Created 03-20-2017 05:17 PM
Hi,
I´m currently trying to do some simple reads, sorts, aggregations and filters to some date in Hbase using Spark (JAVA). I found some sample code and tried a lot but nothing realy works. Since I´m a beginner I dont really know how to get this work.
I loaded a DataSet of some climate data into hbase via bulk load. The table name is "climate" and I got the column families: "date", "temp", "place". The rows itself are organized like the following:
id,date:dt,temp:AverageTemperature,temp:AverageTemperatureUncertainty,place:City,place:Country,place:Latitude,place:Longitude
I tried some sample code of some guide I found and started to work on it. It actually looks like:
public class IBMREAD { public static void main(String[] args) throws Exception { // define Spark Context SparkConf sparkConf = new SparkConf().setAppName("SparkHBaseTest"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); // create connection with HBase Configuration config = null; try { config = HBaseConfiguration.create(); config.addResource(new Path("/etc/hbase/conf/core-site.xml")); config.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); HBaseAdmin.checkHBaseAvailable(config); System.out.println("-----------------------HBase is running!"); } catch (MasterNotRunningException e) { System.out.println("--------------------HBase is not running!"); System.exit(1); }catch (Exception ce){ ce.printStackTrace(); } System.out.println("--------------------SET TABLES!"); config.set(TableInputFormat.INPUT_TABLE, "climate"); System.out.println("--------------------Creating hbase rdd!"); JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(config, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data JavaPairRDD<String, TestData> rowPairRDD = hBaseRDD.mapToPair( new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, TestData>() { @Override public Tuple2<String, TestData> call( Tuple2<ImmutableBytesWritable, Result> entry) throws Exception { System.out.println("--------------------Getting ID!"); Result r = entry._2; String keyRow = Bytes.toString(r.getRow()); System.out.println("--------------------Define JavaBean!"); TestData cd = new TestData(); cd.setId(keyRow); cd.setDt((String) Bytes.toString(r.getValue(Bytes.toBytes("date"), Bytes.toBytes("dt")))); cd.setAverageTemperature((double) Bytes.toDouble(r.getValue(Bytes.toBytes("temp"), Bytes.toBytes("AverageTemperature")))); cd.setAverageTemperature((double) Bytes.toDouble(r.getValue(Bytes.toBytes("temp"), Bytes.toBytes("AverageTemperatureUncertainty")))); cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("City")))); cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("Country")))); cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("Latitude")))); cd.setCity((String) Bytes.toString(r.getValue(Bytes.toBytes("place"), Bytes.toBytes("Longitude")))); return new Tuple2<String, TestData>(keyRow, cd); } }); System.out.println("--------------------COUNT RDD " + rowPairRDD.count()); System.out.println("--------------------Create DataFrame!"); DataFrame schemaRDD = sqlContext.createDataFrame(rowPairRDD.values(), TestData.class); System.out.println("--------------------Loading Schema"); schemaRDD.printSchema(); System.out.println("--------------------COUNT Schema " + schemaRDD.count()); } }
The code seems to connect to HBase, which I´m really happy about and seems to find the table climate. However I´m not sure about that. If I submit this spark application I get the following exception:
17/03/21 04:11:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, c7010.ambari.apache.org): java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 5 at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:632) at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:606) at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:730) at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:721) at thesis.test.sparkthesistest.hbase.read.IBMREAD$1.call(IBMREAD.java:75) at thesis.test.sparkthesistest.hbase.read.IBMREAD$1.call(IBMREAD.java:62) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1633) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
This exceptions occurs about 3 times with different TID till spark aborts the job. What am I doing wrong?
-----------------------------EDIT-------------------------------
I also tried this example:
SparkConf sparkConf = new SparkConf().setAppName("HBaseRead"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); Configuration conf = HBaseConfiguration.create(); conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); Scan scan = new Scan(); scan.setCaching(100); JavaRDD<Tuple2<ImmutableBytesWritable, Result>> hbaseRdd = hbaseContext.hbaseRDD(TableName.valueOf("climate"), scan); System.out.println("Number of Records found : " + hbaseRdd.count());
If I execute this code I get
Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/regionserver/StoreFileWriter
Why is it that hard to reach hbase with spark.. ?
Created 03-20-2017 05:48 PM
There are so many working examples. Sincerely, only you can debug your code. I suggest you take an easier path using Phoenix like any JDBC driver with all its SQL capability. It will simplify your code and it will also take advantage of the Phoenix distributed nature out of box.
Created 03-20-2017 05:49 PM
To not mention, especially because you are a beginner, you should start directly with DataFrame instead of RDD. That is where Spark is going ...
Created 03-20-2017 07:08 PM
thanks for the answer. I doing a research for the university. For this I really want to use spark on hbase. I did not find one working example. Why do u suggest to not use spark with hbase? Is it hard or just not good to do it?