Created 03-06-2017 06:17 PM
Hello, I am trying to use the MergeContent processor in Nifi to deliver a single hdfs file for a specified length of time (no maximum number of entries) rather than the options presented in the MergeContent processor: Number of Entries, Group Size.
I see there is something called Max Bin Age, but I am under the impression that Bins are different than Bundles and it doesn't work as I'd hoped when I use Max Bin Size to do what I am trying to do.
To be clear, I am trying to deliver data into separate directories that are divided according to /year/month/day with one file in each day directory. I still have yet to attempt to create the directory structure.
Thanks for any help.
Created 03-06-2017 06:23 PM
The MergeContent processor adds FlowFiles from the incoming queue to virtual bins. Once the configured criteria on a bin is met all the FlowFile in that Bin are merged.
So if you want to continue to merge incoming FlowFiles until X amount of time has passed then setting the "Max bin age" property is what you want.
Note: Be careful how many FlowFiles you merge. The FlowFile attributes for all incoming FlowFiles being merged in a single bin live in the NiFi JVM heap memory. Merging to many FlowFiles at once can result in OutOfMemory (OOM) errors. There is no formula for the exact number you can merge per bundle/bin. It depends on how many attributes exist on a FlowFile and how large the values are associated to those attributes.
Thanks,
Matt
Created 03-06-2017 06:23 PM
The MergeContent processor adds FlowFiles from the incoming queue to virtual bins. Once the configured criteria on a bin is met all the FlowFile in that Bin are merged.
So if you want to continue to merge incoming FlowFiles until X amount of time has passed then setting the "Max bin age" property is what you want.
Note: Be careful how many FlowFiles you merge. The FlowFile attributes for all incoming FlowFiles being merged in a single bin live in the NiFi JVM heap memory. Merging to many FlowFiles at once can result in OutOfMemory (OOM) errors. There is no formula for the exact number you can merge per bundle/bin. It depends on how many attributes exist on a FlowFile and how large the values are associated to those attributes.
Thanks,
Matt
Created 03-06-2017 06:26 PM
So is it correct to say that a bin and a bundle are the same thing?
Created 03-06-2017 06:26 PM
If you set an attribute on all your FlowFiles with the a value of "<year/month/day>" for the FlowFile, you can use that attribute as your "Correlation Attribute Name" in the mergeContent processor to make sure that only FlowFile from the same day are added to a bin.
Created 03-06-2017 06:28 PM
Also, after viewing your answer, Im wondering if processing 24 hours worth of data and having it stored on the JVM heap memory would be too much. Probably. This is unfortunate. When we were using Flume, it would create a .tmp file that would be constantly gathering the data into it rather than storing it in memory so you could make them for as large as your wanted. This is not an appealling part of Nifi.
Created on 03-07-2017 07:41 PM - edited 08-19-2019 01:16 AM
Its interesting because I am trying your methods of having the bin only complete according to a period of time and neither are working. I have added an attribute called hour which retrieves the yyyy-MM-dd-HH and saves it. Then I tell the MergeProcessor Correlation Attribute Name property to group according to "hour". I can see the actual Attribute when I view the files in the queue and the hour attribute looks correct ... it almost seems like the value in Minimum Group Size is overriding the Correlation Attribute Name. Is there a way to tell the MergeProcessor to ONLY use the Correlation Attribute Name to judge bin size and ignore the number of entries and Group Size?
Attached is a screenshot of my MergeProcessor config values and a screenshot of the value of my "hour" attribute.
Created 01-16-2018 05:07 PM
With the above configuration, it would only take 1 FlowFile to be assigned to a bin before that bin was marked eligible for merging. There is nothing there that force the processor to wait for other FlowFiles to be allocated to a bin before merge, Both minimums are set to 1 FlowFile and 0 Bytes. In order to actually get 100,000 Flowfiles (this is high and may trigger OOM), there would need to be 100,000 Flowfiles all with the same correlation attribute value in the incoming connection queue at the time the processor runs. This is almost certainly not going to be the case.
The Max bin age simply sets an exist strategy here. It will merge a bin regardless if minimums have been met if the bin age has reached this value.
You may want to set more reasonable values for your mins and also consider using multiple mergeContent processors in series to step up to the final merged number you are looking for.
Thanks,
Matt