Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Cloudera Employee

Preface

Apache HBase is an established NoSQL storage system that has been proven out in sizable production deployments. Although HBase is a key value store, there is a high demand to access data more easily like in SQL. Apache Spark includes great support for basic filtering and insertion of data via SQL though it's SparkSQL offering. The HBase project provides integration with SparkSQL through the work of the HBase Connectors subproject. This article describes the required steps of accessing HBase stored data in SQL fashion.

History

Several integrations for accessing HBase from Spark have occurred in the past. The first connector was developed by Cloudera Professional Services, which was called Spark on HBase. A version of that early work was contributed to the upstream HBase projects as a feature that the community considered experimental. Cloudera included a derivative of this community version (called hbase-spark) in both CDH 5 and CDH 6. Around the same time, Hortonworks also came up with an implementation, it was called SHC (Spark HBase connector). Based on these two implementations, we created a new upstream project, hbase-connectors.

Compatibility & Support

Implementation

Spark

Distribution

hbase-spark

1.6

CDH 5

hbase-spark

2.4

CDH 6

hbase-spark

2.3

HDP 2.6, HDP 3.1

SHC

2.3

HDP 2.6, HDP 3.1

hbase-connectors

2.4

CDP

 

Please note, that not every implementation is compatible with all of the versions. We will focus only on hbase-connectors in this article because it is the newest version. These implementations have almost the same codebase, so migrating to hbase-connectors should not be difficult.

Overview

hbase-spark overview.png

When Spark is used with HBase, it requires some HBase libraries. HBase Spark connector exports HBase APIs and also provides HBase specific implementations for RDDs and DataSources.


HBase Region Servers also require Spark classes on the classpath when Spark SQL queries are in use. These SQL queries are evaluated by Region Servers. For more information, see the Filter Algebra section below.

Configuration

HBase

HBase Region Server configuration must be edited when using Spark SQL queries.

Steps

  1. Go to Cloudera Manager and select the HBase service.
  2. Search for “regionserver environment”.
    Screenshot from 2020-04-27 13-38-46.png
  3. Add a new environment variable using the RegionServer Environment Advanced Configuration Snippet (Safety Valve):
    • Key: HBASE_CLASSPATH

    • Value: /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-1.0.0.7.0.3.0-79.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded-1.0.0.7.0.3.0-79.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar

      Ensure you use the appropriate version numbers.

  4. Restart Region Servers.

Spark

Spark runtime also needs HBase bindings. Add the necessary jars to spark-shell or spark-submit:

 

spark-shell --jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-1.0.0.7.0.3.0-79.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded-1.0.0.7.0.3.0-79.jar

 

Ensure you use the appropriate version numbers.

Our task

Let’s imagine storing personal data in HBase. Data is stored in the person table using several Column Families:

Column Family

Description

p

Personal information like birth date and height.

c

Contact information like email address.

avro

For Apache Avro encoded data.

The key field is the name of the people, assuming it is unique.

Create the table in HBase Shell:

 

shell> create 'person', 'p', 'c', 'avro'

 

Connecting to HBase

You can connect to HBase from Spark in two ways:

  • using the default configuration
  • passing a configuration object

Default configuration

 

spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
  option("hbase.spark.use.hbasecontext", false)
  // additional options...

 

Ensure that the hbase.spark.use.hbasecontext configuration property is set, or a NullPointerException will be thrown.

Customized configuration

 

val conf = new HBaseConfiguration()
conf.set("hbase.zookeeper.quorum", "hbase-1.example.com")

// the latest HBaseContext will be used afterwards
new HBaseContext(spark.sparkContext, conf)

spark.sqlContext.read.format("org.apache.hadoop.hbase.spark")
  // additional options...

 

 

In this case, a custom configuration object is created, where the address of ZooKeeper is explicitly set. Spark will use the latest instantiated HBaseContext object, and the hbase.spark.use.hbasecontext option is now omitted.

Mapping

Spark applications store data as typed values, while HBase stores everything as byte[] arrays, so application data has to be mapped and serialized. We assume that application data has a fixed schema, it consists of several fields of integers, strings, dates, etc. and these fields make a record. One record will be mapped to an HBase row.

 

There are two, fundamentally different, common approaches:

  • map fields to separate HBase columns
  • compose one big column of the record using Apache Avro

Field-to-Column mapping

Field-to-Column mapping means application data is stored in the same structure in HBase. The following Spark class stores the Personal data in this example:

 

case class Person(name: String, email: String, birthDate: Date, height: Float)

 

 

Person class stores

  • the name and email addresses as Strings
  • birth date as a Date field
  • height as a floating point number

These fields are mapped to different columns with the following mapping:

 

spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
  option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " +
    "birthDate DATE p:birthDate, height FLOAT p:height")
  // additional options...

 

 

Mapping is defined by the hbase.columns.mapping option. The format of it is <field name> <sql type> <column family:qualifier>. The key field must be referred to as :key. The previous example maps:

  • name to the row key,
  • email to the c column family and email column,
  • birthDate to the p column family and birthDate column,
  • height to the p column family and height column.

The following data types are supported:

Scala type

Spark SQL type

HBase (byte[]) representation

Boolean

BOOLEAN

1 byte (0xFF - true, otherwise - false)

Byte

BYTE

1 byte

Short

SHORT

2 bytes (Big endian)

Integer

INTEGER

4 bytes (Big endian)

Long

LONG

8 bytes (Big endian)

Float

FLOAT

4 bytes IEEE754

Double

DOUBLE

8 bytes IEEE754

Date

DATE

8 bytes (Unix time in milliseconds)

Timestamp

TIMESTAMP

8 bytes (Unix time in milliseconds)

String

STRING

UTF-8 encoded string

Array[Byte]

BINARY

byte[]

These types are encoded with the standard HBase Bytes.toBytes() methods.

 

Note: Connector does not do any data type conversion, so Scala type and SQL type must match. For example, if a particular column mapping is FLOAT then values must be Float instances.

Apache Avro mapping

Avro is a data serialization system. It can serialize multiple fields into a single byte array when used with HBase and Spark. This approach is fundamentally different from the field-to-column mapping because the result of Avro mapping is one big value instead of multiple values. Please also note that Avro uses its own serialization algorithm which is different from the standard Java serialization format.

The schema must be defined in the Avro world:

 

 

def avroSchemaString =
  s"""{"type": "record", "name": "Person", "namespace": "com.example", "fields": [
      | {"name": "email", "type": "string"},
      | {"name": "age", "type": "int"}
      |]}""".stripMargin

 

 

A catalog object must be also defined which describes where the serialized data should be stored:

 

 

def catalog = s"""{
                  | "table": {"namespace": "default", "name": "person"},
                  | "rowkey": "key",
                  | "columns":{
                  |   "key": {"cf": "rowkey", "col": "key", "type": "string"},
                  |   "value": {"cf": "avro", "col": "", "avro": "avroSchema"}
                  | }
                  |}""".stripMargin

 

 

In this example, data is stored in the default namespace and person table, the key column of Avro records is going to be the row key in HBase, and Avro serialized data is stored in the avro column family with an empty column name.

Spark SQL has to be configured to use Avro schema:

 

spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
  option("avroSchema", avroSchemaString).
  option("catalog", catalog)

 

 

This mapping is more difficult because it requires an Avro schema and an HBase mapping (catalog) as well.

Mixed mapping

Data access is pretty fast in HBase when the key is searched, but what happens when data must be searched by multiple values?

It is pretty common to use composite keys in these cases when the key is composed of multiple fields, but other non-key fields are mapped to separate columns. This is not covered in this article, but it is possible to use Avro mapping for the keys and field-to-column mapping for non-key fields.

Other mappings

Field-to-column mapping relies on HBase Bytes.toBytes() methods, while Avro has its own serialization format. These approaches can be mixed, but other mappings are not supported, for example, Phoenix's mapping.

Phoenix

The way Phoenix stores integer types is slightly different. It flips the sign bit, so 4 bytes signed integer 0 is encoded as 0x80000000. Unsigned integers do not have a sign bit, so nothing is flipped. The advantage of it is when using byte level comparison over signed integers, it will work as expected, meaning that negative numbers will be sorted before positive ones. The Spark integration provided by the HBase project does not support Phoenix integers, which can lead to unexpected results. The Phoenix Project provides its own integration for working with Spark. That integration is out of scope for this particular blog post.

Put all together

Field-to-column mapping

Insert

Data insertion is done by the following Scala code. Paste the code into Spark Shell:

 

 

import java.sql.Date

case class Person(name: String, email: String, birthDate: Date, height: Float)

var personDS = Seq(
  Person("alice", "alice@example.com", Date.valueOf("2000-01-01"), 5.6f),
  Person("bob", "bob@example.com", Date.valueOf("2001-10-17"), 6.0f)
).toDS

personDS.write.format("org.apache.hadoop.hbase.spark").
  option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " +
    "birthDate DATE p:birthDate, height FLOAT p:height").
  option("hbase.table", "person").option("hbase.spark.use.hbasecontext", false).save()

 

 

Verify

Use the HBase shell to verify the inserted data:

 

 

shell> scan 'person'
ROW     COLUMN+CELL
 alice  column=c:email, timestamp=1568723598292, value=alice@example.com
 alice  column=p:birthDate, timestamp=1568723598292, value=\x00\x00\x00\xDCl\x87 \x00
 alice  column=p:height, timestamp=1568723598292, value=@\xB333
 bob    column=c:email, timestamp=1568723598521, value=bob@example.com
 bob    column=p:birthDate, timestamp=1568723598521, value=\x00\x00\x00\xE9\x99u\x95\x80
 bob    column=p:height, timestamp=1568723598521, value=@\xC0\x00\x00
2 row(s)

 

 

As you see, HBase Spark connector created separate columns for the fields of Person class.

Select

Data can be queried with Spark shell again:

 

 

val df = spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
  option("hbase.columns.mapping", "name STRING :key, email STRING c:email, " +
    "birthDate DATE p:birthDate, height FLOAT p:height").
  option("hbase.table", "person").option("hbase.spark.use.hbasecontext", false).load()

df.createOrReplaceTempView("personView")

val results = spark.sqlContext.sql("SELECT * FROM personView WHERE name = 'alice'")
results.show()

 

 

The output of the following job should be the following:

 

 

+-----+------+-----------------+----------+
| name|height|            email| birthDate|
+-----+------+-----------------+----------+
|alice|   5.6|alice@example.com|2000-01-01|
+-----+------+-----------------+----------+

 

 

Avro mapping

Insert

 

 

import org.apache.avro.Schema
import org.apache.hadoop.hbase.spark.SchemaConverters
import org.apache.spark.sql._
import org.apache.spark.sql.types._

def avroSchemaString =
  s"""{"type": "record", "name": "Person", "namespace": "com.example", "fields": [
      | {"name": "email", "type": "string"},
      | {"name": "age", "type": "int"}
      |]}""".stripMargin
val avroSchema = new Schema.Parser().parse(avroSchemaString)

def catalog = s"""{
                  | "table": {"namespace": "default", "name": "person"},
                  | "rowkey": "key",
                  | "columns":{
                  |   "key": {"cf": "rowkey", "col": "key", "type": "string"},
                  |   "value": {"cf": "avro", "col": "", "avro": "avroSchema"}
                  | }
                  |}""".stripMargin

val rowSchema = StructType(List(
  StructField("key", StringType, true),
  StructField("value", SchemaConverters.toSqlType(avroSchema).dataType, true))
)

val data = Seq(
  Row("charlie", Row("charlie@example.com", 20)),
  Row("diana", Row("diana@example.com", 23))
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  rowSchema
)

df.write.format("org.apache.hadoop.hbase.spark").
  option("avroSchema", avroSchemaString).option("catalog", catalog).
  option("hbase.spark.use.hbasecontext", false). save()

 

 

Verify

Use HBase Shell to verify the inserted data:

 

 

shell> scan 'person'
ROW        COLUMN+CELL
 charlie    column=avro:, timestamp=1582644932655, value=&charlie@example.com(
 diana      column=avro:, timestamp=1582644932655, value="diana@example.com.
2 row(s)

 

 

The results contain only one column for each row with a composite value as expected. The value &charlie@example.com( means:

  • & (38 ASCII code): the length of the following string is 19 bytes,
  • charlie@example.com: the email address,
  • ( (40 ASCII code): the age is 20 years.

Select

 

 

import org.apache.spark.sql._
import org.apache.spark.sql.types._

def avroSchemaString =
  s"""{"type": "record", "name": "Person", "namespace": "com.example", "fields": [
      | {"name": "email", "type": "string"},
      | {"name": "age", "type": "int"}
      |]}""".stripMargin

def catalog = s"""{
                  | "table": {"namespace": "default", "name": "person"},
                  | "rowkey": "key",
                  | "columns":{
                  |   "key": {"cf": "rowkey", "col": "key", "type": "string"},
                  |   "value": {"cf": "avro", "col": "", "avro": "avroSchema"}
                  | }
                  |}""".stripMargin

val results = spark.sqlContext.read.format("org.apache.hadoop.hbase.spark").
  option("avroSchema", avroSchemaString).
  option("catalog", catalog).
  option("hbase.spark.use.hbasecontext", false).load()

results.show(false)

 

 

The results of this Scala script should be:

 

 

+-------+-------------------------+
|    key|                    value|
+-------+-------------------------+
|charlie|[charlie@example.com, 20]|
|diana  |[diana@example.com, 23]  |
+-------+-------------------------+

 

 

Filter algebra

Spark SQL supports extensive WHERE clauses for filtering rows. Filtering is more performant when it is done by the Region Servers and only by the necessary Region Servers. WHERE clauses are transformed to HBase Filters using the following rules:

  • Expressions on the key field are transformed to RowFilters, so the query will be properly partitioned.
  • Expressions on the values are evaluated by SQLPushDownFilter on the Region Server.

 The following expressions are supported by HBase Spark connector:

  • ==, !=, <, <=, >, >=
  • IS NULL, IS NOT NULL
  • AND, OR, (, )

The following expressions are currently not supported:

  • IN, LIKE

 It is possible to use not supported expressions in WHERE clauses, and surprisingly the results will be correct because Spark will filter them when aggregating the results. However, the evaluation will be slower, as it cannot be done by the Region Server. In addition, partitioning cannot be used with unsupported expressions.

Summary

HBase Spark connection is here, ready for use.

7,376 Views