Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Using MergeContent to create ZIP archive

avatar
Contributor
I have a flow that fetches all files from a given directory—they could be gzip, zip or csv, with the gzip and zip files holding a single csv file. I then route on MIME type, decompress the gzip files, unpack the zip files, and then bring what are now ALL csv files back together. This is working. I then want to create a zip archive of all of these csv files, and MergeContent seemed like a good candidate (outside of using ExecuteStreamCommand to run zip from the OS).

 

But no matter what I do, the results are inconsistent:

  • The first time I ran it with default properties in MergeContent (and setting Merge Format to ZIP), it created a single zip file — I thought I was done!
  • But on the second time, it created six zip files, so I realized I wasn't done, and started playing with properties
  • Changed Maximum number of Bins to 1 - created five zip files
  • Changed Correlation Attribute Name to absolute.path, which is the same for all flow files - created four zip files
  • Changed Maximum Group Size to 500 MB (all csv files zipped up are ~500 KB) - created two zip files
  • Changed Minimum Number of Entries to 1000, Maximum to 5000 (62 csv files) - created forty zip files
  • Changed load balancing on the queue to single node - created twelve zip files

Clearly I'm throwing darts hoping something will stick. Documentation (and the general wisdom of the web) hasn't been particularly helpful in understanding how this works, what a "bin" is, what a "bundle" is, and most of it seems geared towards breaking apart a single flow file, doing some processing, then bringing it back together. That's not what I'm doing. I'm starting with multiple flow files and want to bring all of them always, every time, to a single flow file.

 

If the answer is that this can't be done with MergeContent, then I'll just run zip through the OS — but that would mean writing the files to disk, then zipping, and I wanted to try to keep this native Nifi.

 

Again, I started with default properties, except changing Merge Format to ZIP, and then made my modifications from there. And, yes, I am using the "merged" relationship.

1 ACCEPTED SOLUTION

avatar
Super Mentor

@noncitizen 

 

MergeContent processor.


A "bin" is a virtual container in which FlowFiles are assigned during execution of the mergeContent processor.   FlowFiles that are allocated to "bin(s)" will remain in NiFi heap memory and can not be swapped out to disk.

How FlowFiles are allocated to bins from inbound connections during execution depends on the configured "merge strategy".  

  • Bin-Packing Algorithm - Will allocated FlowFiles to one bin until that bin has reached the configured mins (min num entries and min group size).  If a FlowFile cannot be allocated to a bin (for example doing so would mean exceeding the configured max group size), then the FlowFile will be allocated to a second bin.
  • Defragment - use case specific that is dependent on source FlowFiles having specific attributes about each fragment (fragment.identifier, fragment.index, fragment.count, and segment.original.filename). A new bin is used for each unique fragment.identifier FlowFile attribute value.

For your use case description, you would be using "bin-packing algorithm" merge strategy.

 

When MergeContent executes (0 secs means execute as often as possible), it would look at the unallocated FlowFiles in one of the inbound connections at the exact moment in time and allocate those to an existing bin or bins depending as described previously. At the end of binning the FlowFiles, it looks to see if any bins are eligible to be merged.  MergeContent will merge a bin when any one of the following is true:

  • Both mins have been met for the bin (min num entries AND min group size). Min group size is ignored if blank. 
  • Bin contains all fragments of a fragmented FlowFile (merge strategy = defragment only)
  • Bin has reached configured max bin age (max bin age forces the merge of a bin after configured amount of time, in age starts upon first allocated FlowFile.  This prevents a bin that never reached the configured mins from sitting un-merged indefinitely.
  • If all bins have FlowFiles allocated to them and next unallocated FlowFile can not be allocated to one of these existing bins (oldest bin is forced to merge to free a bin in which that new FlowFile will get allocated).  When merge strategy = defragment, oldest bin of FlowFiles is routed to "failure" relationship instead of forced merge to free a bin.

I suspect that by having only 1 bin, a forced merge is happening in some of you tests.  In others the min(s) are set too low and bin becomes eligible for merge before all FlowFiles have been allocated to the bin. (You reported this worked once and probably because you had all 63 CSVs queued in the inbound connection before you started the mergeContent and other times when it failed all components were running as data streamed through your dataflow).

 

The mergeContent processor has no idea how many FlowFiles should go into a bin (unless merge strategy = defragment).  Also keep in mind that multiple nodes in a NiFi cluster execute dataflows independently of other nodes in the cluster. Each node has its own copy of the flow.json.gz loaded in memory, each node has its own content and FlowFile repositories, and each node executes only on the FlowFile present on that node.  So if you have multiple nodes ingesting data that you want to merge in to a single FlowFile (zip), then the use of "single node" load balanced connection prior to mergeContent processor is correct approach.

 

So now lets look at what configuration would mostly likely work for you:

  1. Merge Strategy  = Bin-Packing Algorithm 
  2. Merge Format = zip
  3. Correlation Attribute = <blank> since you are not trying to divide incoming FlowFiles into different bins.
  4. min number of entries = 100 (since you are trying to make sure all 63 FlowFiles make it in to the bin regardless of how many processor executions it takes to accomplish that)
  5. max number of entries = 1000 (default)
  6. max bin age = 2 mins (set this high enough that you feel confident all FlowFiles will reach inbound connection prior to bin being forced to merge.  default is blank and depending on server resources could mean this processor executes many times per second)
  7. max number of bins = 5 (default) I never recommend having only 1 bin.

 

All other properties are defaults. What this does is allows 2 mins for all 63 of your FlowFiles to get placed in one bin before the max bin age kicks in and forces that bin to merge.  OF course you can adjust this after testing (You have source FlowFiles that are already CSV but you have others that need to be unpacked which may delay them reaching mergeContent even if it is milliseconds. Even that short delay could mean different executions of the mergeContent try to bin and merge). Also single node is important if yoru FlowFiles are spread across all yoru cluster nodes since MergeContent can only merge those on same node.

 

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt

View solution in original post

3 REPLIES 3

avatar
Super Mentor

@noncitizen 

 

MergeContent processor.


A "bin" is a virtual container in which FlowFiles are assigned during execution of the mergeContent processor.   FlowFiles that are allocated to "bin(s)" will remain in NiFi heap memory and can not be swapped out to disk.

How FlowFiles are allocated to bins from inbound connections during execution depends on the configured "merge strategy".  

  • Bin-Packing Algorithm - Will allocated FlowFiles to one bin until that bin has reached the configured mins (min num entries and min group size).  If a FlowFile cannot be allocated to a bin (for example doing so would mean exceeding the configured max group size), then the FlowFile will be allocated to a second bin.
  • Defragment - use case specific that is dependent on source FlowFiles having specific attributes about each fragment (fragment.identifier, fragment.index, fragment.count, and segment.original.filename). A new bin is used for each unique fragment.identifier FlowFile attribute value.

For your use case description, you would be using "bin-packing algorithm" merge strategy.

 

When MergeContent executes (0 secs means execute as often as possible), it would look at the unallocated FlowFiles in one of the inbound connections at the exact moment in time and allocate those to an existing bin or bins depending as described previously. At the end of binning the FlowFiles, it looks to see if any bins are eligible to be merged.  MergeContent will merge a bin when any one of the following is true:

  • Both mins have been met for the bin (min num entries AND min group size). Min group size is ignored if blank. 
  • Bin contains all fragments of a fragmented FlowFile (merge strategy = defragment only)
  • Bin has reached configured max bin age (max bin age forces the merge of a bin after configured amount of time, in age starts upon first allocated FlowFile.  This prevents a bin that never reached the configured mins from sitting un-merged indefinitely.
  • If all bins have FlowFiles allocated to them and next unallocated FlowFile can not be allocated to one of these existing bins (oldest bin is forced to merge to free a bin in which that new FlowFile will get allocated).  When merge strategy = defragment, oldest bin of FlowFiles is routed to "failure" relationship instead of forced merge to free a bin.

I suspect that by having only 1 bin, a forced merge is happening in some of you tests.  In others the min(s) are set too low and bin becomes eligible for merge before all FlowFiles have been allocated to the bin. (You reported this worked once and probably because you had all 63 CSVs queued in the inbound connection before you started the mergeContent and other times when it failed all components were running as data streamed through your dataflow).

 

The mergeContent processor has no idea how many FlowFiles should go into a bin (unless merge strategy = defragment).  Also keep in mind that multiple nodes in a NiFi cluster execute dataflows independently of other nodes in the cluster. Each node has its own copy of the flow.json.gz loaded in memory, each node has its own content and FlowFile repositories, and each node executes only on the FlowFile present on that node.  So if you have multiple nodes ingesting data that you want to merge in to a single FlowFile (zip), then the use of "single node" load balanced connection prior to mergeContent processor is correct approach.

 

So now lets look at what configuration would mostly likely work for you:

  1. Merge Strategy  = Bin-Packing Algorithm 
  2. Merge Format = zip
  3. Correlation Attribute = <blank> since you are not trying to divide incoming FlowFiles into different bins.
  4. min number of entries = 100 (since you are trying to make sure all 63 FlowFiles make it in to the bin regardless of how many processor executions it takes to accomplish that)
  5. max number of entries = 1000 (default)
  6. max bin age = 2 mins (set this high enough that you feel confident all FlowFiles will reach inbound connection prior to bin being forced to merge.  default is blank and depending on server resources could mean this processor executes many times per second)
  7. max number of bins = 5 (default) I never recommend having only 1 bin.

 

All other properties are defaults. What this does is allows 2 mins for all 63 of your FlowFiles to get placed in one bin before the max bin age kicks in and forces that bin to merge.  OF course you can adjust this after testing (You have source FlowFiles that are already CSV but you have others that need to be unpacked which may delay them reaching mergeContent even if it is milliseconds. Even that short delay could mean different executions of the mergeContent try to bin and merge). Also single node is important if yoru FlowFiles are spread across all yoru cluster nodes since MergeContent can only merge those on same node.

 

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt

avatar
Contributor

Matt, wow. Thank you for the tremendous detail about how this processor works internally. This fills a gap here on the web that I know will help many in days to come. Yes, I wonder about myself sometimes. Max Bin Age was the one property I did not tinker with, but it makes perfect sense that this is what got me over the goal line—which also shows I'm still learning in my overall understanding of how NiFi works. I set it down to 15 secs, and so far so good. Many, many thanks!

avatar
Super Mentor

@noncitizen 
Welcome to the Community!!!

Apache NiFi is a very large open source project.  Over the 8+ years since it was originally open sources it has grown so large that the download distribution has reached the max allowable size and does not include all components that the community has developed for NiFi.  There are more then 400+ unique components developed for NiFi and growing every year.  Many of these "add-on" components can be found in various open source repositories and NiFi makes it every easy to add them to your NiFi (even hot loading is possible).   As is true with many open source products with lots of contributors, the documentation usually comes after the development and may at times be lacking in detail.  Sometimes this because the originator could not anticipate all the possible use cases for a given component or being so close to the development there is good amount of self inferred knowledge and understanding.  I myself have been working with NiFi for more then 8 years and have been exposed to many use cases, bugs, improvements, etc.   I look forward to seeing you more in the community as you learn and grow and begin to help others using that new found knowledge.