Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Merge Fileflow files based on time rather than size or number of entries

avatar
Expert Contributor

Hello, I am trying to use the MergeContent processor in Nifi to deliver a single hdfs file for a specified length of time (no maximum number of entries) rather than the options presented in the MergeContent processor: Number of Entries, Group Size.

I see there is something called Max Bin Age, but I am under the impression that Bins are different than Bundles and it doesn't work as I'd hoped when I use Max Bin Size to do what I am trying to do.

To be clear, I am trying to deliver data into separate directories that are divided according to /year/month/day with one file in each day directory. I still have yet to attempt to create the directory structure.

Thanks for any help.

1 ACCEPTED SOLUTION

avatar
Super Mentor
@Eric Lloyd

The MergeContent processor adds FlowFiles from the incoming queue to virtual bins. Once the configured criteria on a bin is met all the FlowFile in that Bin are merged.

So if you want to continue to merge incoming FlowFiles until X amount of time has passed then setting the "Max bin age" property is what you want.

Note: Be careful how many FlowFiles you merge. The FlowFile attributes for all incoming FlowFiles being merged in a single bin live in the NiFi JVM heap memory. Merging to many FlowFiles at once can result in OutOfMemory (OOM) errors. There is no formula for the exact number you can merge per bundle/bin. It depends on how many attributes exist on a FlowFile and how large the values are associated to those attributes.

Thanks,

Matt

View solution in original post

6 REPLIES 6

avatar
Super Mentor
@Eric Lloyd

The MergeContent processor adds FlowFiles from the incoming queue to virtual bins. Once the configured criteria on a bin is met all the FlowFile in that Bin are merged.

So if you want to continue to merge incoming FlowFiles until X amount of time has passed then setting the "Max bin age" property is what you want.

Note: Be careful how many FlowFiles you merge. The FlowFile attributes for all incoming FlowFiles being merged in a single bin live in the NiFi JVM heap memory. Merging to many FlowFiles at once can result in OutOfMemory (OOM) errors. There is no formula for the exact number you can merge per bundle/bin. It depends on how many attributes exist on a FlowFile and how large the values are associated to those attributes.

Thanks,

Matt

avatar
Expert Contributor

So is it correct to say that a bin and a bundle are the same thing?

avatar
Super Mentor

@Eric Lloyd

If you set an attribute on all your FlowFiles with the a value of "<year/month/day>" for the FlowFile, you can use that attribute as your "Correlation Attribute Name" in the mergeContent processor to make sure that only FlowFile from the same day are added to a bin.

avatar
Expert Contributor

Also, after viewing your answer, Im wondering if processing 24 hours worth of data and having it stored on the JVM heap memory would be too much. Probably. This is unfortunate. When we were using Flume, it would create a .tmp file that would be constantly gathering the data into it rather than storing it in memory so you could make them for as large as your wanted. This is not an appealling part of Nifi.

avatar
Expert Contributor

Its interesting because I am trying your methods of having the bin only complete according to a period of time and neither are working. I have added an attribute called hour which retrieves the yyyy-MM-dd-HH and saves it. Then I tell the MergeProcessor Correlation Attribute Name property to group according to "hour". I can see the actual Attribute when I view the files in the queue and the hour attribute looks correct ... it almost seems like the value in Minimum Group Size is overriding the Correlation Attribute Name. Is there a way to tell the MergeProcessor to ONLY use the Correlation Attribute Name to judge bin size and ignore the number of entries and Group Size?

Attached is a screenshot of my MergeProcessor config values and a screenshot of the value of my "hour" attribute.

13355-screen-shot-2017-03-07-at-23853-pm.png

13356-screen-shot-2017-03-07-at-23938-pm.png

avatar
Super Mentor
@Eric Lloyd

With the above configuration, it would only take 1 FlowFile to be assigned to a bin before that bin was marked eligible for merging. There is nothing there that force the processor to wait for other FlowFiles to be allocated to a bin before merge, Both minimums are set to 1 FlowFile and 0 Bytes. In order to actually get 100,000 Flowfiles (this is high and may trigger OOM), there would need to be 100,000 Flowfiles all with the same correlation attribute value in the incoming connection queue at the time the processor runs. This is almost certainly not going to be the case.

The Max bin age simply sets an exist strategy here. It will merge a bin regardless if minimums have been met if the bin age has reached this value.

You may want to set more reasonable values for your mins and also consider using multiple mergeContent processors in series to step up to the final merged number you are looking for.

Thanks,

Matt