Support Questions

Find answers, ask questions, and share your expertise

Efficiently read HBase records without schema in Spark?

avatar
New Contributor

Hi,

I have an HBase table with some records that don't have a fixed set of columns - and I want to read/process those records with Spark. At the time of reading the data from HBase it is not possible to list/name the columns that I need.

Currently I read the data as following:

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, HTable, Result}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

val conf = HBaseConfiguration.create()
conf.addResource(new Path("/usr/hdp/current/hbase-client/conf/hbase-site.xml"))
conf.set("hbase.zookeeper.quorum", "xxx")
conf.set(TableInputFormat.INPUT_TABLE, "TEST_TABLE")
val hBaseRDD = sc.newAPIHadoopRDD(conf,  classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()

This works - but it is super slow! In a test HBase table with just 16mio records (each only having 1 column) it takes 4.5 minutes to execute the count. (The test table is distributed across 2 HBase Regions, so reading the data is done by only 2 Spark executors)

Do you have an idea/suggestion to accelerate the reading of the data?

Unfortunately I did not find any alternative of reading the data instead with the sc.newAPIHadoopRDD() as all HBase-Spark connectors I came across seem to require to provide the "schema" beforehand.

Thanks for your help in advance!

Daniel

3 REPLIES 3

avatar

You may want to look at Spark HBase connector that is based on DataFrame and should give better performance.

https://github.com/hortonworks-spark/shc

avatar
New Contributor

I came across this connector already - but it requires to define the columns you want to read (right?) - which is not an option in my case as the column names for example contain product IDs - which is a huge number of distinct values and also changing over time...

avatar
  1. val sc=new Scan()
  2. add two filters in filterlist (keyonlyfilter,firstkeyonlyfilter)
  3. add that filterlist to scan
  4. val conf =HBaseConfiguration.create()
  5. conf.addResource(newPath("/usr/hdp/current/hbase-client/conf/hbase-site.xml"))
  6. conf.set("hbase.zookeeper.quorum","xxx")
  7. conf.set(TableInputFormat.INPUT_TABLE,"TEST_TABLE")
  8. conf.set(TableInputFormat.SCAN,convertscantoString(sc))
  9. val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
  10. hBaseRDD.count()