Support Questions

Find answers, ask questions, and share your expertise

NiFi mergeontent max file size handle

avatar
Expert Contributor

Hi,

 

   I'm currently using merge_content processor as merge avro files from two sources like kafka_consumer processor and fetchHDFS file. While converting avro file into one with merge content processor yesterday around 680MB trying to convert but processor drop the file and join with new files and i can't able to recover that data also, because content_repository backup i limit. Can you please help me out for this use_case processor how much size can be good or is there any setting needs to modifiy in nifi.properties. 

1 ACCEPTED SOLUTION

avatar
Master Mentor

@varun_rathinam 

 

Can you please elaborate on "processor drop the file and join with new files"?

And also "content_repository backup i limit"?  <-- Are you referring to the "nifi.content.repository.archive.max.retention.period" and "nifi.content.repository.archive.max.usage.percentage" configuration settings in the nifi.properties file?


Also sharing a screenshot of your current MergeContent processor's configuration along with more details around your use case.  What result are you seeing now and what is the desired result?

The MergeContent processor takes multiple FlowFiles from same NiFi node and merges the content of those FlowFiles based on the processor's configuration in to one or more new FlowFile's per node.  The processor cannot merge FlowFiles residing on different NiFi nodes in an NiFi cluster into one FlowFile.

 

FlowFiles from the inbound connection queue are allocated to bins based on the following configuration properties:
Correlation Attribute Name  <-- (Optional) when used, only FlowFile with same value in the configured FlowFile attribute will be placed in same bin.

Maximum Number of Entries  <-- Maximum number of FlowFiles that can be allocated to a single bin before a new bin is used.

Maximum Group Size  <-- (Optional) Maximum cumulative size of the content that can be allocated to a bin

 

When a "bin" is eligible to be merged is controlled by these configuration properties:

Minimum Number of Entries  <-- If at end of thread execution (after all FlowFiles from inbound connection have been allocated to one or more bins) the number of FlowFiles allocated to a bin meets this min and meets configured min group size, the FlowFiles in that bin will be merged.

Minimum Group Size  <-- Same as above

Max Bin Age  <-- A bin that has not reached or exceeded both above min values will merge once the bin has had FlowFiles in it for this amount of time 

Maximum number of Bins  <-- If FlowFile have been allocated to every bin and another bin is needed, the oldest bin will be forced to merge to free a bin.

 

It is possible that one or both min values are never reached if a Max bin setting is reached first.  This means that because of max additional FlowFiles can not be allocated to that bin and the only setting that will force that bin to merge is "Max Bin Age" or you run out of free bins.


As far as bin Max values, NiFi really does not care about content size as it streams the merged FlowFiles content in to a new FlowFile and does not hold that content in memory.  NiFi can experience Out OF Memory (OOM) conditions if the number of FlowFiles Max is set too high since all the attributes for every FlowFile currently allocated to bin(s) is held in heap memory.  NiFi's allocated heap memory is set in the nifi.properties configuration file. So a Max number of entries should be limited to 10000 (but this varies based on memory availability and number and size of attributes on your FlowFiles.  You can use multiple MergeContent processors in series (one after another) to merge multiple merged FlowFiles in to even larger merged FlowFiles if desired.

 

Hope this helps with understanding the MergeContent processor,

Matt

View solution in original post

3 REPLIES 3

avatar
Master Mentor

@varun_rathinam 

 

Can you please elaborate on "processor drop the file and join with new files"?

And also "content_repository backup i limit"?  <-- Are you referring to the "nifi.content.repository.archive.max.retention.period" and "nifi.content.repository.archive.max.usage.percentage" configuration settings in the nifi.properties file?


Also sharing a screenshot of your current MergeContent processor's configuration along with more details around your use case.  What result are you seeing now and what is the desired result?

The MergeContent processor takes multiple FlowFiles from same NiFi node and merges the content of those FlowFiles based on the processor's configuration in to one or more new FlowFile's per node.  The processor cannot merge FlowFiles residing on different NiFi nodes in an NiFi cluster into one FlowFile.

 

FlowFiles from the inbound connection queue are allocated to bins based on the following configuration properties:
Correlation Attribute Name  <-- (Optional) when used, only FlowFile with same value in the configured FlowFile attribute will be placed in same bin.

Maximum Number of Entries  <-- Maximum number of FlowFiles that can be allocated to a single bin before a new bin is used.

Maximum Group Size  <-- (Optional) Maximum cumulative size of the content that can be allocated to a bin

 

When a "bin" is eligible to be merged is controlled by these configuration properties:

Minimum Number of Entries  <-- If at end of thread execution (after all FlowFiles from inbound connection have been allocated to one or more bins) the number of FlowFiles allocated to a bin meets this min and meets configured min group size, the FlowFiles in that bin will be merged.

Minimum Group Size  <-- Same as above

Max Bin Age  <-- A bin that has not reached or exceeded both above min values will merge once the bin has had FlowFiles in it for this amount of time 

Maximum number of Bins  <-- If FlowFile have been allocated to every bin and another bin is needed, the oldest bin will be forced to merge to free a bin.

 

It is possible that one or both min values are never reached if a Max bin setting is reached first.  This means that because of max additional FlowFiles can not be allocated to that bin and the only setting that will force that bin to merge is "Max Bin Age" or you run out of free bins.


As far as bin Max values, NiFi really does not care about content size as it streams the merged FlowFiles content in to a new FlowFile and does not hold that content in memory.  NiFi can experience Out OF Memory (OOM) conditions if the number of FlowFiles Max is set too high since all the attributes for every FlowFile currently allocated to bin(s) is held in heap memory.  NiFi's allocated heap memory is set in the nifi.properties configuration file. So a Max number of entries should be limited to 10000 (but this varies based on memory availability and number and size of attributes on your FlowFiles.  You can use multiple MergeContent processors in series (one after another) to merge multiple merged FlowFiles in to even larger merged FlowFiles if desired.

 

Hope this helps with understanding the MergeContent processor,

Matt

avatar
Expert Contributor

@MattWho Thanks for the reply. Sure here i have attached my existing merge_content processor configuration and single node ec2 with 8GB of RAM. can you please clarify if flowfile size gets increase even TB's of data shall i processed with same approach multiple merge_content processor or if it is good with multi-node or increase single node maximum memory availability. 

JVM Configuration :

# JVM memory settings
java.arg.2=-Xms2048m
java.arg.3=-Xmx2048m

 

mergecontent_new.png

avatar
Master Mentor

@varun_rathinam 

 

Observations from your configuration:

1. You are using "Defragment" merge strategy which tells me that somewhere upstream in your dataflow you are splitting some FlowFile in to fragments and then you are using this processor to merge those fragments back in to the original FlowFile.  Correct?  When using Defragment you can not use multiple MergeContent processors in series as i mentioned earlier because the defragment strategy is expecting to find all fragments from the fragment count before merging them.
2. When using the defragment strategy it is the fragment.count attribute on the FlowFiles that dictates when the bin should be merged and not the min number of entries. 
3. Each FlowFile that has a unique value in the fragment.identifier will be allocated to a different bin. Setting the number of bins to "1" will never work no matter which merge strategy you choose to use.  When the MergeContent processor executes it first checks to see if a free bin is available (if not it merges oldest bin or routes oldest bins FlowFiles to failure in case of Defragment to free up a bin), then it looks at the current FlowFiles in the inbound connection at that exact moment in time and starts allocating them to existing bins or new bins.  So at a minimum you should always have at least "2" bins.  The default is "5" bins.  Having multiple bins does not mean that all those available bins will be used.
4. I see you changed Maximum Number of Entries from default 1000 to 100000.  Is this because you know each of the FlowFiles you split will produce up to 100,000 FlowFiles?  As i mentioned the  ALL FlowFiles allocated to bins have their attributes held in heap memory.  Adding to that... If you have multiple bins being filled because you have unique fragment.identifiers being defragmented, you could have even more than 100,000 FlowFiles worth of attributes in heap memory.

So your NiFi JVM heap memory being set at only 2GB may lead you to hitting Out Of Memory (OOM) conditions with such a dataflow design.  Also want to add that where ever you are doing the original splitting of your FlowFile in your dataflow will also have an impact on heap memory because the FlowFile Attributes for every FlowFile being produced during the split process is held in heap memory until every new split FlowFile being produced is committed to a downstream connection.  NiFi connections between processors have swapping enabled by default to help reduce heap usage when queues get large, but same does not apply within the internals of a processors execution.

 

As i mentioned before, the MergeContent does not load FlowFile content in heap memory, so the size of your FlowFiles does not impact heap here.


So you really want to step back and look at your use case again and ask yourself: "do I really need to split my source FlowFile and merge it back in to the original FlowFile to satisfy my use case?"

NiFi has numerous record based processors for working with records avoiding the need to split them in many use cases.

Hope this helps,

Matt