Support Questions
Find answers, ask questions, and share your expertise

PublishKafka_1_0 and Consume_Kafka_1_0 processors after kerberization

Highlighted

PublishKafka_1_0 and Consume_Kafka_1_0 processors after kerberization

We’ve 2 clusters that were built for a proof of concept, one with HDP 2.6.2.0, the other with HDF 3.1.2.0, Both were kerberized using a MIT KDC on the same realm. The kerberization was done entirely through the Ambari wizard without issues.

After this, the PublishKafka_1_0 and Consume_Kafka_1_0 processors started giving out errors. The Publish_Kafka processor is configured as shown below.

86639-config.png

The HDFS and Hive processors worked fine after similar setups, only Kafka is not working.

Here’s the nifi-app.log snippet with the error:

2018-08-21 17:13:05,009 WARN [kafka-producer-network-thread
| producer-3] org.apache.kafka.clients.NetworkClient [Producer
clientId=producer-3] Connection to node -1 could not be established. Broker may
not be available.<br>2018-08-21 17:13:05,862 WARN [kafka-producer-network-thread
| producer-3] org.apache.kafka.clients.NetworkClient [Producer
clientId=producer-3] Connection to node -1 could not be established. Broker may
not be available.<br>2018-08-21 17:13:06,933 WARN [kafka-producer-network-thread
| producer-3] org.apache.kafka.clients.NetworkClient [Producer
clientId=producer-3] Connection to node -1 could not be established. Broker may
not be available.<br>2018-08-21 17:13:07,760 WARN [kafka-producer-network-thread
| producer-3] org.apache.kafka.clients.NetworkClient [Producer
clientId=producer-3] Connection to node -1 could not be established. Broker may
not be available.<br>2018-08-21 17:13:08,713 WARN [kafka-producer-network-thread
| producer-3] org.apache.kafka.clients.NetworkClient [Producer
clientId=producer-3] Connection to node -1 could not be established. Broker may
not be available.<br>2018-08-21 17:13:09,406 ERROR [Timer-Driven Process
Thread-2] o.a.n.p.kafka.pubsub.PublishKafka_1_0
PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] Failed to send
StandardFlowFileRecord[uuid=b8d0d94b-e241-448d-9c00-5b7aaca393b6,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default,
section=1], offset=46931, length=334],offset=0,name=448674237495637,size=332]
to Kafka: org.apache.kafka.common.errors.TimeoutException: Failed to update
metadata after 5000 ms.<br>org.apache.kafka.common.errors.TimeoutException: Failed to
update metadata after 5000 ms.<br>2018-08-21 17:13:09,410 WARN
[kafka-kerberos-refresh-thread-hdf_user1@---]
o.a.k.c.security.kerberos.KerberosLogin [Principal=hdf_user1@---]: TGT
renewal thread has been interrupted and will exit.<br>2018-08-21 17:13:09,411 ERROR [Timer-Driven Process
Thread-2] o.a.n.p.kafka.pubsub.PublishKafka_1_0
PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd]
PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] failed to process due
to org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default,
section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332]
is already marked for transfer; rolling back session: {}<br>org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default,
section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332]
is already marked for transfer<br>  at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3073)<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3055)<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3086)<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.transfer(StandardProcessSession.java:1902)<br>  at
org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0.onTrigger(PublishKafka_1_0.java:416)<br>  at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)<br>  at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)<br>  at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)<br>  at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)<br>  at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)<br>  at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)<br>  at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)<br>  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)<br>  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)<br>  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)<br>  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)<br>  at
java.lang.Thread.run(Thread.java:745)<br>2018-08-21 17:13:09,412 ERROR [Timer-Driven Process
Thread-2] o.a.n.p.kafka.pubsub.PublishKafka_1_0
PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd]
PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] failed to process
session due to org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default,
section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332]
is already marked for transfer: {}<br>org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default,
section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332]
is already marked for transfer<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3073)<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3055)<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3086)<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.transfer(StandardProcessSession.java:1902)<br>  at
org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0.onTrigger(PublishKafka_1_0.java:416)<br>  at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)<br>  at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)<br>  at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)<br>  at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)<br>  at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)<br>  at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)<br>  at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)<br>  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)<br>  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)<br>  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)<br>  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)<br>  at
java.lang.Thread.run(Thread.java:745)<br>2018-08-21 17:13:09,412 WARN [Timer-Driven Process Thread-2]
o.a.n.p.kafka.pubsub.PublishKafka_1_0
PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] Processor
Administratively Yielded for 1 sec due to processing failure<br>2018-08-21 17:13:09,412 WARN [Timer-Driven Process Thread-2]
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
PublishKafka_1_0[id=58dabab0-0165-1000-0000-00005b66debd] due to uncaught
Exception: org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default,
section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332]
is already marked for transfer<br>2018-08-21 17:13:09,412 WARN [Timer-Driven Process Thread-2]
o.a.n.c.t.ContinuallyRunProcessorTask<br>org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=46169fd4-1fd9-47af-969b-9fc4301a6f1d,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1534875917068-1, container=default,
section=1], offset=47265, length=334],offset=0,name=448673432494738,size=332]
is already marked for transfer<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3073)<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3055)<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3086)<br>  at
org.apache.nifi.controller.repository.StandardProcessSession.transfer(StandardProcessSession.java:1902)<br>  at
org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0.onTrigger(PublishKafka_1_0.java:416)<br>   at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)<br>  at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)<br>  at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)<br>  at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)<br>  at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)<br>  at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)<br>  at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)<br>  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)<br>  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)<br>  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)<br>  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)<br>  at
java.lang.Thread.run(Thread.java:745)
5 REPLIES 5
Highlighted

Re: PublishKafka_1_0 and Consume_Kafka_1_0 processors after kerberization

@Douglas Spadotto

Do replace the ip of broker with the FQDN in the kafka processor configs. This should help in resolving the issue.

Highlighted

Re: PublishKafka_1_0 and Consume_Kafka_1_0 processors after kerberization

Hi @Sandeep Nemuri.

That didn't help. This is a small proof of concept environment which doesn't use DNS. Was that why you recommended the FQDN? So it has a relationship with the Kerberos realm?

Highlighted

Re: PublishKafka_1_0 and Consume_Kafka_1_0 processors after kerberization

DNS could be optional, You can add the remote clusters host entries in the /etc/hosts file. Kerberos doesn't work with IP's it works with the FQDN's.

Re: PublishKafka_1_0 and Consume_Kafka_1_0 processors after kerberization

@Sandeep Nemuri, the name resolution works, but using the broker's FQDN didn't fix the problem or change the error message.

I (and the other cluster services) can authenticate fine with Kerberos on all machines, only Kafka is not working on Nifi.

Highlighted

Re: PublishKafka_1_0 and Consume_Kafka_1_0 processors after kerberization

Are you able to run the console consumer/producer there? Also you'd need to give the grants for this user to consume/produce & describe topic.