Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

What are some strategies to merge the content of flowfiles coming from different streams at the same time?

avatar
New Contributor

We currently have 2 ReplaceText processors (and we'll potentially add more in the future) that each contain a list of file names in their Replacement Value attribute, such that the corresponding flowfiles contain this list of file names in their content. We want to know if there are recommended strategies to merge these flowfiles coming from different streams, but I need the merging to happen at the same time because the output will be used to count the total number of files, and the latter will be passed to a Wait processor for some other functionality of the flow that uses the Wait/Notify pattern (the total number of files is assigned to "Target Signal Count" in the Wait processor).

The main problem we're facing is that we don't know in advance which ReplaceText processors will run in the first place (could be one or the other, or both), so we don't know which flowfiles we have to merge, but we need a way to merge the flowfiles of the ReplaceText processors that are running.

One way I've solved this is by sending the flowfiles to a MergeContent processor, and setting the "Minimum Number of Entries" to 2, and the "Max Bin Age" to 10 sec; with this approach, the flowfiles in the bin will wait for 10 sec before being merged, regardless of whether we have 2 flowfiles (2 lists) or not in the bin. The idea is to add enough buffer time for the other flowfiles to hit MergeContent.

I'm concerned that the above approach is not robust enough as it has the potential to fail as the 10 sec buffer is arbitrary and may not account for any network-related delays and other variables...

Do you have any fool-proof way of solving this issue that doesn't involve the MergeContent approach described above or hardcoding the total number of files in the Wait processor used in the Wait/Notify pattern?

1 ACCEPTED SOLUTION

avatar
Super Mentor

@c i

I am not clear from your description what your flow is doing.
- What does the original FlowFile content look like before these replaceText processors?
- How are the replaceText processors configured?


The description you have of how the mergeContent processor works is not accurate.

The MergeContent processor will merge a bin when any ONE of the following as occurred:
1. A given bin meets the configured minimum configured values for "number of entries" and "group size" (Bin-Packing Algorithm Merge Strategy)

2. A given bin contains all fragments of a fragmented FlowFile (Defragment Merge Strategy)

3. A bin has reached the configured "max bin age". Bin age starts when first FlowFile is allocated to a bin. The max bing age property exists to keep bins from staying around forever when the conditions outlined in 1 and 2 above are never met.

4. Processor runs out of bins. If FlowFiles have been allocated to each available bin already and a new FlowFile does not meet criteria to be addd to one of these existing bins, the oldest bin will be merged in order to free a bin to use for this new FlowFile. (This typically occurs when using the Correlation Attribute Name property and the source FlowFiles have more unique attribute values than bins allocated).


The MergeContent processor will not always wait the configured max bin age before merging.

Also keep in mind that with a NiFi cluster, each NiFi node is running the MergeContent processor independently of the other nodes and can only merge FlowFiles on the same node. So while an inbound connection queue to the MergeContent processor may show 2 queued FlowFiles, each of those FlowFiles may exists on different nodes and thus would not be merged together in to one FlowFile.


Consider the "Max bin age" as the max latency you are willing to allow in your flow. So that a bin is merged even if it does not meet min criteria in the allotted time. So even if you set this to 30 minutes, a bin that meets the min criteria in 2 mins will get merged at 2 minutes.


More details about your overall dataflow is probably needed to offer alternative suggestions here.


Thank you,

Matt


View solution in original post

5 REPLIES 5

avatar
Super Mentor

@c i

I am not clear from your description what your flow is doing.
- What does the original FlowFile content look like before these replaceText processors?
- How are the replaceText processors configured?


The description you have of how the mergeContent processor works is not accurate.

The MergeContent processor will merge a bin when any ONE of the following as occurred:
1. A given bin meets the configured minimum configured values for "number of entries" and "group size" (Bin-Packing Algorithm Merge Strategy)

2. A given bin contains all fragments of a fragmented FlowFile (Defragment Merge Strategy)

3. A bin has reached the configured "max bin age". Bin age starts when first FlowFile is allocated to a bin. The max bing age property exists to keep bins from staying around forever when the conditions outlined in 1 and 2 above are never met.

4. Processor runs out of bins. If FlowFiles have been allocated to each available bin already and a new FlowFile does not meet criteria to be addd to one of these existing bins, the oldest bin will be merged in order to free a bin to use for this new FlowFile. (This typically occurs when using the Correlation Attribute Name property and the source FlowFiles have more unique attribute values than bins allocated).


The MergeContent processor will not always wait the configured max bin age before merging.

Also keep in mind that with a NiFi cluster, each NiFi node is running the MergeContent processor independently of the other nodes and can only merge FlowFiles on the same node. So while an inbound connection queue to the MergeContent processor may show 2 queued FlowFiles, each of those FlowFiles may exists on different nodes and thus would not be merged together in to one FlowFile.


Consider the "Max bin age" as the max latency you are willing to allow in your flow. So that a bin is merged even if it does not meet min criteria in the allotted time. So even if you set this to 30 minutes, a bin that meets the min criteria in 2 mins will get merged at 2 minutes.


More details about your overall dataflow is probably needed to offer alternative suggestions here.


Thank you,

Matt


avatar
Super Mentor

@c i

*** Community Forum Tip: Try to avoid starting a new answer in response to an existing answer. Instead use comments to respond to existing answers. There is no guaranteed order to different answer which can make it hard following a discussion.

*** Community Forum Tip: If you want user to be notified about your response, try using the <@name> tag on your response.


1. I believe this approach is more robust than using Max Bin Age/max latency, what do you think? -- I agree this is more robust. Also the fact that by adding the UpdateAttribute between your MergeContent and the two ReplaceText processors you have only one inbound connection to your MergeContent processor. On each execution of the MergeContent processor it will bin FlowFiles from only one inbound connection. It then round robins all inbound connection. Having only on inbound connection will improve efficiency. Without the updateAttribute processor I would have suggested using a "Funnel" to reduce the two connections to just one.


2. would you suggest then to set the "Execution" of MergeContent to Primary node instead of All nodes? --> NO processor that has inbound connections to it should ever be configured for "Primary node" only execution. The elected "Primary node" in a NiFi cluster can change at any time which could lead to FlowFiles queued in connections to processors that are no longer scheduled to run on the previously elected primary node. I do see however that your GenerateFlowFile processors are not configured for "Primary node" execution. This means every time they are scheduled to run every node in your NiFi cluster is going to run that processor and produce FlowFile(s). So if intent is to produce only 1 FlowFile per execution, you would want those generateFlowFiles processors running on primary node only.


In other scenarios where your Source data comes in to NiFi via other means that may be distributed across all your NiFi nodes, you can use the load-balanced capability available on connections to redistribute those FlowFiles in a variety of ways. One of the methods is "Single node" which is great when you need to get all queued FlowFiles moved over to a single node's mergeContent processor.

Thank you,

Matt


If you found the assistance I provided via this answer addressed your question, please take a moment to login in and click the "ACCEPT" link below my answer.

avatar
New Contributor
I just wanted to take a moment to say thank you for the excellent explanation and for making this all much clearer to me. Have a great rest of day!

Chris

avatar
New Contributor

First of all, thank you for your response Matt.

Sorry I wasn't clear enough in my MergeContent description, but it's as you said; I'm using the Bin-Packing Algorithm merge strategy, and I've set the "Minimum Number of Entries" equal to the amount of ReplaceText processors I have (2 in this case), such that the MergeContent processor will merge a given bin when it meets the configured values for "Minimum Number of Entries" (2) and "group size" (Minimum Group Size is set to 1 B and I haven't set a Maximum Group Size). And with respect to the third point you outlined, I've set the "Max Bin Age" attribute to 10 sec such that if the bin doesn't meet the configured values for "Minimum Number of Entries" and "group size" within 10 sec then it will merge the flowfiles currently allocated to the bin.

For more context here's a template of the situation I'm describing, where the second screenshot shows the configurations for the MergeContent processor:



108421-1556887964086.png


108431-1556887992727.png

For "List1" and "List2" above, as an example, the "Replacement Value" attributes contain the following respectively:

abc1
abc2

abc3

and

abc2

abc3

abc4


I've since found a way to avoid adding arbitrary max latency and have opted to use a global variable instead that will be incremented after each ReplaceText processor using UpdateAttribute (and connecting UpdateAttribute to MergeContent directly), and I'll set "Minimum Number of Entries" to that global variable. In this way, each time a ReplaceText "stream" is executed, the global variable is incremented signifying that we're expecting one more list, and the MergeContent processor will wait for all the flowfiles required (each containing a different list of course) to merge the bin, assuming of course that I don't assign a value to "Max Bin Age".

I believe this approach is more robust than using Max Bin Age/max latency, what do you think?

Also, regarding this comment:

"Also keep in mind that with a NiFi cluster, each NiFi node is running the MergeContent processor independently of the other nodes and can only merge FlowFiles on the same node. So while an inbound connection queue to the MergeContent processor may show 2 queued FlowFiles, each of those FlowFiles may exist on different nodes and thus would not be merged together in to one FlowFile. "

would you suggest then to set the "Execution" of MergeContent to Primary node instead of All nodes?


Thank you for your time and I look forward to your response,


Chris

avatar
New Contributor

 

Hi,

 

I've tried to set a global variable for "Minimum Number of Entries" but it doesn't work. How did you manage to do this?


I have n number of flowfiles and I need to wait for all of them to arrive at "MergeContent" processor so they can be merged. I only know the number of flowfiles at runtime so I keep updating a  counter set as a global variable, but then I tried to set the "Minimum Number of Entries" of "MergeContent" processor to that variable but it doesn't work.
I don't know how long I need wait to get all the flowfiles so I can't use that either.

I don't know what to do, can someone help?