Member since
08-01-2021
49
Posts
10
Kudos Received
7
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2794 | 11-18-2022 09:06 AM | |
| 3959 | 11-15-2022 05:46 PM | |
| 2836 | 10-12-2022 03:18 AM | |
| 2022 | 10-11-2022 08:52 AM | |
| 5013 | 10-08-2022 08:23 AM |
10-15-2022
07:09 PM
Hi Green Really appreciate the assistance, if anything its probably me not clearly articulating what I'm trying to achieve, two weeks in with nifi The raw content from the API is received as follows "{ "ret_code" : 0, "ret_msg" : "OK", "ext_code" : "", "ext_info" : "", "result" : [ { "id" : "187715622692", "symbol" : "BTCUSDT", "price" : 19109.5, "qty" : 0.004, "side" : "Buy", "time" : "2022-10-16T01:25:31.000Z", "trade_time_ms" : 1665883531832, "is_block_trade" : false }, { "id" : "187715618142", "symbol" : "BTCUSDT", "price" : 19109.5, "qty" : 0.882, "side" : "Buy", "time" : "2022-10-16T01:25:31.000Z", "trade_time_ms" : 1665883531123, "is_block_trade" : false }, { "id" : "187715614682", "symbol" : "BTCUSDT", "price" : 19109.5, "qty" : 0.001, "side" : "Buy", "time" : "2022-10-16T01:25:30.000Z", "trade_time_ms" : 1665883530414, "is_block_trade" : false" Likely I was trying to over engineer with the jolt, and ended up with { "id" : [ 1.84310970522E11, 1.84310967802E11 ], "symbol" : [ "BTCUSDT", "BTCUSDT" ], "price" : [ 19241.5, 19241.0 ], "qty" : [ 1.896, 0.002 ], "side" : [ "Buy", "Sell" ], "time" : [ "2022-10-11T12:26:21.000Z", "2022-10-11T12:26:21.000Z" ], "trade_time_ms" : [ 1.665491181666E12, 1.665491181604E12 ], "is_block_trade" : [ false, false ] } Where if I just used a json split, which gave the following { "id" : "187715622692", "symbol" : "BTCUSDT", "price" : 19109.5, "qty" : 0.004, "side" : "Buy", "time" : "2022-10-16T01:25:31.000Z", "trade_time_ms" : 1665883531832, "is_block_trade" : false } The format above was accepted by PutDatabaseRecord, where the issue moved to duplicate ID being written, setting ID as primary key seems have stopped the duplicate being written but is probably not the cleanest solution... The other API Im dealing with is where I think the solution you suggested with be the ideal use case... { "lastUpdateId" : 579582125552, "E" : 1665885750096, "T" : 1665885750088, "symbol" : "BTCUSD_PERP", "pair" : "BTCUSD", "bids" : [ [ "19139.5", "8824" ], [ "19139.4", "757" ] "asks" : [ [ "19139.6", "3165" ], [ "19139.7", "812" ] } Where the requirement would be to extract for example "bids" : [ [ "19139.5", "8824" ], [ "19139.4", "757" ] into two seperate record .. maintaining the fields in each
... View more
10-14-2022
02:07 AM
thanks ,Green! your idea helped me solve the problem
... View more
10-10-2022
10:43 PM
Perhaps the connection you set between PutHDFS and UpdateHive3Table doesn't send the original file? I am a bit confused by your flow in general.. why convert to avro? Where are you reading files from? Why do you PutHDFS and then UpdateHiveTable instead of just using PutHiveQL?
... View more
10-10-2022
10:36 PM
@morti To add to this, this is essentially (if not just very similar) to nifi's Record API where Record processors require a RecordReader/Writer controller service where the schema for the incoming/outgoing files is defined. All these processors can get their schema from some registry, or have it configured hard-coded, or try to infer the schema, or simply rely on the schema that was given to the flowfile earlier in the flow. I think it's worth looking into Records in nifi, they're designed specifically for well-defined data and use-cases similar to what your described
... View more
10-08-2022
10:15 PM
1 Kudo
The ReplaceText procesor has a prepend/append mode which might be of help
... View more
10-08-2022
08:23 AM
1 Kudo
Hey @Fredi , I believe the answer for your problem is the processor UpdateRecord. Update record allows you to directly manipulate the fields in your file content. You add dynamic properties to the processor where the key of the property is /<field> (so in your case, '/api_value'), and in the value of this dynamic property you can write down some logic to determine what value to insert into api_value. In the processor, there is a field called "Replacement Value Strategy", which defines how the value of the property will be read. If you set this to "Record Path Value", it means you can now give a path to a different field in your file (url_value!) - I can't test this right now because I'm not at my office, but I'm not entirely sure whether you can manipulate the result after giving a record path (to extract the api_value from the evaluated url_value). Regardless, I'm just about 100% sure this can be done with two processors - One EvaluateJsonPath to extract the url_value into an attribute, then UpdateRecord that uses the 'Literal Value' replacement strategy - with this strategy, you can just add a property with key '/api_value' and value '${url_value}' (or whatever attribute name you gave to the extracted url_value) and once you can access url_value with the expression language (via ${url_value}) you can use all the available functions to manipulate expression language variables. Here's an article with a couple of examples on UpdateRecord: https://community.cloudera.com/t5/Community-Articles/Update-the-Contents-of-FlowFile-by-using-UpdateRecord/ta-p/248267 (I noticed in the article they used some recordPath related functions like "replaceRegex", so I believe there might be a way to use these and then limit the entire issue to just one UpdateRecord processor! Sadly I'm not too familiar with these myself and this was the first time I've seen them) And here's the expression language documentation: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html You can see there are lots of useful functions to extract your api_value once you have ${url_value} as an attribute variable, for example "substring", "find"/"replace", "ifElse", etc. all of which you can try and use to ensure only the api_value is left in the end. Hope this helps! I'm sure using ReplaceText and possibly JoltTransform could provide alternate solutions to the issue, however I believe UpdateRecord is the cleanest solution for this and truly makes use of the processor's abilities. If you struggle to use it correctly, you can reply with an example json and expected output and I'll try to write down the flow when I have time.
... View more
10-06-2022
11:42 AM
Brilliant! That does the trick. Thanks!
... View more
10-06-2022
05:11 AM
@Green_ wrote: I've run into similar issues and haven't reached a clear conclusion either. It seems your have very high heap usage which might potentially be relevant. Once we stop nifi then only it is reducing..there is no data in queue but it is in high heap only.. unable to understanding why it is not reducing automatically..
... View more
07-12-2022
03:08 PM
@Chakkara As far as i remember , distributed cache is not having consistency . You could use Hbase or HDFS for storing the status of success or failure of the processors for downstream application. Once you saved the Success and Failure at Hbase . You can retrieve it from the Hbase processor using the row ID. Build a REST API NiFi flow to pull the status from Hbase for example HandleHTTPRequest --> FetchHbaseRow - HandleHTTPResponse You can call the HTTP API (Request and Response) via shell script/curl and call the script from Control-M.
... View more
- « Previous
-
- 1
- 2
- Next »