Member since
08-01-2021
52
Posts
10
Kudos Received
7
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2940 | 11-18-2022 09:06 AM | |
| 4154 | 11-15-2022 05:46 PM | |
| 2973 | 10-12-2022 03:18 AM | |
| 2133 | 10-11-2022 08:52 AM | |
| 5226 | 10-08-2022 08:23 AM |
11-15-2022
05:46 PM
1 Kudo
For your attempt to use only a CSVReader, have you configured the "timestamp" property in the controller service to describe a format that includes microseconds? I have encountered a very similar issue in the past. In my case, I was writing avro files using PutDatabaseRecord which already had their schema as part of the file. The schema itself was incorrect and would describe date fields as strings - as such, when I'd write to my DB (which had a date with milliseconds percision type), the milliseconds would get cut out and presumably some default parsing would allow the rest of the date to be written correctly. I the solution I found was to turn my avro files into CSV, then use a CSVReader in my PutDatabaseRecord processor. In my case, configuring the timestamp format to include milliseconds ( .SSS in the end) would end up allowing the processor to write the records to the DB correctly without any data loss.
... View more
10-21-2022
07:54 AM
In the end, I never found a perfect solution and just opted to use attributes and RouteOnAttribute. If you find ValidateXML can actually verify all the checks you need, it shouldn't be too bad using a ConvertRecord processor and transforming JSON to XML for the validation (or perhaps just straight up using XMLs instead of jsons if that fits your use case)
... View more
10-14-2022
07:30 AM
I think I may not have understood your requirement correctly so please correct me if I'm wrong. Currently, you have managed to transform your data into the format of: {
"ret_code":0,
"ret_msg":"OK",
"ext_code":"",
"ext_info":"",
"result":[
{
"id":"184310970522",
"symbol":"BTCUSDT",
"price":19241.5,
"qty":1.896,
"side":"Buy",
"time":"2022-10-11T12:26:21.000Z",
"trade_time_ms":1665491181666,
"is_block_trade":false
},
{
"id":"184310967802",
"symbol":"BTCUSDT",
"price":19241,
"qty":0.002,
"side":"Sell",
"time":"2022-10-11T12:26:21.000Z",
"trade_time_ms":1665491181604,
"is_block_trade":false
}
],
"time_now":"1665491183.183636"
} Where you have a single json record with a field "result" that is an array of results. I understood your need as transforming this single json with two results into two separate records with 1 result each where the rest of their fields are identical (ret_code,ret_msg, ext_code, etc.), I.E: [
{
"ret_code":0,
"ret_msg":"OK",
"ext_code":"",
"ext_info":"",
"result":{
"id":"184310970522",
"symbol":"BTCUSDT",
"price":19241.5,
"qty":1.896,
"side":"Buy",
"time":"2022-10-11T12:26:21.000Z",
"trade_time_ms":1665491181666,
"is_block_trade":false
},
"time_now":"1665491183.183636"
},
{
"ret_code":0,
"ret_msg":"OK",
"ext_code":"",
"ext_info":"",
"result":{
"id":"184310967802",
"symbol":"BTCUSDT",
"price":19241,
"qty":0.002,
"side":"Sell",
"time":"2022-10-11T12:26:21.000Z",
"trade_time_ms":1665491181604,
"is_block_trade":false
},
"time_now":"1665491183.183636"
}
] If this is not what you need, I'd like you to post an example for what your data needs to look like before inserting to your DB. I can help with transforming it to the right format only if I'm sure of what is required 🙂
... View more
10-14-2022
07:10 AM
I'm not too sure what you mean, so let me see if I got it right. You managed to get your data into the format of: {
"ret_code":0,
"ret_msg":"OK",
"ext_code":"",
"ext_info":"",
"result":[
{
"id":"184310970522",
"symbol":"BTCUSDT",
"price":19241.5,
"qty":1.896,
"side":"Buy",
"time":"2022-10-11T12:26:21.000Z",
"trade_time_ms":1665491181666,
"is_block_trade":false
},
{
"id":"184310967802",
"symbol":"BTCUSDT",
"price":19241,
"qty":0.002,
"side":"Sell",
"time":"2022-10-11T12:26:21.000Z",
"trade_time_ms":1665491181604,
"is_block_trade":false
}
],
"time_now":"1665491183.183636"
} And now you want to insert this data into your DB. Then, you mentioned how you need to split the data before you can write, which I assumed meant you want to transform the above example into: [
{
"ret_code":0,
"ret_msg":"OK",
"ext_code":"",
"ext_info":"",
"result":{
"id":"184310970522",
"symbol":"BTCUSDT",
"price":19241.5,
"qty":1.896,
"side":"Buy",
"time":"2022-10-11T12:26:21.000Z",
"trade_time_ms":1665491181666,
"is_block_trade":false
},
"time_now":"1665491183.183636"
},
{
"ret_code":0,
"ret_msg":"OK",
"ext_code":"",
"ext_info":"",
"result":{
"id":"184310967802",
"symbol":"BTCUSDT",
"price":19241,
"qty":0.002,
"side":"Sell",
"time":"2022-10-11T12:26:21.000Z",
"trade_time_ms":1665491181604,
"is_block_trade":false
},
"time_now":"1665491183.183636"
}
] Which is an array of two records, both of which have identical ret_code, ret_msg, ext_code, ext_info & time_now fields, except now instead of the result field being an array of two results, the two records each have only one result inside of them. Could you give a better example of what the data you want to insert into your DB should look like? I can help with figuring out how to transform it into a valid format but first I need to have a better understanding of how your data looks like currently and what it needs to turn into 🙂
... View more
10-12-2022
03:18 AM
1 Kudo
Perhaps JoltTransformRecord could help you. The jolt transformation "shift" let's you rename fields - in a csv file's case, headers. If you know all your replaces ahead of time, you could define a transformation like: { operation: shift spec: { "kgg":"g", "c34":"c", "*":"&" } } Note the last shift, "*":"&" , which would transfer over the rest of the headers you didn't specifically rename.
... View more
10-11-2022
08:52 AM
1 Kudo
Perhaps you can use the processor CalculateRecordStats to count how many records of each situation code you have in a given file, then once that is put in your attributes you can try a couple of different methods to decide what to do, for example you can use RouteOnAttribute and then check if attribute sit_code.pod>0 then send to extract_one_pod_element and there you could use QueryRecord to take out exactly one POD element, otherwise route to LIV, AAR, etc.
... View more
10-11-2022
07:56 AM
I believe ForkRecord is exactly the solution you are looking for. Read here: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html The processor will allow you to split 1 json which has a nested array field "result" into many jsons with the same parent fields but only 1 result per json. Hope this helps!
... View more
10-11-2022
07:49 AM
I believe you could use QueryRecord to take you in the right direction. You could define properties to route files based on their situation code, for example: Key: POD Value: SELECT * FROM flowfile WHERE situation_code = 'POD' LIMIT 1 This would route 1 element in your input array where the code is POD to the new relationship "POD" (same as the dynamic property name). This doesn't exactly match your case of retrieving an element only if higher priority codes weren't found, but perhaps you could use this example to get closer to a solution. Good luck!
... View more
10-10-2022
10:43 PM
Perhaps the connection you set between PutHDFS and UpdateHive3Table doesn't send the original file? I am a bit confused by your flow in general.. why convert to avro? Where are you reading files from? Why do you PutHDFS and then UpdateHiveTable instead of just using PutHiveQL?
... View more
10-10-2022
10:36 PM
@morti To add to this, this is essentially (if not just very similar) to nifi's Record API where Record processors require a RecordReader/Writer controller service where the schema for the incoming/outgoing files is defined. All these processors can get their schema from some registry, or have it configured hard-coded, or try to infer the schema, or simply rely on the schema that was given to the flowfile earlier in the flow. I think it's worth looking into Records in nifi, they're designed specifically for well-defined data and use-cases similar to what your described
... View more