Member since
09-07-2017
13
Posts
0
Kudos Received
0
Solutions
09-19-2017
02:59 AM
I want to know why the current version is not suppoirting and what change has been done to make it work ?
... View more
09-19-2017
02:59 AM
Thanks PD for the confirmation Appreciate that can you please let me know technically what change we are going to do in Flume or Kafka side to make it work over kerberos on versions > 5.7
... View more
09-18-2017
04:35 AM
Cloudera Team can I get the confirmation on this Product Limitations pls. thanks Deepak
... View more
09-17-2017
09:17 PM
PD/ Cloudera Team - can you please provide your inputs . We need to take the further steps accordingly if its product limitation. Appreciate your support This is Urgent pls. Thanks Deepak
... View more
09-17-2017
08:28 PM
Thanks PD I can providre you the required details but before that just want to check the product compatability of Flume with Kafka over kerberos ob CDH version lower than 5.7 The below link shows the product limitations of Flume with Kafka over secured transport https://www.cloudera.com/documentation/kafka/latest/PDF/cloudera-kafka.pdf on Page 20 , below is the limitation mentioned FlumeshippedwithCDH5.7andlowercanonlysenddatatoKafka2.0andhigherviaunsecuredtransport. SecurityadditionstoKafka2.0arenotsupportedbyFlumeinCDH5.7(orlowerversions). We are using CDH 5.5.2 Flume version : 1.6.0-cdh 5.5.2 Kafka : 2.0.1-1.2.0.1.p.0.5 So is that the product limitation that flume will not work on 5.5.2 with flume with Kafka over kerberos. Please confirm Thanks Deepak
... View more
09-14-2017
11:51 PM
Hi Can you please help to give update on below error ? Thanks deepak
... View more
09-08-2017
06:35 PM
Hi PD Thanks for your response, Unfortunaltly I have tried with Brokers IP's also in the advertised.ports property with the port details But I am getting the below exception everytime when I process the message from Flume to Kafka over kerberos . The below error coming in flume log also below is flume.log used. Please suggest what is the reason for this error ? 2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Connected to <xxxxxx>:9092 for producing 2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Disconnecting from <xxxxxx>:9092 2017-09-06 16:30:54,654 WARN kafka.client.ClientUtils$: Fetching topic metadata with correlation id 156180 for topics [Set(topic1)] from broker [id:2,host:<xxxxxx>port:9092] failed java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) Flume Conf : #Source agent.sources=source agent.sources.source.type = spooldir agent.sources.source.spoolDir=/TestPoolDir agent.sources.source.fileHeader = false agent.sources.source.channels=channel #Channel agent.channels=channel agent.channels.channel.groupId=flume agent.channels.channel.pareAsFlumeEvent=false agent.channels.channel.type=org.apache.flume.channel.kafka.KafkaChannel agent.channels.channel.brokerList=xxxx:9092,xxxx:9092,xxxx:9092 agent.channels.channel.topic=topic1 agent.channels.channel.zookeeperConnect=yyyy:2181,yyyy:2181,yyyy:2181 agent.channels.channel.readSmallestOffset=false agent.channels.channel.kafka.consumer.timeout.ms=1000 agent.channels.channel.kafka.parseAsFlumeEvent=true agent.channels.channel.kafka.consumer.sasl.mechanism = GSSAPI agent.channels.channel.kafka.producer.security.protocol = SASL_PLAINTEXT agent.channels.channel.kafka.producer.sasl.kerberos.service.name = kafka agent.channels.channel.generateKeytabFor = $KERBEROS_PRINCIPAL #Sink agent.sinks=sink agent.sinks.sink.hdfs.fileType=DataStream agent.sinks.sink.hdfs.useLocalTimeStamp=true agent.sinks.sink.hdfs.rollSize=1342 agent.sinks.sink.hdfs.roundValue=1 agent.sinks.sink.hdfs.rollTimerPoolSize=1 agent.sinks.sink.hdfs.path=hdfs://<Namenode IP>:8020/user/admin agent.sinks.sink.hdfs.maxOpenFiles=5000 agent.sinks.sink.hdfs.batchSize=100 agent.sinks.sink.hdfs.threadsPoolSize=10 agent.sinks.sink.hdfs.round=false agent.sinks.sink.hdfs.rollInterval=0 agent.sinks.sink.hdfs.filePrefix=Location agent.sinks.sink.type=hdfs #agent.sinks.sink.hdfs.path = /user/testflume agent.sinks.sink.hdfs.idleTimeout=0 #agent.sinks.sink.hdfs.fileSuffix=.avro agent.sinks.sink.hdfs.fileSuffix=.txt agent.sinks.sink.hdfs.roundUnit=second agent.sinks.sink.hdfs.inUseSuffix=.tmp agent.sinks.sink.hdfs.retryInterval=180 agent.sinks.sink.serializer=TEXT agent.sinks.sink.channel=channel agent.sinks.sink.hdfs.callTimeout=10000 agent.sinks.sink.hdfs.closeTries=0 agent.sinks.sink.hdfs.rollCount=0 agent.sinks.sink.hdfs.timeZone=Local Time agent.sinks.sink.kafka.consumer.sasl.mechanism = GSSAPI # Security Setup agent.sinks.sink.kafka.producer.security.protocol = SASL_PLAINTEXT agent.sinks.sink.kafka.producer.sasl.kerberos.service.name = kafka agent.sinks.sink.generateKeytabFor = $KERBEROS_PRINCIPAL
... View more
09-07-2017
02:33 AM
Hi harsh what combination need to be used in case of using kerberos i am specifying SASL_PLAINTEXT://0.0.0.0:9092 AND ALSO TRIED 9093 kafka brokers are not coming up after that
... View more