Created on 04-29-2020 06:36 AM - edited on 04-30-2020 01:12 AM by VidyaSargur
Apache HBase is an established NoSQL storage system that has been proven out in sizable production deployments. Although HBase is a key value store, there is a high demand to access data more easily like in SQL. Apache Spark includes great support for basic filtering and insertion of data via SQL though it's SparkSQL offering. The HBase project provides integration with SparkSQL through the work of the HBase Connectors subproject. This article describes the required steps of accessing HBase stored data in SQL fashion.
Several integrations for accessing HBase from Spark have occurred in the past. The first connector was developed by Cloudera Professional Services, which was called Spark on HBase. A version of that early work was contributed to the upstream HBase projects as a feature that the community considered experimental. Cloudera included a derivative of this community version (called hbase-spark) in both CDH 5 and CDH 6. Around the same time, Hortonworks also came up with an implementation, it was called SHC (Spark HBase connector). Based on these two implementations, we created a new upstream project, hbase-connectors.
Implementation |
Spark |
Distribution |
hbase-spark |
1.6 |
CDH 5 |
hbase-spark |
2.4 |
CDH 6 |
hbase-spark |
2.3 |
HDP 2.6, HDP 3.1 |
SHC |
2.3 |
HDP 2.6, HDP 3.1 |
hbase-connectors |
2.4 |
CDP |
Please note, that not every implementation is compatible with all of the versions. We will focus only on hbase-connectors in this article because it is the newest version. These implementations have almost the same codebase, so migrating to hbase-connectors should not be difficult.
When Spark is used with HBase, it requires some HBase libraries. HBase Spark connector exports HBase APIs and also provides HBase specific implementations for RDDs and DataSources.
HBase Region Servers also require Spark classes on the classpath when Spark SQL queries are in use. These SQL queries are evaluated by Region Servers. For more information, see the Filter Algebra section below.
HBase Region Server configuration must be edited when using Spark SQL queries.
Steps
Key: HBASE_CLASSPATH
Value: /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-1.0.0.7.0.3.0-79.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded-1.0.0.7.0.3.0-79.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
Ensure you use the appropriate version numbers.
Spark runtime also needs HBase bindings. Add the necessary jars to spark-shell or spark-submit:
spark-shell --jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-1.0.0.7.0.3.0-79.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded-1.0.0.7.0.3.0-79.jar
Ensure you use the appropriate version numbers.
Let’s imagine storing personal data in HBase. Data is stored in the person table using several Column Families:
Column Family |
Description |
p |
Personal information like birth date and height. |
c |
Contact information like email address. |
avro |
For Apache Avro encoded data. |
The key field is the name of the people, assuming it is unique.
Create the table in HBase Shell:
shell> create 'person', 'p', 'c', 'avro'
You can connect to HBase from Spark in two ways:
spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("hbase.spark.use.hbasecontext", false)
// additional options...
Ensure that the hbase.spark.use.hbasecontext configuration property is set, or a NullPointerException will be thrown.
val conf = new HBaseConfiguration()
conf.set("hbase.zookeeper.quorum", "hbase-1.example.com")
// the latest HBaseContext will be used afterwards
new HBaseContext(spark.sparkContext, conf)
spark.sqlContext.read.format("org.apache.hadoop.hbase.spark")
// additional options...
In this case, a custom configuration object is created, where the address of ZooKeeper is explicitly set. Spark will use the latest instantiated HBaseContext object, and the hbase.spark.use.hbasecontext option is now omitted.
Spark applications store data as typed values, while HBase stores everything as byte[] arrays, so application data has to be mapped and serialized. We assume that application data has a fixed schema, it consists of several fields of integers, strings, dates, etc. and these fields make a record. One record will be mapped to an HBase row.
There are two, fundamentally different, common approaches:
Field-to-Column mapping means application data is stored in the same structure in HBase. The following Spark class stores the Personal data in this example:
case class Person(name: String, email: String, birthDate: Date, height: Float)
Person class stores
These fields are mapped to different columns with the following mapping:
spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
// additional options...
Mapping is defined by the hbase.columns.mapping option. The format of it is <field name> <sql type> <column family:qualifier>. The key field must be referred to as :key. The previous example maps:
The following data types are supported:
Scala type |
Spark SQL type |
HBase (byte[]) representation |
Boolean |
BOOLEAN |
1 byte (0xFF - true, otherwise - false) |
Byte |
BYTE |
1 byte |
Short |
SHORT |
2 bytes (Big endian) |
Integer |
INTEGER |
4 bytes (Big endian) |
Long |
LONG |
8 bytes (Big endian) |
Float |
FLOAT |
4 bytes IEEE754 |
Double |
DOUBLE |
8 bytes IEEE754 |
Date |
DATE |
8 bytes (Unix time in milliseconds) |
Timestamp |
TIMESTAMP |
8 bytes (Unix time in milliseconds) |
String |
STRING |
UTF-8 encoded string |
Array[Byte] |
BINARY |
byte[] |
These types are encoded with the standard HBase Bytes.toBytes() methods.
Note: Connector does not do any data type conversion, so Scala type and SQL type must match. For example, if a particular column mapping is FLOAT then values must be Float instances.
Avro is a data serialization system. It can serialize multiple fields into a single byte array when used with HBase and Spark. This approach is fundamentally different from the field-to-column mapping because the result of Avro mapping is one big value instead of multiple values. Please also note that Avro uses its own serialization algorithm which is different from the standard Java serialization format.
The schema must be defined in the Avro world:
def avroSchemaString =
s"""{"type": "record", "name": "Person", "namespace": "com.example", "fields": [
| {"name": "email", "type": "string"},
| {"name": "age", "type": "int"}
|]}""".stripMargin
A catalog object must be also defined which describes where the serialized data should be stored:
def catalog = s"""{
| "table": {"namespace": "default", "name": "person"},
| "rowkey": "key",
| "columns":{
| "key": {"cf": "rowkey", "col": "key", "type": "string"},
| "value": {"cf": "avro", "col": "", "avro": "avroSchema"}
| }
|}""".stripMargin
In this example, data is stored in the default namespace and person table, the key column of Avro records is going to be the row key in HBase, and Avro serialized data is stored in the avro column family with an empty column name.
Spark SQL has to be configured to use Avro schema:
spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("avroSchema", avroSchemaString).
option("catalog", catalog)
This mapping is more difficult because it requires an Avro schema and an HBase mapping (catalog) as well.
Data access is pretty fast in HBase when the key is searched, but what happens when data must be searched by multiple values?
It is pretty common to use composite keys in these cases when the key is composed of multiple fields, but other non-key fields are mapped to separate columns. This is not covered in this article, but it is possible to use Avro mapping for the keys and field-to-column mapping for non-key fields.
Field-to-column mapping relies on HBase Bytes.toBytes() methods, while Avro has its own serialization format. These approaches can be mixed, but other mappings are not supported, for example, Phoenix's mapping.
The way Phoenix stores integer types is slightly different. It flips the sign bit, so 4 bytes signed integer 0 is encoded as 0x80000000. Unsigned integers do not have a sign bit, so nothing is flipped. The advantage of it is when using byte level comparison over signed integers, it will work as expected, meaning that negative numbers will be sorted before positive ones. The Spark integration provided by the HBase project does not support Phoenix integers, which can lead to unexpected results. The Phoenix Project provides its own integration for working with Spark. That integration is out of scope for this particular blog post.
Data insertion is done by the following Scala code. Paste the code into Spark Shell:
import java.sql.Date
case class Person(name: String, email: String, birthDate: Date, height: Float)
var personDS = Seq(
Person("alice", "alice@example.com", Date.valueOf("2000-01-01"), 5.6f),
Person("bob", "bob@example.com", Date.valueOf("2001-10-17"), 6.0f)
).toDS
personDS.write.format("org.apache.hadoop.hbase.spark").
option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height").
option("hbase.table", "person").option("hbase.spark.use.hbasecontext", false).save()
Use the HBase shell to verify the inserted data:
shell> scan 'person'
ROW COLUMN+CELL
alice column=c:email, timestamp=1568723598292, value=alice@example.com
alice column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00
alice column=p:height, timestamp=1568723598292, value=@\xB333
bob column=c:email, timestamp=1568723598521, value=bob@example.com
bob column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80
bob column=p:height, timestamp=1568723598521, value=@\xC0\x00\x00
2 row(s)
As you see, HBase Spark connector created separate columns for the fields of Person class.
Data can be queried with Spark shell again:
val df = spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height").
option("hbase.table", "person").option("hbase.spark.use.hbasecontext", false).load()
df.createOrReplaceTempView("personView")
val results = spark.sqlContext.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
The output of the following job should be the following:
+-----+------+-----------------+----------+
| name|height| email| birthDate|
+-----+------+-----------------+----------+
|alice| 5.6|alice@example.com|2000-01-01|
+-----+------+-----------------+----------+
import org.apache.avro.Schema
import org.apache.hadoop.hbase.spark.SchemaConverters
import org.apache.spark.sql._
import org.apache.spark.sql.types._
def avroSchemaString =
s"""{"type": "record", "name": "Person", "namespace": "com.example", "fields": [
| {"name": "email", "type": "string"},
| {"name": "age", "type": "int"}
|]}""".stripMargin
val avroSchema = new Schema.Parser().parse(avroSchemaString)
def catalog = s"""{
| "table": {"namespace": "default", "name": "person"},
| "rowkey": "key",
| "columns":{
| "key": {"cf": "rowkey", "col": "key", "type": "string"},
| "value": {"cf": "avro", "col": "", "avro": "avroSchema"}
| }
|}""".stripMargin
val rowSchema = StructType(List(
StructField("key", StringType, true),
StructField("value", SchemaConverters.toSqlType(avroSchema).dataType, true))
)
val data = Seq(
Row("charlie", Row("charlie@example.com", 20)),
Row("diana", Row("diana@example.com", 23))
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
rowSchema
)
df.write.format("org.apache.hadoop.hbase.spark").
option("avroSchema", avroSchemaString).option("catalog", catalog).
option("hbase.spark.use.hbasecontext", false). save()
Use HBase Shell to verify the inserted data:
shell> scan 'person'
ROW COLUMN+CELL
charlie column=avro:, timestamp=1582644932655, value=&charlie@example.com(
diana column=avro:, timestamp=1582644932655, value="diana@example.com.
2 row(s)
The results contain only one column for each row with a composite value as expected. The value &charlie@example.com( means:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
def avroSchemaString =
s"""{"type": "record", "name": "Person", "namespace": "com.example", "fields": [
| {"name": "email", "type": "string"},
| {"name": "age", "type": "int"}
|]}""".stripMargin
def catalog = s"""{
| "table": {"namespace": "default", "name": "person"},
| "rowkey": "key",
| "columns":{
| "key": {"cf": "rowkey", "col": "key", "type": "string"},
| "value": {"cf": "avro", "col": "", "avro": "avroSchema"}
| }
|}""".stripMargin
val results = spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("avroSchema", avroSchemaString).
option("catalog", catalog).
option("hbase.spark.use.hbasecontext", false).load()
results.show(false)
The results of this Scala script should be:
+-------+-------------------------+
| key| value|
+-------+-------------------------+
|charlie|[charlie@example.com, 20]|
|diana |[diana@example.com, 23] |
+-------+-------------------------+
Spark SQL supports extensive WHERE clauses for filtering rows. Filtering is more performant when it is done by the Region Servers and only by the necessary Region Servers. WHERE clauses are transformed to HBase Filters using the following rules:
The following expressions are supported by HBase Spark connector:
The following expressions are currently not supported:
It is possible to use not supported expressions in WHERE clauses, and surprisingly the results will be correct because Spark will filter them when aggregating the results. However, the evaluation will be slower, as it cannot be done by the Region Server. In addition, partitioning cannot be used with unsupported expressions.
HBase Spark connection is here, ready for use.