Created on 09-20-2016 10:02 AM - edited 08-18-2019 04:48 AM
Hi,
I want to use standalone NiFi 0.6 to ingest data into Kafka within a kerberized HDP2.3.4 cluster.
Base config ran smooth and additional config from https://community.hortonworks.com/articles/28180/how-to-configure-hdf-12-to-send-to-and-get-data-fr.... , "Section 2" has been applied.
To start playing around I added a GetFile processor and connected it to PutKafka, but after starting the flow, PutKafka fails with the following error:
2016-09-20 11:44:38,076 INFO [StandardProcessScheduler Thread-6] o.a.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 0 ms. 2016-09-20 11:44:38,076 ERROR [StandardProcessScheduler Thread-6] o.apache.nifi.processors.kafka.PutKafka PutKafka[id=9293cf22-d7c2-40a6-ba7e-a6d41942fc30] PutKafka[id=9293cf22-d7c2-40a6-ba7e-a6d41942fc30] failed to invoke @OnScheduled method due to java.lang.RuntimeException: Failed while executing one of processor's OnScheduled task.; processor will not be scheduled to run for 30000 milliseconds: java.lang.RuntimeException: Failed while executing one of processor's OnScheduled task. 2016-09-20 11:44:38,082 ERROR [StandardProcessScheduler Thread-6] org.apache.nifi.engine.FlowEngine A flow controller task execution stopped abnormally java.util.concurrent.ExecutionException: java.lang.reflect.InvocationTargetException at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[na:1.7.0_79] at java.util.concurrent.FutureTask.get(FutureTask.java:188) ~[na:1.7.0_79] at org.apache.nifi.engine.FlowEngine.afterExecute(FlowEngine.java:100) ~[na:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) [na:1.7.0_79] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_79] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] Caused by: java.lang.reflect.InvocationTargetException: null at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_79] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_79] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_79] at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_79] at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137) ~[na:na] at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125) ~[na:na] at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:70) ~[na:na] at org.apache.nifi.controller.StandardProcessorNode$1$1.call(StandardProcessorNode.java:1247) ~[na:na] at org.apache.nifi.controller.StandardProcessorNode$1$1.call(StandardProcessorNode.java:1243) ~[na:na] at java.util.concurrent.FutureTask.run(FutureTask.java:262) ~[na:1.7.0_79] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) ~[na:1.7.0_79] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) ~[na:1.7.0_79] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_79] ... 2 common frames omitted Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:275) ~[na:na] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:176) ~[na:na] at org.apache.nifi.processors.kafka.KafkaPublisher.<init>(KafkaPublisher.java:66) ~[na:na] at org.apache.nifi.processors.kafka.PutKafka.createKafkaPublisher(PutKafka.java:281) ~[na:na] ... 15 common frames omitted Caused by: org.apache.kafka.common.KafkaException: java.lang.RuntimeException: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: JavaLoginConfig, provider: SUN, class: sun.security.provider.ConfigSpiFile) at org.apache.kafka.common.network.Selector.<init>(Selector.java:135) ~[na:na] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:231) ~[na:na] ... 18 common frames omitted Caused by: java.lang.RuntimeException: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: JavaLoginConfig, provider: SUN, class: sun.security.provider.ConfigSpiFile) at org.apache.kafka.common.security.kerberos.Login.login(Login.java:306) ~[na:na] at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:101) ~[na:na] at org.apache.kafka.common.security.kerberos.LoginFactory.<init>(LoginFactory.java:34) ~[na:na] at org.apache.kafka.common.network.Selector.<init>(Selector.java:132) ~[na:na] ... 19 common frames omitted Caused by: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: JavaLoginConfig, provider: SUN, class: sun.security.provider.ConfigSpiFile) at java.security.Provider$Service.newInstance(Provider.java:1259) ~[na:1.7.0_79] at sun.security.jca.GetInstance.getInstance(GetInstance.java:243) ~[na:1.7.0_79] at sun.security.jca.GetInstance.getInstance(GetInstance.java:190) ~[na:1.7.0_79] at javax.security.auth.login.Configuration.getInstance(Configuration.java:352) ~[na:1.7.0_79] at org.apache.kafka.common.security.kerberos.Login.login(Login.java:303) ~[na:na] ... 22 common frames omitted Caused by: java.io.IOException: Configuration Error: Line 7: expected [option key] at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:550) ~[na:1.7.0_79] at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:439) ~[na:1.7.0_79] at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383) ~[na:1.7.0_79] at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283) ~[na:1.7.0_79] at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:166) ~[na:1.7.0_79] at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:124) ~[na:1.7.0_79] at sun.security.provider.ConfigSpiFile$1.run(ConfigSpiFile.java:72) ~[na:1.7.0_79] at sun.security.provider.ConfigSpiFile$1.run(ConfigSpiFile.java:61) ~[na:1.7.0_79] at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_79] at sun.security.provider.ConfigSpiFile.<init>(ConfigSpiFile.java:61) ~[na:1.7.0_79] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_79] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_79] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_79] at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_79] at java.security.Provider$Service.newInstance(Provider.java:1254) ~[na:1.7.0_79] ... 26 common frames omitted
The PutKafka processor looks like:
and the zookeeper-jaas.conf looks like ("REALM" has been replaced):
#>cat /data/nifi/current/conf/zookeeper-jaas.conf Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/nifi.user.keytab" storeKey=true useTicketCache=false principal="nifi@REALM; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka" useKeyTab=true keyTab="/etc/security/keytabs/nifi.user.keytab" principal="nifi@REALM"; };
=> keytab file is in place and readable
Any hints what is causing this "Configuration error" in the PutKafka processor ?
Thanks...
Created 09-21-2016 11:51 AM
After a closer look at the jaas file you posted above, I believe you issue is because of a missing " in the following line:
principal="nifi@REALM;
This line should actually be:
principal="nifi@REALM";
Try making the above change and restarting your NiFi.
Thanks,
Matt
Created 09-20-2016 10:09 AM
Hi,
What version of Kafka are you using?
Created 09-20-2016 10:16 AM
Hi @Pierre Villard,
it is Kafka0.9.0.2.3 , shipped with HDP2.3.4 and installed via Ambari.
Created 09-20-2016 11:08 AM
it is not possible to use PutKafka with Kafka 0.9, it is necessary to use Publish/ConsumeKafka processors. Have a look here [1]. Besides i think those processors have been introduced from NiFi 0.7 [2] (Apache JIRA website seems to have some problems at the moment).
Created on 09-20-2016 12:14 PM - edited 08-18-2019 04:48 AM
Hi @Pierre Villard ,
thanks for sharing this.
I am a bit confused, because [1] says that HDF 1.0.0 contains the Publish/ConsumeKafka processors, but I do not have them available although I am using HDF 1.2 (NiFi 0.6):
Also https://community.hortonworks.com/questions/23298/nifi-does-getkafka-processor-support-reading-from.... says "HDF releases of NiFi have patched support for Kerberized Kafka clusters in HDP".
Am I missing something here ?!?!
Created 09-20-2016 12:27 PM
There are two versions in play here:
- HDF which is the stack from Hortonworks containing NiFi but not only (Kafka, Storm, ZK, ...).
- Apache NiFi which is the release from the Apache community
You are using HDF 1.2.0.1 which contains NiFi 0.6.0 + patches.
Last week, HDF 2.0.0 has been released and is containing NiFi 1.0.0 + patches.
As you can see, the NiFi release contained in HDP release can come with early patches to help users for a better integration.
Created 09-21-2016 06:47 AM
Hello @Pierre Villard ,
yes, using HDF2.0.0 would be my preferred way, but (at the moment) I have to stick to Java1.7, hence cannot go HDF2.0....
Created 09-20-2016 12:35 PM
The question here is are you running Apache NiFi 0.6 or HDF 1.2? I believe you are using Apache NiFi 0.6 which does not understand PLAINTEXTSASL as the security protocol.
The Kafka 0.8 in HDP 2.3.2 and the Kafka 0.9 in HDP 2.3.4 use a custom Hortonworks Kafka client library. Kafka 0.8 in HDP 2.3.2 introduced support for kerberos before it was supported in the community. That support introduced the PLAINTEXTSASL security protocol. later when Apache Kafka 0.9 added kerberos support they used a different security protocol (SASL_PLAINTEXT). In order for HDF 1.2 to work with HDP 2.3.2, the GetKafka processor was modified from the Apache GetKafka to use that modified client library. Hortonworks again modified the client lib in HDP 2.3.4 for Kafka 0.9 so that it was backwards compatible and still supported the PLAINTEXTSASL security protocol. So bottom line here is that HDF 1.2 NiFi can talk kerberos to both HDP 2.3.2 (Kafka 0.8) and HDP 2.3.4 (Kafka 0.9), but Apache NiFi cannot.
The NiFi consume and publish Kafka processor available in NiFi 0.7, NiFi 1.0, and HDF 2.0 do not use the Hortonworks custom Kafka client lib and can be used with Kafka 0.9 but not Kafka 0.8. You will need to use the SASL_PLAINTEXT security protocol with these new processors.
Thanks,
Matt
Created on 09-20-2016 01:45 PM - edited 08-18-2019 04:48 AM
Can you try changing the value you have for "Message Delimiter" from "\n" to an actual new line in your PutKafka processor?
You can add a new line by holding the Shift key while hitting enter.
The result will appear as below:
Thanks,
Matt
Created 09-21-2016 07:02 AM
Hello @mclark ,
thanks for explaining the versioning topic. Although it is a bit 'complex', if I interpret it correctly my combination of HDP 2.3.4 and NiFi0.6 (extracted from HDF1.2) should work, means the PutKafka should be able to write to kerberized Kafka0.9 with PLAINTEXTSASL.
Unfortunately it is not, even after I modified the "Message Delimiter", either to 'new line' or to 'not set'.
Just to ensure, the NiFi welcome page shows "Hortonworks Data Flow .... powered by Apache NiFi", the 'about' dialog is the one I pasted in this thread above, and the whole NiFi directory from where I start it has been extracted from HDF-1.2.0.1-1.tar.gz. Therefore I am pretty sure I am runnign HDF version of NiFi...
What can I check further ?
Thanks in advance....