Created on 05-09-2017 03:50 PM - edited 08-17-2019 06:36 PM
Hello, here is our setup.
We have 4 servers that are sending data through PublishKafka_0_10 to a different topic each.
We have 1 receiving server that uses ConsumeKafka_0_10 in FOUR flows, one for each sending server for each topic.
(see screenshots)
We are trying to parse out the events by changing the filename to 2017_05_09_15_topic.log
We are getting the error shown in the screenshot related to commit being failed in ConsumeKafka since the group has already rebalanced and it suggests increasing session timeout or reducing maximum size of batches returned with max.poll.records.
We are under the impression that:
session timeout = Kafka configurable property: offsets.commit.timeout.ms which we changed from 5000 -> 25000
and
max.poll.records is the ConsumeKafka property: Max Poll Records which we have changed from 10,000 -> 1,000
We have also tried tuning the Max Uncommitted Time both higher than 3 secs and lower than 3 secs.
What we are seeing is we are sometimes missing data, sometime getting data duplicates and the filenames are odd... it will have the correct format for them but then it will be missing data that will be put into a file in the format: 2017_05_09__topic.log (which is missing the minutes)
I realize theres probably alot of issues here ... the inquiry of this question is more based around understanding the error we received, finding out if our modifications to the parameters it mentions are the correct parameters we are modifying (which I suspect they aren't) and possible solutions.
Thank you.
Created 05-09-2017 09:16 PM
First, its important to understand how consuming from Kafka works in NiFi. The ConsumeKafka section of this post was my attempt to explain it: http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka
So if you have a single NiFi node with 1 concurrent task, then the consumer group has 1 consumer.
The error means that a consumer pulled some data and went to commit offsets to Kafka, but Kafka hasn't heard from the consumer in longer than the configured session timeout, so Kafka is saying sorry you can't do that because the partition was already reassigned to another consumer, or is possible unassigned if there are no consumers alive.
The most common case of this happening in NiFi was when back-pressure occurred between ConsumeKafka and the next processor... back-pressure causes the preceeding processor to not execute until the back-pressure threshold is reduced. So if ConsumeKafka wasn't allowed to run for a couple of minutes, then the back-pressure is cleared and it runs again, you then get this error.
The was addressed in a ticket for the upcoming Apache NiFi 1.2.0 release:
Created 05-09-2017 05:32 PM
Its important to note here that when we only collected from two hosts, it worked just fine.
Created 05-09-2017 09:16 PM
First, its important to understand how consuming from Kafka works in NiFi. The ConsumeKafka section of this post was my attempt to explain it: http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka
So if you have a single NiFi node with 1 concurrent task, then the consumer group has 1 consumer.
The error means that a consumer pulled some data and went to commit offsets to Kafka, but Kafka hasn't heard from the consumer in longer than the configured session timeout, so Kafka is saying sorry you can't do that because the partition was already reassigned to another consumer, or is possible unassigned if there are no consumers alive.
The most common case of this happening in NiFi was when back-pressure occurred between ConsumeKafka and the next processor... back-pressure causes the preceeding processor to not execute until the back-pressure threshold is reduced. So if ConsumeKafka wasn't allowed to run for a couple of minutes, then the back-pressure is cleared and it runs again, you then get this error.
The was addressed in a ticket for the upcoming Apache NiFi 1.2.0 release:
Created 05-10-2017 02:08 PM
Thanks Bryan that article was very helpful in understanding.
We resolved this error by increasing our Maximum Timer Driven Thread Count and Maximum Event Drive Thread Count in the general settings of the Nifi instance which is consuming.
We are currently only testing so each topic only has 1 partition and for each topic we are using a ConsumeKafka which has 5 Concurrent Tasks so according to your article, we should have a surplus of tasks.
The error from this issue has been resolved but now we experience data loss when retrieving from more than 2 hosts. I guess that is the topic for another question though.
Created 12-10-2018 04:40 PM
Thank you @Bryan Bende for this response
My ConsumeKafka_0_10 works fine and i obviously don't have an errors like @Eric Lloyd but i can't see my data in the processor.
Is that what we call back-pressure ? could this can be resolved with increasing the Maximum Timer Driven Thread Count and Maximum Event Drive Thread Count ?
what does that means ? Could you please give some suggesion.
Thanks you in advance.