Support Questions

Find answers, ask questions, and share your expertise

Implementing custom grouping on Storm throws an error

avatar
Master Mentor

I'm trying to implement custom grouping feature as seen in OpenSOC for HBaseStreamPartitioner

I'm declaring it like so

declarer.customGrouping(
                        MAPPER_BOLT_ID,
                        new HBaseStreamPartitioner("clicks_tbl", 0, 60, zkQuorum, zkParent, zkPort));

What would cause the error below? What is the default stream id when you implement a custom grouping? My HBase bolt is 2nd bolt in chain after a mapper bolt that just deserializes byte array stream from Kafka to parsed values.

2015-10-21 11:06:56.829 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.lang.IllegalArgumentException: No matching clause: default
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.daemon.executor$fn__6214$fn__6227$fn__6278.invoke(executor.clj:808) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.util$async_loop$fn__543.invoke(util.clj:475) [storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
Caused by: java.lang.IllegalArgumentException: No matching clause: default
at backtype.storm.daemon.acker$mk_acker_bolt$reify__1380.execute(acker.clj:59) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.daemon.acker$_execute.invoke(acker.clj:101) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.daemon.acker.execute(Unknown Source) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.daemon.executor$fn__6214$tuple_action_fn__6216.invoke(executor.clj:670) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.daemon.executor$mk_task_receiver$fn__6137.invoke(executor.clj:426) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.disruptor$clojure_handler$reify__5713.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
1 ACCEPTED SOLUTION

avatar
Rising Star

Hi Artem,

The default stream id is "default". Check to see if the mapper bolt you mention is declaring a new outputStream. This would occur in the declareOutputFields method of that class. Also, check the collector.emit call within the execute method. The default behavior is to emit on the "default" stream so you can confirm what stream this is emitting on.

View solution in original post

2 REPLIES 2

avatar
Rising Star

Hi Artem,

The default stream id is "default". Check to see if the mapper bolt you mention is declaring a new outputStream. This would occur in the declareOutputFields method of that class. Also, check the collector.emit call within the execute method. The default behavior is to emit on the "default" stream so you can confirm what stream this is emitting on.

avatar
Contributor

Hi Artem,

Are you trying to implement OpenSOC? Ryan is correct. The default stream id is "default"