Support Questions

Find answers, ask questions, and share your expertise

Problem with merge content in Nifi

avatar
New Contributor

Hi i have problem with NIFI-I download json records from Kafka, I split to have single records-here i have round robin strategy, I add attributes, then I give the id attribute and divide the flow into two paths, on one I do some actions with invokes http to reach some data and transform it to attributes, I add attributes, clear the context and I would like to merge it back to keep these jsons from one paths and attributes on the other. I use the merge content processor for this, merging using the id attribute that I assigned at the beginning. And now I have a problem because it works, but in the case of a larger number of records, e.g. 200, it doesn't because I have an error BIN_MANAGER_FULL. Minimum Number of Entries is 2 and Maximum Number of Entries is 2, while Maximum number of Bins is 10 and I would not like to increase it. Is there any way to bypass this and make it work for a larger number of records? For example, apply a limit so that it merges only when there is space or divede content and wait? I have no idea how to reslove this problem.

3 REPLIES 3

avatar
Community Manager

@donaldo71 Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @MattWho @mburgess  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Master Mentor

@donaldo71 

You pasted the exact same question as @Sofia71 here:
https://community.cloudera.com/t5/Support-Questions/Problem-with-merge-BIN-MANAGER-FULL/td-p/388211. to which i have already responded with below:

Sharing the specific processors used in your dataflow and the configuration of some of them to include your MergeContent processor configuration may help clarify some your specific setup.

You are using a ConsumeKafka processor to consume messages (multi-record content) from a Kafka topic. I am assuming that each of these consumed messages only contains two single records so the the SplitRecord processor only produces 2 FlowFiles for every FlowFile it splits?

Immediately after SplitRecord processors you have configured the "Round Robin" load -balancing strategy on the outbound connection.   This probably going to be your first issue here.  Each node in a NiFi cluster runs its own identical copy of the flow against only the FlowFiles present on that one specific node.  Each node has not access to or ability read FlowFiles present on other nodes.  So if 1 flowFile produced by splitting of a record is on node1 and the other FlowFile is on Node2, the downstream MergeContent is not going to be able to merge them back together.  

So first question is whether you even need to setup load-balancing on the connection since you are consuming your messages from a Kafka topic. 
1. How many nodes in your NiFi Cluster?
2. How many partitions on the Kafka topic from which you are consuming?
The consumeKafka processor uses a "Group ID" to identify a consumer group. so Every node in your NiFi Cluster that is running this consumeKafka processor is a member of the same consumer group.   So lets assume your source Kafka Topic has 3 partitions and your NiFi cluster has 3 nodes.  What would happen here is each node's consumeKafka is assigned to one of those partitions.  This means that each node is consuming a uniques set of messages from the topic.  So no need to then load balance.

Assuming above is not what you are doing, then the proper load-balancing strategy to use would be "Partition by attribute" which use an attribute on the FlowFile to make sure that FlowFiles with the same attribute value get sent to same node.

Now on to the MergeContent processor.
MergeContent upon execution reads from the inbound connection queue and starts assigning FlowFiles to bins.  It does not search the inbound connection for matches.  It simply reads in order listed and works its way down the list.  First FlowFiles is allocated to a bin, then next FlowFile is if can't be allocated to same bin is placed in second bin, and so on.   If a flowFile is allocated to every bin and then the next FlowFiles does not belong to any of those bins, MergeContent force merges the oldest bin to free up a bin for the new FlowFile allocation.  There is no way to change how this works as the processor is not designed to parse through all connection FlowFiles looking for matches before allocating to bins.  That would not exhibit very good performance characteristics.  What is your  concern with increasing number of bins?

This might be a use case for wait/notify processors.   So after you split the record in to two FlowFiles, one FlowFiles is currently routed to an invokeHTTP  for further attribute enrichment and the other FlowFile ie routed directly to MergeContent?  If so, so this means that the FlowFiles that don't get additional processing will queue up much sooner at MergeContent.  But if you add a Notify processor after invokeHTTP processor and the Wait processor in the other FlowFile path before MergeContent you could control the release of FlowFiles to the mergeContent processor.

This is just one suggestion you could try, but i would start first by making sure you  are handling the distribution of your split FlowFiles correctly.

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Community Manager

@donaldo71 Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.  Thanks.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: