Member since
09-08-2016
9
Posts
4
Kudos Received
0
Solutions
06-07-2017
03:23 PM
4 Kudos
Well, it's a good question. I will first explain what is Field Grouping and how Window Bolt works. Then I will give my suggestion on how to solve this problem. 1) Field Grouping and Window Bolt When you use field grouping based some key, storm can guarantee the same key tuple flow into the same bolt. But with window bolt(BaseWindowedBolt), in your case average last 3 event, no matter how many bolt you have, storm will not guarantee the 3 successive key flow into the same bolt within window. I will explain it with the following example: First we define a window bolt, with tumbling window using 3 tuples. Then we connect this window bolt to upstream spout with fieldGrouping using "key". In your case, we use some abbreviation for convenient D stand for deviceid, S stand for siteid, P stand for ip, T stand for timestamp, V stand for val You want the same device_site_ip(D_S_P) data flow into the same bolt within same window, we can combine D_S_P as key, use K stand for it, then we assume the following 15 test data: 1 K1, T1, V1
2 K2, T2, V2
3 K3, T3, V3
4 K1, T4, V4
5 K1, T5, V5
6 K1, T6, V6
7 K3, T7, V7
8 K2, T8, V8
9 K3, T9, V9
10 K4, T10, V10
11 K4, T11, V11
12 K1, T12, V12
13 K4, T13, V13
14 K3, T14, V14
15 K2, T15, V15 Then the core code we want to use as follow: BaseWindowedBolt winBolt = new winBolt().withTumblingWindow(new BaseWindowedBolt.Count(3));
builder.setBolt("winBolt", winBolt, boltCount).fieldsGrouping("spout", new Fields("key")); We assume there are 2 winBolt(Bolt1 and Bolt2), with field grouping and window bolt. Field Grouping will calculate hash code of Key, and put the same hash code key into the same bolt. Let say hash code for K1 is H1, hash code for K2 is H2, hash code for K3 is H3, hash code for K4 is H4. Then all H1 and H3 tuple flow into Bolt1, and H2 and H4 tuple flow into Bolt2. In Bolt1, we have 9 data as follow: 1 K1, T1, V1
3 K3, T3, V3
4 K1, T4, V4
5 K1, T5, V5
6 K1, T6, V6
7 K3, T7, V7
9 K3, T9, V9
12 K1, T12, V12
14 K3, T14, V14 In Bolt2, we have 6 data as follow: 2 K2, T2, V2
8 K2, T8, V8
10 K4, T10, V10
11 K4, T11, V11
13 K4, T13, V13
15 K2, T15, V15
The above result inside each bolt is the result of Field Grouping As for window bolt functionality, with 3 tuple Tumbling Window, you will see the following happens: In Bolt1: First window we got is: 1 K1, T1, V1
3 K3, T3, V3
4 K1, T4, V4 Second window we got is: 5 K1, T5, V5
6 K1, T6, V6
7 K3, T7, V7 Third window we got is: 9 K3, T9, V9
12 K1, T12, V12
14 K3, T14, V14 In Bolt2: First window we got is: 2 K2, T2, V2
8 K2, T8, V8
10 K4, T10, V10 Second window we got is: 11 K4, T11, V11
13 K4, T13, V13
15 K2, T15, V15 Now you can see, it is explain why it's not we expectation that same key into same bolt in same window 2) My suggestion how to solve this problem Now I have 2 method to solve this. The first one is simpler, which only use storm. We don't need Window Bolt, and just use Field Grouping. Guarantee the same key flow into the same bolt worker. Then in the down stream bolt, we use a HashMap, where key is (D_S_P) pair and the data structure of value is a fixed size queue(you should implemented yourself). The fixed size queue memories the last N tuple of this key. Each time there is new tuple flow into this bolt worker, we use it's key to find the fixed size queue of this key and add it into the queue, and if necessary remove the oldest tuple in this queue. After update this key, then we can calculate the current average of the last N tuple, and emit it. The disadvantage of this approach the HashMap is in memory, there is no data persistence. Each time we relaunch the topology we need to construct the HashMap and accumulate the fixed size queue for each key. The second one is to use HBase as a storage. In storm, we don't need Field Grouping and Window Bolt. Just Shuffle grouping is enough for it. For each tuple, after some parsing, we put it into HBase Table. The row key of this Table is (D_S_P) key, and we set the version number of this table as N. So this HBase Table will keep the last N tuple of this key. Then get this N tuple of the current key from HBase, and calculate the average of it, and emit it.
... View more
01-24-2017
12:31 AM
@Bikas, Is this correct to create LivyClient object during application start up and use that object for all the API requests to submit jar?
... View more
09-12-2016
05:16 PM
3 Kudos
I am assuming you are using Hortonworks data platform. Create an external table pointing to the HDFS location of these CSV files. Once the data is loaded onto your server, move the files to this HDFS location using a cron job like ncron to move the files once it completely transferred. You could write a hive API to read the files using select statements via your java program using jdbc/rest or whatever. ( ncron triggers as soon as a file is copied into the source directory ).
... View more
09-12-2016
11:58 PM
Yes. That is exactly what I wanted. Thank you!
... View more