Reply
Explorer
Posts: 10
Registered: ‎09-07-2017

Flume is not able to send the events to kafka over kerberoized cluter

Hello   I am getting the below exception everytime when I process the message from Flume to Kafka over kerberos . The below error coming in flume log also below is flume.log used. Please suggest what is the reason for this error ?

 

 

  2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Connected to :9092 for producing 2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Disconnecting from :9092 2017-09-06 16:30:54,654 WARN kafka.client.ClientUtils$: Fetching topic metadata with correlation id 156180 for topics [Set(topic1)] from broker [id:2,host:port:9092] failed java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.  at kafka.utils.Utils$.read(Utils.scala:376)  at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)  at kafka.network.Receive$class.readCompletely(Transmission.scala:56)  at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)  at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)  at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)  at kafka.producer.SyncProducer.send(SyncProducer.scala:112)  at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)  at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)  at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)  at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)  at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)  at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)  at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)  at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)  at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)   Flume Conf :   #Source agent.sources=source agent.sources.source.type = spooldir agent.sources.source.spoolDir=/TestPoolDir agent.sources.source.fileHeader = false agent.sources.source.channels=channel #Channel agent.channels=channel agent.channels.channel.groupId=flume agent.channels.channel.pareAsFlumeEvent=false agent.channels.channel.type=org.apache.flume.channel.kafka.KafkaChannel agent.channels.channel.brokerList=xxxx:9092,xxxx:9092,xxxx:9092 agent.channels.channel.topic=topic1 agent.channels.channel.zookeeperConnect=yyyy:2181,yyyy:2181,yyyy:2181 agent.channels.channel.readSmallestOffset=false agent.channels.channel.kafka.consumer.timeout.ms=1000 agent.channels.channel.kafka.parseAsFlumeEvent=true agent.channels.channel.kafka.consumer.sasl.mechanism = GSSAPI agent.channels.channel.kafka.producer.security.protocol = SASL_PLAINTEXT agent.channels.channel.kafka.producer.sasl.kerberos.service.name = kafka agent.channels.channel.generateKeytabFor = $KERBEROS_PRINCIPAL #Sink agent.sinks=sink agent.sinks.sink.hdfs.fileType=DataStream agent.sinks.sink.hdfs.useLocalTimeStamp=true agent.sinks.sink.hdfs.rollSize=1342 agent.sinks.sink.hdfs.roundValue=1 agent.sinks.sink.hdfs.rollTimerPoolSize=1 agent.sinks.sink.hdfs.path=hdfs://:8020/user/admin agent.sinks.sink.hdfs.maxOpenFiles=5000 agent.sinks.sink.hdfs.batchSize=100 agent.sinks.sink.hdfs.threadsPoolSize=10 agent.sinks.sink.hdfs.round=false agent.sinks.sink.hdfs.rollInterval=0 agent.sinks.sink.hdfs.filePrefix=Location agent.sinks.sink.type=hdfs #agent.sinks.sink.hdfs.path = /user/testflume agent.sinks.sink.hdfs.idleTimeout=0 #agent.sinks.sink.hdfs.fileSuffix=.avro agent.sinks.sink.hdfs.fileSuffix=.txt agent.sinks.sink.hdfs.roundUnit=second agent.sinks.sink.hdfs.inUseSuffix=.tmp agent.sinks.sink.hdfs.retryInterval=180 agent.sinks.sink.serializer=TEXT agent.sinks.sink.channel=channel agent.sinks.sink.hdfs.callTimeout=10000 agent.sinks.sink.hdfs.closeTries=0 agent.sinks.sink.hdfs.rollCount=0 agent.sinks.sink.hdfs.timeZone=Local Time agent.sinks.sink.kafka.consumer.sasl.mechanism = GSSAPI # Security Setup agent.sinks.sink.kafka.producer.security.protocol = SASL_PLAINTEXT agent.sinks.sink.kafka.producer.sasl.kerberos.service.name = kafka agent.sinks.sink.generateKeytabFor = $KERBEROS_PRINCIPAL  

Explorer
Posts: 10
Registered: ‎09-07-2017

Re: Flume is not able to send the events to kafka over kerberoized cluter

can some one update on this ? We are not able to find a solution from few days now.

 

Thanks

deepak

Cloudera Employee
Posts: 198
Registered: ‎01-09-2014

Re: Flume is not able to send the events to kafka over kerberoized cluter

This seems to indicate that your listeners are not set up correctly:
2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Connected to :9092 for producing

In the kafka configuration, are you setting a safety valve for the listeners property? If you are using something like:
listeners=SASL_PLAINTEXT://0.0.0.0:9092

Then that requires that you also set a unique advertised.listeners property with the hostname that clients will connect to (as they can't connect to 0.0.0.0)

Do you need kafka to listen on multiple addresses? If not, then you don't need to specify the listeners safety valve.

-pd
Explorer
Posts: 16
Registered: ‎01-11-2017

Re: Flume is not able to send the events to kafka over kerberoized cluter

Can you upload, Kafka and Flume config?
Cloudera Employee
Posts: 198
Registered: ‎01-09-2014

Re: Flume is not able to send the events to kafka over kerberoized cluter

Just to confirm, what version of CDH are you using?

-pd
Announcements