Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Efficiently read HBase records without schema in Spark?

avatar
New Contributor

Hi,

I have an HBase table with some records that don't have a fixed set of columns - and I want to read/process those records with Spark. At the time of reading the data from HBase it is not possible to list/name the columns that I need.

Currently I read the data as following:

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, HTable, Result}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

val conf = HBaseConfiguration.create()
conf.addResource(new Path("/usr/hdp/current/hbase-client/conf/hbase-site.xml"))
conf.set("hbase.zookeeper.quorum", "xxx")
conf.set(TableInputFormat.INPUT_TABLE, "TEST_TABLE")
val hBaseRDD = sc.newAPIHadoopRDD(conf,  classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()

This works - but it is super slow! In a test HBase table with just 16mio records (each only having 1 column) it takes 4.5 minutes to execute the count. (The test table is distributed across 2 HBase Regions, so reading the data is done by only 2 Spark executors)

Do you have an idea/suggestion to accelerate the reading of the data?

Unfortunately I did not find any alternative of reading the data instead with the sc.newAPIHadoopRDD() as all HBase-Spark connectors I came across seem to require to provide the "schema" beforehand.

Thanks for your help in advance!

Daniel

3 REPLIES 3

avatar

You may want to look at Spark HBase connector that is based on DataFrame and should give better performance.

https://github.com/hortonworks-spark/shc

avatar
New Contributor

I came across this connector already - but it requires to define the columns you want to read (right?) - which is not an option in my case as the column names for example contain product IDs - which is a huge number of distinct values and also changing over time...

avatar
  1. val sc=new Scan()
  2. add two filters in filterlist (keyonlyfilter,firstkeyonlyfilter)
  3. add that filterlist to scan
  4. val conf =HBaseConfiguration.create()
  5. conf.addResource(newPath("/usr/hdp/current/hbase-client/conf/hbase-site.xml"))
  6. conf.set("hbase.zookeeper.quorum","xxx")
  7. conf.set(TableInputFormat.INPUT_TABLE,"TEST_TABLE")
  8. conf.set(TableInputFormat.SCAN,convertscantoString(sc))
  9. val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
  10. hBaseRDD.count()