Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Offset handling in Apache NiFi consumeKafka

avatar

Hello,

We are planning to use NiFi to consume data from Kafka, transform and store the same onto HDFS. We are using Kafka 0.10.0 version hence using consumeKafka. It works perfectly fine when all the properties are set correctly. However we have few questions and it will be great if someone can help with the answers.

1. How NiFi handles the Kafka offset, does it maintain anything by itself or depends upon default topic "__consumer_offsets"?

2. As we set "Offset reset" to "earliest" i.e. to start from the beginning in case there is no initial offset to Kafka or the offset is no longer valid. Apart from the "earliest/ latest/ none" is there any other mechanism to handle the offsets?

3. Can we parallelize NiFi execution? Can it run on multiple servers on a distributed fashion or we need to install it in every single server we want it to run? If it is running only on a single machine how to scale it? Where does NiFi executes its back-end jobs, is it on a local JVM or Jetty Server process or something else?

Thanking in advance! Anshu

1 ACCEPTED SOLUTION

avatar

Hi @Anshuman Ghosh,

It does not answer to all of your questions but you may want to have a look at:

http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka

The offset is handled by Kafka itself, in other words when NiFi consumes data from Kafka the offset is committed to Kafka and NiFi does not store it. That's why there is this property "Offset reset" in case there is no offset on Kafka's side. In this case you only have the values proposed by the processor. It corresponds to Kafka's 'auto.offset.reset' property. You may want to look at Kafka's documentation for more details.

https://kafka.apache.org/documentation/#newconsumerconfigs

Regarding scaling, the link I provided before should give you a good idea. But in short... NiFi does scale very well with Kafka, you can increase the number of threads running in the JVM (Jetty is not involved at all) to consume data from Kafka, but you can also install NiFi in cluster mode to have multiple nodes of NiFi consuming data (and even multiple threads for each one of the nodes of your cluster).

Hope this helps.

View solution in original post

4 REPLIES 4

avatar

Hi @Anshuman Ghosh,

It does not answer to all of your questions but you may want to have a look at:

http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka

The offset is handled by Kafka itself, in other words when NiFi consumes data from Kafka the offset is committed to Kafka and NiFi does not store it. That's why there is this property "Offset reset" in case there is no offset on Kafka's side. In this case you only have the values proposed by the processor. It corresponds to Kafka's 'auto.offset.reset' property. You may want to look at Kafka's documentation for more details.

https://kafka.apache.org/documentation/#newconsumerconfigs

Regarding scaling, the link I provided before should give you a good idea. But in short... NiFi does scale very well with Kafka, you can increase the number of threads running in the JVM (Jetty is not involved at all) to consume data from Kafka, but you can also install NiFi in cluster mode to have multiple nodes of NiFi consuming data (and even multiple threads for each one of the nodes of your cluster).

Hope this helps.

avatar

Thank you @Pierre Villard I will surely have a look.

avatar
New Contributor

Hi,

 

How can i reset Kafka Offset from within NiFi, incase i want to reprocess all the messages within nifi for same consumer group?

 

Thanks

Deepak

avatar
Super Mentor

@dvmishra 

 

It always best to start a new thread/question rather than adding a new question to an existing thread that already has an excepted answer.

As far as being able to reset the offset for a specific consumer group from within NiFi itself, this is not something that can be done via the ConsumeKafka processors.  The offset is not stored by NiFi.  Offsets for each consumer group are stored in Kafka. Would not make make much sense to build such an option in to a NiFi processor if it was possible. Every time the processor executes it would reset in that case which is probably not the desired outcome. There are numerous threads online around reseting the offset in Kafka you may want to explore. Here are a couple:
https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
https://gist.github.com/mduhan/0e0a4b08694f50d8a646d2adf02542fc

If you can figure out how to accomplish reset via a custom script of external command, NiFi does offer several script execution and command line execution processors.  You may be able to use these processors to execute your script to rest the offset in Kafka.

Aside from above, you can change the "group id" (new consumer group) and change the "offset reset" to "earliest".  Then restart processor to start consuming topic form beginning again as a different consumer group.

Hope this helps,
Matt