Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

nifi: how to handle with mergeContent processor?

avatar
Contributor

I have more than 2000 flowFile with same name how can i configure mergeContent so that it could covert all this data in one file?

1 ACCEPTED SOLUTION

avatar
Master Mentor
@sally sally

If you do not know the exact number of files you expect to Merge, you must consider FlowFile latency.

42715-screen-shot-2017-11-22-at-95304-am.png

Consider the above MergeContent processor configuration:

1. "Correlation Attribute Name" <-- This property is used to determine which bin inbound queued FlowFiles are assigned to what bin. (You need to make sure you have enough bins to accommodate the number of unique filenames you expect to deal with.)

2. "Minimum Number of Entires" <-- This is the expected min number of FlowFiles to be allocated to a bin before it is considered eligible to be merged. (Since you do not know how many files will have the same filename (2,000+), I set this to 10,000). NiFi will continue to try and Flowfiles with the same filename until 10,000 is reached or bin has existed for "max bin age".

3. "Maximum Number of Entries" <-- If inbound queue more then 20,000 flowfiles with same filename, this property will trigger bin to merge at 20,000 and a new bin started for that filename. 20,000 is generally considered a good ceiling here to prevent excessive heap usage during merge.

4. "Max Bin Age" <-- This is your force merge property. No matter how many Flowfiles have been assigned to a given bin, that bin will be merged once the bin has existed for this amount of time. You set this to the max latency you are willing to accept for this dataflow.

Thank you,

Matt

View solution in original post

5 REPLIES 5

avatar
Master Mentor
@sally sally

If you do not know the exact number of files you expect to Merge, you must consider FlowFile latency.

42715-screen-shot-2017-11-22-at-95304-am.png

Consider the above MergeContent processor configuration:

1. "Correlation Attribute Name" <-- This property is used to determine which bin inbound queued FlowFiles are assigned to what bin. (You need to make sure you have enough bins to accommodate the number of unique filenames you expect to deal with.)

2. "Minimum Number of Entires" <-- This is the expected min number of FlowFiles to be allocated to a bin before it is considered eligible to be merged. (Since you do not know how many files will have the same filename (2,000+), I set this to 10,000). NiFi will continue to try and Flowfiles with the same filename until 10,000 is reached or bin has existed for "max bin age".

3. "Maximum Number of Entries" <-- If inbound queue more then 20,000 flowfiles with same filename, this property will trigger bin to merge at 20,000 and a new bin started for that filename. 20,000 is generally considered a good ceiling here to prevent excessive heap usage during merge.

4. "Max Bin Age" <-- This is your force merge property. No matter how many Flowfiles have been assigned to a given bin, that bin will be merged once the bin has existed for this amount of time. You set this to the max latency you are willing to accept for this dataflow.

Thank you,

Matt

avatar

@sally sally @Matt Clarke Is there a way that i can specify a bin should wait for any minimum amount of time at least (min Bin Age). I use a split processor to split incoming flow files , enrich each of the split and finally merges them back to the original flow file. My process of enriching might be delayed so i want to wait till all the splits comes together, i cant use Defragment strategy as i may not have all the splits (i want to reject some splits based on some criteria). Can you please help here?

avatar
Master Mentor

@Jose Paul

-

The Defragment strategy will bin FlowFiles based on the fragment.identifier attributes set on the FLowFiles by the split processor in your flow.

-

A bin will only be merged when utilizing this strategy if one of the following occurs:

1. all the fragments for a give fragment.identifier are currently in the bin.

2. max bin age has been reached. Max bin age property works like exit strategy in the event a bin never contains all required pieces. such as 1 or more fragments never arriving at processor. It will force a merge or failure (depending on strategy in use)

3. Not enough free bins. For example you have 5 bins yet your incoming connections contains more then 5 unique fragment identifier values. This will force the oldest bin to merge or route to failure to free a bin for the next fragment identifier. Strategy for bin number should be number of unique values +1. You always want to have at least 1 free bin.

-

Thank you,

Matt

avatar

@Matt Clarke: Thanks for the response. Can you please clarify if i use Bin Packing algorithm correlation attribute is filename,minEntries 1 and no MaxBinAge, number of MaxBins: 500 then when will be the merging happens?

avatar
Master Mentor

@Jose Paul

-

A bin would be eligible for merge with only 1 FlowFile in it since you set minEntries to 1.

-

When the Processor get scheduled to execute (based on configured run schedule and scheduling strategy), It will look at one of possible many incoming connections and look at only the queued FlowFile at that exact moment in time. It will then bin those FlowFiles based on configuration. So it multiple FlowFiles happen to exist in that connection with sam filename attribute value, they will be placed in same bin. At completion of of placing those FlowFiles in bins, the bins are evaluated if they are eligible to be merged. In your case since minEntries is 1 all bins with 1 or more FlowFiles would be merged.

-

If you run schedule is set to run as fast as possible (Timer Driven with run schedule of 0 sec), it may be reading the inbound connection so fast that it only contains 1 or just a few FlowFiles per execution.

-

The other scenario is an inbound connection with over 500 queued FlowFiles at time of execution. If we assume there are more than 500 FlowFiles with unique values assigned to the filename attribute, each would end up be placed in new bin (correlation attribute config). As soon as bin 500 has a FlowFile assigned to it and MergeContent tries to bin unique filename number 501, it has no available bins left so it forces the merging of the oldest bin to free a bin.

-

Thank you,

Matt