- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 07-17-2023 03:04 AM - edited 11-29-2023 09:17 PM
Spark3 HBase Integration
This blog post will guide you through the process of integrating Spark 3 with HBase, providing you with valuable insights and step-by-step instructions.
Note: This blog post will cover both Spark3 and HBase services only when they exists in the same cluster. If HBase is in remote cluster, you can go through the hbase-configure-spark-connector-remote-cluster tutorial.
Prerequisites
- To use HBase-Spark3 connector, we need to ensure that CDP version is 7.1.7 SP1 Spark3 parcel or above.
- Ensure that every Spark node has the HBase Master, Region Server, or Gateway role assigned to it. If no HBase role is assigned to a Spark node, add the HBase Gateway role to it, which ensures that the HBase configuration files are available on the Spark node.
HBase Integration with Spark3
HBase integration with Spark can be achieved using the HBase-Spark Connector, which provides a seamless way to interact with HBase from within Spark applications.
Here's how you can integrate HBase with Spark3 using the HBase Spark Connector:
1. Configure HBase-Spark connector
Step1: Adding the hbase-spark connector jars to classpath.
- Cloudera Manager --> Spark3 --> Configuration
- Ensure that the HBase service is selected in Spark Service as a dependency.
- Locate the Spark 3 Client Advanced Configuration Snippet (Safety Valve) for spark3-conf/spark-defaults.conf property or search for it by typing its name in the Search box.
- Add the following properties to ensure that all required HBase platform dependencies are available on the classpath for the Spark executors and drivers.
spark.driver.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar
spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar
Note: Before adding above parameters, you need to find out and replace VERSION-NUMBER from your cluster. For example version-number is 1.0.0.3.2.7171000.2-1:
spark.driver.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar
spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar
- Save the Changes and restart the Spark3 service.
Step2: This step is optional when we are not using any kind of Spark SQL Filters.
- Go to Cloudera Manager > HBase > Configuration.
- Locate the RegionServer Environment Advanced Configuration Snippet (Safety Valve) property or Search for regionserver environment.
- Click the plus icon to add the following property:
Key: HBASE_CLASSPATH
Value:
/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/CDH/jars/scala-library-***SCALA-VERSION***.jar
Note: Ensure that the listed jars have the correct version number in their name.
Example:
/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.12.10.jar
- Save Changes and Restart the Region Server.
2. HBase-Spark connector Example
2.1 Schema:
In order to use the HBase-Spark connector, you need to define the schema to map the attributes/columns from HBase to Spark or vice versa.
Let's assume you have employee table with columns id (Long type), name (String), age (Short), salary(Float). The employee personal information i.e, the id, name and age are stored in the per column family and professional information, i.e salary is stored in the prof column family. The key in HBase table will be the id attribute.
Spark | HBase | |
Type/Table | Employee | employees |
Id | id:Long | key |
Name | name: String | per:name |
Age | age: Short | per:age |
Salary | salary: Float | prof:salary |
There are two ways we can specify the schema:
- Using hbase.columns.mapping property: This is very simple way to map the hbase columns to the Spark data types.For example:
val hbase_table = "employees"
val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", hbase_column_mapping).option("hbase.table", hbase_table).option("hbase.spark.use.hbasecontext", false).load()
- Using HBaseTableCatalog.tableCatalog property: This is also very useful if you want to specify the schema in json format and also useful when you migrate your code from SHC to HBase Spark Connector. For example:
val catalog = s"""{
|"table":{"namespace":"my_nm", "name":"employees"},
|"rowkey":"key",
|"columns":{
|"id":{"cf":"rowkey", "col":"key", "type":"long"},
|"name":{"cf":"per", "col":"name", "type":"string"},
|"age":{"cf":"per", "col":"age", "type":"short"},
|"salary":{"cf":"prof", "col":"salary", "type":"float"}
|}
|}""".stripMargin
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
val df = spark.read.format("org.apache.hadoop.hbase.spark").options(Map(HBaseTableCatalog.tableCatalog->catalog)).load()
df.show()
In the above example, in the catalog definition section, you can see the namespace is configured with the my_nm.
2.2 HBase:
Step 1: Launch the hbase-shell
hbase shell
Step 2: Create the HBase table.
create 'employees', 'per', 'prof'
Step 3: Quit the HBase shell
quit
2.3 Spark:
Step 1: Launch the spark3-shell
- If you are configured HBase Spark Connector at cluster level then you can run the following command to launch the spark3-shell.
spark3-shell
- If you are not configured HBase Spark Connector at cluster level then you can use the following command to launch the spark3-shell.
spark3-shell --jars /opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-<VERSION-NUMBER>.jar,/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-<VERSION-NUMBER>.jar
Note: You need to replace the VERSION-NUMBER according to your cluster.
Step 2: Run the following spark code to write the data to HBase.
// Create sample dataset to insert
case class Employee(id:Long, name: String, age: Short, salary: Float)
val employeeDF = Seq(
Employee(1L, "Ranga", 34, 15000.5f),
Employee(2L, "Nishanth", 5, 35000.5f),
Employee(3L, "Meena", 30, 25000.5f)
).toDF()
// Save the data to kudu
val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val hbase_table = "employees"
employeeDF.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping",hbase_column_mapping).option("hbase.table",hbase_table).option("hbase.spark.use.hbasecontext",false).save()
Step 3: Run the following spark code to read the data from HBase.
val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val hbase_table = "employees"
// Load the data from HBase
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", hbase_column_mapping).option("hbase.table", hbase_table).option("hbase.spark.use.hbasecontext", false).load()
// Display the data
df.show()
// Filter the data
df.filter("age>10").show()
PySpark code:
/tmp/hbase_spark_connector_app.py:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, ShortType, FloatType
import json
def main():
spark = SparkSession.builder.appName("HBase Spark Connector App").getOrCreate()
data = [(1, "Ranga", 34, 15000.5), (2, "Nishanth", 5, 35000.5),(3, "Meena", 30, 25000.5)]
schema = StructType([ \
StructField("id",LongType(),True), \
StructField("name",StringType(),True), \
StructField("age",ShortType(),True), \
StructField("salary", FloatType(), True)
])
employeeDF = spark.createDataFrame(data=data,schema=schema)
catalog = json.dumps({
"table":{"namespace":"default", "name":"employees"},
"rowkey":"key",
"columns":{
"id":{"cf":"rowkey", "col":"key", "type":"long"},
"name":{"cf":"per", "col":"name", "type":"string"},
"age":{"cf":"per", "col":"age", "type":"short"},
"salary":{"cf":"prof", "col":"salary", "type":"float"}
}
})
employeeDF.write.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).save()
df = spark.read.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).load()
df.show()
spark.stop()
if __name__ == "__main__":
main()
Spark-Submit:
spark3-submit --master yarn \
--deploy-mode client \
/tmp/hbase_spark_connector_app.py
3. Running in Secure Cluster
For running in a Kerberos enabled cluster, the user has to include HBase related jars into the classpath as the HBase token retrieval and renewal is done by Spark, and is independent of the connector.
In other words, the user needs to initiate the environment in the normal way, either through kinit or by providing principal/keytab.
4. Tuning Parameters:
There are several tuning parameters in the Hbase-Spark connector.
For example:
- hbase.spark.query.batchsize - Set the maximum number of values to return for each call to next() in scan.
- hbase.spark.query.cachedrows - The number of rows for caching that will be passed to scan.
Refer more configurations from here.
5. Troubleshooting:
1. org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for user:
org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for user ?rangareddy@HADOOP.COM',action: scannerOpen, tableName:employees, family:prof, column: salary
at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.authorizeAccess(RangerAuthorizationCoprocessor.java:568)
at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.preScannerOpen(RangerAuthorizationCoprocessor.java:1013)
at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.preScannerOpen(RangerAuthorizationCoprocessor.java:710)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$40.call(RegionCoprocessorHost.java:1231)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$40.call(RegionCoprocessorHost.java:1228)
at org.apache.hadoop.hbase.coprocessor.CoprocessorHost$ObserverOperationWithoutResult.callObserver(CoprocessorHost.java:558)
at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.execOperation(CoprocessorHost.java:631)
Problem:
The User doesn't have proper permission(s) to access the table/namespace. To access the HBase data, we need to provide a Ranger permission(s) to the user.
Solution:
Go to Ranger UI --> Access Manager --> HBASE --> Click on cm_hbase --> Create a new policy or use existing policy and provide necessary permissions to access HBase table.
2. java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array
java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 2
at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:789)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:943)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:929)
at org.apache.hadoop.hbase.spark.datasources.Utils$.hbaseFieldToScalaType(Utils.scala:53)
at org.apache.hadoop.hbase.spark.HBaseRelation.$anonfun$buildRow$2(DefaultSource.scala:289)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.hadoop.hbase.spark.HBaseRelation.buildRow(DefaultSource.scala:280)
at org.apache.hadoop.hbase.spark.HBaseRelation.$anonfun$buildScan$9(DefaultSource.scala:366)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
Problem:
java.lang.IllegalArgumentException will occur due to schema mismatch between Spark and HBase. For example, from Spark side data type is Int and HBase side data type is String.
Solution:
There are two solutions for this issue:
1. Providing the correct the schema to map from both Spark and HBase side
2. From Spark side convert the data type to String.
3. java.lang.NullPointerException at org.apache.hadoop.hbase.spark.HBaseRelation
java.lang.NullPointerException
at org.apache.hadoop.hbase.spark.HBaseRelation.<init>(DefaultSource.scala:138)
at org.apache.hadoop.hbase.spark.DefaultSource.createRelation(DefaultSource.scala:78)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
Problem:
When trying to read/write the data using HBase Spark Connector, the connector will try to get/initialize the HBaseContext. While getting the HBaseContext value, first it will check the hbase.spark.use.hbasecontext parameter value (by default true) is true then it will get the HBaseContext from cache and in cache HBaseContext does not exist so you are getting the NullPointerException.
HBaseSparkConf configuration:
val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext" val DEFAULT_USE_HBASECONTEXT = true
Get the HBaseContext code:
//create or get latest HBaseContext val hbaseContext:HBaseContext = if (useHBaseContext) { LatestHBaseContextCache.latest } else { val config = HBaseConfiguration.create() configResources.map(resource => resource.split(",").foreach(r => config.addResource(r))) new HBaseContext(sqlContext.sparkContext, config) }
There is already open jira for this issue - https://issues.apache.org/jira/browse/HBASE-18570
Solution:
There are multiple solutions for this issue:
- Set the hbase.spark.use.hbasecontext as false\
- Initialise the HBaseContext and load the data using HBase Spark Connector
- We need to wait till fix the HBASE-18570 jira.
4.org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: employee: 1 time, servers with issues: null
org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:163)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: employee: 1 time, servers with issues: null
at org.apache.hadoop.hbase.client.BatchErrors.makeException(BatchErrors.java:54)
at org.apache.hadoop.hbase.client.AsyncRequestFutureImpl.getErrors(AsyncRequestFutureImpl.java:1196)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doFlush(BufferedMutatorImpl.java:309)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:241)
at org.apache.hadoop.hbase.mapred.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:91)
at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.closeWriter(SparkHadoopWriter.scala:251)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:145)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
... 9 more
Problem:
While saving the data to HBase, if table doesn't exist we will get the above exception.
Solution:
Creating the HBase table with proper column families.
6. References
- Configuring HBase-Spark connector using Cloudera Manager when HBase and Spark are on the same cluste...
- hbase-connectors /spark/
- hbase-connectors/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources /HBaseSp...
- HBASE-18570
Created on 08-02-2023 10:12 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Nice work!