Member since
08-07-2019
1
Post
0
Kudos Received
0
Solutions
08-07-2019
01:41 PM
I would like to achieve exactly-one semantics with "use Transactions" property from PublishKafkaRecord processor (and Guarantee Replicated Delivery). Everything's working fine if "use Transactions" is set to false but after switching to true got following error messages and can't publish to kafka anymore. 2019-08-07 11:00:09,099 WARN [Timer-Driven Process Thread-23] o.a.n.p.k.pubsub.PublishKafkaRecord_1_0 PublishKafkaRecord_1_0[id=e6fb1268-3f76-3db3-a7db-575fb72bc4c5] Processor Administratively Yielded for 1 sec due to processing failure
2019-08-07 11:00:09,099 WARN [Timer-Driven Process Thread-23] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PublishKafkaRecord_1_0[id=e6fb1268-3f76-3db3-a7db-575fb72bc4c5] due to uncaught Exception: org.apache.kafka.common.KafkaException: TransactionalId 5fdee084-1995-46ac-847c-bebfcc30a5f5: Invalid transition attempted from state FATAL_ERROR to state INITIALIZING
2019-08-07 11:00:09,099 WARN [Timer-Driven Process Thread-23] o.a.n.c.t.ContinuallyRunProcessorTask
org.apache.kafka.common.KafkaException: TransactionalId 5fdee084-1995-46ac-847c-bebfcc30a5f5: Invalid transition attempted from state FATAL_ERROR to state INITIALIZING
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:755)
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:749)
at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:203)
at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)
at org.apache.nifi.processors.kafka.pubsub.PublisherLease.beginTransaction(PublisherLease.java:92)
at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0.onTrigger(PublishKafkaRecord_1_0.java:378)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Can't figure out what's going wrong. I'm using PublishKafkaRecord_1_0 1.5.0.3.1.1.0-35 and Kafka 1.0 (which supports transactions). Any help would be much appreciated. Thanks
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache NiFi