Member since
06-15-2018
3
Posts
0
Kudos Received
0
Solutions
06-19-2018
06:09 AM
That's the very cause of the problem! After enabling the user to read the Jaas file now the agents are running fine, will try with flume's own principal, thanks for the great help!
... View more
06-15-2018
03:09 PM
Hi, I'm using HDP 2.6.3.0 with kerberos enabled on cluster, need to configure flume to read from kafka source, here's what i did: 1.Modify flume.config: a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 5000
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.topics = testtopic
a1.sources.r1.kafka.bootstrap.servers = hdp-host-01-lntest.mxnavi.com:6667
a1.sources.r1.kafka.consumer.group.id = flumetest
a1.sources.r1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.r1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.r1.kafka.consumer.sasl.kerberos.service.name = kafka
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.kerberosPrincipal = flume/_HOST@MXNAVI.COM
a1.sinks.k1.hdfs.kerberosKeytab = /etc/security/keytabs/flume.service.keytab
a1.sinks.k1.hdfs.path = /tmp/flume/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 2.Create a jaas file in /usr/hdp/2.6.3.0-235/flume/conf/: KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/hdp-host-01-lntest.mxnavi.com@MXNAVI.COM";
}; 3.Modify the flume-env template: # Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.
# Enviroment variables can be set here.
export JAVA_HOME={{java_home}}
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 -Djava.security.auth.login.config=/usr/hdp/2.6.3.0-235/flume/conf/flume_kafka_jaas.conf"
# Note that the Flume conf directory is always included in the classpath.
# Add flume sink to classpath
if [ -e "/usr/lib/flume/lib/ambari-metrics-flume-sink.jar" ]; then
export FLUME_CLASSPATH=$FLUME_CLASSPATH:/usr/lib/flume/lib/ambari-metrics-flume-sink.jar
fi
export HIVE_HOME={{flume_hive_home}}
export HCAT_HOME={{flume_hcat_home}} 4.Add flume to kerberos by following this guid, to get hdfs sink. Then I got errors in the agent log, witch indicates flume failed to connect to kafka: ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.PollableSourceRunner$PollingRunner.run:154) - Unhandled exception, logging and sleeping for 5000ms
org.apache.flume.FlumeException: Source had error configuring or starting
at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:52)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:584)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:503)
at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:72)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:71)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:85)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
... 13 more
Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:940)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:760)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)
at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:110)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 16 more
I'm not sure if it's because the version ( flume 1.5.2 ) doesn't support for kererized kafka source as I went through the user guide but found nothing for it, however when I followed this guid to add kafka channel to flume ( which 1.5.2 supports ), I got the same error. Got no clue for the next step to make it work, please kindly advise, thanks in advance.
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Kafka