Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi MergeContent Not Merging

avatar

I have 4 Hive queries returning 4 separate flowfiles going in to Merge. I'd like the 4 files to be merged into one, but everything I've tried is not working.

28385-cusersjzgkfbdocumentsedwapp-infocflmerge-issue.png

-all input queues to merge are "Back Pressure Object Threshold = 1"
-require all 4 flowfiles before continuing to merge

4 files go in and 4 files come out?

28386-cusersjzgkfbdocumentsedwapp-infocflmerge-propertie.png

10 REPLIES 10

avatar
Master Guru

It's hard to tell from your flow if you have the 4 flow files you want to merge with their "fragment.*" attributes set correctly. If you use Defragment as a Merge Strategy, then the flow files must share the same value for fragment.count and fragment.id attributes. If those are not set and you just want to take the first 4 you get, set Merge Strategy to Bin-Packing Algorithm.

avatar
I tried Bin-Packing as well and this also did not resolve to 1 output file. Are the fragment attributes something that can be set? Let me look at that.

avatar
Super Mentor
@Wesley Bohannon

The issue you are most likely running in to is caused by only having 1 bin.

https://issues.apache.org/jira/browse/NIFI-4299


Change number of bins to at least 2 and see if the resolves your issue.

Thanks,

Matt

avatar
I previously had the default of 5 and 4, but still same result 4 in and 4 out.

avatar
Super Mentor

@Wesley Bohannon

Is this a NiFi standalone or a NiFi cluster?

If cluster, are the FlowFiles being produced by each of your SelectHiveQL processors being produced on the same node? The MergeContent processor will not merge FlowFiles from different cluster nodes.

Assuming that all FlowFiles are on same NiFi instance, the only way I could reproduce your scenario was:

  1. Each FlowFile had a different value assigned to the "table_name" FlowFile Attribute and Merge Strategy was set to "Bin-Packing Algorithm". This caused each FlowFile to be placed in its own bin. At the end of 5 minutes max bin age, each bin of 1 was merged. If the intent is always to merge one FlowFile from each incoming connection, what is the purpose of setting a "Correlation Attribute Name"
  2. Setting Maximum number of bins to 1 and having 4 source FlowFiles become queued at different times.
  3. The "Defragment" Merge Strategy will bin FlowFiles based on FlowFiles with matching values in the "fragment.identifier" FlowFile Attribute. It will then merge the flowFiles using the "fragment.index" and "fragment.count" attributes. Since you have also specified a correlation attribute, the MergeContent processor will instead use the value associated to that attribute instead of "fragment.identifier" to bin your files. If I have unique values on each FlowFile for "table_name", then each FlowFile ends up in a different bin and are routed to failure right away (if bins set to 1) or after 5 minutes max bin age since not all fragments where present.
  4. The other possibility is that "fragment.count" and "fragment.index" is set to 1 on every FlowFile.

I would stop your MergeContent processor and allow 1 FlowFile to queue in each connection feeding it. Then use the "list queue" capability to inspect the attributes on each queued FlowFile.

What values are associated to each FlowFile for the following attributes:

  • fragment.identifier
  • fragment.count
  • fragment.index
  • table_name

Thank you,

Matt

avatar
This was standalone and all had table_name set same. I was able to resolve it by adding updateattr processors after all hive processors and manually setting the fragment.index to 0-3 and fragment.count to 4. In this way it knew to combine all the fragments for 1 output flowfile. I wish the attribute could be "updated" within the hive processor to avoid yet more processors just to reset attr.

27505-cusersjzgkfbdocumentsedwapp-infocflresolution.png

27506-cusersjzgkfbdocumentsedwapp-infocflupdateattr-reso.png

Thanks Matt and Matt!

avatar

Ugh... I was incorrect. It does NOT wait for all 4 when all processors are running... Back to unresolved. I must've had the merge turned off on the previous run, then turned it on.

avatar
Super Mentor

@Wesley Bohannon

I setup a similar dataflow that is working as expected. The only difference is you made your fragment.index values 0-3 and I made mine 1-4. Is the FlowFile Attribute "table_name" set on all four FlowFiles? Is the value associated to the FlowFile Attribute "table_name" on all 4 FlowFiles exactly the same?

Below is my test flow that worked:

34568-screen-shot-2017-08-29-at-12816-pm.png

As you can see one 4 FlowFile merge was successful and a second is waiting for that 4th file before being merged.

Thanks,

Matt

avatar

@Matt Clarke For some reason, I was unable to reply directly to your comment above... Yes table_name was the same across all inputs. Not sure, why it wasn't working but moved to a different approach to resolve. I basically unioned all the independent hive queries to make it one input.