Member since
12-16-2016
6
Posts
0
Kudos Received
0
Solutions
01-17-2017
03:19 PM
Quite a thing to do in a point release.
... View more
01-06-2017
04:55 AM
Yes, I was wrong about securityProtocol and fixated on it too long. We altered cluster level worker.childopts and our ZK connections are running as plaintext. However, now we have noticed that the new offset lag monitor also is trying to SASL connect to ZK. I didn't realize this at first as that is a new component to me. Simply overlooked it.
... View more
01-06-2017
04:50 AM
2.5.3.0-37 kafka-storm added an additional argument (securityProtocol) to the SimpleConsumer constructor call in this._connections.put(host, new DynamicPartitionConnections.ConnectionInfo(new SimpleConsumer(host.host, host.port, this._config.socketTimeoutMs, this._config.bufferSizeBytes, this._config.clientId, this._config.securityProtocol)));
This constructor does not exist in Apache Kafka trunk SimpleConsumer:
*/
@threadsafe
class SimpleConsumer(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String) extends Logging {
I did find reference to this constructor in the last HDP Kafka build 0.8.2.2.3.5.4-5 /**
* A consumer of kafka messages
*/
@threadsafe
class SimpleConsumer(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String,
val protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) extends Logging {
However, this version does not work and it doesn't feel like I should be going back to HDP 2.2.3 for this: 7062 [Thread-14-kafka_spout-executor[2 2]] INFO o.a.s.k.PartitionManager - Starting Kafka 10.63.80.217 Partition{host=ip:9008, topic=asdf, partition=1} from offset 106867
7063 [Thread-14-kafka_spout-executor[2 2]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing
8315 [Thread-14-kafka_spout-executor[2 2]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: java.lang.IllegalArgumentException
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:207) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:136) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.daemon.executor$fn__5192$fn__5207$fn__5238.invoke(executor.clj:651) ~[storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_92]
Caused by: java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275) ~[?:1.8.0_92]
at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?]
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?]
at scala.collection.immutable.Range.foreach(Range.scala:166) ~[scala-library-2.11.7.jar:?]
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) ~[scala-library-2.11.7.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[scala-library-2.11.7.jar:?]
at kafka.api.TopicData$.readFrom(FetchResponse.scala:97) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?]
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?]
at scala.collection.immutable.Range.foreach(Range.scala:166) ~[scala-library-2.11.7.jar:?]
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) ~[scala-library-2.11.7.jar:?]
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) ~[scala-library-2.11.7.jar:?]
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:151) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:54) ~[kafka_2.11-0.8.2.2.3.5.4-5.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:197) ~[storm-kafka-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
... 7 more
Using the previous HDP Storm build (1.0.1.2.5.0.0-1245)that does not have the securityProtocol in SimpleConsumer and a non HDP Kafka .8 dep does work. Since this effectively makes HDP 2.3.5.0-37 storm-kafka not compatible with a Kafka .8 SimpleConsumer, will there be a new HDP build for Kafka .8 with a SimpleConsumer constructor that includes securityProtocol? If not, is the expectation that users pull in storm-kafka from Apache in order to work with Kafka .8?
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Storm
01-03-2017
03:38 PM
Does that mean that the topology level configuration 'KafkaConfig.securityProtocol = "PLAINTEXT";' is not respected?
... View more
12-28-2016
02:22 PM
Hi @Sriharsha Chintalapani - Where exactly is the problem in storm-kafka? Can you link to an Apache JIRA? I cannot get a kerberized cluster to read from a non-kerberized Kafka - even with 1.0.1.2.5.3.0-37. It does look like Storm is requesting a SASL connection to a unsecure ZooKeeper. To note, running the 1.0.1.2.5.3.0-37 topology in local mode does connect to the unsecure ZooKeeper and read the data from the topic. Although KafkaConfig.securityProtocol = "PLAINTEXT"; I set this explicitly as well. Here is the underlying exception that bubbles up NoNodeException: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$AuthFailedException: KeeperErrorCode = AuthFailed for /brokers/topics/UserJsonString/partitions
at org.apache.storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:101) ~[stormjar.jar:?]
at org.apache.storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:44) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:64) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:78) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__6505$fn__6520.invoke(executor.clj:607) ~[storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]
Caused by: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$AuthFailedException: KeeperErrorCode = AuthFailed for /brokers/topics/UserJsonString/partitions
at org.apache.storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:115) ~[stormjar.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:85) ~[stormjar.jar:?]
... 7 more
Caused by: org.apache.zookeeper.KeeperException$AuthFailedException: KeeperErrorCode = AuthFailed for /brokers/topics/UserJsonString/partitions
at org.apache.zookeeper.KeeperException.create(KeeperException.java:123) ~[zookeeper-3.4.6.2.5.3.0-37.jar:3.4.6-37--1]
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) ~[zookeeper-3.4.6.2.5.3.0-37.jar:3.4.6-37--1]
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1590) ~[zookeeper-3.4.6.2.5.3.0-37.jar:3.4.6-37--1]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214) ~[stormjar.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:203) ~[stormjar.jar:?]
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:108) ~[stormjar.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:200) ~[stormjar.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191) ~[stormjar.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:38) ~[stormjar.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:112) ~[stormjar.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:85) ~[stormjar.jar:?]
... 7 more
2016-12-27 18:18:14.879 o.a.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__7178$fn__7179.invoke(worker.clj:765) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.daemon.executor$mk_executor_data$fn__6390$fn__6391.invoke(executor.clj:275) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:494) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]
... View more