Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Apache Nifi : Merge continuously incoming flowfiles from a processor into a single flowfile

avatar
New Contributor

Hi,

I'm a newbie to nifi and i have a scenario where consume kafka record processor fetches 10 million records(not in a single queue/flowfile). The consume kafka records processor runs for a certain amount of time.I need to merge all these flowfiles  into a single flowfile. The problem is , when the processor first runs, it fetches few flowfiles and these flowfiles need to be held until the same processor finishes running and gets the remaining flowfiles. Is there a way i can do it. I have tried to pull the 10 million records in a single flowfile which seems not possible and not a best practice either. Please help

1 REPLY 1

avatar
Super Mentor

@Kit2020 

 

You will want to use the MergeContent or MergeRecord processor to merge you incoming data from Kafka.

NiFi FlowFiles consists of two parts:
1. FlowFile attributes/metadata   --> This information is held in heap memory
2. FlowFile content . --> held in NiFi's content repository on disk.

 

The merge based processor above will hold the FlowFile attributes of every FlowFile being merged (allocated to a merge bin) in heap memory.  Considering you are talking about merging 10 million FlowFiles, you will not be able to merge all of these using a single merge processor without likely encountering an out of memory condition.  A better approach is to have two merge processor in series with the first merging batches of min 10000 to 20000 and the second merging those into another batch resulting in the merging of all FlowFiles.   

 

A merge processor bin is eligible to be merged when the minimum set values are met.  Meaning if you set min entires/records to 1 and max to 10000, the bin can merge with only one FlowFile. At time of execution the thread grabs what is in inbound connection at that moment in time and allocates it to a bin. Then checks if that bin met mins and if so merges it.  So makes sure your minimums  on your multiple merge processors are set high enough (for example 10000 on first and 1000 on merge processor number 2, result 10,000,000 merged FlowFiles)

 

Now if you do not know the exact number of records you need to merge, set the second merge processor mins to a higher value than you expect to receive.  Then set your "Max bin age" to a value you can accept for data latency.  The max bin age is your force merge setting.  So even if min vales are not reached, a bin that has existed for this length of time will be forced to merge.  Setting bin age on both processor is important.  With high value on second then first.  This allows time for a bin from merge one (typically last created) that may not meet 10000 min to get merged while second is still waiting.

 

Hope this helps,

Matt