Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

how to configure Flume Kafka Sink on HDP 2.5.3 with Kerberos enabled.

how to configure Flume Kafka Sink on HDP 2.5.3 with Kerberos enabled.

New Contributor

First of all, there are a discrepancies between docs and real life, properties names are not same.

I have the following configuration, b built according to the docs:

# Describe the sink

a1.sinks.sink1.channel = channel1

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink

#a1.sinks.sink1.kafka.flumeBatchSize = 20

#a1.sinks.sink1.kafka.producer.acks = 1

#a1.sinks.sink1.kafka.producer.linger.ms = 1

#a1.sinks.sink1.kafka.producer.compression.type = snappy

a1.sinks.sink1.kafka.bootstrap.servers = ip-172-30-4-132.ec2.internal:6667

a1.sinks.sink1.kafka.topic = test

a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT

a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI

#a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

#a1.sinks.sink1.brokerList = ip-172-30-4-132.ec2.internal:6667

#a1.sinks.sink1.topic = test

#a1.sinks.sink1.producer.security.protocol = SASL_PLAINTEXT

#a1.sinks.sink1.producer.sasl.mechanism = GSSAPI

#a1.sinks.sink1.producer.sasl.kerberos.service.name = kafka

When starting agent, got the following:

17/10/06 19:12:26 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: org.apache.flume.sink.kafka.KafkaSink

17/10/06 19:12:26 WARN kafka.KafkaSink: The Property 'topic' is not set. Using the default topic name: default-flume-topic

17/10/06 19:12:26 INFO kafka.KafkaSinkUtil: context={ parameters:{kafka.producer.security.protocol=SASL_PLAINTEXT, kafka.bootstrap.servers=ip-172-30-4-132.ec2.internal:6667, kafka.producer.sasl.mechanism=GSSAPI, channel=channel1, kafka.topic=test, type=org.apache.flume.sink.kafka.KafkaSink} }

17/10/06 19:12:26 ERROR node.AbstractConfigurationProvider: Sink sink1 has been removed due to an error during configuration

org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker

at org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)

Change config to older form, found in old Flume docs:

# Describe the sink

a1.sinks.sink1.channel = channel1

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink

#a1.sinks.sink1.kafka.flumeBatchSize = 20

#a1.sinks.sink1.kafka.producer.acks = 1

#a1.sinks.sink1.kafka.producer.linger.ms = 1

#a1.sinks.sink1.kafka.producer.compression.type = snappy

#a1.sinks.sink1.kafka.bootstrap.servers = ip-172-30-4-132.ec2.internal:6667

#a1.sinks.sink1.kafka.topic = test

#a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT

#a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI

#a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

a1.sinks.sink1.brokerList = ip-172-30-4-132.ec2.internal:6667

a1.sinks.sink1.topic = test

a1.sinks.sink1.producer.security.protocol = SASL_PLAINTEXT

a1.sinks.sink1.producer.sasl.mechanism = GSSAPI

#a1.sinks.sink1.producer.sasl.kerberos.service.name = kafka

Got this after the change:

17/10/06 19:13:45 INFO utils.VerifiableProperties: Verifying properties

17/10/06 19:13:45 INFO utils.VerifiableProperties: Property key.serializer.class is overridden to kafka.serializer.StringEncoder

17/10/06 19:13:45 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to ip-172-30-4-132.ec2.internal:6667

17/10/06 19:13:45 INFO utils.VerifiableProperties: Property request.required.acks is overridden to 1

17/10/06 19:13:45 INFO utils.VerifiableProperties: Property serializer.class is overridden to kafka.serializer.DefaultEncoder

17/10/06 19:13:45 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.

17/10/06 19:13:45 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started

17/10/06 19:13:45 INFO client.ClientUtils$: Fetching metadata from broker BrokerEndPoint(0,ip-172-30-4-132.ec2.internal,6667) with correlation id 0 for 1 topic(s) Set(test)

17/10/06 19:13:45 INFO producer.SyncProducer: Connected to ip-172-30-4-132.ec2.internal:6667 for producing

17/10/06 19:13:45 INFO producer.SyncProducer: Disconnecting from ip-172-30-4-132.ec2.internal:6667

17/10/06 19:13:45 WARN client.ClientUtils$: Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,ip-172-30-4-132.ec2.internal,6667)] failed

java.io.EOFException

at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)

at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)

Not even touching Kerberos at all.

I need to build the following pipeline:

source -> flume -> kafka -> flume -> hbase

Last part (flume -> hbase) works, but I can not define Kafka sync correctly.

What did I wrong?

Thank you for suggestions!

Don't have an account?
Coming from Hortonworks? Activate your account here