Support Questions
Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Innovation Accelerator group hub.

HDP 2.5.3.0-37 kafka-storm not compatible with non-HDP Kafka

2.5.3.0-37 kafka-storm added an additional argument (securityProtocol) to the SimpleConsumer constructor call in

this._connections.put(host, new DynamicPartitionConnections.ConnectionInfo(new SimpleConsumer(host.host, host.port, this._config.socketTimeoutMs, this._config.bufferSizeBytes, this._config.clientId, this._config.securityProtocol)));

This constructor does not exist in Apache Kafka trunk SimpleConsumer:

*/
@threadsafe
class SimpleConsumer(val host: String,
                     val port: Int,
                     val soTimeout: Int,
                     val bufferSize: Int,
                     val clientId: String) extends Logging {


I did find reference to this constructor in the last HDP Kafka build 0.8.2.2.3.5.4-5

/**
 * A consumer of kafka messages
 */
@threadsafe
class SimpleConsumer(val host: String,
                     val port: Int,
                     val soTimeout: Int,
                     val bufferSize: Int,
                     val clientId: String,
                     val protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) extends Logging {					


However, this version does not work and it doesn't feel like I should be going back to HDP 2.2.3 for this:

7062 [Thread-14-kafka_spout-executor[2 2]] INFO  o.a.s.k.PartitionManager - Starting Kafka 10.63.80.217 Partition{host=ip:9008, topic=asdf, partition=1} from offset 106867
7063 [Thread-14-kafka_spout-executor[2 2]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing
8315 [Thread-14-kafka_spout-executor[2 2]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: java.lang.IllegalArgumentException
	at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:207) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:136) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.daemon.executor$fn__5192$fn__5207$fn__5238.invoke(executor.clj:651) ~[storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_92]
Caused by: java.lang.IllegalArgumentException
	at java.nio.Buffer.limit(Buffer.java:275) ~[?:1.8.0_92]
	at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?]
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?]
	at scala.collection.immutable.Range.foreach(Range.scala:166) ~[scala-library-2.11.7.jar:?]
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?]
	at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[scala-library-2.11.7.jar:?]
	at kafka.api.TopicData$.readFrom(FetchResponse.scala:97) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?]
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?]
	at scala.collection.immutable.Range.foreach(Range.scala:166) ~[scala-library-2.11.7.jar:?]
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?]
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) ~[scala-library-2.11.7.jar:?]
	at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:151) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:54) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:197) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	... 7 more

Using the previous HDP Storm build (1.0.1.2.5.0.0-1245)that does not have the securityProtocol in SimpleConsumer and a non HDP Kafka .8 dep does work.

Since this effectively makes HDP 2.3.5.0-37 storm-kafka not compatible with a Kafka .8 SimpleConsumer, will there be a new HDP build for Kafka .8 with a SimpleConsumer constructor that includes securityProtocol?

If not, is the expectation that users pull in storm-kafka from Apache in order to work with Kafka .8?

1 ACCEPTED SOLUTION

Explorer

@Kristopher Kane

Apache Kafka's old scala based clients does not support security (SASL/SSL) features. Only new java based clients support security. In HDP, we patched the old scala clients to support security. So HDP 2.5.3.0-37 kafka-storm depends on HDP Kafka. We suggest you to use HDP Kafka for security features.

Yes, If are unable to use HDP Kafka , then you have to use apache artifacts for both Storm-kafka and Kafka ,

View solution in original post

2 REPLIES 2

Explorer

@Kristopher Kane

Apache Kafka's old scala based clients does not support security (SASL/SSL) features. Only new java based clients support security. In HDP, we patched the old scala clients to support security. So HDP 2.5.3.0-37 kafka-storm depends on HDP Kafka. We suggest you to use HDP Kafka for security features.

Yes, If are unable to use HDP Kafka , then you have to use apache artifacts for both Storm-kafka and Kafka ,

Quite a thing to do in a point release.