I would like to achieve 'exactly-once' semantics i.e. No duplicate of messages when writing records into Kafka cluster.
As far as I know, Kafka uses an 'idempotent' concept to achieve 'exactly-once' since version 0.11.
I have tried to enable 'Transaction' option in PublishKafka processor, however message duplication still happens.
So, I would like to know
1. Does PublishKafka_0_11 / PublishKafka_1_0 support exactly-once semantics?
2. Besides of de-duplication on consumer side, are there any way to remove duplicates msgs?
Both the 0.11 and 1.0 processors support transactions which is how you would get exactly once semantics.
How are you determining that there are duplicates?
Are the duplicates coming from the producer producing the same message twice, or from the consumer consuming the same message twice?
Is there a specific error that occurs that causes duplicates? Even without transactions you should only see duplicates if there was an error that caused a producer to retry or a consumer to reconsume the same message.
My flow is simple: ListFile > FetchFile > PublishKafka_0_11 (with transaction=true, message demarcator=\n)
To test whether it is duplicate, I :
1. Check offset of the topic
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:49092 --topic MY_TOPIC --time -1
2. Run kafka-console-consumer.sh with param "--from-beginning" and compare the result with the source file.
My source file contains 9.26 million rows. Using method 1, I got offset=18460643, far more than row count. Using method 2, I got ~18million messages consumed.
p.s. The topic is newly created when running the flow.
There's no error occurred on the flow that I found duplicate.
But in general, I occasionally encounter ProducerFencedException from kafka when using the PublishKafka producer.
FYI, from consumer side, if I use consumeKafka processor with 'Honor Transaction=true', I cannot get any flowfile. Only if 'Honor Transaction=false', I can get that 18 million duplicated messages.
@Michael LY Thanks for all the info..
Do you happen to have the stacktrace from nifi-app.log from a time when you saw ProducerFencedException? It would be helpful to dig into that and see why that is occurring.
In your test scenario, do you have a single stand-alone NiFi node, or is NiFi clustered? If it is clustered then how many nodes?
The reason I am asking is because the fact that you have 9 million rows in your input file and then 18 million in the topics seems meaningful in that it is twice the amount. I was wondering if the file is somehow being published twice, like two nodes of the cluster both publish the file to the same topic.
The above case is run on a standalone nifi.
I can confirm that it is not twice the amount, as I set failure queue to be 'auto terminated'
Also, the fact that ConsumeKafka can only see the messages when "Honor Transactions = false" means that the transaction that was used to publish the messages was never committed (or maybe there wasn't a transaction).
Basically, setting Honor Transactions to true means the consumer will only see messages for committed transactions.
I just re-run above case again. This time, I found that for those duplicated topics., exception is found in Kafka logs:
ERROR [Replica Manager on Broker 0]: Error processing append operation on partition MY_TOPIC-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 0 (request epoch), 1 (server epoch)
Do I configure anything wrong in my Kafka server?