Created 10-21-2015 03:19 PM
I'm trying to implement custom grouping feature as seen in OpenSOC for HBaseStreamPartitioner
I'm declaring it like so
declarer.customGrouping( MAPPER_BOLT_ID, new HBaseStreamPartitioner("clicks_tbl", 0, 60, zkQuorum, zkParent, zkPort));
What would cause the error below? What is the default stream id when you implement a custom grouping? My HBase bolt is 2nd bolt in chain after a mapper bolt that just deserializes byte array stream from Kafka to parsed values.
2015-10-21 11:06:56.829 b.s.d.executor [ERROR] java.lang.RuntimeException: java.lang.IllegalArgumentException: No matching clause: default at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.executor$fn__6214$fn__6227$fn__6278.invoke(executor.clj:808) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.util$async_loop$fn__543.invoke(util.clj:475) [storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40] Caused by: java.lang.IllegalArgumentException: No matching clause: default at backtype.storm.daemon.acker$mk_acker_bolt$reify__1380.execute(acker.clj:59) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.acker$_execute.invoke(acker.clj:101) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.acker.execute(Unknown Source) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.executor$fn__6214$tuple_action_fn__6216.invoke(executor.clj:670) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.executor$mk_task_receiver$fn__6137.invoke(executor.clj:426) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.disruptor$clojure_handler$reify__5713.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
Created 10-21-2015 08:03 PM
Hi Artem,
The default stream id is "default". Check to see if the mapper bolt you mention is declaring a new outputStream. This would occur in the declareOutputFields method of that class. Also, check the collector.emit call within the execute method. The default behavior is to emit on the "default" stream so you can confirm what stream this is emitting on.
Created 10-21-2015 08:03 PM
Hi Artem,
The default stream id is "default". Check to see if the mapper bolt you mention is declaring a new outputStream. This would occur in the declareOutputFields method of that class. Also, check the collector.emit call within the execute method. The default behavior is to emit on the "default" stream so you can confirm what stream this is emitting on.
Created 10-26-2015 09:46 PM
Hi Artem,
Are you trying to implement OpenSOC? Ryan is correct. The default stream id is "default"