Support Questions

Find answers, ask questions, and share your expertise

Merging JSON flowfiles from 2 processors into one flowfile

avatar
Explorer
hey all
 
I was just wondering what the best way is to merge 2 JSON flowfiles together. I have 2 processes which generate pretty much identical data but with a different source (so can’t do it as one process) and I’d like to merge them together into one JSON file if possible.
 
Example data is something like...
 
Process 1:
 

 

[
  {
    "code": "ABC123",
    "product": "My Product",
    "prices": [
      {
        "valid_from": "20180913 09:00:08",
        "name": "pricelist1",
        "abc": "14.74"
      },
      {
        "valid_from": "20180913 09:00:08",
        "name": "pricelist2",
        "abc": "10.70"
      }
    ]
  }
]

 

 
Process 2:
 

 

[
  {
    "code": "ABC123",
    "product": "My Product",
    "prices": [
      {
        "valid_from": "20180913 09:00:08",
        "name": "pricelist1",
        "xyz": "53.74"
      },
      {
        "valid_from": "20180913 09:00:08",
        "name": "pricelist2",
        "xyz": "21.70"
      }
    ]
  }
]

 

 
Desired Outcome:
 

 

[
  {
    "code": "ABC123",
    "product": "My Product",
    "prices": [
      {
        "valid_from": "20180913 09:00:08",
        "name": "pricelist1",
        "abc": "14.74",
        "xyz": "53.74"
      },
      {
        "valid_from": "20180913 09:00:08",
        "name": "pricelist2",
        "abc": "10.70",
        "xyz": "21.70"
      }
    ]
  }
]

 

 
There is actually an array of products, although I’ve only shown 1 example above.
 
Thanks in advance for any help
4 REPLIES 4

avatar
Super Guru

Hi @Crags ,

It depends. Are you getting this data from the different sources around the same time - may be using common trigger or cron schedule - or are you getting the data independent of each other ?

If its the first case, then you might be able to take advantage of ForkEnrichment\JoinEnrichment. The JoinEnrichment provides different merging strategy which you can select from based on the structure and the order of your data.

If its the second case and lets assume that you end up saving this information to a database , then you can apply the merging in the data base by passing product data to sql stored proc for example, the stored proc will check if the data exist for the given Id and if it does then apply an update statement otherwise insert statement.

Hope that helps.

 

avatar
Explorer

Hi @SAMSAL 

it's actually more of case 1, so the exact scenario is...

1. a file is uploaded to a location
2. processor sees there is a new file and then initiates the 2 separate flows based on different sheets in a spreadsheet
3. after the 2 processes have run and generated their data, I'd like to merge that back together into a single JSON file

Not sure how the fork enrich / join works so any light you can shed on that would be great.

 

Alternatively, is there a better way to access data from a spreadsheet (2 different sheets, but the format and naming of the data is exactly the same) that I can use to then perform JOLT's on or something?

I tried listing both sheets in the record reader but data 2 overwrites data 1 as they are exactly the same field / header names etc

Thanks in advance for all the help!

avatar
Super Guru

Hi ,

well lets start with the last thing you have said because Im carious how its happening that one records override the other record. I assume you are using ExcelReader , correct? If that is the case I have tried creating an excel with two sheets that share same schema and both sheers have the same records as follows:

Sheet1: Address1

SAMSAL_0-1726701119254.png

Sheet2: Address2

SAMSAL_1-1726701168669.png

I read the file using FetchFile processor then I passed the content to  ConvertRecord Processor where the reader is ExcelReader and the Writer JsonRrecordSetWriter configured as follows:

ExcelReader:

SAMSAL_2-1726701294238.png

Im passing Avro schema to assign proper  field name and type as follows:

{
  "namespace": "nifi",
  "name": "user",
  "type": "record",
  "fields": [
    { "name": "ID", "type": "int" },
    { "name": "NAME", "type": "string" },
    { "name": "ADDRESS", "type": "string" }
  ]
}

For JsonRecordSetWriter I  used with the default settings no changes.

Here is how my output looked like which account for all the records from both sheets even with duplicates (1, sam, TX):

[ {
  "ID" : 1,
  "NAME" : "sam",
  "ADDRESS" : "TX"
}, {
  "ID" : 2,
  "NAME" : "Ali",
  "ADDRESS" : "WA"
}, {
  "ID" : 1,
  "NAME" : "sam",
  "ADDRESS" : "TX"
}, {
  "ID" : 2,
  "NAME" : "Ali",
  "ADDRESS" : "FL"
} ]

So Im carious what is happening in your case so that the record is overwritten. Maybe if we figure out this problem we can solve it as you said by using JOLT.

I think Fork\Join Enrichment will still work specially since you are reading the info that you are trying to merge from the same file but the flow is going to be different and you might need two ExcelReader for each sheet which means that you need to read the excel twice. The flow will look like the following in a high level:

SAMSAL_3-1726702615276.png

You can avoid having two excel reader by passing the Sheet info as flowfile attribute. However you still need to read it twice for each sheet and then join the info based on the joining strategy that works best for you.

Hope that helps.

 

 

 

avatar
Explorer

Hey @SAMSAL 

Amazing response - thank you!

I'll go over a couple of things you have mentioned, firstly the overwriting... I was wrong about that one. When I converted it to CSV instead using the 2 sheets, it actually appends the information from sheet 2 to the end of sheet 1 (leaving 2 rows as a gap first) so in your example above you would actually end up with something like this in CSV (when reimported into XL):

Crags_0-1726843368083.png

I'm not sure if I can work with that, as I don't see any way in any of the readers to be able to "split" that (or only reading from row 2 to X for example) whilst keeping the flow etc and just using some of the values

The other issue is with the Avro you have set up, because we don't know what the column headers will be (field names) I'm unable to define a schema in Avro for this as they can add and remove these columns at will - the only thing I will be able to rely on is that both the sheets will have identical columns. When I convert direct to JSON, I get something more like this:

[ {
  "column_0" : "ID",
  "column_1" : "NAME",
  "column_2" : "ADDRESS"
}, {
  "column_0" : "1",
  "column_1" : "Sam",
  "column_2" : "TX"
}, {
  "column_0" : "2",
  "column_1" : "Ali",
  "column_2" : "FL"
}]

Again, not sure this is something I can use in this format but I'm just not familiar enough with the NiFi elements at this stage I guess

I'll try the enrichment fork method though and see if there is some mileage in that and then I can feed back