<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Spark On Hbase Read (Java) in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Spark-On-Hbase-Read-Java/m-p/165976#M128328</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I´m currently trying to do some simple reads, sorts, aggregations and filters to some date in Hbase using Spark (JAVA). I found some sample code and tried a lot but nothing realy works. Since I´m a beginner I dont really know how to get this work.&lt;/P&gt;&lt;P&gt;I loaded a DataSet of some climate data into hbase via bulk load. The table name is "climate" and I got the column families: "date", "temp", "place". The rows itself are organized like the following:&lt;/P&gt;&lt;PRE&gt;id,date:dt,temp:AverageTemperature,temp:AverageTemperatureUncertainty,place:City,place:Country,place:Latitude,place:Longitude&lt;/PRE&gt;&lt;P&gt;I tried some sample code of some guide I found and started to work on it. It actually looks like: &lt;/P&gt;&lt;PRE&gt;public class IBMREAD {
    public static void main(String[] args) throws Exception {
                
            // define Spark Context
        SparkConf sparkConf = new SparkConf().setAppName("SparkHBaseTest");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new SQLContext(jsc);     

        // create connection with HBase
        Configuration config = null;
        try {
               config = HBaseConfiguration.create();
               config.addResource(new Path("/etc/hbase/conf/core-site.xml"));
               config.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
               HBaseAdmin.checkHBaseAvailable(config);
               System.out.println("-----------------------HBase is running!");
             } 
        catch (MasterNotRunningException e) {
                System.out.println("--------------------HBase is not running!");
                System.exit(1);
        }catch (Exception ce){ 
                ce.printStackTrace();
        }


        System.out.println("--------------------SET TABLES!");
        config.set(TableInputFormat.INPUT_TABLE, "climate");
              
        System.out.println("--------------------Creating hbase rdd!");
        JavaPairRDD&amp;lt;ImmutableBytesWritable, Result&amp;gt; hBaseRDD = 
          jsc.newAPIHadoopRDD(config, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
        // in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data 
        JavaPairRDD&amp;lt;String, TestData&amp;gt; rowPairRDD = hBaseRDD.mapToPair(
                new PairFunction&amp;lt;Tuple2&amp;lt;ImmutableBytesWritable, Result&amp;gt;, String, TestData&amp;gt;() {
                @Override
                public Tuple2&amp;lt;String, TestData&amp;gt; call(
                    Tuple2&amp;lt;ImmutableBytesWritable, Result&amp;gt; entry) throws Exception {
                    
                        System.out.println("--------------------Getting ID!");
                        Result r = entry._2;
                        String keyRow = Bytes.toString(r.getRow());

                        System.out.println("--------------------Define JavaBean!");
                        TestData cd = new TestData();
                        cd.setId(keyRow);
                        cd.setDt((String) Bytes.toString(r.getValue(Bytes.toBytes("date"), Bytes.toBytes("dt"))));
                        cd.setAverageTemperature((double) Bytes.toDouble(r.getValue(Bytes.toBytes("temp"), Bytes.toBytes("AverageTemperature"))));
                        cd.setAverageTemperature((double) Bytes.toDouble(r.getValue(Bytes.toBytes("temp"), Bytes.toBytes("AverageTemperatureUncertainty"))));
                        cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("City"))));
                        cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("Country"))));
                        cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("Latitude"))));
                        cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("Longitude"))));
                    return new Tuple2&amp;lt;String, TestData&amp;gt;(keyRow, cd);
            }
        });
        
        System.out.println("--------------------COUNT RDD " + rowPairRDD.count()); 
        System.out.println("--------------------Create DataFrame!");
        DataFrame schemaRDD = sqlContext.createDataFrame(rowPairRDD.values(), TestData.class);
        System.out.println("--------------------Loading Schema");
        schemaRDD.printSchema();
        System.out.println("--------------------COUNT Schema " + schemaRDD.count());
    }
}
&lt;/PRE&gt;&lt;P&gt;The code seems to connect to HBase, which I´m really happy about and seems to find the table climate. However I´m not sure about that. If I submit this spark application I get the following exception:&lt;/P&gt;&lt;PRE&gt;17/03/21 04:11:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, c7010.ambari.apache.org): java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 5
        at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:632)
        at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:606)
        at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:730)
        at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:721)
        at thesis.test.sparkthesistest.hbase.read.IBMREAD$1.call(IBMREAD.java:75)
        at thesis.test.sparkthesistest.hbase.read.IBMREAD$1.call(IBMREAD.java:62)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1633)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
&lt;/PRE&gt;&lt;P&gt;This exceptions occurs about 3 times with different TID till spark aborts the job. What am I doing wrong?&lt;/P&gt;&lt;P&gt;-----------------------------EDIT-------------------------------&lt;/P&gt;&lt;P&gt;I also tried this example:&lt;/P&gt;&lt;PRE&gt;    SparkConf sparkConf = new SparkConf().setAppName("HBaseRead");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    Configuration conf = HBaseConfiguration.create();
    conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
    conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
    JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);


    Scan scan = new Scan();
    scan.setCaching(100);

    JavaRDD&amp;lt;Tuple2&amp;lt;ImmutableBytesWritable, Result&amp;gt;&amp;gt; hbaseRdd = hbaseContext.hbaseRDD(TableName.valueOf("climate"), scan);

    System.out.println("Number of Records found : " + hbaseRdd.count());
&lt;/PRE&gt;&lt;P&gt;If I execute this code I get &lt;/P&gt;&lt;PRE&gt;Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/regionserver/StoreFileWriter
&lt;/PRE&gt;&lt;P&gt;Why is it that hard to reach hbase with spark.. ?&lt;/P&gt;</description>
    <pubDate>Tue, 21 Mar 2017 00:17:52 GMT</pubDate>
    <dc:creator>morten_riedel</dc:creator>
    <dc:date>2017-03-21T00:17:52Z</dc:date>
  </channel>
</rss>

