Created 05-22-2024 04:42 AM
Hi guys, I have some problem with my NIFI flow-I download records from Kafka, I split it to have single record per split-here i have round robin load balance.Next i add id attribute and then i divide the flow to two paths, on one I do some actions with http iinokes to have some new attributes.Next I clear the context on this path and I would like merge it back with this other path to have this json context and this attributes in one.I use merge content processoe for this, merging using the id attribute that I assigned at the beginning. And now I have a problem because it works but in the case of a larger number of records, eg. 200 it doesn't because I have an error BIN_MANAGER_FULL. Minimum Number of Entries is 2 and Maximum Number of Entries i have 2 too, while Maximum number of Bins is 10 and i would not like to increase it. Is there any way to bypass this and make it work for a larger number of records?For example apply the limit so that it merges only when there is space or wait for space?I will be grateful for tips on how to solve this problem
Created 05-22-2024 02:10 PM
@Sofia71 Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our NiFi experts @SAMSAL @MattWho who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,Created 05-23-2024 05:44 AM
@Sofia71
Sharing the specific processors used in your dataflow and the configuration of some of them to include your MergeContent processor configuration may help clarify some your specific setup.
You are using a ConsumeKafka processor to consume messages (multi-record content) from a Kafka topic. I am assuming that each of these consumed messages only contains two single records so the the SplitRecord processor only produces 2 FlowFiles for every FlowFile it splits?
Immediately after SplitRecord processors you have configured the "Round Robin" load -balancing strategy on the outbound connection. This probably going to be your first issue here. Each node in a NiFi cluster runs its own identical copy of the flow against only the FlowFiles present on that one specific node. Each node has not access to or ability read FlowFiles present on other nodes. So if 1 flowFile produced by splitting of a record is on node1 and the other FlowFile is on Node2, the downstream MergeContent is not going to be able to merge them back together.
So first question is whether you even need to setup load-balancing on the connection since you are consuming your messages from a Kafka topic.
1. How many nodes in your NiFi Cluster?
2. How many partitions on the Kafka topic from which you are consuming?
The consumeKafka processor uses a "Group ID" to identify a consumer group. so Every node in your NiFi Cluster that is running this consumeKafka processor is a member of the same consumer group. So lets assume your source Kafka Topic has 3 partitions and your NiFi cluster has 3 nodes. What would happen here is each node's consumeKafka is assigned to one of those partitions. This means that each node is consuming a uniques set of messages from the topic. So no need to then load balance.
Assuming above is not what you are doing, then the proper load-balancing strategy to use would be "Partition by attribute" which use an attribute on the FlowFile to make sure that FlowFiles with the same attribute value get sent to same node.
Now on to the MergeContent processor.
MergeContent upon execution reads from the inbound connection queue and starts assigning FlowFiles to bins. It does not search the inbound connection for matches. It simply reads in order listed and works its way down the list. First FlowFiles is allocated to a bin, then next FlowFile is if can't be allocated to same bin is placed in second bin, and so on. If a flowFile is allocated to every bin and then the next FlowFiles does not belong to any of those bins, MergeContent force merges the oldest bin to free up a bin for the new FlowFile allocation. There is no way to change how this works as the processor is not designed to parse through all connection FlowFiles looking for matches before allocating to bins. That would not exhibit very good performance characteristics. What is your concern with increasing number of bins?
This might be a use case for wait/notify processors. So after you split the record in to two FlowFiles, one FlowFiles is currently routed to an invokeHTTP for further attribute enrichment and the other FlowFile ie routed directly to MergeContent? If so, so this means that the FlowFiles that don't get additional processing will queue up much sooner at MergeContent. But if you add a Notify processor after invokeHTTP processor and the Wait processor in the other FlowFile path before MergeContent you could control the release of FlowFiles to the mergeContent processor.
This is just one suggestion you could try, but i would start first by making sure you are handling the distribution of your split FlowFiles correctly.
Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 05-23-2024 02:26 PM
@Sofia71 Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
Regards,
Diana Torres,