@noncitizen
MergeContent processor.
A "bin" is a virtual container in which FlowFiles are assigned during execution of the mergeContent processor. FlowFiles that are allocated to "bin(s)" will remain in NiFi heap memory and can not be swapped out to disk.
How FlowFiles are allocated to bins from inbound connections during execution depends on the configured "merge strategy".
- Bin-Packing Algorithm - Will allocated FlowFiles to one bin until that bin has reached the configured mins (min num entries and min group size). If a FlowFile cannot be allocated to a bin (for example doing so would mean exceeding the configured max group size), then the FlowFile will be allocated to a second bin.
- Defragment - use case specific that is dependent on source FlowFiles having specific attributes about each fragment (fragment.identifier, fragment.index, fragment.count, and segment.original.filename). A new bin is used for each unique fragment.identifier FlowFile attribute value.
For your use case description, you would be using "bin-packing algorithm" merge strategy.
When MergeContent executes (0 secs means execute as often as possible), it would look at the unallocated FlowFiles in one of the inbound connections at the exact moment in time and allocate those to an existing bin or bins depending as described previously. At the end of binning the FlowFiles, it looks to see if any bins are eligible to be merged. MergeContent will merge a bin when any one of the following is true:
- Both mins have been met for the bin (min num entries AND min group size). Min group size is ignored if blank.
- Bin contains all fragments of a fragmented FlowFile (merge strategy = defragment only)
- Bin has reached configured max bin age (max bin age forces the merge of a bin after configured amount of time, in age starts upon first allocated FlowFile. This prevents a bin that never reached the configured mins from sitting un-merged indefinitely.
- If all bins have FlowFiles allocated to them and next unallocated FlowFile can not be allocated to one of these existing bins (oldest bin is forced to merge to free a bin in which that new FlowFile will get allocated). When merge strategy = defragment, oldest bin of FlowFiles is routed to "failure" relationship instead of forced merge to free a bin.
I suspect that by having only 1 bin, a forced merge is happening in some of you tests. In others the min(s) are set too low and bin becomes eligible for merge before all FlowFiles have been allocated to the bin. (You reported this worked once and probably because you had all 63 CSVs queued in the inbound connection before you started the mergeContent and other times when it failed all components were running as data streamed through your dataflow).
The mergeContent processor has no idea how many FlowFiles should go into a bin (unless merge strategy = defragment). Also keep in mind that multiple nodes in a NiFi cluster execute dataflows independently of other nodes in the cluster. Each node has its own copy of the flow.json.gz loaded in memory, each node has its own content and FlowFile repositories, and each node executes only on the FlowFile present on that node. So if you have multiple nodes ingesting data that you want to merge in to a single FlowFile (zip), then the use of "single node" load balanced connection prior to mergeContent processor is correct approach.
So now lets look at what configuration would mostly likely work for you:
- Merge Strategy = Bin-Packing Algorithm
- Merge Format = zip
- Correlation Attribute = <blank> since you are not trying to divide incoming FlowFiles into different bins.
- min number of entries = 100 (since you are trying to make sure all 63 FlowFiles make it in to the bin regardless of how many processor executions it takes to accomplish that)
- max number of entries = 1000 (default)
- max bin age = 2 mins (set this high enough that you feel confident all FlowFiles will reach inbound connection prior to bin being forced to merge. default is blank and depending on server resources could mean this processor executes many times per second)
- max number of bins = 5 (default) I never recommend having only 1 bin.
All other properties are defaults. What this does is allows 2 mins for all 63 of your FlowFiles to get placed in one bin before the max bin age kicks in and forces that bin to merge. OF course you can adjust this after testing (You have source FlowFiles that are already CSV but you have others that need to be unpacked which may delay them reaching mergeContent even if it is milliseconds. Even that short delay could mean different executions of the mergeContent try to bin and merge). Also single node is important if yoru FlowFiles are spread across all yoru cluster nodes since MergeContent can only merge those on same node.
If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.
Thank you,
Matt