Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Merge big Json using Wait/Notify in Apache Nifi

avatar
Explorer

I am having trouble with my dataflow using Nifi, Sure because I didn't understand how Wit/Notify Works. Here is my basic data flow:
- I listed my s3 bucket containing zip files.
- Extracted my Json file from the zip. Each Json is 2Gb up to 3 Gb.
- I then split my json several times to avoid 'Out of memory' to obtain sing records FlowFiles(Here comes the trouble).
- I want to flatten each flowfile
- I want to merge my flattened FF to re obtain the original Json. I know I cannot do with 'Merge Content' as my json goes through several split processors.

Can you please explain how to use the Wait/Notify?

Does the wait automatically merges the flowfiles?

Because I did not understand. I also looked at this post but it is still not clear.
Thanks

10 REPLIES 10

avatar
Master Mentor

avatar
Explorer

I need do be honest, I might be the only one but it is still not clear how the wait/notify pattern works. Do you have any other resources?

avatar
Super Mentor

@3nomis

The Wait and Notify processor have nothing to do with the merging of FlowFiles.

The Wait processor prevents FlowFiles from passing through it until a release signal for the FlowFile is found in the configured cache service.


The Wait processor would be configured to read some attribute's value from an incoming FlowFile. That value should be unique and shared by only the corresponding FlowFile that will go to the Notify processor. The wait processor then check the configured cache service for the existence of a cache key that matches that value.


The Notify processor would be configured to read some attribute's value from an incoming FlowFile. That value should be unique to that FlowFile and to the corresponding FlowFile currently waiting at the Wait processor. The Notify processor will create the cache key entry if it does not exist in the configured cache service. JSON content similar to the following will be generated and assign to the that cache key:


{"counts":{"counter":1},"attributes":{},"releasableCount":0}


If another FlowFile will same cache key value comes to the Notify processor, the "counter" is incremented.

When the Wait processor finally sees that the cache key for a waiting FlowFile contains this JSON, it will decrement the count value by the configured signal count and move resulting number from counter to "releasableCount". If releasableCount equates to 0, the cache key is deleted; otherwise, the decremented value is set to the positive new decremented value. Additional FlowFiles looking at this same cache key will be able to pass through Wait until counter and releasableCount are both zero. Remember that Notify will increment these counts by one for each FlowFile that "notifies" using same cache key value.

The design intent here is to hold processing on some source file until some side processing is complete.


In your use case I do not believe this is what you are trying to do. You are just looking for a way to merge all your splits back in to the same original zipped FlowFile post flattening the json in each split.


In this case, you may want to add and updateAttribute processor between your two split based processors.


Following the split of a FlowFile each produced split FlowFiles has these three FlowFile Attributes assigned:

fragment.identifierAll split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute
fragment.indexA one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile
fragment.countThe number of split FlowFiles generated from the parent FlowFile


Using the UpdateAttribute Processor, you can create a new set of attributes to save these values so they do not get lost when you split again.

for example:

fragment.identifier.first.split  =  ${fragment.identifier}
fragment.index.first.split  =  ${fragment.index}
fragment.count.first.split  =  ${fragment.count}


Then pass the first set of splits to next split processor. Each produced FlowFile will get new fragment values but retain the above first.split properties.


Then perform your processing to flatten each of your resulting FlowFile's json content.


Then we are on to the Merge process where we merge twice. First merge based on the fragment.identifier, fragement.index, and fragment.count assigned by second split. Following the merge use an UpdateAttribute processor to move the *.first.spit fragment values back to original the corresponding fragment.identifier, fragement.index, and fragment.count. Now you can merge these FlowFiles back to original FlowFile.


Thank you,

Matt


If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.

avatar
Explorer

@Matt Clarke

Hi I tried what you suggested but with 3 splits. However the last merge thells that 'cannot merge because the fragment.index is not integer value'. Is there anything wrong in my flow?:

108489-1557238371402.png

last merge

108459-1557239956807.png

Update attr

108475-1557239964861.png

avatar
Super Mentor

@3nomis

I suggest inspecting your FlowFiles in the connection before the final mergeContent to see what values have been assigned to these three attributes (fragment.identifier, fragment.count, and fragment.index) since the error you are seeing says the fragement.index does not contain an integer value.

avatar
Super Mentor

@3nomis

I just ran a small test flow to double check that a triple split and triple merge worked.

108429-1557250817468.png

I attached template of above test flow for your reference:


Multi-stage-Split-Then-Merge.xml


Hope this helps.


Thanks,

Matt

avatar
Explorer

I tried what you exactly did. The merge processors look somehow not waiting for all FF to be queued but they merge as FF come. In this way it confuses the next merge.


I proved it by starting a merge at a time, waiting for all FF. in this way works but it is not deployable.

avatar
Super Mentor

@3nomis

How many unique sourced FlowFiles (very start of flow before first split) are you working with? Is it possible you do not have enough bins configured on your MergeContent?

What is the Max Bin Age set to on your mergeContent processor? If you remove or set it to a much higher value do you then see merging happening correctly?

avatar
Explorer

@Matt Clarke

The initial files are 20 each of around 2gb. Regarding the merge content I left all default values, such as' bins number'= 5 and so on. Which parameters do you recommend to tune in the merge contents? Because I noticed that it takes time until the queue before the mergingit gets filled.