Member since
12-23-2019
1
Post
0
Kudos Received
0
Solutions
04-29-2020
06:36 AM
3 Kudos
Preface
Apache HBase is an established NoSQL storage system that has been proven out in sizable production deployments. Although HBase is a key value store, there is a high demand to access data more easily like in SQL. Apache Spark includes great support for basic filtering and insertion of data via SQL though it's SparkSQL offering. The HBase project provides integration with SparkSQL through the work of the HBase Connectors subproject. This article describes the required steps of accessing HBase stored data in SQL fashion.
History
Several integrations for accessing HBase from Spark have occurred in the past. The first connector was developed by Cloudera Professional Services, which was called Spark on HBase. A version of that early work was contributed to the upstream HBase projects as a feature that the community considered experimental. Cloudera included a derivative of this community version (called hbase-spark) in both CDH 5 and CDH 6. Around the same time, Hortonworks also came up with an implementation, it was called SHC (Spark HBase connector). Based on these two implementations, we created a new upstream project, hbase-connectors.
Compatibility & Support
Implementation
Spark
Distribution
hbase-spark
1.6
CDH 5
hbase-spark
2.4
CDH 6
hbase-spark
2.3
HDP 2.6, HDP 3.1
SHC
2.3
HDP 2.6, HDP 3.1
hbase-connectors
2.4
CDP
Please note, that not every implementation is compatible with all of the versions. We will focus only on hbase-connectors in this article because it is the newest version. These implementations have almost the same codebase, so migrating to hbase-connectors should not be difficult.
Overview
When Spark is used with HBase, it requires some HBase libraries. HBase Spark connector exports HBase APIs and also provides HBase specific implementations for RDDs and DataSources.
HBase Region Servers also require Spark classes on the classpath when Spark SQL queries are in use. These SQL queries are evaluated by Region Servers. For more information, see the Filter Algebra section below.
Configuration
HBase
HBase Region Server configuration must be edited when using Spark SQL queries.
Steps
Go to Cloudera Manager and select the HBase service.
Search for “regionserver environment”.
Add a new environment variable using the RegionServer Environment Advanced Configuration Snippet (Safety Valve):
Key: HBASE_CLASSPATH
Value: /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-1.0.0.7.0.3.0-79.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded-1.0.0.7.0.3.0-79.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
Ensure you use the appropriate version numbers.
Restart Region Servers.
Spark
Spark runtime also needs HBase bindings. Add the necessary jars to spark-shell or spark-submit:
spark-shell --jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-1.0.0.7.0.3.0-79.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded-1.0.0.7.0.3.0-79.jar
Ensure you use the appropriate version numbers.
Our task
Let’s imagine storing personal data in HBase. Data is stored in the person table using several Column Families:
Column Family
Description
p
Personal information like birth date and height.
c
Contact information like email address.
avro
For Apache Avro encoded data.
The key field is the name of the people, assuming it is unique.
Create the table in HBase Shell:
shell> create 'person', 'p', 'c', 'avro'
Connecting to HBase
You can connect to HBase from Spark in two ways:
using the default configuration
passing a configuration object
Default configuration
spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("hbase.spark.use.hbasecontext", false)
// additional options...
Ensure that the hbase.spark.use.hbasecontext configuration property is set, or a NullPointerException will be thrown.
Customized configuration
val conf = new HBaseConfiguration()
conf.set("hbase.zookeeper.quorum", "hbase-1.example.com")
// the latest HBaseContext will be used afterwards
new HBaseContext(spark.sparkContext, conf)
spark.sqlContext.read.format("org.apache.hadoop.hbase.spark")
// additional options...
In this case, a custom configuration object is created, where the address of ZooKeeper is explicitly set. Spark will use the latest instantiated HBaseContext object, and the hbase.spark.use.hbasecontext option is now omitted.
Mapping
Spark applications store data as typed values, while HBase stores everything as byte[] arrays, so application data has to be mapped and serialized. We assume that application data has a fixed schema, it consists of several fields of integers, strings, dates, etc. and these fields make a record. One record will be mapped to an HBase row.
There are two, fundamentally different, common approaches:
map fields to separate HBase columns
compose one big column of the record using Apache Avro
Field-to-Column mapping
Field-to-Column mapping means application data is stored in the same structure in HBase. The following Spark class stores the Personal data in this example:
case class Person(name: String, email: String, birthDate: Date, height: Float)
Person class stores
the name and email addresses as Strings
birth date as a Date field
height as a floating point number
These fields are mapped to different columns with the following mapping:
spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height")
// additional options...
Mapping is defined by the hbase.columns.mapping option. The format of it is <field name> <sql type> <column family:qualifier>. The key field must be referred to as :key. The previous example maps:
name to the row key,
email to the c column family and email column,
birthDate to the p column family and birthDate column,
height to the p column family and height column.
The following data types are supported:
Scala type
Spark SQL type
HBase (byte[]) representation
Boolean
BOOLEAN
1 byte (0xFF - true, otherwise - false)
Byte
BYTE
1 byte
Short
SHORT
2 bytes (Big endian)
Integer
INTEGER
4 bytes (Big endian)
Long
LONG
8 bytes (Big endian)
Float
FLOAT
4 bytes IEEE754
Double
DOUBLE
8 bytes IEEE754
Date
DATE
8 bytes (Unix time in milliseconds)
Timestamp
TIMESTAMP
8 bytes (Unix time in milliseconds)
String
STRING
UTF-8 encoded string
Array[Byte]
BINARY
byte[]
These types are encoded with the standard HBase Bytes.toBytes() methods.
Note: Connector does not do any data type conversion, so Scala type and SQL type must match. For example, if a particular column mapping is FLOAT then values must be Float instances.
Apache Avro mapping
Avro is a data serialization system. It can serialize multiple fields into a single byte array when used with HBase and Spark. This approach is fundamentally different from the field-to-column mapping because the result of Avro mapping is one big value instead of multiple values. Please also note that Avro uses its own serialization algorithm which is different from the standard Java serialization format.
The schema must be defined in the Avro world:
def avroSchemaString =
s"""{"type": "record", "name": "Person", "namespace": "com.example", "fields": [
| {"name": "email", "type": "string"},
| {"name": "age", "type": "int"}
|]}""".stripMargin
A catalog object must be also defined which describes where the serialized data should be stored:
def catalog = s"""{
| "table": {"namespace": "default", "name": "person"},
| "rowkey": "key",
| "columns":{
| "key": {"cf": "rowkey", "col": "key", "type": "string"},
| "value": {"cf": "avro", "col": "", "avro": "avroSchema"}
| }
|}""".stripMargin
In this example, data is stored in the default namespace and person table, the key column of Avro records is going to be the row key in HBase, and Avro serialized data is stored in the avro column family with an empty column name.
Spark SQL has to be configured to use Avro schema:
spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("avroSchema", avroSchemaString).
option("catalog", catalog)
This mapping is more difficult because it requires an Avro schema and an HBase mapping (catalog) as well.
Mixed mapping
Data access is pretty fast in HBase when the key is searched, but what happens when data must be searched by multiple values?
It is pretty common to use composite keys in these cases when the key is composed of multiple fields, but other non-key fields are mapped to separate columns. This is not covered in this article, but it is possible to use Avro mapping for the keys and field-to-column mapping for non-key fields.
Other mappings
Field-to-column mapping relies on HBase Bytes.toBytes() methods, while Avro has its own serialization format. These approaches can be mixed, but other mappings are not supported, for example, Phoenix's mapping.
Phoenix
The way Phoenix stores integer types is slightly different. It flips the sign bit, so 4 bytes signed integer 0 is encoded as 0x80000000. Unsigned integers do not have a sign bit, so nothing is flipped. The advantage of it is when using byte level comparison over signed integers, it will work as expected, meaning that negative numbers will be sorted before positive ones. The Spark integration provided by the HBase project does not support Phoenix integers, which can lead to unexpected results. The Phoenix Project provides its own integration for working with Spark. That integration is out of scope for this particular blog post.
Put all together
Field-to-column mapping
Insert
Data insertion is done by the following Scala code. Paste the code into Spark Shell:
import java.sql.Date
case class Person(name: String, email: String, birthDate: Date, height: Float)
var personDS = Seq(
Person("alice", "alice@example.com", Date.valueOf("2000-01-01"), 5.6f),
Person("bob", "bob@example.com", Date.valueOf("2001-10-17"), 6.0f)
).toDS
personDS.write.format("org.apache.hadoop.hbase.spark").
option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height").
option("hbase.table", "person").option("hbase.spark.use.hbasecontext", false).save()
Verify
Use the HBase shell to verify the inserted data:
shell> scan 'person'
ROW COLUMN+CELL
alice column=c:email, timestamp=1568723598292, value=alice@example.com
alice column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00
alice column=p:height, timestamp=1568723598292, value=@\xB333
bob column=c:email, timestamp=1568723598521, value=bob@example.com
bob column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80
bob column=p:height, timestamp=1568723598521, value=@\xC0\x00\x00
2 row(s)
As you see, HBase Spark connector created separate columns for the fields of Person class.
Select
Data can be queried with Spark shell again:
val df = spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " +
"birthDate DATE p:birthDate, height FLOAT p:height").
option("hbase.table", "person").option("hbase.spark.use.hbasecontext", false).load()
df.createOrReplaceTempView("personView")
val results = spark.sqlContext.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()
The output of the following job should be the following:
+-----+------+-----------------+----------+
| name|height| email| birthDate|
+-----+------+-----------------+----------+
|alice| 5.6|alice@example.com|2000-01-01|
+-----+------+-----------------+----------+
Avro mapping
Insert
import org.apache.avro.Schema
import org.apache.hadoop.hbase.spark.SchemaConverters
import org.apache.spark.sql._
import org.apache.spark.sql.types._
def avroSchemaString =
s"""{"type": "record", "name": "Person", "namespace": "com.example", "fields": [
| {"name": "email", "type": "string"},
| {"name": "age", "type": "int"}
|]}""".stripMargin
val avroSchema = new Schema.Parser().parse(avroSchemaString)
def catalog = s"""{
| "table": {"namespace": "default", "name": "person"},
| "rowkey": "key",
| "columns":{
| "key": {"cf": "rowkey", "col": "key", "type": "string"},
| "value": {"cf": "avro", "col": "", "avro": "avroSchema"}
| }
|}""".stripMargin
val rowSchema = StructType(List(
StructField("key", StringType, true),
StructField("value", SchemaConverters.toSqlType(avroSchema).dataType, true))
)
val data = Seq(
Row("charlie", Row("charlie@example.com", 20)),
Row("diana", Row("diana@example.com", 23))
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
rowSchema
)
df.write.format("org.apache.hadoop.hbase.spark").
option("avroSchema", avroSchemaString).option("catalog", catalog).
option("hbase.spark.use.hbasecontext", false). save()
Verify
Use HBase Shell to verify the inserted data:
shell> scan 'person'
ROW COLUMN+CELL
charlie column=avro:, timestamp=1582644932655, value=&charlie@example.com(
diana column=avro:, timestamp=1582644932655, value="diana@example.com.
2 row(s)
The results contain only one column for each row with a composite value as expected. The value &charlie@example.com( means:
& (38 ASCII code): the length of the following string is 19 bytes,
charlie@example.com: the email address,
( (40 ASCII code): the age is 20 years.
Select
import org.apache.spark.sql._
import org.apache.spark.sql.types._
def avroSchemaString =
s"""{"type": "record", "name": "Person", "namespace": "com.example", "fields": [
| {"name": "email", "type": "string"},
| {"name": "age", "type": "int"}
|]}""".stripMargin
def catalog = s"""{
| "table": {"namespace": "default", "name": "person"},
| "rowkey": "key",
| "columns":{
| "key": {"cf": "rowkey", "col": "key", "type": "string"},
| "value": {"cf": "avro", "col": "", "avro": "avroSchema"}
| }
|}""".stripMargin
val results = spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
option("avroSchema", avroSchemaString).
option("catalog", catalog).
option("hbase.spark.use.hbasecontext", false).load()
results.show(false)
The results of this Scala script should be:
+-------+-------------------------+
| key| value|
+-------+-------------------------+
|charlie|[charlie@example.com, 20]|
|diana |[diana@example.com, 23] |
+-------+-------------------------+
Filter algebra
Spark SQL supports extensive WHERE clauses for filtering rows. Filtering is more performant when it is done by the Region Servers and only by the necessary Region Servers. WHERE clauses are transformed to HBase Filters using the following rules:
Expressions on the key field are transformed to RowFilters, so the query will be properly partitioned.
Expressions on the values are evaluated by SQLPushDownFilter on the Region Server.
The following expressions are supported by HBase Spark connector:
==, !=, <, <=, >, >=
IS NULL, IS NOT NULL
AND, OR, (, )
The following expressions are currently not supported:
IN, LIKE
It is possible to use not supported expressions in WHERE clauses, and surprisingly the results will be correct because Spark will filter them when aggregating the results. However, the evaluation will be slower, as it cannot be done by the Region Server. In addition, partitioning cannot be used with unsupported expressions.
Summary
HBase Spark connection is here, ready for use.
... View more