Created 02-21-2017 12:30 PM
Hello,
We are planning to use NiFi to consume data from Kafka, transform and store the same onto HDFS. We are using Kafka 0.10.0 version hence using consumeKafka. It works perfectly fine when all the properties are set correctly. However we have few questions and it will be great if someone can help with the answers.
1. How NiFi handles the Kafka offset, does it maintain anything by itself or depends upon default topic "__consumer_offsets"?
2. As we set "Offset reset" to "earliest" i.e. to start from the beginning in case there is no initial offset to Kafka or the offset is no longer valid. Apart from the "earliest/ latest/ none" is there any other mechanism to handle the offsets?
3. Can we parallelize NiFi execution? Can it run on multiple servers on a distributed fashion or we need to install it in every single server we want it to run? If it is running only on a single machine how to scale it? Where does NiFi executes its back-end jobs, is it on a local JVM or Jetty Server process or something else?
Thanking in advance! Anshu
Created 02-21-2017 01:20 PM
Hi @Anshuman Ghosh,
It does not answer to all of your questions but you may want to have a look at:
http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka
The offset is handled by Kafka itself, in other words when NiFi consumes data from Kafka the offset is committed to Kafka and NiFi does not store it. That's why there is this property "Offset reset" in case there is no offset on Kafka's side. In this case you only have the values proposed by the processor. It corresponds to Kafka's 'auto.offset.reset' property. You may want to look at Kafka's documentation for more details.
https://kafka.apache.org/documentation/#newconsumerconfigs
Regarding scaling, the link I provided before should give you a good idea. But in short... NiFi does scale very well with Kafka, you can increase the number of threads running in the JVM (Jetty is not involved at all) to consume data from Kafka, but you can also install NiFi in cluster mode to have multiple nodes of NiFi consuming data (and even multiple threads for each one of the nodes of your cluster).
Hope this helps.
Created 02-21-2017 01:20 PM
Hi @Anshuman Ghosh,
It does not answer to all of your questions but you may want to have a look at:
http://bryanbende.com/development/2016/09/15/apache-nifi-and-apache-kafka
The offset is handled by Kafka itself, in other words when NiFi consumes data from Kafka the offset is committed to Kafka and NiFi does not store it. That's why there is this property "Offset reset" in case there is no offset on Kafka's side. In this case you only have the values proposed by the processor. It corresponds to Kafka's 'auto.offset.reset' property. You may want to look at Kafka's documentation for more details.
https://kafka.apache.org/documentation/#newconsumerconfigs
Regarding scaling, the link I provided before should give you a good idea. But in short... NiFi does scale very well with Kafka, you can increase the number of threads running in the JVM (Jetty is not involved at all) to consume data from Kafka, but you can also install NiFi in cluster mode to have multiple nodes of NiFi consuming data (and even multiple threads for each one of the nodes of your cluster).
Hope this helps.
Created 02-21-2017 04:11 PM
Thank you @Pierre Villard I will surely have a look.
Created 09-21-2020 02:11 AM
Hi,
How can i reset Kafka Offset from within NiFi, incase i want to reprocess all the messages within nifi for same consumer group?
Thanks
Deepak
Created 09-21-2020 05:53 AM
It always best to start a new thread/question rather than adding a new question to an existing thread that already has an excepted answer.
As far as being able to reset the offset for a specific consumer group from within NiFi itself, this is not something that can be done via the ConsumeKafka processors. The offset is not stored by NiFi. Offsets for each consumer group are stored in Kafka. Would not make make much sense to build such an option in to a NiFi processor if it was possible. Every time the processor executes it would reset in that case which is probably not the desired outcome. There are numerous threads online around reseting the offset in Kafka you may want to explore. Here are a couple:
https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
https://gist.github.com/mduhan/0e0a4b08694f50d8a646d2adf02542fc
If you can figure out how to accomplish reset via a custom script of external command, NiFi does offer several script execution and command line execution processors. You may be able to use these processors to execute your script to rest the offset in Kafka.
Aside from above, you can change the "group id" (new consumer group) and change the "offset reset" to "earliest". Then restart processor to start consuming topic form beginning again as a different consumer group.
Hope this helps,
Matt