Created 09-13-2017 11:54 AM
Hi
I am trying to create a kafka message queue uswing nifi and the publishkafka processor
we are runing kerberos security on our cluster
it times out ( se my log )
I have following settings in my processor
Kafka Brokers sktudv01hdp01.ccta.dk:2181,sktudv01hdp03.ccta.dk:2181,sktudv01hdp02.ccta.dk:2181
Security Protocol PLAINTEXT
Kerberos Service Name
kafka/_HOST@CCTA.DK
SSL Context Service No value set
Topic Name simonkafka
Delivery Guarantee Best Effort
Kafka Key No value set
Key Attribute Encoding UTF-8 Encoded
In my ambari kafka config i have following
Kafka Broker host sktudv01hdp03.ccta.dk zookeeper.connect sktudv01hdp01.ccta.dk:2181,sktudv01hdp03.ccta.dk:2181,sktudv01hdp02.ccta.dk:2181 listeners PLAINTEXT://localhost:6667
the kafka service seems to run without problems on my hadoop environment
My nifi log
[w20960@sktudv01hdf01 nifi]$ tail -f nifi-app.log | grep kafka key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = kafka/_HOST@CCTA.DK value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 2017-09-13 13:44:34,113 WARN [Timer-Driven Process Thread-6] o.a.k.clients.producer.ProducerConfig The configuration sasl.kerberos.service.name = kafka/_HOST@CCTA.DK was supplied but isn't a known config. 2017-09-13 13:44:34,113 INFO [Timer-Driven Process Thread-6] o.a.kafka.common.utils.AppInfoParser Kafka version : 0.9.0.1 2017-09-13 13:44:34,113 INFO [Timer-Driven Process Thread-6] o.a.kafka.common.utils.AppInfoParser Kafka commitId : 23c69d62a0cabf06 2017-09-13 13:44:39,113 ERROR [Timer-Driven Process Thread-6] o.a.n.p.kafka.pubsub.PublishKafka PublishKafka[id=7a067740-015e-1000-ffff-ffffaeaac0ec] Failed to send all message for StandardFlowFileRecord[uuid=1402ef55-e3db-42e3-901e-28ebcf240224,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1505294789599-13484, container=default, section=172], offset=40948, length=17],offset=0,name=3131919623882367,size=17] to Kafka; routing to failure due to org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms. 2017-09-13 13:44:39,113 ERROR [Timer-Driven Process Thread-6] o.a.n.p.kafka.pubsub.PublishKafka org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms. 2017-09-13 13:44:39,113 INFO [Timer-Driven Process Thread-6] o.a.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 5000 ms. key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = kafka/_HOST@CCTA.DK value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 2017-09-13 13:44:39,115 WARN [Timer-Driven Process Thread-6] o.a.k.clients.producer.ProducerConfig The configuration sasl.kerberos.service.name = kafka/_HOST@CCTA.DK was supplied but isn't a known config. 2017-09-13 13:44:39,115 INFO [Timer-Driven Process Thread-6] o.a.kafka.common.utils.AppInfoParser Kafka version : 0.9.0.1 2017-09-13 13:44:39,115 INFO [Timer-Driven Process Thread-6] o.a.kafka.common.utils.AppInfoParser Kafka commitId : 23c69d62a0cabf06 2017-09-13 13:44:44,115 ERROR [Timer-Driven Process Thread-6] o.a.n.p.kafka.pubsub.PublishKafka PublishKafka[id=7a067740-015e-1000-ffff-ffffaeaac0ec] Failed to send all message for StandardFlowFileRecord[uuid=88645193-ca0d-441e-a79b-487870bdf401,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1505294789599-13484, container=default, section=172], offset=41210, length=17],offset=0,name=3131979624448053,size=17] to Kafka; routing to failure due to org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms. 2017-09-13 13:44:44,116 ERROR [Timer-Driven Process Thread-6] o.a.n.p.kafka.pubsub.PublishKafka org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms. 2017-09-13 13:44:44,116 INFO [Timer-Driven Process Thread-6] o.a.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 5000 ms. ^C [w20960@sktudv01hdf01 nifi]$
Created on 09-13-2017 11:11 PM - edited 08-18-2019 01:43 AM
You need to configure two additional properties in the processor:
Change the Kerberos Service name to just kafka
Created on 09-14-2017 09:06 AM - edited 08-18-2019 01:43 AM
The processor PublishKafka does not support those two additional properties.
Created 09-14-2017 01:22 PM
What version of NiFi are you using?
Created 09-14-2017 01:50 PM
@Wynner we are on Powered by Apache NiFi - Version 1.1.0.2.1.2.0-10
Created on 09-14-2017 02:01 PM - edited 08-18-2019 01:43 AM
Create a jaas-client.config file with the principal and keytab for NiFi to use to connect to the broker. Here is one I use for my NiFi:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/nifi/2.1.1.0-2/0/kafka.service.keytab" principal="kafka/hdp-24-03.openstacklocal@SMENIFI.COM"; };
Make sure to put it in a directory that the nifi user can access, mine is located in the /etc/nifi/2.1.1.0-2/0/ directory.
Then add the highlighted property to the Advanced nifi-bootstrap-env section in Ambari
This file must be on all of the nodes in your cluster.
Created on 09-15-2017 08:07 AM - edited 08-18-2019 01:42 AM
Hi @Wynner thanks for the reply
I created the jaas file on my nifi host
jaas-client.config KafkaClient
<br>[root@sktudv01hdf01 nifi]# cat /etc/nifi/jaas-client.config KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka.service.keytab" principal="kafka/_HOST@CCTA.DK"; }; [root@sktudv01hdf01 nifi]#
My kafka configuration is
my nifi publisher is configured like this
The log file looks like this
2017-09-15 10:03:34,878 WARN [Timer-Driven Process Thread-10] o.a.n.p.kafka.pubsub.PublishKafka PublishKafka[id=7a067740-015e-1000-ffff-ffffaeaac0ec] Processor Administratively Yielded for 1 sec due to processing failure 2017-09-15 10:03:34,878 WARN [Timer-Driven Process Thread-10] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PublishKafka[id=7a067740-015e-1000-ffff-ffffaeaac0ec] due to uncaught Exception: org.apache.kafka.common.KafkaException: Failed to construct kafka producer org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321) ~[na:na] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156) ~[na:na] at org.apache.nifi.processors.kafka.pubsub.PublisherPool.createLease(PublisherPool.java:61) ~[na:na] at org.apache.nifi.processors.kafka.pubsub.PublisherPool.obtainPublisher(PublisherPool.java:56) ~[na:na] at org.apache.nifi.processors.kafka.pubsub.PublishKafka.onTrigger(PublishKafka.java:312) ~[na:na] Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner authentication information from the user at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74) ~[na:na] at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60) ~[na:na] at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79) ~[na:na] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:271) ~[na:na] at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298) ~[na:na] at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104) ~[na:na] at org.apache.kafka.common.security.kerberos.LoginManager.<init>(LoginManager.java:44) ~[na:na] at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85) ~[na:na] at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55) ~[na:na] ^C [w20960@sktudv01hdf01 nifi]$
It seems that my JAAS file doesnt work properly
Created 09-15-2017 07:52 PM
Run this command on your keytab file:
klist -kt /etc/security/keytabs/kafka.service.keytab
Whatever is displayed should be your principal in the jaas-client.config file.
Created 09-18-2017 06:39 AM
This looks like an authentication issue.
For the given topic can you add ACLs for anonymous user as the protocol is PLAINTEXT?
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<zookeeper:host> --add --allow-principal User:ANONYMOUS --operation Read --operation Write --operation Describe --topic <topic>