Created on 08-21-2018 08:43 PM - edited 08-17-2019 05:50 PM
We’ve 2 clusters that were built for a proof of concept, one with HDP 2.6.2.0, the other with HDF 3.1.2.0, Both were kerberized using a MIT KDC on the same realm. The kerberization was done entirely through the Ambari wizard without issues.
After this, the PublishKafka_1_0 and Consume_Kafka_1_0 processors started giving out errors. The Publish_Kafka processor is configured as shown below.
The HDFS and Hive processors worked fine after similar setups, only Kafka is not working.
Here’s the nifi-app.log snippet with the error:
2018-08-21 17:13:05,009 WARN [kafka-producer-network-thread | producer-3] org.apache.kafka.clients.NetworkClient [Producer clientId=producer-3] Connection to node -1 could not be established. Broker may not be available.<br>2018-08-21 17:13:05,862 WARN [kafka-producer-network-thread | producer-3] org.apache.kafka.clients.NetworkClient [Producer clientId=producer-3] Connection to node -1 could not be established. Broker may not be available.<br>2018-08-21 17:13:06,933 WARN [kafka-producer-network-thread | producer-3] org.apache.kafka.clients.NetworkClient [Producer clientId=producer-3] Connection to node -1 could not be established. Broker may not be available.<br>2018-08-21 17:13:07,760 WARN [kafka-producer-network-thread | producer-3] org.apache.kafka.clients.NetworkClient [Producer clientId=producer-3] Connection to node -1 could not be established. Broker may not be available.<br>2018-08-21 17:13:08,713 WARN [kafka-producer-network-thread | producer-3] org.apache.kafka.clients.NetworkClient [Producer clientId=producer-3] Connection to node -1 could not be established. Broker may not be available.<br>2018-08-21 17:13:09,406 ERROR [Timer-Driven Process Thread-2] o.a.n.p.kafka.pubsub.PublishKafka_1_0 PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] Failed to send StandardFlowFileRecord[uuid=b8d0d94b-e241-448d-9c00-5b7aaca393b6,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default, section=1], offset=46931, length=334],offset=0,name=448674237495637,size=332] to Kafka: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.<br>org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.<br>2018-08-21 17:13:09,410 WARN [kafka-kerberos-refresh-thread-hdf_user1@---] o.a.k.c.security.kerberos.KerberosLogin [Principal=hdf_user1@---]: TGT renewal thread has been interrupted and will exit.<br>2018-08-21 17:13:09,411 ERROR [Timer-Driven Process Thread-2] o.a.n.p.kafka.pubsub.PublishKafka_1_0 PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default, section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332] is already marked for transfer; rolling back session: {}<br>org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default, section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332] is already marked for transfer<br> at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3073)<br> at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3055)<br> at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3086)<br> at org.apache.nifi.controller.repository.StandardProcessSession.transfer(StandardProcessSession.java:1902)<br> at org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0.onTrigger(PublishKafka_1_0.java:416)<br> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)<br> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)<br> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)<br> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)<br> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)<br> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)<br> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)<br> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)<br> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)<br> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)<br> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)<br> at java.lang.Thread.run(Thread.java:745)<br>2018-08-21 17:13:09,412 ERROR [Timer-Driven Process Thread-2] o.a.n.p.kafka.pubsub.PublishKafka_1_0 PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] failed to process session due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default, section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332] is already marked for transfer: {}<br>org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default, section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332] is already marked for transfer<br> at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3073)<br> at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3055)<br> at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3086)<br> at org.apache.nifi.controller.repository.StandardProcessSession.transfer(StandardProcessSession.java:1902)<br> at org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0.onTrigger(PublishKafka_1_0.java:416)<br> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)<br> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)<br> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)<br> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)<br> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)<br> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)<br> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)<br> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)<br> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)<br> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)<br> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)<br> at java.lang.Thread.run(Thread.java:745)<br>2018-08-21 17:13:09,412 WARN [Timer-Driven Process Thread-2] o.a.n.p.kafka.pubsub.PublishKafka_1_0 PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] Processor Administratively Yielded for 1 sec due to processing failure<br>2018-08-21 17:13:09,412 WARN [Timer-Driven Process Thread-2] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] due to uncaught Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default, section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332] is already marked for transfer<br>2018-08-21 17:13:09,412 WARN [Timer-Driven Process Thread-2] o.a.n.c.t.ContinuallyRunProcessorTask<br>org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default, section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332] is already marked for transfer<br> at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3073)<br> at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3055)<br> at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3086)<br> at org.apache.nifi.controller.repository.StandardProcessSession.transfer(StandardProcessSession.java:1902)<br> at org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0.onTrigger(PublishKafka_1_0.java:416)<br> at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)<br> at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)<br> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)<br> at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)<br> at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)<br> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)<br> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)<br> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)<br> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)<br> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)<br> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)<br> at java.lang.Thread.run(Thread.java:745)
Created 08-22-2018 11:28 AM
Do replace the ip of broker with the FQDN in the kafka processor configs. This should help in resolving the issue.
Created 08-22-2018 06:37 PM
Hi @Sandeep Nemuri.
That didn't help. This is a small proof of concept environment which doesn't use DNS. Was that why you recommended the FQDN? So it has a relationship with the Kerberos realm?
Created 08-22-2018 06:59 PM
DNS could be optional, You can add the remote clusters host entries in the /etc/hosts file. Kerberos doesn't work with IP's it works with the FQDN's.
Created 08-23-2018 12:19 AM
@Sandeep Nemuri, the name resolution works, but using the broker's FQDN didn't fix the problem or change the error message.
I (and the other cluster services) can authenticate fine with Kerberos on all machines, only Kafka is not working on Nifi.
Created 08-28-2018 04:45 PM
Are you able to run the console consumer/producer there? Also you'd need to give the grants for this user to consume/produce & describe topic.