Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Super Collaborator

Spark3 HBase Integration

 

image (2).png

This blog post will guide you through the process of integrating Spark 3 with HBase, providing you with valuable insights and step-by-step instructions.

Note: This blog post will cover both Spark3 and HBase services only when they exists in the same cluster. If HBase is in remote  cluster, you can go through the hbase-configure-spark-connector-remote-cluster tutorial.

Prerequisites

  1. To use HBase-Spark3 connector, we need to ensure that CDP version is 7.1.7 SP1 Spark3 parcel or above.
  2. Ensure that every Spark node has the HBase Master, Region Server, or Gateway role assigned to it. If no HBase role is assigned to a Spark node, add the HBase Gateway role to it, which ensures that the HBase configuration files are available on the Spark node.

HBase Integration with Spark3

HBase integration with Spark can be achieved using the HBase-Spark Connector, which provides a seamless way to interact with HBase from within Spark applications

Here's how you can integrate HBase with Spark3 using the HBase Spark Connector:

1. Configure HBase-Spark connector

Step1: Adding the hbase-spark connector jars to classpath.

  1. Cloudera Manager --> Spark3 --> Configuration
  2. Ensure that the HBase service is selected in Spark Service as a dependency.
  3. Locate the Spark 3 Client Advanced Configuration Snippet (Safety Valve) for spark3-conf/spark-defaults.conf property or search for it by typing its name in the Search box.
  4. Add the following properties to ensure that all required HBase platform dependencies are available on the classpath for the Spark executors and drivers.

 

spark.driver.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar
spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar


Note: Before adding above parameters, you need to find out and replace VERSION-NUMBER from your cluster. For example version-number is 1.0.0.3.2.7171000.2-1:

spark.driver.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar
spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar

 

  • Save the Changes and restart the Spark3 service.

Step2: This step is optional when we are not using any kind of Spark SQL Filters.

  1. Go to Cloudera Manager > HBase > Configuration.
  2. Locate the RegionServer Environment Advanced Configuration Snippet (Safety Valve)  property or Search for regionserver environment.
  3. Click the plus icon to add the following property:

    Key: HBASE_CLASSPATH

    Value:

 

/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/CDH/jars/scala-library-***SCALA-VERSION***.jar

Note: Ensure that the listed jars have the correct version number in their name.
Example:

/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.12.10.jar

 

  • Save Changes and Restart the Region Server.

2. HBase-Spark connector Example 

2.1 Schema:

In order to use the HBase-Spark connector, you need to define the schema to map the attributes/columns from HBase to Spark or vice versa.

Let's assume you have employee table with columns id (Long type), name (String), age (Short), salary(Float). The employee personal information i.e, the id, name and age are stored in the per column family and professional information, i.e salary is stored in the prof column family. The key in HBase table will be the id attribute.

 SparkHBase
Type/TableEmployeeemployees
Idid:Longkey
Namename: Stringper:name
Ageage: Shortper:age
Salarysalary: Floatprof:salary

 

There are two ways we can specify the schema:

  1. Using hbase.columns.mapping property: This is very simple way to map the hbase columns to the Spark data types.For example:

 

val hbase_table = "employees"

val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"

val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", hbase_column_mapping).option("hbase.table", hbase_table).option("hbase.spark.use.hbasecontext", false).load()

 

  • Using HBaseTableCatalog.tableCatalog property: This is also very useful if you want to specify the schema in json format and also useful when you migrate your code from SHC to HBase Spark Connector. For example:

 

val catalog = s"""{
    |"table":{"namespace":"my_nm", "name":"employees"},
    |"rowkey":"key",
    |"columns":{
      |"id":{"cf":"rowkey", "col":"key", "type":"long"},
      |"name":{"cf":"per", "col":"name", "type":"string"},
      |"age":{"cf":"per", "col":"age", "type":"short"},
      |"salary":{"cf":"prof", "col":"salary", "type":"float"}
    |}
|}""".stripMargin

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog

val df = spark.read.format("org.apache.hadoop.hbase.spark").options(Map(HBaseTableCatalog.tableCatalog->catalog)).load()
df.show()

In the above example, in the catalog definition section, you can see the namespace is configured with the my_nm.

 

2.2 HBase:

Step 1: Launch the hbase-shell

 

 

hbase shell

 

 

Step 2: Create the HBase table.

 

 

create 'employees', 'per', 'prof'

 

 

Step 3: Quit the HBase shell

 

 

quit

 

 

2.3 Spark:

Step 1: Launch the spark3-shell

  1.  If you are configured HBase Spark Connector at cluster level then you can run the following command to launch the spark3-shell.

 

spark3-shell

 

  • If you are not configured HBase Spark Connector at cluster level then you can use the following command to launch the spark3-shell.

 

spark3-shell --jars /opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-<VERSION-NUMBER>.jar,/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-<VERSION-NUMBER>.jar

Note: You need to replace the VERSION-NUMBER according to your cluster.

 

Step 2: Run the following spark code to write the data to HBase.

 

 

// Create sample dataset to insert
case class Employee(id:Long, name: String, age: Short, salary: Float)

val employeeDF = Seq(
  Employee(1L, "Ranga", 34, 15000.5f),
  Employee(2L, "Nishanth", 5, 35000.5f),
  Employee(3L, "Meena", 30, 25000.5f)
).toDF()

// Save the data to kudu
val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val hbase_table = "employees"

employeeDF.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping",hbase_column_mapping).option("hbase.table",hbase_table).option("hbase.spark.use.hbasecontext",false).save()

 

 

Step 3: Run the following spark code to read the data from HBase.

 

 

val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val hbase_table = "employees"

// Load the data from HBase
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", hbase_column_mapping).option("hbase.table", hbase_table).option("hbase.spark.use.hbasecontext", false).load()

// Display the data
df.show()

// Filter the data
df.filter("age>10").show()

 

 

PySpark code:

/tmp/hbase_spark_connector_app.py:

 

 

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, ShortType, FloatType
import json

def main():
    spark = SparkSession.builder.appName("HBase Spark Connector App").getOrCreate()

    data = [(1, "Ranga", 34, 15000.5), (2, "Nishanth", 5, 35000.5),(3, "Meena", 30, 25000.5)]

    schema = StructType([ \
        StructField("id",LongType(),True), \
        StructField("name",StringType(),True), \
        StructField("age",ShortType(),True), \
        StructField("salary", FloatType(), True)
      ])
 
    employeeDF = spark.createDataFrame(data=data,schema=schema)

    catalog = json.dumps({
          "table":{"namespace":"default", "name":"employees"},
          "rowkey":"key",
          "columns":{
              "id":{"cf":"rowkey", "col":"key", "type":"long"},
              "name":{"cf":"per", "col":"name", "type":"string"},
              "age":{"cf":"per", "col":"age", "type":"short"},
              "salary":{"cf":"prof", "col":"salary", "type":"float"}
          }
    })

    employeeDF.write.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).save()

    df = spark.read.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).load()
    df.show()

    spark.stop()

if __name__ == "__main__":
    main()

 

 

Spark-Submit:

 

 

spark3-submit --master yarn \
--deploy-mode client \
/tmp/hbase_spark_connector_app.py

 

 

3. Running in Secure Cluster

For running in a Kerberos enabled cluster, the user has to include HBase related jars into the classpath as the HBase token retrieval and renewal is done by Spark, and is independent of the connector.

In other words, the user needs to initiate the environment in the normal way, either through kinit or by providing principal/keytab.

4. Tuning Parameters:

There are several tuning parameters in the Hbase-Spark connector.

For example:

  • hbase.spark.query.batchsize - Set the maximum number of values to return for each call to next() in scan.
  • hbase.spark.query.cachedrows - The number of rows for caching that will be passed to scan.

Refer more configurations from here.

5. Troubleshooting:

1. org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for user:

 

 

org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for user ?rangareddy@HADOOP.COM',action: scannerOpen, tableName:employees, family:prof, column: salary
    at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.authorizeAccess(RangerAuthorizationCoprocessor.java:568)
    at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.preScannerOpen(RangerAuthorizationCoprocessor.java:1013)
    at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.preScannerOpen(RangerAuthorizationCoprocessor.java:710)
    at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$40.call(RegionCoprocessorHost.java:1231)
    at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$40.call(RegionCoprocessorHost.java:1228)
    at org.apache.hadoop.hbase.coprocessor.CoprocessorHost$ObserverOperationWithoutResult.callObserver(CoprocessorHost.java:558)
    at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.execOperation(CoprocessorHost.java:631)

 

 

Problem:

The User doesn't have proper permission(s) to access the table/namespace. To access the HBase data, we need to provide a Ranger permission(s) to the user.

Solution:

Go to Ranger UI --> Access Manager --> HBASE --> Click on cm_hbase --> Create a new policy or use existing policy and provide necessary permissions to access HBase table.

2. java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array

 

 

java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 2
    at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:789)
    at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:943)
    at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:929)
    at org.apache.hadoop.hbase.spark.datasources.Utils$.hbaseFieldToScalaType(Utils.scala:53)
    at org.apache.hadoop.hbase.spark.HBaseRelation.$anonfun$buildRow$2(DefaultSource.scala:289)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.hadoop.hbase.spark.HBaseRelation.buildRow(DefaultSource.scala:280)
    at org.apache.hadoop.hbase.spark.HBaseRelation.$anonfun$buildScan$9(DefaultSource.scala:366)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)

 

 

Problem:

java.lang.IllegalArgumentException will occur due to schema mismatch between Spark and HBase. For example, from Spark side data type is Int and HBase side data type is String.

Solution:

There are two solutions for this issue:

1. Providing the correct the schema to map from both Spark and HBase side

2. From Spark side convert the data type to String.

3. java.lang.NullPointerException at org.apache.hadoop.hbase.spark.HBaseRelation

 

 

 java.lang.NullPointerException
	at org.apache.hadoop.hbase.spark.HBaseRelation.<init>(DefaultSource.scala:138)
	at org.apache.hadoop.hbase.spark.DefaultSource.createRelation(DefaultSource.scala:78)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)

 

 

Problem:

When trying to read/write the data using HBase Spark Connector, the connector will try to get/initialize the HBaseContext. While getting the HBaseContext value, first it will check the hbase.spark.use.hbasecontext parameter value (by default true) is true then it will get the HBaseContext from cache and in cache HBaseContext does not exist so you are getting the NullPointerException.

 HBaseSparkConf configuration:

val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext"
val DEFAULT_USE_HBASECONTEXT = true

Get the HBaseContext code:

  //create or get latest HBaseContext  val hbaseContext:HBaseContext = if (useHBaseContext) {
    LatestHBaseContextCache.latest
  } else {
    val config = HBaseConfiguration.create()
    configResources.map(resource => resource.split(",").foreach(r => config.addResource(r)))
    new HBaseContext(sqlContext.sparkContext, config)
  }

There is already open jira for this issuehttps://issues.apache.org/jira/browse/HBASE-18570

Solution:

There are multiple solutions for this issue:

  1.  Set the hbase.spark.use.hbasecontext as false\
  2. Initialise the HBaseContext and load the data using HBase Spark Connector
  3. We need to wait till fix the HBASE-18570 jira.

4.org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: employee: 1 time, servers with issues: null

 

 

org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:163)
    at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: employee: 1 time, servers with issues: null
    at org.apache.hadoop.hbase.client.BatchErrors.makeException(BatchErrors.java:54)
    at org.apache.hadoop.hbase.client.AsyncRequestFutureImpl.getErrors(AsyncRequestFutureImpl.java:1196)
    at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doFlush(BufferedMutatorImpl.java:309)
    at org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:241)
    at org.apache.hadoop.hbase.mapred.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:91)
    at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.closeWriter(SparkHadoopWriter.scala:251)
    at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:145)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
    at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
    ... 9 more

 

 

Problem:

While saving the data to HBase, if table doesn't exist we will get the above exception.

Solution:

Creating the HBase table with proper column families.

6. References

  1. Configuring HBase-Spark connector using Cloudera Manager when HBase and Spark are on the same cluste...
  2. hbase-connectors /spark/
  3. hbase-connectors/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources /HBaseSp...
  4. HBASE-18570

 

 

 

2,221 Views
Comments
avatar
Contributor

Nice work!

Version history
Last update:
‎11-29-2023 09:17 PM
Updated by: