Created 10-13-2016 08:51 AM
I am writing a Storm topology to read data from HBase using DRPC. Essentially this performs a scan to get data, enriches the data and returns it.
I can easily get a basic DRPC example working (based on http://storm.apache.org/releases/current/Distributed-RPC.html). However when I insert the code for the scan, the process takes a very long time. After a minute, I get the following error:
backtype.storm.generated.DRPCExecutionException
at backtype.storm.daemon.drpc$service_handler$reify__8688.failRequest(drpc.clj:136) ~[storm-core-0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258]
at backtype.storm.drpc.DRPCSpout.fail(DRPCSpout.java:241) ~[storm-core-0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258
A short while later, I get org.apache.hadoop.hbase.client.RetriesExhaustedException. This doesn't always happen, but is very common. My assumption based on this is one of two possibilities:
I know that the connection to HBase is fine: I have added debugging output and the bolt gets the results. However due to the DRPCExecutionException, these results are never returned over DRPC.
I though the issue was DRPC timeout, however I have increased the DRPC timeout a lot and I get the same result in the same amount of time. After Googling I found someone else with the same issue (http://stackoverflow.com/questions/35940623/stormdrpc-request-failed) but there is no indication of how to fix this.
For reference I am adding my code below:
try (Table table = HbaseClient.connection().getTable(TableName.valueOf("EPG_URI"))) { List<Filter> filters = new ArrayList<>(); String startRowString = "start"; String endRowString = "end"; RowFilter startRow = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(startRowString.getBytes())); filters.add(startRow); RowFilter endRow = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(endRowString.getBytes())); filters.add(endRow); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters); Scan scan = new Scan(); scan.addFamily("f1".getBytes()); scan.setFilter(filterList); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { hbaseValues.add(result); } } }
Thanks in advance for the help.
Created 10-21-2016 05:14 AM
@Josh Elser It wasn't actually giving me any clears errors or exceptions, simply hanging. However I succesfully fixed it by simplifying my scan as below:
Scan scan = new Scan(startRowString.getBytes(), endRowString.getBytes());
scan.addFamily("f1".getBytes());
ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner)
{...}
Based on that the problem seemed to be on the HBase side rather than Storm.
Created 10-20-2016 03:16 PM
Can you share the full HBase errors/exceptions you see? It's not clear to me from your description whether this should be approached from the Storm side or the HBase side.
Created 10-21-2016 05:14 AM
@Josh Elser It wasn't actually giving me any clears errors or exceptions, simply hanging. However I succesfully fixed it by simplifying my scan as below:
Scan scan = new Scan(startRowString.getBytes(), endRowString.getBytes());
scan.addFamily("f1".getBytes());
ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner)
{...}
Based on that the problem seemed to be on the HBase side rather than Storm.
Created 10-21-2016 03:31 PM
Hrm. Curious. Maybe the HBase client API is doing multiple retries before giving up? You can try reducing "Maximum Client Retries" on the Ambari configuration page for HBase from 35 to 5 or 10 to make the client give up and tell you why it was failing/retrying. Just a guess given what you've been able to provide so far.