Created 06-13-2017 09:03 PM
As part of our application, we are consuming events from Kafka and we are doing data transformation before sending the events to downstream system. When we are running load as part of Performance testing, we noticed we are not getting even distribution across all the nodes through ConsumeKafka. We have 9 nodes in the cluster. I have attached the document that has snapshots of Kafka properties, count of events on all nodes from Splunk and Flow file status history from ConsumeKafka in NiFi. Trying to understand the behaviour here. And we have also noticed the below error intermittently
o.a.n.p.kafka.pubsub.ConsumeKafka org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
1) Why is the data not distributed across the nodes?
2) Why do we see the error intermittently?
Created 06-13-2017 09:25 PM
You have 9 NiFi nodes all running a ConsumeKafka processor configured with 3 concurrent tasks. That totals 27 consumers. Does the Kafka topic you are consuming from have 27 partitions? There can only be one consumer per partition on a topic. If you have more consumers then partitions, some of those consumers will never get any data. This likely explains the load distribution you are seeing.
Whenever a new consumer is added or existing consumer is removed a rebalance is triggered.
You will achieve your best performance when the number of partitions equals the number of consumers.
Thanks,
Matt
Created 06-13-2017 09:25 PM
You have 9 NiFi nodes all running a ConsumeKafka processor configured with 3 concurrent tasks. That totals 27 consumers. Does the Kafka topic you are consuming from have 27 partitions? There can only be one consumer per partition on a topic. If you have more consumers then partitions, some of those consumers will never get any data. This likely explains the load distribution you are seeing.
Whenever a new consumer is added or existing consumer is removed a rebalance is triggered.
You will achieve your best performance when the number of partitions equals the number of consumers.
Thanks,
Matt
Created 06-14-2017 05:09 PM
Thanks much Matt for the quick response. We do not have 27 partitions and what you have mentioned above makes sense. We are now evaluating on modifying configuration and see how things behave. I will update if we still see this after the changes based on your response.
Created 06-14-2017 09:48 PM
Follow up question, so we have 10 partitions in Kafka and we have 9 NiFi nodes. We are deciding on the best way to configure this. What would help us here is if we know how NiFi scales? Does it try to accommodate consumer from all the nodes (try to distribute) or spawns resource in the same node because it is available? With what happened, it does look like the later. But just wanted to know if that is how it is intended to behave.
Would you suggest we configure one concurrent task per node and leave it at that?
Created on 06-15-2017 01:07 PM - edited 08-17-2019 09:23 PM
Nodes in a NiFi cluster have no idea about the existence of other nodes in the cluster. Nodes simply send heath and status heartbeat messages to the currently elected cluster coordinator. As such, each node runs its own copy of the flow.xml.gz file and works on its own set of FlowFiles.
So if you have 9 NiFi nodes, each node will be running its own copy of the consumeKafka processor. With 1 concurrent task set on the processor, each node will establish one consumer connection to the Kafka topic. So you would have 9 consumers for 10 partitions. So in order to consume from all partitions you will need to configure 2 concurrent tasks. This will give you 18 consumers for 10 partitions. Kafka will assign a partition connections within this pool of 18 consumers. Ideally you would see 1 consumer on 8 of your nodes and 2 on one. The data to your niFi cluster will not be evenly balanced because of the in-balance in number of consumers versus partitions.
As far as your Kafka Broker rebalance goes.... Kafka will trigger a rebalance if a consumer disconnects and another consumer connects. Things that can cause a consumer to disconnect include:
1. Shutting down one or more of your NiFi nodes.
2. Connection timeout between a consumer and a Kafka broker.
- Triggered by network issues between a NiFi node and Kafka broker
- Triggered by scheduling Consume Kafka run schedule for longer then configured timeout. for example a 60 second run schedule and 30 second timeout.
- Triggered by backpressure being applied on the connection leading off the consumeKafka causing ConsumeKafka to not run until backpressure is gone. *** This trigger was fixed in NiFi 1.2, but i don't knwo what version you are running.
I you feel I have addressed your original question, please mark this answer as accepted ( ) to close out this thread.
Thank you,
Matt