Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Retrieving from multiple Kafka topics through Nifi causes data misplacement/loss

avatar
Expert Contributor

Hello, here is our setup.

We have 4 servers that are sending data through PublishKafka_0_10 to a different topic each.

We have 1 receiving server that uses ConsumeKafka_0_10 in FOUR flows, one for each sending server for each topic.

(see screenshots)

We are trying to parse out the events by changing the filename to 2017_05_09_15_topic.log

We are getting the error shown in the screenshot related to commit being failed in ConsumeKafka since the group has already rebalanced and it suggests increasing session timeout or reducing maximum size of batches returned with max.poll.records.

We are under the impression that:

session timeout = Kafka configurable property: offsets.commit.timeout.ms which we changed from 5000 -> 25000

and

max.poll.records is the ConsumeKafka property: Max Poll Records which we have changed from 10,000 -> 1,000

We have also tried tuning the Max Uncommitted Time both higher than 3 secs and lower than 3 secs.

What we are seeing is we are sometimes missing data, sometime getting data duplicates and the filenames are odd... it will have the correct format for them but then it will be missing data that will be put into a file in the format: 2017_05_09__topic.log (which is missing the minutes)

I realize theres probably alot of issues here ... the inquiry of this question is more based around understanding the error we received, finding out if our modifications to the parameters it mentions are the correct parameters we are modifying (which I suspect they aren't) and possible solutions.

Thank you.

15246-screen-shot-2017-05-09-at-114755-am.png

15247-screen-shot-2017-05-09-at-114747-am.png

15248-screen-shot-2017-05-09-at-114736-am.png

15249-screen-shot-2017-05-09-at-94408-am.png

1 ACCEPTED SOLUTION

avatar
Master Guru

First, its important to understand how consuming from Kafka works in NiFi. The ConsumeKafka section of this post was my attempt to explain it: http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka

  • The Kafka client automatically assigns partitions to consumers, and 1 partition can only be consumed by 1 consumer.
  • The # of consumers = The # of NiFi nodes X the # of concurrent tasks on ConsumeKafka

So if you have a single NiFi node with 1 concurrent task, then the consumer group has 1 consumer.

The error means that a consumer pulled some data and went to commit offsets to Kafka, but Kafka hasn't heard from the consumer in longer than the configured session timeout, so Kafka is saying sorry you can't do that because the partition was already reassigned to another consumer, or is possible unassigned if there are no consumers alive.

The most common case of this happening in NiFi was when back-pressure occurred between ConsumeKafka and the next processor... back-pressure causes the preceeding processor to not execute until the back-pressure threshold is reduced. So if ConsumeKafka wasn't allowed to run for a couple of minutes, then the back-pressure is cleared and it runs again, you then get this error.

The was addressed in a ticket for the upcoming Apache NiFi 1.2.0 release:

https://issues.apache.org/jira/browse/NIFI-3189

View solution in original post

4 REPLIES 4

avatar
Expert Contributor

Its important to note here that when we only collected from two hosts, it worked just fine.

avatar
Master Guru

First, its important to understand how consuming from Kafka works in NiFi. The ConsumeKafka section of this post was my attempt to explain it: http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka

  • The Kafka client automatically assigns partitions to consumers, and 1 partition can only be consumed by 1 consumer.
  • The # of consumers = The # of NiFi nodes X the # of concurrent tasks on ConsumeKafka

So if you have a single NiFi node with 1 concurrent task, then the consumer group has 1 consumer.

The error means that a consumer pulled some data and went to commit offsets to Kafka, but Kafka hasn't heard from the consumer in longer than the configured session timeout, so Kafka is saying sorry you can't do that because the partition was already reassigned to another consumer, or is possible unassigned if there are no consumers alive.

The most common case of this happening in NiFi was when back-pressure occurred between ConsumeKafka and the next processor... back-pressure causes the preceeding processor to not execute until the back-pressure threshold is reduced. So if ConsumeKafka wasn't allowed to run for a couple of minutes, then the back-pressure is cleared and it runs again, you then get this error.

The was addressed in a ticket for the upcoming Apache NiFi 1.2.0 release:

https://issues.apache.org/jira/browse/NIFI-3189

avatar
Expert Contributor

Thanks Bryan that article was very helpful in understanding.

We resolved this error by increasing our Maximum Timer Driven Thread Count and Maximum Event Drive Thread Count in the general settings of the Nifi instance which is consuming.

We are currently only testing so each topic only has 1 partition and for each topic we are using a ConsumeKafka which has 5 Concurrent Tasks so according to your article, we should have a surplus of tasks.

The error from this issue has been resolved but now we experience data loss when retrieving from more than 2 hosts. I guess that is the topic for another question though.

avatar
Explorer

Thank you @Bryan Bende for this response

My ConsumeKafka_0_10 works fine and i obviously don't have an errors like @Eric Lloyd but i can't see my data in the processor.

Is that what we call back-pressure ? could this can be resolved with increasing the Maximum Timer Driven Thread Count and Maximum Event Drive Thread Count ?

what does that means ? Could you please give some suggesion.

Thanks you in advance.