Created on 02-25-2016 04:55 PM - edited 09-16-2022 03:05 AM
Setting the property
listeners=PLAINTEXT://host1:9092,PLAINTEXT://host2:9092,PLAINTEXT://host3:9092,PLAINTEXT://host4:9092,
Fails with exception
java.lang.IllegalArgumentException: requirement failed: Each listener must have a different port at scala.Predef$.require(Predef.scala:219) at kafka.server.KafkaConfig.validateUniquePortAndProtocol(KafkaConfig.scala:903) at kafka.server.KafkaConfig.getListeners(KafkaConfig.scala:911) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:864) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:698) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:695) at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28) at kafka.Kafka$.main(Kafka.scala:58) at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:76) at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)
As long as each listener is on a different machine, why should " each listener must have a different port"
Created 02-25-2016 05:06 PM
Created on 02-25-2016 05:17 PM - edited 02-25-2016 05:27 PM
If I do that I run into consumer messages like this in a continuous loop
2016-02-25 17:16:25,437 INFO kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1456449328092] Added fetcher for partitions ArrayBuffer() 2016-02-25 17:16:25,440 INFO kafka.utils.VerifiableProperties: Verifying properties 2016-02-25 17:16:25,440 INFO kafka.utils.VerifiableProperties: Property client.id is overridden to SYSTEM_COMMAND_group_VirusTotalSink_cdh1.dev.countertack.com 2016-02-25 17:16:25,440 INFO kafka.utils.VerifiableProperties: Property metadata.broker.list is overridden to 0.0.0.0:9092,0.0.0.0:9092,0.0.0.0:9092,0.0.0.0:9092 2016-02-25 17:16:25,440 INFO kafka.utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000 2016-02-25 17:16:25,440 INFO kafka.client.ClientUtils$: Fetching metadata from broker id:133,host:0.0.0.0,port:9092 with correlation id 70 for 1 topic(s) Set(SYSTEM_COMMAND) 2016-02-25 17:16:25,440 INFO kafka.producer.SyncProducer: Connected to 0.0.0.0:9092 for producing 2016-02-25 17:16:25,440 INFO kafka.producer.SyncProducer: Disconnecting from 0.0.0.0:9092
and the errors look like
2016-02-25 17:08:22,565 WARN kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [SYSTEM_COMMAND_group_CEFSink_cdh1.dev.xxxx.com_cdh1.dev.xxxx.com-1456444748972-5451c7a3-leader-finder-thread], Failed to find leader for Set([SYSTEM_COMMAND,0]) kafka.common.KafkaException: fetching topic metadata for topics [Set(SYSTEM_COMMAND)] from broker [ArrayBuffer(id:135,host:0.0.0.0,port:9092, id:136,host:0.0.0.0,port:9092, id:134,host:0.0.0.0,port:9092)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:454) at sun.nio.ch.Net.connect(Net.java:446) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) ... 3 more
Created 09-07-2017 02:33 AM
Hi harsh
what combination need to be used in case of using kerberos
i am specifying SASL_PLAINTEXT://0.0.0.0:9092 AND ALSO TRIED 9093
kafka brokers are not coming up after that
Created 09-08-2017 08:48 AM
Created 09-08-2017 06:35 PM
Hi PD
Thanks for your response, Unfortunaltly I have tried with Brokers IP's also in the advertised.ports property with the port details But I am getting the below exception everytime when I process the message from Flume to Kafka over kerberos . The below error coming in flume log also below is flume.log used. Please suggest what is the reason for this error ?
2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Connected to <xxxxxx>:9092 for producing
2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Disconnecting from <xxxxxx>:9092
2017-09-06 16:30:54,654 WARN kafka.client.ClientUtils$: Fetching topic metadata with correlation id 156180 for topics [Set(topic1)] from broker [id:2,host:<xxxxxx>port:9092] failed
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
Flume Conf :
#Source
agent.sources=source
agent.sources.source.type = spooldir
agent.sources.source.spoolDir=/TestPoolDir
agent.sources.source.fileHeader = false
agent.sources.source.channels=channel
#Channel
agent.channels=channel
agent.channels.channel.groupId=flume
agent.channels.channel.pareAsFlumeEvent=false
agent.channels.channel.type=org.apache.flume.channel.kafka.KafkaChannel
agent.channels.channel.brokerList=xxxx:9092,xxxx:9092,xxxx:9092
agent.channels.channel.topic=topic1
agent.channels.channel.zookeeperConnect=yyyy:2181,yyyy:2181,yyyy:2181
agent.channels.channel.readSmallestOffset=false
agent.channels.channel.kafka.consumer.timeout.ms=1000
agent.channels.channel.kafka.parseAsFlumeEvent=true
agent.channels.channel.kafka.consumer.sasl.mechanism = GSSAPI
agent.channels.channel.kafka.producer.security.protocol = SASL_PLAINTEXT
agent.channels.channel.kafka.producer.sasl.kerberos.service.name = kafka
agent.channels.channel.generateKeytabFor = $KERBEROS_PRINCIPAL
#Sink
agent.sinks=sink
agent.sinks.sink.hdfs.fileType=DataStream
agent.sinks.sink.hdfs.useLocalTimeStamp=true
agent.sinks.sink.hdfs.rollSize=1342
agent.sinks.sink.hdfs.roundValue=1
agent.sinks.sink.hdfs.rollTimerPoolSize=1
agent.sinks.sink.hdfs.path=hdfs://<Namenode IP>:8020/user/admin
agent.sinks.sink.hdfs.maxOpenFiles=5000
agent.sinks.sink.hdfs.batchSize=100
agent.sinks.sink.hdfs.threadsPoolSize=10
agent.sinks.sink.hdfs.round=false
agent.sinks.sink.hdfs.rollInterval=0
agent.sinks.sink.hdfs.filePrefix=Location
agent.sinks.sink.type=hdfs
#agent.sinks.sink.hdfs.path = /user/testflume
agent.sinks.sink.hdfs.idleTimeout=0
#agent.sinks.sink.hdfs.fileSuffix=.avro
agent.sinks.sink.hdfs.fileSuffix=.txt
agent.sinks.sink.hdfs.roundUnit=second
agent.sinks.sink.hdfs.inUseSuffix=.tmp
agent.sinks.sink.hdfs.retryInterval=180
agent.sinks.sink.serializer=TEXT
agent.sinks.sink.channel=channel
agent.sinks.sink.hdfs.callTimeout=10000
agent.sinks.sink.hdfs.closeTries=0
agent.sinks.sink.hdfs.rollCount=0
agent.sinks.sink.hdfs.timeZone=Local Time
agent.sinks.sink.kafka.consumer.sasl.mechanism = GSSAPI
# Security Setup
agent.sinks.sink.kafka.producer.security.protocol = SASL_PLAINTEXT
agent.sinks.sink.kafka.producer.sasl.kerberos.service.name = kafka
agent.sinks.sink.generateKeytabFor = $KERBEROS_PRINCIPAL
Created 09-14-2017 11:51 PM
Hi
Can you please help to give update on below error ?
Thanks
deepak
Created 09-15-2017 09:18 AM
Created 09-17-2017 08:28 PM
Thanks PD
I can providre you the required details but before that just want to check the product compatability of Flume with Kafka over kerberos ob CDH version lower than 5.7
The below link shows the product limitations of Flume with Kafka over secured transport
https://www.cloudera.com/documentation/kafka/latest/PDF/cloudera-kafka.pdf
on Page 20 , below is the limitation mentioned
FlumeshippedwithCDH5.7andlowercanonlysenddatatoKafka2.0andhigherviaunsecuredtransport. SecurityadditionstoKafka2.0arenotsupportedbyFlumeinCDH5.7(orlowerversions).
We are using CDH 5.5.2
Flume version : 1.6.0-cdh 5.5.2
Kafka : 2.0.1-1.2.0.1.p.0.5
So is that the product limitation that flume will not work on 5.5.2 with flume with Kafka over kerberos.
Please confirm
Thanks
Deepak
Created 09-17-2017 09:17 PM
PD/ Cloudera Team - can you please provide your inputs . We need to take the further steps accordingly if its product limitation.
Appreciate your support
This is Urgent pls.
Thanks
Deepak