Support Questions

Find answers, ask questions, and share your expertise

Spark On Hbase Read (Java)

avatar

Hi,

I´m currently trying to do some simple reads, sorts, aggregations and filters to some date in Hbase using Spark (JAVA). I found some sample code and tried a lot but nothing realy works. Since I´m a beginner I dont really know how to get this work.

I loaded a DataSet of some climate data into hbase via bulk load. The table name is "climate" and I got the column families: "date", "temp", "place". The rows itself are organized like the following:

id,date:dt,temp:AverageTemperature,temp:AverageTemperatureUncertainty,place:City,place:Country,place:Latitude,place:Longitude

I tried some sample code of some guide I found and started to work on it. It actually looks like:

public class IBMREAD {
    public static void main(String[] args) throws Exception {
                
            // define Spark Context
        SparkConf sparkConf = new SparkConf().setAppName("SparkHBaseTest");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new SQLContext(jsc);     

        // create connection with HBase
        Configuration config = null;
        try {
               config = HBaseConfiguration.create();
               config.addResource(new Path("/etc/hbase/conf/core-site.xml"));
               config.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
               HBaseAdmin.checkHBaseAvailable(config);
               System.out.println("-----------------------HBase is running!");
             } 
        catch (MasterNotRunningException e) {
                System.out.println("--------------------HBase is not running!");
                System.exit(1);
        }catch (Exception ce){ 
                ce.printStackTrace();
        }


        System.out.println("--------------------SET TABLES!");
        config.set(TableInputFormat.INPUT_TABLE, "climate");
              
        System.out.println("--------------------Creating hbase rdd!");
        JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = 
          jsc.newAPIHadoopRDD(config, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
        // in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data 
        JavaPairRDD<String, TestData> rowPairRDD = hBaseRDD.mapToPair(
                new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, TestData>() {
                @Override
                public Tuple2<String, TestData> call(
                    Tuple2<ImmutableBytesWritable, Result> entry) throws Exception {
                    
                        System.out.println("--------------------Getting ID!");
                        Result r = entry._2;
                        String keyRow = Bytes.toString(r.getRow());

                        System.out.println("--------------------Define JavaBean!");
                        TestData cd = new TestData();
                        cd.setId(keyRow);
                        cd.setDt((String) Bytes.toString(r.getValue(Bytes.toBytes("date"), Bytes.toBytes("dt"))));
                        cd.setAverageTemperature((double) Bytes.toDouble(r.getValue(Bytes.toBytes("temp"), Bytes.toBytes("AverageTemperature"))));
                        cd.setAverageTemperature((double) Bytes.toDouble(r.getValue(Bytes.toBytes("temp"), Bytes.toBytes("AverageTemperatureUncertainty"))));
                        cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("City"))));
                        cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("Country"))));
                        cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("Latitude"))));
                        cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("Longitude"))));
                    return new Tuple2<String, TestData>(keyRow, cd);
            }
        });
        
        System.out.println("--------------------COUNT RDD " + rowPairRDD.count()); 
        System.out.println("--------------------Create DataFrame!");
        DataFrame schemaRDD = sqlContext.createDataFrame(rowPairRDD.values(), TestData.class);
        System.out.println("--------------------Loading Schema");
        schemaRDD.printSchema();
        System.out.println("--------------------COUNT Schema " + schemaRDD.count());
    }
}

The code seems to connect to HBase, which I´m really happy about and seems to find the table climate. However I´m not sure about that. If I submit this spark application I get the following exception:

17/03/21 04:11:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, c7010.ambari.apache.org): java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 5
        at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:632)
        at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:606)
        at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:730)
        at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:721)
        at thesis.test.sparkthesistest.hbase.read.IBMREAD$1.call(IBMREAD.java:75)
        at thesis.test.sparkthesistest.hbase.read.IBMREAD$1.call(IBMREAD.java:62)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1633)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

This exceptions occurs about 3 times with different TID till spark aborts the job. What am I doing wrong?

-----------------------------EDIT-------------------------------

I also tried this example:

    SparkConf sparkConf = new SparkConf().setAppName("HBaseRead");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    Configuration conf = HBaseConfiguration.create();
    conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
    conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);


    Scan scan = new Scan();
    scan.setCaching(100);

    JavaRDD<Tuple2<ImmutableBytesWritable, Result>> hbaseRdd = hbaseContext.hbaseRDD(TableName.valueOf("climate"), scan);

    System.out.println("Number of Records found : " + hbaseRdd.count());

If I execute this code I get

Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/regionserver/StoreFileWriter

Why is it that hard to reach hbase with spark.. ?

3 REPLIES 3

avatar
Super Guru

@ Morten R

There are so many working examples. Sincerely, only you can debug your code. I suggest you take an easier path using Phoenix like any JDBC driver with all its SQL capability. It will simplify your code and it will also take advantage of the Phoenix distributed nature out of box.

https://github.com/apache/phoenix/tree/master/phoenix-spark

avatar
Super Guru

@ Morten R

To not mention, especially because you are a beginner, you should start directly with DataFrame instead of RDD. That is where Spark is going ...

avatar

thanks for the answer. I doing a research for the university. For this I really want to use spark on hbase. I did not find one working example. Why do u suggest to not use spark with hbase? Is it hard or just not good to do it?