Support Questions

Find answers, ask questions, and share your expertise

Nifi Transaction with Kafka (exactly-once)

avatar
New Contributor

I would like to achieve exactly-one semantics with "use Transactions" property from PublishKafkaRecord processor (and Guarantee Replicated Delivery).

Everything's working fine if "use Transactions" is set to false but after switching to true got following error messages and can't publish to kafka anymore.

2019-08-07 11:00:09,099 WARN [Timer-Driven Process Thread-23] o.a.n.p.k.pubsub.PublishKafkaRecord_1_0 PublishKafkaRecord_1_0[id=e6fb1268-3f76-3db3-a7db-575fb72bc4c5] Processor Administratively Yielded for 1 sec due to processing failure
2019-08-07 11:00:09,099 WARN [Timer-Driven Process Thread-23] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PublishKafkaRecord_1_0[id=e6fb1268-3f76-3db3-a7db-575fb72bc4c5] due to uncaught Exception: org.apache.kafka.common.KafkaException: TransactionalId 5fdee084-1995-46ac-847c-bebfcc30a5f5: Invalid transition attempted from state FATAL_ERROR to state INITIALIZING
2019-08-07 11:00:09,099 WARN [Timer-Driven Process Thread-23] o.a.n.c.t.ContinuallyRunProcessorTask 
org.apache.kafka.common.KafkaException: TransactionalId 5fdee084-1995-46ac-847c-bebfcc30a5f5: Invalid transition attempted from state FATAL_ERROR to state INITIALIZING
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:755)
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:749)
    at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:203)
    at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)
    at org.apache.nifi.processors.kafka.pubsub.PublisherLease.beginTransaction(PublisherLease.java:92)
    at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0.onTrigger(PublishKafkaRecord_1_0.java:378)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Can't figure out what's going wrong.

I'm using PublishKafkaRecord_1_0 1.5.0.3.1.1.0-35 and Kafka 1.0 (which supports transactions).

Any help would be much appreciated. Thanks

2 REPLIES 2

avatar
Rising Star

@nicolasgernigon 

I'm kind of curious about this issue. Did you manage to figure this one out?

I tried to look through the Kafka producer code a little bit to get a better understanding of what might have happened but it needs a bit more digging. So far what I have found is that a transaction by the Kafka producer will maintain different states throughout its lifetime, where it would start with an UNINITIALIZED state. However, a starting state can never be FATAL_ERROR and so the attempt by the Kafka producer to try and initialize this transaction and change its state from FATAL_ERROR to INITIALIZING is an invalid one.  This all seems to imply from my understanding that there was another issue that happened beforehand causing this particular transaction to fail in the first place. 

I don't see why the Kafka producer would stop sending any messages all together after this failure, that seems to be an issue in possibly the Kafka client.

Could you try enabling the 'use Transactions' property for your PublishKafkaRecord processor but at the same time also be sure to change the topic name to a new one?  Be sure that this topic is a newly created one that hasn't been written to previously.

Please do let me know if you experience the same issue or not once you make that change.

avatar
Explorer

I'm seeing the same issue.

I can see "Transition from state INITIALIZING to error state FATAL_ERROR" once I set "Use Transactions"="true" and "Delivery Guarantee"="Guarantee Replicated Delivery".