- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Implementing custom grouping on Storm throws an error
- Labels:
-
Apache HBase
-
Apache Storm
Created ‎10-21-2015 03:19 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I'm trying to implement custom grouping feature as seen in OpenSOC for HBaseStreamPartitioner
I'm declaring it like so
declarer.customGrouping( MAPPER_BOLT_ID, new HBaseStreamPartitioner("clicks_tbl", 0, 60, zkQuorum, zkParent, zkPort));
What would cause the error below? What is the default stream id when you implement a custom grouping? My HBase bolt is 2nd bolt in chain after a mapper bolt that just deserializes byte array stream from Kafka to parsed values.
2015-10-21 11:06:56.829 b.s.d.executor [ERROR] java.lang.RuntimeException: java.lang.IllegalArgumentException: No matching clause: default at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.executor$fn__6214$fn__6227$fn__6278.invoke(executor.clj:808) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.util$async_loop$fn__543.invoke(util.clj:475) [storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40] Caused by: java.lang.IllegalArgumentException: No matching clause: default at backtype.storm.daemon.acker$mk_acker_bolt$reify__1380.execute(acker.clj:59) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.acker$_execute.invoke(acker.clj:101) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.acker.execute(Unknown Source) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.executor$fn__6214$tuple_action_fn__6216.invoke(executor.clj:670) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.daemon.executor$mk_task_receiver$fn__6137.invoke(executor.clj:426) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.disruptor$clojure_handler$reify__5713.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.10.0.2.3.2.0-2950.jar:0.10.0.2.3.2.0-2950]
Created ‎10-21-2015 08:03 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Artem,
The default stream id is "default". Check to see if the mapper bolt you mention is declaring a new outputStream. This would occur in the declareOutputFields method of that class. Also, check the collector.emit call within the execute method. The default behavior is to emit on the "default" stream so you can confirm what stream this is emitting on.
Created ‎10-21-2015 08:03 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Artem,
The default stream id is "default". Check to see if the mapper bolt you mention is declaring a new outputStream. This would occur in the declareOutputFields method of that class. Also, check the collector.emit call within the execute method. The default behavior is to emit on the "default" stream so you can confirm what stream this is emitting on.
Created ‎10-26-2015 09:46 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Artem,
Are you trying to implement OpenSOC? Ryan is correct. The default stream id is "default"
