- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
ClosedChannelException - Kafka Spout cannot read kafka broker endpoint out of zookeeper
Created ‎11-15-2016 02:46 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I get an ClosedChannelException when I try to read kafka topics from my storm spout.
Storm java snipped:
BrokerHosts hosts = new ZkHosts(zkHostString, "/brokers"); SpoutConfig spoutConfig = new SpoutConfig(hosts, "testTopic", "/myOffsets/testTopic", "myGroup"); spoutConfig.securityProtocol = "PLAINTEXTSASL"; KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
In the logs you can see, that no GlobalPartitionInformation can be found.
2016-11-14 21:43:59.846 o.a.s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=testTopic, partitionMap={0=null:-1, 1=null:-1}}
In my opinion, that is because of the information in zookeeper are stored in the endpoint instead of HOST and PORT fields.
[zk: zk1370:2181(CONNECTED) 10] get /brokers/ids/1001 {"jmx_port":-1,"timestamp":"1477657407466","endpoints":["SASL_PLAINTEXT://broker0190:6667","SASL_SSL://broker0190:6668"],"host":null,"version":3,"port":-1}
[zk: zk1370:2181(CONNECTED) 9] get /brokers/ids/1002 {"jmx_port":-1,"timestamp":"1477658692052","endpoints":["SASL_PLAINTEXT://broker0210:6667","SASL_SSL://broker0210:6668"],"host":null,"version":3,"port":-1}
[zk: zk1370:2181(CONNECTED) 12] get /brokers/topics/testTopic {"version":1,"partitions":{"1":[1001,1002],"0":[1002,1001]}}
The odd thing is, on a non-kerberos cluster these values are not set to null but to the proper hostnames and port.
See also this similar question: Getting Kafka Consumer offsets - getting null for both kafka-consumer-groups.sh and kafka-consumer-o...
Therefore my question:
Why are the host and port values in a kerberized cluster NULL and what is the best practice to use the endpoint information for my kafka spout?
Created ‎11-26-2016 07:00 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I believe we have resolved this BUG.
In kerberized cluster Storm-kafka spout will get the host and port values from the endpoints and the null values were due to a bug.
Created ‎11-26-2016 07:00 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I believe we have resolved this BUG.
In kerberized cluster Storm-kafka spout will get the host and port values from the endpoints and the null values were due to a bug.
Created ‎10-01-2018 07:56 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am still seeing this error in HDP 2.6.4.0? Any workaround?
Created ‎12-12-2016 09:03 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
We have kerberized cluster with the latest HDP-2.5.3.0 (kafka 0.10.0 and ZooKeeper 3.4.6) and we still get host null, port null
[zk: localhost:2181(CONNECTED) 0] get /brokers/ids/1003 {"jmx_port":-1,"timestamp":"1481302072354","endpoints":["SASL_PLAINTEXT://hdw2dn2.infovidematrix.pl:6667"],"host":null,"version":3,"port":-1} cZxid = 0xf00000bcf ctime = Fri Dec 09 17:47:52 CET 2016 mZxid = 0xf00000bcf mtime = Fri Dec 09 17:47:52 CET 2016 pZxid = 0xf00000bcf cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x158e42671f40014 dataLength = 141 numChildren = 0 [zk: localhost:2181(CONNECTED) 1]
Created ‎01-04-2017 08:53 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Zookeeper will have these values as null and -1, But the storm topology will parse the listener :
SASL_PLAINTEXT://hdw2dn2.infovidematrix.pl:6667
to get broker hostname and port and this is expected. The bug i mentioned above as the spout was unable to parse the listener and get the hostname and port. This is fixed in the HDP version you are using. (HDP 2.5.3)
Created ‎10-01-2018 07:09 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am still seeing this error with HDP
2.6.4.0.. Is there a work around?
Created ‎01-03-2017 10:27 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
This typical problem in the case where pom.xml does not contain the correct version jar. You must have the same version of the libraries in the pom.xml application that is in a cluster hadoop. etc:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>0.10.0.2.4.2.0-258</storm.version>
<storm.kafka.version>0.10.0.2.4.2.0-258</storm.kafka.version>
<kafka.version>0.9.0.2.4.2.0-258</kafka.version>
<hadoop.version>2.7.1.2.4.2.0-258</hadoop.version>
<hbase.version>1.1.2.2.4.2.0-258</hbase.version>
<log4j.version>1.2.17</log4j.version>
<storm.hdfs.version>0.10.0.2.4.2.0-258</storm.hdfs.version>
<hive.version>1.2.1000.2.4.2.0-258</hive.version> </properties>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
<!-- Storm Dependencies --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- Storm Kafka Dependencies --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm.kafka.version}</version> </dependency>
