Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

HDP 2.5.3.0-37 kafka-storm not compatible with non-HDP Kafka

avatar

2.5.3.0-37 kafka-storm added an additional argument (securityProtocol) to the SimpleConsumer constructor call in

this._connections.put(host, new DynamicPartitionConnections.ConnectionInfo(new SimpleConsumer(host.host, host.port, this._config.socketTimeoutMs, this._config.bufferSizeBytes, this._config.clientId, this._config.securityProtocol)));

This constructor does not exist in Apache Kafka trunk SimpleConsumer:

*/
@threadsafe
class SimpleConsumer(val host: String,
                     val port: Int,
                     val soTimeout: Int,
                     val bufferSize: Int,
                     val clientId: String) extends Logging {


I did find reference to this constructor in the last HDP Kafka build 0.8.2.2.3.5.4-5

/**
 * A consumer of kafka messages
 */
@threadsafe
class SimpleConsumer(val host: String,
                     val port: Int,
                     val soTimeout: Int,
                     val bufferSize: Int,
                     val clientId: String,
                     val protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) extends Logging {					


However, this version does not work and it doesn't feel like I should be going back to HDP 2.2.3 for this:

7062 [Thread-14-kafka_spout-executor[2 2]] INFO  o.a.s.k.PartitionManager - Starting Kafka 10.63.80.217 Partition{host=ip:9008, topic=asdf, partition=1} from offset 106867
7063 [Thread-14-kafka_spout-executor[2 2]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing
8315 [Thread-14-kafka_spout-executor[2 2]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: java.lang.IllegalArgumentException
	at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:207) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:136) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.daemon.executor$fn__5192$fn__5207$fn__5238.invoke(executor.clj:651) ~[storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_92]
Caused by: java.lang.IllegalArgumentException
	at java.nio.Buffer.limit(Buffer.java:275) ~[?:1.8.0_92]
	at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?]
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?]
	at scala.collection.immutable.Range.foreach(Range.scala:166) ~[scala-library-2.11.7.jar:?]
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?]
	at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[scala-library-2.11.7.jar:?]
	at kafka.api.TopicData$.readFrom(FetchResponse.scala:97) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?]
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?]
	at scala.collection.immutable.Range.foreach(Range.scala:166) ~[scala-library-2.11.7.jar:?]
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?]
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) ~[scala-library-2.11.7.jar:?]
	at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:151) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:54) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
	at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:197) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
	... 7 more

Using the previous HDP Storm build (1.0.1.2.5.0.0-1245)that does not have the securityProtocol in SimpleConsumer and a non HDP Kafka .8 dep does work.

Since this effectively makes HDP 2.3.5.0-37 storm-kafka not compatible with a Kafka .8 SimpleConsumer, will there be a new HDP build for Kafka .8 with a SimpleConsumer constructor that includes securityProtocol?

If not, is the expectation that users pull in storm-kafka from Apache in order to work with Kafka .8?

1 ACCEPTED SOLUTION

avatar
New Member

@Kristopher Kane

Apache Kafka's old scala based clients does not support security (SASL/SSL) features. Only new java based clients support security. In HDP, we patched the old scala clients to support security. So HDP 2.5.3.0-37 kafka-storm depends on HDP Kafka. We suggest you to use HDP Kafka for security features.

Yes, If are unable to use HDP Kafka , then you have to use apache artifacts for both Storm-kafka and Kafka ,

View solution in original post

2 REPLIES 2

avatar
New Member

@Kristopher Kane

Apache Kafka's old scala based clients does not support security (SASL/SSL) features. Only new java based clients support security. In HDP, we patched the old scala clients to support security. So HDP 2.5.3.0-37 kafka-storm depends on HDP Kafka. We suggest you to use HDP Kafka for security features.

Yes, If are unable to use HDP Kafka , then you have to use apache artifacts for both Storm-kafka and Kafka ,

avatar

Quite a thing to do in a point release.