Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Storm topology not able to consume message

Storm topology not able to consume message

Expert Contributor

Hi Team,

 

From logs i see there as issue on one of the worker node due to which storm topology spout was not able to consume messages. Can you help how to resolve the issue?

Logs below -

 

 

 

2019-11-21 07:28:10.478 o.a.k.c.c.i.AbstractCoordinator Thread-22-ciFileSpout-executor[91 91] [INFO] [Consumer clientId=consumer-1, groupId=acp_c1_prv_input_client_instruction-
consumer] Marking the coordinator testnode8o.example.com:6667 (id: 2147482643 rack: null) dead
2019-11-21 07:28:10.480 o.a.k.c.c.i.AbstractCoordinator Thread-22-ciFileSpout-executor[91 91] [INFO] [Consumer clientId=consumer-1, groupId=acp_c1_prv_input_client_instruction-
consumer] Discovered coordinator testnode8o.example.com:6667 (id: 2147482643 rack: null)
2019-11-21 07:37:43.749 o.a.s.d.executor Thread-22-ciFileSpout-executor[91 91] [INFO] Deactivating spout ciFileSpout:(91)
2019-11-21 07:37:43.750 o.a.k.c.c.i.AbstractCoordinator Thread-22-ciFileSpout-executor[91 91] [INFO] [Consumer clientId=consumer-1, groupId=acp_c1_prv_input_client_instruction-
consumer] Marking the coordinator testnode8o.example.com:6667 (id: 2147482643 rack: null) dead
2019-11-21 07:37:47.346 c.c.m.a.s.c.IgniteCacheServiceRegistryPropertyImpl tcp-client-disco-reconnector-#5%null% [ERROR] Failed to reconnect to cluster (consider increasing 'ne
tworkTimeout' configuration property) [networkTimeout=5000]
2019-11-21 07:37:48.970 o.a.s.util Thread-22-ciFileSpout-executor[91 91] [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.IllegalStateException: This consumer has already been closed.
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:485) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:441) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.daemon.executor$fn__10125$fn__10140$fn__10173.invoke(executor.clj:632) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
Caused by: java.lang.IllegalStateException: This consumer has already been closed.
        at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$metrics_tick$fn__10050.invoke(executor.clj:345) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
        at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
        at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
        at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
        at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
        at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?]
        at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
        at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
        at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
        at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
        at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
        at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) ~[clojure-1.7.0.jar:?]
        at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) ~[clojure-1.7.0.jar:?]
        at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) ~[clojure-1.7.0.jar:?]
        at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) ~[clojure-1.7.0.jar:?]
        at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) ~[clojure-1.7.0.jar:?]
        at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
        at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
        at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.daemon.executor$fn__10125$tuple_action_fn__10131.invoke(executor.clj:520) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:469) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        ... 7 more
2019-11-21 07:37:48.978 o.a.s.d.executor Thread-22-ciFileSpout-executor[91 91] [ERROR]
java.lang.RuntimeException: java.lang.IllegalStateException: This consumer has already been closed.
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:485) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:441) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.daemon.executor$fn__10125$fn__10140$fn__10173.invoke(executor.clj:632) ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]

 

 

5 REPLIES 5
Highlighted

Re: Storm topology not able to consume message

Contributor

@sagarshimpi 

 

From the log files, I can see that the coordinator is dead:

 

2019-11-21 07:37:43.750 o.a.k.c.c.i.AbstractCoordinator Thread-22-ciFileSpout-executor[91 91] [INFO] [Consumer clientId=consumer-1, groupId=acp_c1_prv_input_client_instruction-
consumer] Marking the coordinator testnode8o.example.com:6667 (id: 2147482643 rack: null) dead

 

 

Can you share the describe command output:

 

bin/kafka-topics.sh --describe --zookeeper <zkHost>:<zkPort>

 

 

Also, check if you're able to produce/consume messages in Kafka by using Kafka clients:

 

 

bin/kafka-console-producer.sh --broker-list testnode8o.example.com:6667 --topic <topicName>

 

 

 

bin/kafka-console-consumer.sh --bootstrap-server testnode8o.example.com:6667 --topic <topicName> --from-beginning

 

Highlighted

Re: Storm topology not able to consume message

Expert Contributor

Hi @ManuelCalvo 

 

Below is the output -

[Note: for security reason i modified topic name below to test_c1]

Describe output -

Topic:test_c1_prv_input_client_instruction       PartitionCount:8        ReplicationFactor:4     Configs:retention.ms=604800000,retention.bytes=1073741824
        Topic: test_c1_prv_input_client_instruction      Partition: 0    Leader: 1001    Replicas: 1001,1003,1006,1004   Isr: 1003,1006,1004,1001
        Topic: test_c1_prv_input_client_instruction      Partition: 1    Leader: 1005    Replicas: 1005,1006,1004,1001   Isr: 1005,1006,1001,1004
        Topic: test_c1_prv_input_client_instruction      Partition: 2    Leader: 1007    Replicas: 1007,1004,1001,1005   Isr: 1007,1004,1001,1005
        Topic: test_c1_prv_input_client_instruction      Partition: 3    Leader: 1008    Replicas: 1008,1001,1005,1007   Isr: 1008,1007,1001,1005
        Topic: test_c1_prv_input_client_instruction      Partition: 4    Leader: 1002    Replicas: 1002,1005,1007,1008   Isr: 1007,1002,1008,1005
        Topic: test_c1_prv_input_client_instruction      Partition: 5    Leader: 1003    Replicas: 1003,1007,1008,1002   Isr: 1003,1007,1008,1002
        Topic: test_c1_prv_input_client_instruction      Partition: 6    Leader: 1006    Replicas: 1006,1008,1002,1003   Isr: 1003,1006,1008,1002
        Topic: test_c1_prv_input_client_instruction      Partition: 7    Leader: 1004    Replicas: 1004,1002,1003,1006   Isr: 1003,1006,1004,1002

 

3. I tried with console consumer, I am able to fetch data. I see the issue was that point of time.

 

 

Highlighted

Re: Storm topology not able to consume message

Contributor

@sagarshimpi 

 

So if I understood correctly the issue is not occurring now, right? If not, then this could be related to a connection issue:

 

[ERROR] Failed to reconnect to cluster (consider increasing 'ne
tworkTimeout' configuration property) [networkTimeout=5000]

 

 

Highlighted

Re: Storm topology not able to consume message

Expert Contributor

@ManuelCalvo  we already checked from network side, and they mentioned no issue.

how to debug this issue? 

currently we enable debug for worker.logs and relaunched topologies. Any more suggestions?

Highlighted

Re: Storm topology not able to consume message

Contributor

@sagarshimpi 

 

I believe debug for worker nodes its fine for the storm side. From the Kafka side, we can review the broker .log files usually located under /var/log/kafka/server.log. Also, you can monitor if there are any under replicated partitions by running the command line below:

 

 

bin/kafka-topics.sh $ZK --describe --under-replicated-partitions

 

Don't have an account?
Coming from Hortonworks? Activate your account here