Support Questions

Find answers, ask questions, and share your expertise

How to configure flume with kerberized kafka source?

avatar
New Contributor

Hi, I'm using HDP 2.6.3.0 with kerberos enabled on cluster, need to configure flume to read from kafka source, here's what i did:

1.Modify flume.config:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 5000
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.topics = testtopic
a1.sources.r1.kafka.bootstrap.servers = hdp-host-01-lntest.mxnavi.com:6667
a1.sources.r1.kafka.consumer.group.id = flumetest
a1.sources.r1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.r1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.r1.kafka.consumer.sasl.kerberos.service.name = kafka

a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.kerberosPrincipal = flume/_HOST@MXNAVI.COM
a1.sinks.k1.hdfs.kerberosKeytab = /etc/security/keytabs/flume.service.keytab
a1.sinks.k1.hdfs.path = /tmp/flume/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

2.Create a jaas file in /usr/hdp/2.6.3.0-235/flume/conf/:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/etc/security/keytabs/kafka.service.keytab"
    principal="kafka/hdp-host-01-lntest.mxnavi.com@MXNAVI.COM";
};

3.Modify the flume-env template:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.


# Enviroment variables can be set here.


export JAVA_HOME={{java_home}}


# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"


export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 -Djava.security.auth.login.config=/usr/hdp/2.6.3.0-235/flume/conf/flume_kafka_jaas.conf"


# Note that the Flume conf directory is always included in the classpath.
# Add flume sink to classpath
if [ -e "/usr/lib/flume/lib/ambari-metrics-flume-sink.jar" ]; then
  export FLUME_CLASSPATH=$FLUME_CLASSPATH:/usr/lib/flume/lib/ambari-metrics-flume-sink.jar
fi


export HIVE_HOME={{flume_hive_home}}
export HCAT_HOME={{flume_hcat_home}}

4.Add flume to kerberos by following this guid, to get hdfs sink.

Then I got errors in the agent log, witch indicates flume failed to connect to kafka:

ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.PollableSourceRunner$PollingRunner.run:154)  - Unhandled exception, logging and sleeping for 5000ms
org.apache.flume.FlumeException: Source had error configuring or starting
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:52)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:584)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
        at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:503)
        at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
        at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:72)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:71)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:85)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
        ... 13 more
Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:940)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:760)
        at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
        at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)
        at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:110)
        at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
        at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
        ... 16 more

I'm not sure if it's because the version ( flume 1.5.2 ) doesn't support for kererized kafka source as I went through the user guide but found nothing for it, however when I followed this guid to add kafka channel to flume ( which 1.5.2 supports ), I got the same error.

Got no clue for the next step to make it work, please kindly advise, thanks in advance.



1 ACCEPTED SOLUTION

avatar
Expert Contributor

@L Ning

Hi, have you given read permission to other user on Kafka service keytab? Because in the Jaas file you are using kafka service keytab:

keyTab="/etc/security/keytabs/kafka.service.keytab"

The recommended approach would be to use user keytab and principal in your Jaas file.

Thank you!

View solution in original post

3 REPLIES 3

avatar
Expert Contributor

@L Ning

Hi, have you given read permission to other user on Kafka service keytab? Because in the Jaas file you are using kafka service keytab:

keyTab="/etc/security/keytabs/kafka.service.keytab"

The recommended approach would be to use user keytab and principal in your Jaas file.

Thank you!

avatar
New Contributor

That's the very cause of the problem! After enabling the user to read the Jaas file now the agents are running fine, will try with flume's own principal, thanks for the great help!

avatar
Expert Contributor

@L Nin I am glad it worked!

Thanks!