Created 11-15-2016 02:46 PM
I get an ClosedChannelException when I try to read kafka topics from my storm spout.
Storm java snipped:
BrokerHosts hosts = new ZkHosts(zkHostString, "/brokers"); SpoutConfig spoutConfig = new SpoutConfig(hosts, "testTopic", "/myOffsets/testTopic", "myGroup"); spoutConfig.securityProtocol = "PLAINTEXTSASL"; KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
In the logs you can see, that no GlobalPartitionInformation can be found.
2016-11-14 21:43:59.846 o.a.s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=testTopic, partitionMap={0=null:-1, 1=null:-1}}
In my opinion, that is because of the information in zookeeper are stored in the endpoint instead of HOST and PORT fields.
[zk: zk1370:2181(CONNECTED) 10] get /brokers/ids/1001 {"jmx_port":-1,"timestamp":"1477657407466","endpoints":["SASL_PLAINTEXT://broker0190:6667","SASL_SSL://broker0190:6668"],"host":null,"version":3,"port":-1}
[zk: zk1370:2181(CONNECTED) 9] get /brokers/ids/1002 {"jmx_port":-1,"timestamp":"1477658692052","endpoints":["SASL_PLAINTEXT://broker0210:6667","SASL_SSL://broker0210:6668"],"host":null,"version":3,"port":-1}
[zk: zk1370:2181(CONNECTED) 12] get /brokers/topics/testTopic {"version":1,"partitions":{"1":[1001,1002],"0":[1002,1001]}}
The odd thing is, on a non-kerberos cluster these values are not set to null but to the proper hostnames and port.
See also this similar question: Getting Kafka Consumer offsets - getting null for both kafka-consumer-groups.sh and kafka-consumer-o...
Therefore my question:
Why are the host and port values in a kerberized cluster NULL and what is the best practice to use the endpoint information for my kafka spout?
Created 11-26-2016 07:00 PM
I believe we have resolved this BUG.
In kerberized cluster Storm-kafka spout will get the host and port values from the endpoints and the null values were due to a bug.
Created 11-26-2016 07:00 PM
I believe we have resolved this BUG.
In kerberized cluster Storm-kafka spout will get the host and port values from the endpoints and the null values were due to a bug.
Created 10-01-2018 07:56 PM
I am still seeing this error in HDP 2.6.4.0? Any workaround?
Created 12-12-2016 09:03 AM
We have kerberized cluster with the latest HDP-2.5.3.0 (kafka 0.10.0 and ZooKeeper 3.4.6) and we still get host null, port null
[zk: localhost:2181(CONNECTED) 0] get /brokers/ids/1003 {"jmx_port":-1,"timestamp":"1481302072354","endpoints":["SASL_PLAINTEXT://hdw2dn2.infovidematrix.pl:6667"],"host":null,"version":3,"port":-1} cZxid = 0xf00000bcf ctime = Fri Dec 09 17:47:52 CET 2016 mZxid = 0xf00000bcf mtime = Fri Dec 09 17:47:52 CET 2016 pZxid = 0xf00000bcf cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x158e42671f40014 dataLength = 141 numChildren = 0 [zk: localhost:2181(CONNECTED) 1]
Created 01-04-2017 08:53 AM
Zookeeper will have these values as null and -1, But the storm topology will parse the listener :
SASL_PLAINTEXT://hdw2dn2.infovidematrix.pl:6667
to get broker hostname and port and this is expected. The bug i mentioned above as the spout was unable to parse the listener and get the hostname and port. This is fixed in the HDP version you are using. (HDP 2.5.3)
Created 10-01-2018 07:09 PM
I am still seeing this error with HDP
2.6.4.0.. Is there a work around?
Created 01-03-2017 10:27 PM
This typical problem in the case where pom.xml does not contain the correct version jar. You must have the same version of the libraries in the pom.xml application that is in a cluster hadoop. etc:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>0.10.0.2.4.2.0-258</storm.version>
<storm.kafka.version>0.10.0.2.4.2.0-258</storm.kafka.version>
<kafka.version>0.9.0.2.4.2.0-258</kafka.version>
<hadoop.version>2.7.1.2.4.2.0-258</hadoop.version>
<hbase.version>1.1.2.2.4.2.0-258</hbase.version>
<log4j.version>1.2.17</log4j.version>
<storm.hdfs.version>0.10.0.2.4.2.0-258</storm.hdfs.version>
<hive.version>1.2.1000.2.4.2.0-258</hive.version> </properties>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
<!-- Storm Dependencies --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- Storm Kafka Dependencies --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm.kafka.version}</version> </dependency>