Created 09-15-2017 01:20 AM
Hello I am getting the below exception everytime when I process the message from Flume to Kafka over kerberos . The below error coming in flume log also below is flume.log used. Please suggest what is the reason for this error ?
2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Connected to :9092 for producing 2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Disconnecting from :9092 2017-09-06 16:30:54,654 WARN kafka.client.ClientUtils$: Fetching topic metadata with correlation id 156180 for topics [Set(topic1)] from broker [id:2,host:port:9092] failed java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) Flume Conf : #Source agent.sources=source agent.sources.source.type = spooldir agent.sources.source.spoolDir=/TestPoolDir agent.sources.source.fileHeader = false agent.sources.source.channels=channel #Channel agent.channels=channel agent.channels.channel.groupId=flume agent.channels.channel.pareAsFlumeEvent=false agent.channels.channel.type=org.apache.flume.channel.kafka.KafkaChannel agent.channels.channel.brokerList=xxxx:9092,xxxx:9092,xxxx:9092 agent.channels.channel.topic=topic1 agent.channels.channel.zookeeperConnect=yyyy:2181,yyyy:2181,yyyy:2181 agent.channels.channel.readSmallestOffset=false agent.channels.channel.kafka.consumer.timeout.ms=1000 agent.channels.channel.kafka.parseAsFlumeEvent=true agent.channels.channel.kafka.consumer.sasl.mechanism = GSSAPI agent.channels.channel.kafka.producer.security.protocol = SASL_PLAINTEXT agent.channels.channel.kafka.producer.sasl.kerberos.service.name = kafka agent.channels.channel.generateKeytabFor = $KERBEROS_PRINCIPAL #Sink agent.sinks=sink agent.sinks.sink.hdfs.fileType=DataStream agent.sinks.sink.hdfs.useLocalTimeStamp=true agent.sinks.sink.hdfs.rollSize=1342 agent.sinks.sink.hdfs.roundValue=1 agent.sinks.sink.hdfs.rollTimerPoolSize=1 agent.sinks.sink.hdfs.path=hdfs://:8020/user/admin agent.sinks.sink.hdfs.maxOpenFiles=5000 agent.sinks.sink.hdfs.batchSize=100 agent.sinks.sink.hdfs.threadsPoolSize=10 agent.sinks.sink.hdfs.round=false agent.sinks.sink.hdfs.rollInterval=0 agent.sinks.sink.hdfs.filePrefix=Location agent.sinks.sink.type=hdfs #agent.sinks.sink.hdfs.path = /user/testflume agent.sinks.sink.hdfs.idleTimeout=0 #agent.sinks.sink.hdfs.fileSuffix=.avro agent.sinks.sink.hdfs.fileSuffix=.txt agent.sinks.sink.hdfs.roundUnit=second agent.sinks.sink.hdfs.inUseSuffix=.tmp agent.sinks.sink.hdfs.retryInterval=180 agent.sinks.sink.serializer=TEXT agent.sinks.sink.channel=channel agent.sinks.sink.hdfs.callTimeout=10000 agent.sinks.sink.hdfs.closeTries=0 agent.sinks.sink.hdfs.rollCount=0 agent.sinks.sink.hdfs.timeZone=Local Time agent.sinks.sink.kafka.consumer.sasl.mechanism = GSSAPI # Security Setup agent.sinks.sink.kafka.producer.security.protocol = SASL_PLAINTEXT agent.sinks.sink.kafka.producer.sasl.kerberos.service.name = kafka agent.sinks.sink.generateKeytabFor = $KERBEROS_PRINCIPAL
Created 09-15-2017 03:34 AM
can some one update on this ? We are not able to find a solution from few days now.
Thanks
deepak
Created 09-20-2017 05:24 AM
Created 09-20-2017 12:21 PM
Created 09-15-2017 08:56 AM