Created 01-06-2017 04:50 AM
2.5.3.0-37 kafka-storm added an additional argument (securityProtocol) to the SimpleConsumer constructor call in
this._connections.put(host, new DynamicPartitionConnections.ConnectionInfo(new SimpleConsumer(host.host, host.port, this._config.socketTimeoutMs, this._config.bufferSizeBytes, this._config.clientId, this._config.securityProtocol)));
This constructor does not exist in Apache Kafka trunk SimpleConsumer:
*/ @threadsafe class SimpleConsumer(val host: String, val port: Int, val soTimeout: Int, val bufferSize: Int, val clientId: String) extends Logging {
I did find reference to this constructor in the last HDP Kafka build 0.8.2.2.3.5.4-5
/** * A consumer of kafka messages */ @threadsafe class SimpleConsumer(val host: String, val port: Int, val soTimeout: Int, val bufferSize: Int, val clientId: String, val protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) extends Logging {
However, this version does not work and it doesn't feel like I should be going back to HDP 2.2.3 for this:
7062 [Thread-14-kafka_spout-executor[2 2]] INFO o.a.s.k.PartitionManager - Starting Kafka 10.63.80.217 Partition{host=ip:9008, topic=asdf, partition=1} from offset 106867 7063 [Thread-14-kafka_spout-executor[2 2]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing 8315 [Thread-14-kafka_spout-executor[2 2]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.IllegalArgumentException at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:207) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:136) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.daemon.executor$fn__5192$fn__5207$fn__5238.invoke(executor.clj:651) ~[storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_92] Caused by: java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) ~[?:1.8.0_92] at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?] at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?] at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?] at scala.collection.immutable.Range.foreach(Range.scala:166) ~[scala-library-2.11.7.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[scala-library-2.11.7.jar:?] at kafka.api.TopicData$.readFrom(FetchResponse.scala:97) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?] at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?] at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?] at scala.collection.immutable.Range.foreach(Range.scala:166) ~[scala-library-2.11.7.jar:?] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?] at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) ~[scala-library-2.11.7.jar:?] at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:151) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:54) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:197) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] ... 7 more
Using the previous HDP Storm build (1.0.1.2.5.0.0-1245)that does not have the securityProtocol in SimpleConsumer and a non HDP Kafka .8 dep does work.
Since this effectively makes HDP 2.3.5.0-37 storm-kafka not compatible with a Kafka .8 SimpleConsumer, will there be a new HDP build for Kafka .8 with a SimpleConsumer constructor that includes securityProtocol?
If not, is the expectation that users pull in storm-kafka from Apache in order to work with Kafka .8?
Created 01-11-2017 09:06 AM
Apache Kafka's old scala based clients does not support security (SASL/SSL) features. Only new java based clients support security. In HDP, we patched the old scala clients to support security. So HDP 2.5.3.0-37 kafka-storm depends on HDP Kafka. We suggest you to use HDP Kafka for security features.
Yes, If are unable to use HDP Kafka , then you have to use apache artifacts for both Storm-kafka and Kafka ,
Created 01-11-2017 09:06 AM
Apache Kafka's old scala based clients does not support security (SASL/SSL) features. Only new java based clients support security. In HDP, we patched the old scala clients to support security. So HDP 2.5.3.0-37 kafka-storm depends on HDP Kafka. We suggest you to use HDP Kafka for security features.
Yes, If are unable to use HDP Kafka , then you have to use apache artifacts for both Storm-kafka and Kafka ,
Created 01-17-2017 03:19 PM
Quite a thing to do in a point release.