Support Questions
Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Innovation Accelerator group hub.

Storm filtering kafka events for calculating moving average

I have a requirement to consume kafka events and calculate moving average.

The structure of the kafka event would look like,

{'deviceid':'a','siteid':'s1','ip':'i1','timestamp':123,'val':1}

The same timestamp would appear for different deviceid, siteid and ipaddress. I need to calculate the moving average for a particular deviceid, siteid and ip. The events are pushed to kafka queue from different device/ip.

I have tried fieldgrouping in bolt and by using withwindow function to calculate average for last 3 events.

When I tested, the events are not grouped by the fields. I am getting different values in my bolt tuplewindow. The events are not being filtered.

Appreciate your help! @Bryan Bende

1 ACCEPTED SOLUTION

Cloudera Employee

Well, it's a good question. I will first explain what is Field Grouping and how Window Bolt works. Then I will give my suggestion on how to solve this problem.

1) Field Grouping and Window Bolt

When you use field grouping based some key, storm can guarantee the same key tuple flow into the same bolt.

But with window bolt(BaseWindowedBolt), in your case average last 3 event, no matter how many bolt you have, storm will not guarantee the 3 successive key flow into the same bolt within window.

I will explain it with the following example:

First we define a window bolt, with tumbling window using 3 tuples.

Then we connect this window bolt to upstream spout with fieldGrouping using "key".

In your case, we use some abbreviation for convenient

D stand for deviceid,

S stand for siteid,

P stand for ip,

T stand for timestamp,

V stand for val

You want the same device_site_ip(D_S_P) data flow into the same bolt within same window, we can combine D_S_P as key, use K stand for it, then we assume the following 15 test data:

1  K1, T1, V1
2  K2, T2, V2
3  K3, T3, V3
4  K1, T4, V4
5  K1, T5, V5
6  K1, T6, V6
7  K3, T7, V7
8  K2, T8, V8
9  K3, T9, V9
10 K4, T10, V10
11 K4, T11, V11
12 K1, T12, V12
13 K4, T13, V13
14 K3, T14, V14
15 K2, T15, V15

Then the core code we want to use as follow:

BaseWindowedBolt winBolt = new winBolt().withTumblingWindow(new BaseWindowedBolt.Count(3));
builder.setBolt("winBolt", winBolt, boltCount).fieldsGrouping("spout", new Fields("key"));

We assume there are 2 winBolt(Bolt1 and Bolt2), with field grouping and window bolt.

Field Grouping will calculate hash code of Key, and put the same hash code key into the same bolt.

Let say hash code for K1 is H1, hash code for K2 is H2, hash code for K3 is H3, hash code for K4 is H4.

Then all H1 and H3 tuple flow into Bolt1, and H2 and H4 tuple flow into Bolt2.

In Bolt1, we have 9 data as follow:

1  K1, T1, V1
3  K3, T3, V3
4  K1, T4, V4
5  K1, T5, V5
6  K1, T6, V6
7  K3, T7, V7
9  K3, T9, V9
12 K1, T12, V12
14 K3, T14, V14

In Bolt2, we have 6 data as follow:

2  K2, T2, V2
8  K2, T8, V8
10 K4, T10, V10
11 K4, T11, V11
13 K4, T13, V13
15 K2, T15, V15

The above result inside each bolt is the result of Field Grouping

As for window bolt functionality, with 3 tuple Tumbling Window, you will see the following happens:

In Bolt1:

First window we got is:

1  K1, T1, V1
3  K3, T3, V3
4  K1, T4, V4

Second window we got is:

5  K1, T5, V5
6  K1, T6, V6
7  K3, T7, V7

Third window we got is:

9  K3, T9, V9
12 K1, T12, V12
14 K3, T14, V14

In Bolt2:

First window we got is:

2  K2, T2, V2
8  K2, T8, V8
10 K4, T10, V10

Second window we got is:

11 K4, T11, V11
13 K4, T13, V13
15 K2, T15, V15

Now you can see, it is explain why it's not we expectation that same key into same bolt in same window

2) My suggestion how to solve this problem

Now I have 2 method to solve this.

The first one is simpler, which only use storm. We don't need Window Bolt, and just use Field Grouping. Guarantee the same key flow into the same bolt worker. Then in the down stream bolt, we use a HashMap, where key is (D_S_P) pair and the data structure of value is a fixed size queue(you should implemented yourself). The fixed size queue memories the last N tuple of this key. Each time there is new tuple flow into this bolt worker, we use it's key to find the fixed size queue of this key and add it into the queue, and if necessary remove the oldest tuple in this queue. After update this key, then we can calculate the current average of the last N tuple, and emit it. The disadvantage of this approach the HashMap is in memory, there is no data persistence. Each time we relaunch the topology we need to construct the HashMap and accumulate the fixed size queue for each key.

The second one is to use HBase as a storage. In storm, we don't need Field Grouping and Window Bolt. Just Shuffle grouping is enough for it. For each tuple, after some parsing, we put it into HBase Table. The row key of this Table is (D_S_P) key, and we set the version number of this table as N. So this HBase Table will keep the last N tuple of this key. Then get this N tuple of the current key from HBase, and calculate the average of it, and emit it.

View solution in original post

5 REPLIES 5

When calling setBolt() on the TopologyBuilder you are then using fieldsGrouping() to link it to a particular component/stream? You do agree that you won't get just that combination of deviceid/siteid/ip in each of the bolt instances, but you should be getting all of a unique combination going to the same bolt instance, right?

Maybe you could show some code and someone could help you out better as it does sound like you are going down the right path. It would also be interesting (helpful maybe even) to know how many instances of the bolt you have as well as a guesstimate of the distinct number of deviceid/siteid/ip combinations you have.

The input from kafka queue:

{'key':'deviceid1_siteid1_ipaddress1','timestamp':12345678.123,'x':12,'y':13,'z':14} {'key':'deviceid2_siteid1_ipaddress2','timestamp':12345678.123,'x':13,'y':13,'z':14} {'key':'deviceid1_siteid1_ipaddress3','timestamp':12345678.123,'x':12,'y':13,'z':14} {'key':'deviceid1_siteid2_ipaddress1','timestamp':12345678.123,'x':15,'y':13,'z':14} {'key':'deviceid3_siteid3_ipaddress1','timestamp':12345678.123,'x':12,'y':14,'z':14} {'key':'deviceid3_siteid2_ipaddress2','timestamp':12345678.123,'x':13,'y':13,'z':14} {'key':'deviceid2_siteid1_ipaddress3','timestamp':12345678.123,'x':16,'y':13,'z':6}

I need to filter the events by key, sort the timestamp and calculate moving average for the last 3 events.

Topology code:

builder.setSpout("kafkaSpout", kafkaSpout, 1);

builder.setBolt("movingSpeedBolt", movingSpeedBolt, 1).fieldsGrouping("kafkaSpout", new Fields("key"));

The distinct number of deviceid/siteid/ip combinations would be 800.

Is that tuple definition of key and timestamp part of the declareOutputFields() method of your spout? My Topology code (snippet below) did NOT have a chance to declare output fields from my Kafka bolt (or maybe I just didn't wire it up right).

TopologyBuilder builder = new TopologyBuilder();
BrokerHosts hosts = new ZkHosts("zk1:2181,zk2:2181,zk3:2181");

SpoutConfig sc = new SpoutConfig(hosts,
        "s20-logs", "/s20-logs",
        UUID.randomUUID().toString());
sc.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout spout = new KafkaSpout(sc);
builder.setSpout("log-spout", spout, 1);

builder.setBolt("message-tokenizer",
        new MessageTokenizerBolt(), 1)
        .shuffleGrouping("log-spout");

My Kafka messages were just a long tab-separated string of values so my MessageTokenizerBolt (shown below) broke this apart and declared fields, such as ip-address, that could then later be used in a fieldsGrouping() further in the topology.

public class MessageTokenizerBolt extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String[] logElements = StringUtils.split(tuple.getString(0), '\t');
        String ipAddress      = logElements[2];
        String messageType    = logElements[3];
        String messageDetails = logElements[4];
        basicOutputCollector.emit(new Values(ipAddress, messageType, messageDetails));
    }
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("ip-address", "message-type", "message-details"));
    }
}

I'm guessing this isn't your problem as I was thinking you'd get a runtime exception if the field name you were trying to group on wasn't declared as a field name in the stream you are listening to.

Maybe you can provide a bit more of your code?

I am using custom scheme which outputs values (key, timestamp, x, y, z)

public class KeyScheme implements Scheme {

return new Values(key, x, ,y z);

}

Topology:

kafkaConf.scheme = new SchemeAsMultiScheme(new KeyScheme());

Cloudera Employee

Well, it's a good question. I will first explain what is Field Grouping and how Window Bolt works. Then I will give my suggestion on how to solve this problem.

1) Field Grouping and Window Bolt

When you use field grouping based some key, storm can guarantee the same key tuple flow into the same bolt.

But with window bolt(BaseWindowedBolt), in your case average last 3 event, no matter how many bolt you have, storm will not guarantee the 3 successive key flow into the same bolt within window.

I will explain it with the following example:

First we define a window bolt, with tumbling window using 3 tuples.

Then we connect this window bolt to upstream spout with fieldGrouping using "key".

In your case, we use some abbreviation for convenient

D stand for deviceid,

S stand for siteid,

P stand for ip,

T stand for timestamp,

V stand for val

You want the same device_site_ip(D_S_P) data flow into the same bolt within same window, we can combine D_S_P as key, use K stand for it, then we assume the following 15 test data:

1  K1, T1, V1
2  K2, T2, V2
3  K3, T3, V3
4  K1, T4, V4
5  K1, T5, V5
6  K1, T6, V6
7  K3, T7, V7
8  K2, T8, V8
9  K3, T9, V9
10 K4, T10, V10
11 K4, T11, V11
12 K1, T12, V12
13 K4, T13, V13
14 K3, T14, V14
15 K2, T15, V15

Then the core code we want to use as follow:

BaseWindowedBolt winBolt = new winBolt().withTumblingWindow(new BaseWindowedBolt.Count(3));
builder.setBolt("winBolt", winBolt, boltCount).fieldsGrouping("spout", new Fields("key"));

We assume there are 2 winBolt(Bolt1 and Bolt2), with field grouping and window bolt.

Field Grouping will calculate hash code of Key, and put the same hash code key into the same bolt.

Let say hash code for K1 is H1, hash code for K2 is H2, hash code for K3 is H3, hash code for K4 is H4.

Then all H1 and H3 tuple flow into Bolt1, and H2 and H4 tuple flow into Bolt2.

In Bolt1, we have 9 data as follow:

1  K1, T1, V1
3  K3, T3, V3
4  K1, T4, V4
5  K1, T5, V5
6  K1, T6, V6
7  K3, T7, V7
9  K3, T9, V9
12 K1, T12, V12
14 K3, T14, V14

In Bolt2, we have 6 data as follow:

2  K2, T2, V2
8  K2, T8, V8
10 K4, T10, V10
11 K4, T11, V11
13 K4, T13, V13
15 K2, T15, V15

The above result inside each bolt is the result of Field Grouping

As for window bolt functionality, with 3 tuple Tumbling Window, you will see the following happens:

In Bolt1:

First window we got is:

1  K1, T1, V1
3  K3, T3, V3
4  K1, T4, V4

Second window we got is:

5  K1, T5, V5
6  K1, T6, V6
7  K3, T7, V7

Third window we got is:

9  K3, T9, V9
12 K1, T12, V12
14 K3, T14, V14

In Bolt2:

First window we got is:

2  K2, T2, V2
8  K2, T8, V8
10 K4, T10, V10

Second window we got is:

11 K4, T11, V11
13 K4, T13, V13
15 K2, T15, V15

Now you can see, it is explain why it's not we expectation that same key into same bolt in same window

2) My suggestion how to solve this problem

Now I have 2 method to solve this.

The first one is simpler, which only use storm. We don't need Window Bolt, and just use Field Grouping. Guarantee the same key flow into the same bolt worker. Then in the down stream bolt, we use a HashMap, where key is (D_S_P) pair and the data structure of value is a fixed size queue(you should implemented yourself). The fixed size queue memories the last N tuple of this key. Each time there is new tuple flow into this bolt worker, we use it's key to find the fixed size queue of this key and add it into the queue, and if necessary remove the oldest tuple in this queue. After update this key, then we can calculate the current average of the last N tuple, and emit it. The disadvantage of this approach the HashMap is in memory, there is no data persistence. Each time we relaunch the topology we need to construct the HashMap and accumulate the fixed size queue for each key.

The second one is to use HBase as a storage. In storm, we don't need Field Grouping and Window Bolt. Just Shuffle grouping is enough for it. For each tuple, after some parsing, we put it into HBase Table. The row key of this Table is (D_S_P) key, and we set the version number of this table as N. So this HBase Table will keep the last N tuple of this key. Then get this N tuple of the current key from HBase, and calculate the average of it, and emit it.