Created 06-15-2018 03:09 PM
Hi, I'm using HDP 2.6.3.0 with kerberos enabled on cluster, need to configure flume to read from kafka source, here's what i did:
1.Modify flume.config:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.batchSize = 5000 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.topics = testtopic a1.sources.r1.kafka.bootstrap.servers = hdp-host-01-lntest.mxnavi.com:6667 a1.sources.r1.kafka.consumer.group.id = flumetest a1.sources.r1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.r1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.r1.kafka.consumer.sasl.kerberos.service.name = kafka a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.kerberosPrincipal = flume/_HOST@MXNAVI.COM a1.sinks.k1.hdfs.kerberosKeytab = /etc/security/keytabs/flume.service.keytab a1.sinks.k1.hdfs.path = /tmp/flume/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
2.Create a jaas file in /usr/hdp/2.6.3.0-235/flume/conf/:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/etc/security/keytabs/kafka.service.keytab" principal="kafka/hdp-host-01-lntest.mxnavi.com@MXNAVI.COM"; };
3.Modify the flume-env template:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced # during Flume startup. # Enviroment variables can be set here. export JAVA_HOME={{java_home}} # Give Flume more memory and pre-allocate, enable remote monitoring via JMX # export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 -Djava.security.auth.login.config=/usr/hdp/2.6.3.0-235/flume/conf/flume_kafka_jaas.conf" # Note that the Flume conf directory is always included in the classpath. # Add flume sink to classpath if [ -e "/usr/lib/flume/lib/ambari-metrics-flume-sink.jar" ]; then export FLUME_CLASSPATH=$FLUME_CLASSPATH:/usr/lib/flume/lib/ambari-metrics-flume-sink.jar fi export HIVE_HOME={{flume_hive_home}} export HCAT_HOME={{flume_hcat_home}}
4.Add flume to kerberos by following this guid, to get hdfs sink.
Then I got errors in the agent log, witch indicates flume failed to connect to kafka:
ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.PollableSourceRunner$PollingRunner.run:154) - Unhandled exception, logging and sleeping for 5000ms org.apache.flume.FlumeException: Source had error configuring or starting at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:52) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:584) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566) at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:503) at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83) at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:72) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:71) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:85) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644) ... 13 more Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:940) at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:760) at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755) at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) at javax.security.auth.login.LoginContext.login(LoginContext.java:587) at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69) at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:110) at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46) at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78) ... 16 more
I'm not sure if it's because the version ( flume 1.5.2 ) doesn't support for kererized kafka source as I went through the user guide but found nothing for it, however when I followed this guid to add kafka channel to flume ( which 1.5.2 supports ), I got the same error.
Got no clue for the next step to make it work, please kindly advise, thanks in advance.
Created 06-15-2018 08:24 PM
Hi, have you given read permission to other user on Kafka service keytab? Because in the Jaas file you are using kafka service keytab:
keyTab="/etc/security/keytabs/kafka.service.keytab"
The recommended approach would be to use user keytab and principal in your Jaas file.
Thank you!
Created 06-15-2018 08:24 PM
Hi, have you given read permission to other user on Kafka service keytab? Because in the Jaas file you are using kafka service keytab:
keyTab="/etc/security/keytabs/kafka.service.keytab"
The recommended approach would be to use user keytab and principal in your Jaas file.
Thank you!
Created 06-19-2018 06:09 AM
That's the very cause of the problem! After enabling the user to read the Jaas file now the agents are running fine, will try with flume's own principal, thanks for the great help!
Created 06-19-2018 09:20 PM
@L Nin I am glad it worked!
Thanks!