Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Apache Nifi - Merge Content

avatar
Rising Star

This is my flow

SplitText -> De-Deuplicator (here, I use the processor to De-Dupe the records within a File) -> MergeContent

Here, my question is regarding the working of MergeContent. Say suppose I keep watch on a folder where 6 files of 2GB each fall. Will the output of mergecontent maintain the input files as is? I ask this question because I see various optional properties in MergeContent which says "if not specified there is no maximum". What exactly does this mean? Is there any default maximum which cut off the files?

1 ACCEPTED SOLUTION

avatar
Super Mentor

@bala krishnan

When the MergeContent processor runs (based on run schedule configuration), it looks at the FlowFiles on the incoming queue and groups them in bins (think of these as logical containers) based on processor properties configurations.

You have two Merge Strategies to choose from:

1. Defragmentation: This strategy requires that FlowFiles have the following attributes set on them.

------------------------------

9042-screen-shot-2016-11-01-at-84900-am.png

*** Each "bundle" will go in to its own bin.

-----------------------------

2. Bin-Packing Algorithm (default): FlowFiles are simply placed in bins based on the criteria discussed below.

Here are the properties that affect how the bins FlowFile are placed in are handled:

9041-screen-shot-2016-11-01-at-84308-am.png

--------------

*** FlowFile content is never truncated. For example: If a FlowFile where its content is larger then the configured "maximum Group Size" exists on incoming queue will simply become a merged file of one and passed directly to the merged relationship.

--------------

*** No maximum simply means that there is no ceiling on the "maximum number of entries" or "maximum group size". Does this mean a bin will never get merged? NO. The currently queued FlowFiles on an incoming connection are placed in bin(s). If at the end of being placed in those bins, any of the bins has reached or exceeded the "minimum number of entries" or "minimum group size", that bin(s) is merged.

--------------

*** Don't forget the "Max bin age" acts as your trump card. A bin will be merged if it has been around this long without being merged regardless of the other settings.

--------------

Thanks,

Matt

View solution in original post

6 REPLIES 6

avatar
Super Mentor

@bala krishnan

When the MergeContent processor runs (based on run schedule configuration), it looks at the FlowFiles on the incoming queue and groups them in bins (think of these as logical containers) based on processor properties configurations.

You have two Merge Strategies to choose from:

1. Defragmentation: This strategy requires that FlowFiles have the following attributes set on them.

------------------------------

9042-screen-shot-2016-11-01-at-84900-am.png

*** Each "bundle" will go in to its own bin.

-----------------------------

2. Bin-Packing Algorithm (default): FlowFiles are simply placed in bins based on the criteria discussed below.

Here are the properties that affect how the bins FlowFile are placed in are handled:

9041-screen-shot-2016-11-01-at-84308-am.png

--------------

*** FlowFile content is never truncated. For example: If a FlowFile where its content is larger then the configured "maximum Group Size" exists on incoming queue will simply become a merged file of one and passed directly to the merged relationship.

--------------

*** No maximum simply means that there is no ceiling on the "maximum number of entries" or "maximum group size". Does this mean a bin will never get merged? NO. The currently queued FlowFiles on an incoming connection are placed in bin(s). If at the end of being placed in those bins, any of the bins has reached or exceeded the "minimum number of entries" or "minimum group size", that bin(s) is merged.

--------------

*** Don't forget the "Max bin age" acts as your trump card. A bin will be merged if it has been around this long without being merged regardless of the other settings.

--------------

Thanks,

Matt

avatar

I have a doubt @Matt Clarke
Does NiFi always merge the bins after 'Minimum Number of Entries'? If yes, what's the use for 'Maximum Number of Entries'?

My doubt originates from the following use case. I have to merge flowfiles based on the following two conditions:

A) No. of flowfiles in the bin reaches 6000

B) Bin age crosses 5 min and at least one flowfile in the bin

What should be the settings for this case?

Thanks!

avatar
Super Mentor

@Carrick

NiFi will merge a bin that has met minimum as part of a thread execution.

Lets assume a steady stream of FlowFiles is entering the incoming connection queue feeding the MergeContent processor. When MergeContent runs (obtains a thread) in looks at incoming queue and grabs from the active queue only those Flowfiles which are there at that exact moment in time. That thread will not know about any FlowFiles that entered that queue after that moment in time. Upon placing those Flowfiles in one or more bins that same thread will asses if a bin has satisfied the minimum requirements and if so the bin will be merged.

Now consider a MergeContent processor with a run schedule of 0 sec (default). It will be requesting tasks as fast as possible, which means each executed thread may only see as few as one new FlowFile in the incoming Queue when it runs. Which means you would end up with Merged Flowfiles that consist of only one FlowFIle.

Now lets assume you have MergeContent only run every 1 minute. In between those two executions of 10,000 new FlowFiles queue on the incoming Connection. On next run the mergeContent thread now sees 10,000 new Flowfiles. It will allocate 6,000 to 1 bin (because you set a max) and play the other 4,000 in another bin. At end of the thread both those bins are eligible to be merged because they both met the min, but as you can see 1 did end up with 6,000 FlowFiles in it.

- If the intent is never to have 1 FlowFile in a merge, do not set min to 1.

- If the Flow feeding the MergeContent is slow, change the run schedule so it does not run as often allowing more FlowFiles to queue between executions.

- If setting min settings to any value beyond 1, make sure you set max bin age. This setting makes sure that a bin will eventually be merged even if it never meets the minimum configured values.

Hope this clarifies how this processor works.

Thanks,

Matt

avatar

Thanks a lot Matt! This very well clears the merging logic.

However, currently I am facing a strange issue.

Suppose I have an attribute called 'trigger.type' on the flowfile, which can be 'mouse' or 'keyboard'.

Now, I want to merge flowfiles with same 'trigger.type' values so I have the Correlation Attribute Name set to ${trigger.type}

However, it's merging flowfiles with different values for this attribute into the same flowfile.

I can't comprehent what am I missing here?

These are my settings for the 'MergeContent' processor:

Merge Strategy

Bin-Packing Algorithm

Merge Format

Binary Concatenation
Attribute Strategy
Keep Only Common Attributes
Correlation Attribute Name
${trigger.type}
Metadata Strategy
Ignore Metadata
Minimum Number of Entries
6000
Maximum Number of Entries
6000
Minimum Group Size
0 B
Maximum Group Size No value set Max Bin Age
300 seconds
Maximum number of Bins
15
Delimiter Strategy
Text
Header
[
Footer
]
Demarcator
,
Compression Level
1
Keep Path
false
Tar Modified Time
${file.lastModifiedTime}

avatar

Okay, so apparently, Correlation Attribute Name does not support expression language.

Changing, Correlation Attribute Name from

${trigger.type}

to

trigger.type

solved the issue.

Thanks still!

avatar
Super Mentor

@Carrick

That property does support Expression Language, but you need to understand what this property is doing with the EL you provided.

${trigger.type} 
<-- This is an expression language statement. It is asking NiFi to check for the existence of an attribute on the incoming FlowFile, assigned to a Process group, found in the NiFi registry file, found in a JVM property and finally found in NiFi user env variable. (check in that order, first match ends search.) If not found nothing is returned. If found the value assigned to that attribute is returned.

So in your case what is being returned is 'mouse' or 'keyboard'. So now the merge content processor is using mouse or keyboard as the correlation attribute name. And in you case there is no attribute named mouse or keyboard so all the flowfiles with null values are ending up in the same bin.

As soon as you removed the EL and simply defined the attribute to use for correlation it worked.

Thanks,

Matt