Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

ClosedChannelException - Kafka Spout cannot read kafka broker endpoint out of zookeeper

avatar
New Contributor

I get an ClosedChannelException when I try to read kafka topics from my storm spout.

Storm java snipped:

BrokerHosts hosts = new ZkHosts(zkHostString, "/brokers");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "testTopic", "/myOffsets/testTopic", "myGroup");
spoutConfig.securityProtocol = "PLAINTEXTSASL";
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

In the logs you can see, that no GlobalPartitionInformation can be found.

2016-11-14 21:43:59.846 o.a.s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=testTopic, partitionMap={0=null:-1, 1=null:-1}} 

In my opinion, that is because of the information in zookeeper are stored in the endpoint instead of HOST and PORT fields.

[zk: zk1370:2181(CONNECTED) 10] get /brokers/ids/1001
{"jmx_port":-1,"timestamp":"1477657407466","endpoints":["SASL_PLAINTEXT://broker0190:6667","SASL_SSL://broker0190:6668"],"host":null,"version":3,"port":-1}
[zk: zk1370:2181(CONNECTED) 9] get /brokers/ids/1002
{"jmx_port":-1,"timestamp":"1477658692052","endpoints":["SASL_PLAINTEXT://broker0210:6667","SASL_SSL://broker0210:6668"],"host":null,"version":3,"port":-1}
[zk: zk1370:2181(CONNECTED) 12] get /brokers/topics/testTopic
{"version":1,"partitions":{"1":[1001,1002],"0":[1002,1001]}}

The odd thing is, on a non-kerberos cluster these values are not set to null but to the proper hostnames and port.

See also this similar question: Getting Kafka Consumer offsets - getting null for both kafka-consumer-groups.sh and kafka-consumer-o...

Therefore my question:

Why are the host and port values in a kerberized cluster NULL and what is the best practice to use the endpoint information for my kafka spout?

1 ACCEPTED SOLUTION

avatar
@Nico Inhoffen

I believe we have resolved this BUG.

In kerberized cluster Storm-kafka spout will get the host and port values from the endpoints and the null values were due to a bug.

View solution in original post

6 REPLIES 6

avatar
@Nico Inhoffen

I believe we have resolved this BUG.

In kerberized cluster Storm-kafka spout will get the host and port values from the endpoints and the null values were due to a bug.

avatar
Contributor

I am still seeing this error in HDP 2.6.4.0? Any workaround?

avatar
New Contributor

We have kerberized cluster with the latest HDP-2.5.3.0 (kafka 0.10.0 and ZooKeeper 3.4.6) and we still get host null, port null

[zk: localhost:2181(CONNECTED) 0] get /brokers/ids/1003 {"jmx_port":-1,"timestamp":"1481302072354","endpoints":["SASL_PLAINTEXT://hdw2dn2.infovidematrix.pl:6667"],"host":null,"version":3,"port":-1} cZxid = 0xf00000bcf ctime = Fri Dec 09 17:47:52 CET 2016 mZxid = 0xf00000bcf mtime = Fri Dec 09 17:47:52 CET 2016 pZxid = 0xf00000bcf cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x158e42671f40014 dataLength = 141 numChildren = 0 [zk: localhost:2181(CONNECTED) 1]

avatar
@Maciej Bogucki

Zookeeper will have these values as null and -1, But the storm topology will parse the listener :

SASL_PLAINTEXT://hdw2dn2.infovidematrix.pl:6667

to get broker hostname and port and this is expected. The bug i mentioned above as the spout was unable to parse the listener and get the hostname and port. This is fixed in the HDP version you are using. (HDP 2.5.3)

avatar
Contributor

I am still seeing this error with HDP

2.6.4.0.. Is there a work around?

avatar
New Contributor

This typical problem in the case where pom.xml does not contain the correct version jar. You must have the same version of the libraries in the pom.xml application that is in a cluster hadoop. etc:

<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<storm.version>0.10.0.2.4.2.0-258</storm.version>

<storm.kafka.version>0.10.0.2.4.2.0-258</storm.kafka.version>

<kafka.version>0.9.0.2.4.2.0-258</kafka.version>

<hadoop.version>2.7.1.2.4.2.0-258</hadoop.version>

<hbase.version>1.1.2.2.4.2.0-258</hbase.version>

<log4j.version>1.2.17</log4j.version>

<storm.hdfs.version>0.10.0.2.4.2.0-258</storm.hdfs.version>

<hive.version>1.2.1000.2.4.2.0-258</hive.version> </properties>

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>

<!-- Storm Dependencies --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- Storm Kafka Dependencies --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm.kafka.version}</version> </dependency>