I'm running Apache Kafka and Flume-1.8 on two different RHEL servers. Kafka topic has no partitioning, one topic, for which multiple messages have been produced from the kafka console. Flume agent configured to read messages from kafka topic (kafka source) and save them in hdfs (hdfs sink). Agent's channel is memory. When I produce messages on kafka console they go through the flume into hdfs successfully. That works well. But when I'd like to re-consume All messages from kafka broker - I stop Flume, re-configure the flume agent by adding the "*.auto.offset.reset = earliest" (as flume-1.8 doc advises) to its kafka source and start flume again then the messages are Not re-consuming from kafka topic at all. Am I missing something ? May be something else need to be done on the flume or/and kafka side ? Thanks. Vadim Dzyubam
The property kafka.consumer.auto.offset.reset comes into picture when there is no initial offset in Kafka or if the current offset does not exist any more on the server. As per the flume docs:
One workaround that I can think of is changing the kafka.consumer.group.id and restart the agent. Kindly let me know if that helps.
Thank you very much dbains for the answer and you are right if I change the value for the kafka.consumer.group.id and restart the flume agent it will allow to re-consume/re-read all messages from kafka topic.
(1)But it is inconvenient and might require the manual intervention which might brake the automation process. Let's assume the real time processing in a system with the events going through the Kafka => Flume => HDFS for archiving/audit purposes. And the system should prevent data loss and provide the reconciliation and recovery process. Suppose the scheduled and running periodically script identified the data loss in the system in chain Flume-HDFS. In this case Flume agent need to be stopped, the group.id in config file need to be changed and the flume agent need to be started again to re-consume messages with new group.id. It seems should be something more easier and effective in flume. Or may be it is but I'm not aware of it. If so, please let me know.
(2)Can you please let me know in which section of the Flume User Guide 1.8 there is the statement about "The property kafka.consumer.auto.offset.reset comes into picture when there is no initial offset in Kafka or if the current offset does not exist any more on the server" ?
Hi @Vadim Dzyuban,
I'm facing the same issue - What is finally your recommendation/best practice in order to re-consume the whole kafka queue in a specific topic ?
Should I change the kafka.consumer.group.id , then restart the flume agent with this specific setting:
*.auto.offset.reset = earliest
In advance, thank you.