I am having trouble with my dataflow using Nifi, Sure because I didn't understand how Wit/Notify Works. Here is my basic data flow: - I listed my s3 bucket containing zip files. - Extracted my Json file from the zip. Each Json is 2Gb up to 3 Gb. - I then split my json several times to avoid 'Out of memory' to obtain sing records FlowFiles(Here comes the trouble). - I want to flatten each flowfile - I want to merge my flattened FF to re obtain the original Json. I know I cannot do with 'Merge Content' as my json goes through several split processors.
Can you please explain how to use the Wait/Notify?
Does the wait automatically merges the flowfiles?
Because I did not understand. I also looked at this post but it is still not clear. Thanks
The Wait and Notify processor have nothing to do with the merging of FlowFiles.
The Wait processor prevents FlowFiles from passing through it until a release signal for the FlowFile is found in the configured cache service.
The Wait processor would be configured to read some attribute's value from an incoming FlowFile. That value should be unique and shared by only the corresponding FlowFile that will go to the Notify processor. The wait processor then check the configured cache service for the existence of a cache key that matches that value.
The Notify processor would be configured to read some attribute's value from an incoming FlowFile. That value should be unique to that FlowFile and to the corresponding FlowFile currently waiting at the Wait processor. The Notify processor will create the cache key entry if it does not exist in the configured cache service. JSON content similar to the following will be generated and assign to the that cache key:
If another FlowFile will same cache key value comes to the Notify processor, the "counter" is incremented.
When the Wait processor finally sees that the cache key for a waiting FlowFile contains this JSON, it will decrement the count value by the configured signal count and move resulting number from counter to "releasableCount". If releasableCount equates to 0, the cache key is deleted; otherwise, the decremented value is set to the positive new decremented value. Additional FlowFiles looking at this same cache key will be able to pass through Wait until counter and releasableCount are both zero. Remember that Notify will increment these counts by one for each FlowFile that "notifies" using same cache key value.
The design intent here is to hold processing on some source file until some side processing is complete.
In your use case I do not believe this is what you are trying to do. You are just looking for a way to merge all your splits back in to the same original zipped FlowFile post flattening the json in each split.
In this case, you may want to add and updateAttribute processor between your two split based processors.
Following the split of a FlowFile each produced split FlowFiles has these three FlowFile Attributes assigned:
All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute
A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile
The number of split FlowFiles generated from the parent FlowFile
Using the UpdateAttribute Processor, you can create a new set of attributes to save these values so they do not get lost when you split again.
Then pass the first set of splits to next split processor. Each produced FlowFile will get new fragment values but retain the above first.split properties.
Then perform your processing to flatten each of your resulting FlowFile's json content.
Then we are on to the Merge process where we merge twice. First merge based on the fragment.identifier, fragement.index, and fragment.count assigned by second split. Following the merge use an UpdateAttribute processor to move the *.first.spit fragment values back to original the corresponding fragment.identifier, fragement.index, and fragment.count. Now you can merge these FlowFiles back to original FlowFile.
If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
I suggest inspecting your FlowFiles in the connection before the final mergeContent to see what values have been assigned to these three attributes (fragment.identifier, fragment.count, and fragment.index) since the error you are seeing says the fragement.index does not contain an integer value.
The initial files are 20 each of around 2gb. Regarding the merge content I left all default values, such as' bins number'= 5 and so on. Which parameters do you recommend to tune in the merge contents? Because I noticed that it takes time until the queue before the mergingit gets filled.
MergeContent should be using Defragment. There is no default value for Max Bin Age, so not sure what you set there. If left blank, processor will wait for ever to merge a bin unless you run out of bins. Also make sure you adjust the object and size thresholds on the connections feeding the MergeContent processors so that they are large enough to accommodate the number of splits that need to be merged. Considering the size of the FlowFiles being merged, it may take time to merge all of them. as far as bins, try setting 21 of them.