Support Questions

Find answers, ask questions, and share your expertise

nifi splitjson - JsonTransform

avatar
New Contributor

Hi All..

Been struggling with trying to write data to postgresql, If I accept a single line I can write, but the data is a realtime feed and need to ingest the data at a higher rate..

Using jolt, I have got the fields correct

{
"ret_code" : 0,
"ret_msg" : "OK",
"ext_code" : "",
"ext_info" : "",
"result" : [ {
"id" : "184310970522",
"symbol" : "BTCUSDT",
"price" : 19241.5,
"qty" : 1.896,
"side" : "Buy",
"time" : "2022-10-11T12:26:21.000Z",
"trade_time_ms" : 1665491181666,
"is_block_trade" : false
}, {
"id" : "184310967802",
"symbol" : "BTCUSDT",
"price" : 19241,
"qty" : 0.002,
"side" : "Sell",
"time" : "2022-10-11T12:26:21.000Z",
"trade_time_ms" : 1665491181604,
"is_block_trade" : false
} ],
"time_now" : "1665491183.183636"
}

 

Issue is I need to split the json to write to DB

I've landed with the following

{
"id" : [ 1.84310970522E11, 1.84310967802E11 ],
"symbol" : [ "BTCUSDT", "BTCUSDT" ],
"price" : [ 19241.5, 19241.0 ],
"qty" : [ 1.896, 0.002 ],
"side" : [ "Buy", "Sell" ],
"time" : [ "2022-10-11T12:26:21.000Z", "2022-10-11T12:26:21.000Z" ],
"trade_time_ms" : [ 1.665491181666E12, 1.665491181604E12 ],
"is_block_trade" : [ false, false ]
}

 

But am unable to write the array to DB ... Relatively new at this, and have tried all the options suggested within the forum ..

 

Any guidance would be appreciated..

 

 

5 REPLIES 5

avatar
Expert Contributor

I believe ForkRecord is exactly the solution you are looking for. Read here:

 

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apach...

 

The processor will allow you to split 1 json which has a nested array field "result" into many jsons with the same parent fields but only 1 result per json.

 

Hope this helps! 

 

avatar
New Contributor

Hi Green, Appreciate the response.. Im working through testing option based on your suggestion.

Looking at the documentation, the output seems to be inline with the schema of the data prior to the JoltTransform as shown below. With the exception of removing the parent field, If the data prior to result is removed on the jolt, isn that rally the same output?

{
"ret_code" : 0,
"ret_msg" : "OK",
"ext_code" : "",
"ext_info" : "",
"result" : [ {
"id" : "186781027962",
"symbol" : "BTCUSDT",
"price" : 19699.5,
"qty" : 0.279,
"side" : "Sell",
"time" : "2022-10-14T13:29:58.000Z",
"trade_time_ms" : 1665754198616,
"is_block_trade" : false
}, {
"id" : "186781021442",
"symbol" : "BTCUSDT",
"price" : 19700,
"qty" : 0.003,
"side" : "Buy",
"time" : "2022-10-14T13:29:58.000Z",
"trade_time_ms" : 1665754198129,
"is_block_trade" : false
} ],
"time_now" : "1665754200.378419"
}

 

avatar
Expert Contributor

I'm not too sure what you mean, so let me see if I got it right.

You managed to get your data into the format of:

 

{
   "ret_code":0,
   "ret_msg":"OK",
   "ext_code":"",
   "ext_info":"",
   "result":[
      {
         "id":"184310970522",
         "symbol":"BTCUSDT",
         "price":19241.5,
         "qty":1.896,
         "side":"Buy",
         "time":"2022-10-11T12:26:21.000Z",
         "trade_time_ms":1665491181666,
         "is_block_trade":false
      },
      {
         "id":"184310967802",
         "symbol":"BTCUSDT",
         "price":19241,
         "qty":0.002,
         "side":"Sell",
         "time":"2022-10-11T12:26:21.000Z",
         "trade_time_ms":1665491181604,
         "is_block_trade":false
      }
   ],
   "time_now":"1665491183.183636"
}

 

And now you want to insert this data into your DB. Then, you mentioned how you need to split the data before you can write, which I assumed meant you want to transform the above example into:

 

[
   {
      "ret_code":0,
      "ret_msg":"OK",
      "ext_code":"",
      "ext_info":"",
      "result":{
         "id":"184310970522",
         "symbol":"BTCUSDT",
         "price":19241.5,
         "qty":1.896,
         "side":"Buy",
         "time":"2022-10-11T12:26:21.000Z",
         "trade_time_ms":1665491181666,
         "is_block_trade":false
      },
      "time_now":"1665491183.183636"
   },
   {
      "ret_code":0,
      "ret_msg":"OK",
      "ext_code":"",
      "ext_info":"",
      "result":{
         "id":"184310967802",
         "symbol":"BTCUSDT",
         "price":19241,
         "qty":0.002,
         "side":"Sell",
         "time":"2022-10-11T12:26:21.000Z",
         "trade_time_ms":1665491181604,
         "is_block_trade":false
      },
      "time_now":"1665491183.183636"
   }
]

 

Which is an array of two records, both of which have identical ret_code, ret_msg, ext_code, ext_info & time_now fields, except now instead of the result field being an array of two results, the two records each have only one result inside of them.

 

Could you give a better example of what the data you want to insert into your DB should look like? I can help with figuring out how to transform it into a valid format but first I need to have a better understanding of how your data looks like currently and what it needs to turn into 🙂

avatar
Expert Contributor

I think I may not have understood your requirement correctly so please correct me if I'm wrong.

Currently, you have managed to transform your data into the format of:

 

{
   "ret_code":0,
   "ret_msg":"OK",
   "ext_code":"",
   "ext_info":"",
   "result":[
      {
         "id":"184310970522",
         "symbol":"BTCUSDT",
         "price":19241.5,
         "qty":1.896,
         "side":"Buy",
         "time":"2022-10-11T12:26:21.000Z",
         "trade_time_ms":1665491181666,
         "is_block_trade":false
      },
      {
         "id":"184310967802",
         "symbol":"BTCUSDT",
         "price":19241,
         "qty":0.002,
         "side":"Sell",
         "time":"2022-10-11T12:26:21.000Z",
         "trade_time_ms":1665491181604,
         "is_block_trade":false
      }
   ],
   "time_now":"1665491183.183636"
}

 

 Where you have a single json record with a field "result" that is an array of results.

I understood your need as transforming this single json with two results into two separate records with 1 result each where the rest of their fields are identical (ret_code,ret_msg, ext_code, etc.), I.E:

 

[
   {
      "ret_code":0,
      "ret_msg":"OK",
      "ext_code":"",
      "ext_info":"",
      "result":{
         "id":"184310970522",
         "symbol":"BTCUSDT",
         "price":19241.5,
         "qty":1.896,
         "side":"Buy",
         "time":"2022-10-11T12:26:21.000Z",
         "trade_time_ms":1665491181666,
         "is_block_trade":false
      },
      "time_now":"1665491183.183636"
   },
   {
      "ret_code":0,
      "ret_msg":"OK",
      "ext_code":"",
      "ext_info":"",
      "result":{
         "id":"184310967802",
         "symbol":"BTCUSDT",
         "price":19241,
         "qty":0.002,
         "side":"Sell",
         "time":"2022-10-11T12:26:21.000Z",
         "trade_time_ms":1665491181604,
         "is_block_trade":false
      },
      "time_now":"1665491183.183636"
   }
]

 

If this is not what you need, I'd like you to post an example for what your data needs to look like before inserting to your DB. I can help with transforming it to the right format only if I'm sure of what is required 🙂

avatar
New Contributor

Hi Green

Really appreciate the assistance, if anything its probably me not clearly articulating what I'm trying to achieve, two weeks in with nifi

 

The raw content from the API is received as follows

 

"{
"ret_code" : 0,
"ret_msg" : "OK",
"ext_code" : "",
"ext_info" : "",
"result" : [ {
"id" : "187715622692",
"symbol" : "BTCUSDT",
"price" : 19109.5,
"qty" : 0.004,
"side" : "Buy",
"time" : "2022-10-16T01:25:31.000Z",
"trade_time_ms" : 1665883531832,
"is_block_trade" : false
}, {
"id" : "187715618142",
"symbol" : "BTCUSDT",
"price" : 19109.5,
"qty" : 0.882,
"side" : "Buy",
"time" : "2022-10-16T01:25:31.000Z",
"trade_time_ms" : 1665883531123,
"is_block_trade" : false
}, {
"id" : "187715614682",
"symbol" : "BTCUSDT",
"price" : 19109.5,
"qty" : 0.001,
"side" : "Buy",
"time" : "2022-10-16T01:25:30.000Z",
"trade_time_ms" : 1665883530414,
"is_block_trade" : false"

 

Likely I was trying to over engineer with the jolt, and ended up with 

{
"id" : [ 1.84310970522E11, 1.84310967802E11 ],
"symbol" : [ "BTCUSDT", "BTCUSDT" ],
"price" : [ 19241.5, 19241.0 ],
"qty" : [ 1.896, 0.002 ],
"side" : [ "Buy", "Sell" ],
"time" : [ "2022-10-11T12:26:21.000Z", "2022-10-11T12:26:21.000Z" ],
"trade_time_ms" : [ 1.665491181666E12, 1.665491181604E12 ],
"is_block_trade" : [ false, false ]
}

 

Where if I just used a json split, which gave the following

{
"id" : "187715622692",
"symbol" : "BTCUSDT",
"price" : 19109.5,
"qty" : 0.004,
"side" : "Buy",
"time" : "2022-10-16T01:25:31.000Z",
"trade_time_ms" : 1665883531832,
"is_block_trade" : false
}

 

The format above was accepted by PutDatabaseRecord, where the issue moved to duplicate ID being written, setting ID as primary key seems have stopped the duplicate being written but is probably not the cleanest solution...

 

The other API Im dealing with is where I think the solution you suggested with be the ideal use case...

{
"lastUpdateId" : 579582125552,
"E" : 1665885750096,
"T" : 1665885750088,
"symbol" : "BTCUSD_PERP",
"pair" : "BTCUSD",
"bids" : [ [ "19139.5", "8824" ], [ "19139.4", "757" ]
"asks" : [ [ "19139.6", "3165" ], [ "19139.7", "812" ]
}

 

Where the requirement would be to extract for example "bids" : [ [ "19139.5", "8824" ], [ "19139.4", "757" ] into two seperate record .. maintaining the fields in each