Created on 07-17-2023 03:04 AM - edited 11-29-2023 09:17 PM
This blog post will guide you through the process of integrating Spark 3 with HBase, providing you with valuable insights and step-by-step instructions.
Note: This blog post will cover both Spark3 and HBase services only when they exists in the same cluster. If HBase is in remote cluster, you can go through the hbase-configure-spark-connector-remote-cluster tutorial.
HBase integration with Spark can be achieved using the HBase-Spark Connector, which provides a seamless way to interact with HBase from within Spark applications.
Here's how you can integrate HBase with Spark3 using the HBase Spark Connector:
Step1: Adding the hbase-spark connector jars to classpath.
spark.driver.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar
spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar
Note: Before adding above parameters, you need to find out and replace VERSION-NUMBER from your cluster. For example version-number is 1.0.0.3.2.7171000.2-1:
spark.driver.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar
spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar
Step2: This step is optional when we are not using any kind of Spark SQL Filters.
Key: HBASE_CLASSPATH
/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/CDH/jars/scala-library-***SCALA-VERSION***.jar
Note: Ensure that the listed jars have the correct version number in their name.
Example:
/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.12.10.jar
In order to use the HBase-Spark connector, you need to define the schema to map the attributes/columns from HBase to Spark or vice versa.
Let's assume you have employee table with columns id (Long type), name (String), age (Short), salary(Float). The employee personal information i.e, the id, name and age are stored in the per column family and professional information, i.e salary is stored in the prof column family. The key in HBase table will be the id attribute.
Spark | HBase | |
Type/Table | Employee | employees |
Id | id:Long | key |
Name | name: String | per:name |
Age | age: Short | per:age |
Salary | salary: Float | prof:salary |
There are two ways we can specify the schema:
val hbase_table = "employees"
val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", hbase_column_mapping).option("hbase.table", hbase_table).option("hbase.spark.use.hbasecontext", false).load()
val catalog = s"""{
|"table":{"namespace":"my_nm", "name":"employees"},
|"rowkey":"key",
|"columns":{
|"id":{"cf":"rowkey", "col":"key", "type":"long"},
|"name":{"cf":"per", "col":"name", "type":"string"},
|"age":{"cf":"per", "col":"age", "type":"short"},
|"salary":{"cf":"prof", "col":"salary", "type":"float"}
|}
|}""".stripMargin
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
val df = spark.read.format("org.apache.hadoop.hbase.spark").options(Map(HBaseTableCatalog.tableCatalog->catalog)).load()
df.show()
In the above example, in the catalog definition section, you can see the namespace is configured with the my_nm.
Step 1: Launch the hbase-shell
hbase shell
Step 2: Create the HBase table.
create 'employees', 'per', 'prof'
Step 3: Quit the HBase shell
quit
Step 1: Launch the spark3-shell
spark3-shell
spark3-shell --jars /opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-<VERSION-NUMBER>.jar,/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-<VERSION-NUMBER>.jar
Note: You need to replace the VERSION-NUMBER according to your cluster.
Step 2: Run the following spark code to write the data to HBase.
// Create sample dataset to insert
case class Employee(id:Long, name: String, age: Short, salary: Float)
val employeeDF = Seq(
Employee(1L, "Ranga", 34, 15000.5f),
Employee(2L, "Nishanth", 5, 35000.5f),
Employee(3L, "Meena", 30, 25000.5f)
).toDF()
// Save the data to kudu
val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val hbase_table = "employees"
employeeDF.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping",hbase_column_mapping).option("hbase.table",hbase_table).option("hbase.spark.use.hbasecontext",false).save()
Step 3: Run the following spark code to read the data from HBase.
val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val hbase_table = "employees"
// Load the data from HBase
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", hbase_column_mapping).option("hbase.table", hbase_table).option("hbase.spark.use.hbasecontext", false).load()
// Display the data
df.show()
// Filter the data
df.filter("age>10").show()
/tmp/hbase_spark_connector_app.py:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, ShortType, FloatType
import json
def main():
spark = SparkSession.builder.appName("HBase Spark Connector App").getOrCreate()
data = [(1, "Ranga", 34, 15000.5), (2, "Nishanth", 5, 35000.5),(3, "Meena", 30, 25000.5)]
schema = StructType([ \
StructField("id",LongType(),True), \
StructField("name",StringType(),True), \
StructField("age",ShortType(),True), \
StructField("salary", FloatType(), True)
])
employeeDF = spark.createDataFrame(data=data,schema=schema)
catalog = json.dumps({
"table":{"namespace":"default", "name":"employees"},
"rowkey":"key",
"columns":{
"id":{"cf":"rowkey", "col":"key", "type":"long"},
"name":{"cf":"per", "col":"name", "type":"string"},
"age":{"cf":"per", "col":"age", "type":"short"},
"salary":{"cf":"prof", "col":"salary", "type":"float"}
}
})
employeeDF.write.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).save()
df = spark.read.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).load()
df.show()
spark.stop()
if __name__ == "__main__":
main()
Spark-Submit:
spark3-submit --master yarn \
--deploy-mode client \
/tmp/hbase_spark_connector_app.py
For running in a Kerberos enabled cluster, the user has to include HBase related jars into the classpath as the HBase token retrieval and renewal is done by Spark, and is independent of the connector.
In other words, the user needs to initiate the environment in the normal way, either through kinit or by providing principal/keytab.
There are several tuning parameters in the Hbase-Spark connector.
For example:
Refer more configurations from here.
org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for user ?rangareddy@HADOOP.COM',action: scannerOpen, tableName:employees, family:prof, column: salary
at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.authorizeAccess(RangerAuthorizationCoprocessor.java:568)
at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.preScannerOpen(RangerAuthorizationCoprocessor.java:1013)
at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.preScannerOpen(RangerAuthorizationCoprocessor.java:710)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$40.call(RegionCoprocessorHost.java:1231)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$40.call(RegionCoprocessorHost.java:1228)
at org.apache.hadoop.hbase.coprocessor.CoprocessorHost$ObserverOperationWithoutResult.callObserver(CoprocessorHost.java:558)
at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.execOperation(CoprocessorHost.java:631)
Problem:
The User doesn't have proper permission(s) to access the table/namespace. To access the HBase data, we need to provide a Ranger permission(s) to the user.
Solution:
Go to Ranger UI --> Access Manager --> HBASE --> Click on cm_hbase --> Create a new policy or use existing policy and provide necessary permissions to access HBase table.
java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 2
at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:789)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:943)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:929)
at org.apache.hadoop.hbase.spark.datasources.Utils$.hbaseFieldToScalaType(Utils.scala:53)
at org.apache.hadoop.hbase.spark.HBaseRelation.$anonfun$buildRow$2(DefaultSource.scala:289)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.hadoop.hbase.spark.HBaseRelation.buildRow(DefaultSource.scala:280)
at org.apache.hadoop.hbase.spark.HBaseRelation.$anonfun$buildScan$9(DefaultSource.scala:366)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
Problem:
java.lang.IllegalArgumentException will occur due to schema mismatch between Spark and HBase. For example, from Spark side data type is Int and HBase side data type is String.
Solution:
There are two solutions for this issue:
1. Providing the correct the schema to map from both Spark and HBase side
2. From Spark side convert the data type to String.
java.lang.NullPointerException
at org.apache.hadoop.hbase.spark.HBaseRelation.<init>(DefaultSource.scala:138)
at org.apache.hadoop.hbase.spark.DefaultSource.createRelation(DefaultSource.scala:78)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
Problem:
When trying to read/write the data using HBase Spark Connector, the connector will try to get/initialize the HBaseContext. While getting the HBaseContext value, first it will check the hbase.spark.use.hbasecontext parameter value (by default true) is true then it will get the HBaseContext from cache and in cache HBaseContext does not exist so you are getting the NullPointerException.
HBaseSparkConf configuration:
val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext" val DEFAULT_USE_HBASECONTEXT = true
Get the HBaseContext code:
//create or get latest HBaseContext val hbaseContext:HBaseContext = if (useHBaseContext) { LatestHBaseContextCache.latest } else { val config = HBaseConfiguration.create() configResources.map(resource => resource.split(",").foreach(r => config.addResource(r))) new HBaseContext(sqlContext.sparkContext, config) }
There is already open jira for this issue - https://issues.apache.org/jira/browse/HBASE-18570
Solution:
There are multiple solutions for this issue:
org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:163)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: employee: 1 time, servers with issues: null
at org.apache.hadoop.hbase.client.BatchErrors.makeException(BatchErrors.java:54)
at org.apache.hadoop.hbase.client.AsyncRequestFutureImpl.getErrors(AsyncRequestFutureImpl.java:1196)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doFlush(BufferedMutatorImpl.java:309)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:241)
at org.apache.hadoop.hbase.mapred.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:91)
at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.closeWriter(SparkHadoopWriter.scala:251)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:145)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
... 9 more
Problem:
While saving the data to HBase, if table doesn't exist we will get the above exception.
Solution:
Creating the HBase table with proper column families.
Created on 08-02-2023 10:12 AM
Nice work!