Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

nifi publishkafka TimeoutException: Failed to update metadata after 5000 ms. what can i do

avatar
Expert Contributor

Hi

I am trying to create a kafka message queue uswing nifi and the publishkafka processor

we are runing kerberos security on our cluster

it times out ( se my log )

I have following settings in my processor

Kafka Brokers sktudv01hdp01.ccta.dk:2181,sktudv01hdp03.ccta.dk:2181,sktudv01hdp02.ccta.dk:2181

Security Protocol PLAINTEXT

Kerberos Service Name

kafka/_HOST@CCTA.DK

SSL Context Service No value set

Topic Name simonkafka

Delivery Guarantee Best Effort

Kafka Key No value set

Key Attribute Encoding UTF-8 Encoded

In my ambari kafka config i have following

Kafka Broker host sktudv01hdp03.ccta.dk zookeeper.connect sktudv01hdp01.ccta.dk:2181,sktudv01hdp03.ccta.dk:2181,sktudv01hdp02.ccta.dk:2181 listeners PLAINTEXT://localhost:6667

the kafka service seems to run without problems on my hadoop environment

My nifi log

[w20960@sktudv01hdf01 nifi]$ tail -f nifi-app.log | grep kafka
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        sasl.kerberos.service.name = kafka/_HOST@CCTA.DK
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
2017-09-13 13:44:34,113 WARN [Timer-Driven Process Thread-6] o.a.k.clients.producer.ProducerConfig The configuration sasl.kerberos.service.name = kafka/_HOST@CCTA.DK was supplied but isn't a known config.
2017-09-13 13:44:34,113 INFO [Timer-Driven Process Thread-6] o.a.kafka.common.utils.AppInfoParser Kafka version : 0.9.0.1
2017-09-13 13:44:34,113 INFO [Timer-Driven Process Thread-6] o.a.kafka.common.utils.AppInfoParser Kafka commitId : 23c69d62a0cabf06
2017-09-13 13:44:39,113 ERROR [Timer-Driven Process Thread-6] o.a.n.p.kafka.pubsub.PublishKafka PublishKafka[id=7a067740-015e-1000-ffff-ffffaeaac0ec] Failed to send all message for StandardFlowFileRecord[uuid=1402ef55-e3db-42e3-901e-28ebcf240224,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1505294789599-13484, container=default, section=172], offset=40948, length=17],offset=0,name=3131919623882367,size=17] to Kafka; routing to failure due to org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.
2017-09-13 13:44:39,113 ERROR [Timer-Driven Process Thread-6] o.a.n.p.kafka.pubsub.PublishKafka
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.
2017-09-13 13:44:39,113 INFO [Timer-Driven Process Thread-6] o.a.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 5000 ms.
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        sasl.kerberos.service.name = kafka/_HOST@CCTA.DK
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
2017-09-13 13:44:39,115 WARN [Timer-Driven Process Thread-6] o.a.k.clients.producer.ProducerConfig The configuration sasl.kerberos.service.name = kafka/_HOST@CCTA.DK was supplied but isn't a known config.
2017-09-13 13:44:39,115 INFO [Timer-Driven Process Thread-6] o.a.kafka.common.utils.AppInfoParser Kafka version : 0.9.0.1
2017-09-13 13:44:39,115 INFO [Timer-Driven Process Thread-6] o.a.kafka.common.utils.AppInfoParser Kafka commitId : 23c69d62a0cabf06
2017-09-13 13:44:44,115 ERROR [Timer-Driven Process Thread-6] o.a.n.p.kafka.pubsub.PublishKafka PublishKafka[id=7a067740-015e-1000-ffff-ffffaeaac0ec] Failed to send all message for StandardFlowFileRecord[uuid=88645193-ca0d-441e-a79b-487870bdf401,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1505294789599-13484, container=default, section=172], offset=41210, length=17],offset=0,name=3131979624448053,size=17] to Kafka; routing to failure due to org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.
2017-09-13 13:44:44,116 ERROR [Timer-Driven Process Thread-6] o.a.n.p.kafka.pubsub.PublishKafka
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.
2017-09-13 13:44:44,116 INFO [Timer-Driven Process Thread-6] o.a.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 5000 ms.
^C
[w20960@sktudv01hdf01 nifi]$


8 REPLIES 8

avatar
@Simon Jespersen

You need to configure two additional properties in the processor:

34788-screen-shot-2017-09-13-at-70917-pm.png

Change the Kerberos Service name to just kafka

avatar
Expert Contributor

The processor PublishKafka does not support those two additional properties.

38639-publishkafka.png

avatar
@Simon Jespersen

What version of NiFi are you using?

avatar
Expert Contributor

@Wynner we are on Powered by Apache NiFi - Version 1.1.0.2.1.2.0-10

avatar
@Simon Jespersen

Create a jaas-client.config file with the principal and keytab for NiFi to use to connect to the broker. Here is one I use for my NiFi:

KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/nifi/2.1.1.0-2/0/kafka.service.keytab"
        principal="kafka/hdp-24-03.openstacklocal@SMENIFI.COM";
    };

Make sure to put it in a directory that the nifi user can access, mine is located in the /etc/nifi/2.1.1.0-2/0/ directory.

Then add the highlighted property to the Advanced nifi-bootstrap-env section in Ambari

34797-screen-shot-2017-09-14-at-95856-am.png

This file must be on all of the nodes in your cluster.

avatar
Expert Contributor

Hi @Wynner thanks for the reply

I created the jaas file on my nifi host

jaas-client.config KafkaClient

<br>[root@sktudv01hdf01 nifi]# cat /etc/nifi/jaas-client.config
KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/security/keytabs/kafka.service.keytab"
        principal="kafka/_HOST@CCTA.DK";
    };
[root@sktudv01hdf01 nifi]#

My kafka configuration is

38649-kafkabroker-config.png

my nifi publisher is configured like this

38650-nifi-kafkapublisher.png

The log file looks like this

2017-09-15 10:03:34,878 WARN [Timer-Driven Process Thread-10] o.a.n.p.kafka.pubsub.PublishKafka PublishKafka[id=7a067740-015e-1000-ffff-ffffaeaac0ec] Processor Administratively Yielded for 1 sec due to processing failure
2017-09-15 10:03:34,878 WARN [Timer-Driven Process Thread-10] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PublishKafka[id=7a067740-015e-1000-ffff-ffffaeaac0ec] due to uncaught Exception: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321) ~[na:na]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156) ~[na:na]
        at org.apache.nifi.processors.kafka.pubsub.PublisherPool.createLease(PublisherPool.java:61) ~[na:na]
        at org.apache.nifi.processors.kafka.pubsub.PublisherPool.obtainPublisher(PublisherPool.java:56) ~[na:na]
        at org.apache.nifi.processors.kafka.pubsub.PublishKafka.onTrigger(PublishKafka.java:312) ~[na:na]
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74) ~[na:na]
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60) ~[na:na]
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79) ~[na:na]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:271) ~[na:na]
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298) ~[na:na]
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104) ~[na:na]
        at org.apache.kafka.common.security.kerberos.LoginManager.<init>(LoginManager.java:44) ~[na:na]
        at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85) ~[na:na]
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55) ~[na:na]
^C
[w20960@sktudv01hdf01 nifi]$

It seems that my JAAS file doesnt work properly

avatar

@Simon Jespersen

Run this command on your keytab file:

klist -kt /etc/security/keytabs/kafka.service.keytab

Whatever is displayed should be your principal in the jaas-client.config file.

avatar
Contributor

@Simon Jespersen

This looks like an authentication issue.

For the given topic can you add ACLs for anonymous user as the protocol is PLAINTEXT?

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<zookeeper:host> --add --allow-principal User:ANONYMOUS --operation Read --operation Write --operation Describe --topic <topic>