Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kafka.properties override for listeners property fails

avatar
Explorer

Setting the property

listeners=PLAINTEXT://host1:9092,PLAINTEXT://host2:9092,PLAINTEXT://host3:9092,PLAINTEXT://host4:9092,

Fails with exception

java.lang.IllegalArgumentException: requirement failed: Each listener must have a different port
	at scala.Predef$.require(Predef.scala:219)
	at kafka.server.KafkaConfig.validateUniquePortAndProtocol(KafkaConfig.scala:903)
	at kafka.server.KafkaConfig.getListeners(KafkaConfig.scala:911)
	at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:864)
	at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:698)
	at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:695)
	at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
	at kafka.Kafka$.main(Kafka.scala:58)
	at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:76)
	at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)

As long as each listener is on a different machine, why should " each listener must have a different port"

14 REPLIES 14

avatar
Mentor
The listener feature is to make Kafka Brokers listen on multiple ports: http://kafka.apache.org/documentation.html#security_configbroker.

Specifying hostnames along with that is just a way of specifying an interface (i.e. if not to be wild-carded). Thereby, the validation that each element's port be different if multiple are specified, is the right thing for it to do.

If you wanted a single port to be listened to globally, simply use PLAINTEXT://0.0.0.0:9092, instead of a multi-host list.

avatar
Explorer

If I do that I run into consumer messages like this in a continuous loop

2016-02-25 17:16:25,437 INFO kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1456449328092] Added fetcher for partitions ArrayBuffer()
2016-02-25 17:16:25,440 INFO kafka.utils.VerifiableProperties: Verifying properties
2016-02-25 17:16:25,440 INFO kafka.utils.VerifiableProperties: Property client.id is overridden to SYSTEM_COMMAND_group_VirusTotalSink_cdh1.dev.countertack.com
2016-02-25 17:16:25,440 INFO kafka.utils.VerifiableProperties: Property metadata.broker.list is overridden to 0.0.0.0:9092,0.0.0.0:9092,0.0.0.0:9092,0.0.0.0:9092
2016-02-25 17:16:25,440 INFO kafka.utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
2016-02-25 17:16:25,440 INFO kafka.client.ClientUtils$: Fetching metadata from broker id:133,host:0.0.0.0,port:9092 with correlation id 70 for 1 topic(s) Set(SYSTEM_COMMAND)
2016-02-25 17:16:25,440 INFO kafka.producer.SyncProducer: Connected to 0.0.0.0:9092 for producing
2016-02-25 17:16:25,440 INFO kafka.producer.SyncProducer: Disconnecting from 0.0.0.0:9092

 and the errors look like

2016-02-25 17:08:22,565 WARN kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [SYSTEM_COMMAND_group_CEFSink_cdh1.dev.xxxx.com_cdh1.dev.xxxx.com-1456444748972-5451c7a3-leader-finder-thread], Failed to find leader for Set([SYSTEM_COMMAND,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(SYSTEM_COMMAND)] from broker [ArrayBuffer(id:135,host:0.0.0.0,port:9092, id:136,host:0.0.0.0,port:9092, id:134,host:0.0.0.0,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:454)
        at sun.nio.ch.Net.connect(Net.java:446)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        ... 3 more

avatar

Hi harsh

 

what combination need to be used in case of using kerberos 

 

i am specifying  SASL_PLAINTEXT://0.0.0.0:9092 AND ALSO TRIED 9093

 

kafka brokers are not coming up after that

avatar
If you are using 0.0.0.0 for listeners, you'll also need an advertised.listeners safety valve to specify what host:port the clients should connect to, as they can't connect to 0.0.0.0. Generally, this is done with external/internal DNS.

-pd

avatar

Hi PD

 

Thanks for your response, Unfortunaltly I have tried with Brokers IP's also in the advertised.ports property with the port details But I am getting the below exception everytime when I process the message from Flume to Kafka over kerberos . The below error coming in flume log also below is flume.log used. Please suggest what is the reason for this error ?

 

 

2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Connected to <xxxxxx>:9092 for producing
2017-09-06 16:30:54,654 INFO kafka.producer.SyncProducer: Disconnecting from <xxxxxx>:9092
2017-09-06 16:30:54,654 WARN kafka.client.ClientUtils$: Fetching topic metadata with correlation id 156180 for topics [Set(topic1)] from broker [id:2,host:<xxxxxx>port:9092] failed
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
 at kafka.utils.Utils$.read(Utils.scala:376)
 at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
 at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
 at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
 at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
 at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
 at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
 at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
 at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
 at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
 at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
 at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
 at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
 at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
 at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)

 

Flume Conf :

 

#Source
agent.sources=source
agent.sources.source.type = spooldir
agent.sources.source.spoolDir=/TestPoolDir
agent.sources.source.fileHeader = false
agent.sources.source.channels=channel
#Channel
agent.channels=channel
agent.channels.channel.groupId=flume
agent.channels.channel.pareAsFlumeEvent=false
agent.channels.channel.type=org.apache.flume.channel.kafka.KafkaChannel
agent.channels.channel.brokerList=xxxx:9092,xxxx:9092,xxxx:9092
agent.channels.channel.topic=topic1
agent.channels.channel.zookeeperConnect=yyyy:2181,yyyy:2181,yyyy:2181
agent.channels.channel.readSmallestOffset=false
agent.channels.channel.kafka.consumer.timeout.ms=1000
agent.channels.channel.kafka.parseAsFlumeEvent=true
agent.channels.channel.kafka.consumer.sasl.mechanism = GSSAPI
agent.channels.channel.kafka.producer.security.protocol = SASL_PLAINTEXT
agent.channels.channel.kafka.producer.sasl.kerberos.service.name = kafka
agent.channels.channel.generateKeytabFor = $KERBEROS_PRINCIPAL

#Sink
agent.sinks=sink
agent.sinks.sink.hdfs.fileType=DataStream
agent.sinks.sink.hdfs.useLocalTimeStamp=true
agent.sinks.sink.hdfs.rollSize=1342
agent.sinks.sink.hdfs.roundValue=1
agent.sinks.sink.hdfs.rollTimerPoolSize=1
agent.sinks.sink.hdfs.path=hdfs://<Namenode IP>:8020/user/admin
agent.sinks.sink.hdfs.maxOpenFiles=5000
agent.sinks.sink.hdfs.batchSize=100
agent.sinks.sink.hdfs.threadsPoolSize=10
agent.sinks.sink.hdfs.round=false
agent.sinks.sink.hdfs.rollInterval=0
agent.sinks.sink.hdfs.filePrefix=Location
agent.sinks.sink.type=hdfs
#agent.sinks.sink.hdfs.path = /user/testflume
agent.sinks.sink.hdfs.idleTimeout=0
#agent.sinks.sink.hdfs.fileSuffix=.avro
agent.sinks.sink.hdfs.fileSuffix=.txt
agent.sinks.sink.hdfs.roundUnit=second
agent.sinks.sink.hdfs.inUseSuffix=.tmp
agent.sinks.sink.hdfs.retryInterval=180
agent.sinks.sink.serializer=TEXT
agent.sinks.sink.channel=channel
agent.sinks.sink.hdfs.callTimeout=10000
agent.sinks.sink.hdfs.closeTries=0
agent.sinks.sink.hdfs.rollCount=0
agent.sinks.sink.hdfs.timeZone=Local Time
agent.sinks.sink.kafka.consumer.sasl.mechanism = GSSAPI

# Security Setup
agent.sinks.sink.kafka.producer.security.protocol = SASL_PLAINTEXT
agent.sinks.sink.kafka.producer.sasl.kerberos.service.name = kafka
agent.sinks.sink.generateKeytabFor = $KERBEROS_PRINCIPAL

 

avatar

Hi

 

Can you please help to give update on below error ?

 

Thanks

deepak

avatar
You need to provide your broker config as well. Are you seeing any errors on the broker logs? Are you able to use kafka-console-consumer and kafka-console-producer to send messages?

-pd

avatar

Thanks PD 

 

I can providre you the required details but before that just want to check the product compatability of Flume with Kafka over kerberos ob CDH version lower than 5.7

 

The below link shows the product limitations of Flume with Kafka over secured transport

 

https://www.cloudera.com/documentation/kafka/latest/PDF/cloudera-kafka.pdf

 

on Page 20 , below is the limitation mentioned

 

FlumeshippedwithCDH5.7andlowercanonlysenddatatoKafka2.0andhigherviaunsecuredtransport. SecurityadditionstoKafka2.0arenotsupportedbyFlumeinCDH5.7(orlowerversions).

 

We are using CDH 5.5.2

Flume  version : 1.6.0-cdh 5.5.2

Kafka : 2.0.1-1.2.0.1.p.0.5

 

So is that the product limitation that flume will not work on 5.5.2 with flume with Kafka over kerberos.

 

Please confirm

 

Thanks

Deepak

 

 

 

 

avatar

PD/ Cloudera Team - can you please provide your inputs . We need to take the further steps accordingly if its product limitation.

 

Appreciate your support 

 

This is Urgent pls.

 

Thanks

Deepak