Created on 12-10-2023 10:05 PM - edited on 12-11-2023 12:22 AM by VidyaSargur
This article delves into the practical aspects of integrating Spark and HBase using Livy, showcasing a comprehensive example that demonstrates the process of reading, processing, and writing data between Spark and HBase. The example utilizes Livy to submit Spark jobs to a YARN cluster, enabling remote execution of Spark applications on HBase data.
This step-by-step guide provides a comprehensive overview of how to integrate Spark and HBase using Livy.
Note: If your cluster is kerberized, then you need to provide the proper Ranger HBase permissions to the user and needs to the kinit.
hbase shell
create 'employees', 'per', 'prof'
quit
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, ShortType, FloatType
import json
def main():
spark = SparkSession.builder.appName("HBase Spark Connector App").getOrCreate()
data = [(1, "Ranga", 34, 15000.5), (2, "Nishanth", 5, 35000.5),(3, "Meena", 30, 25000.5)]
schema = StructType([ \
StructField("id",LongType(),True), \
StructField("name",StringType(),True), \
StructField("age",ShortType(),True), \
StructField("salary", FloatType(), True)
])
employeeDF = spark.createDataFrame(data=data,schema=schema)
catalog = json.dumps({
"table":{"namespace":"default", "name":"employees"},
"rowkey":"key",
"columns":{
"id":{"cf":"rowkey", "col":"key", "type":"long"},
"name":{"cf":"per", "col":"name", "type":"string"},
"age":{"cf":"per", "col":"age", "type":"short"},
"salary":{"cf":"prof", "col":"salary", "type":"float"}
}
})
employeeDF.write.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).save()
df = spark.read.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).load()
df.show()
spark.stop()
if __name__ == "__main__":
main()
Run the following command to verify application is working with out any issues.
Note:
spark-submit \
--master yarn \
--deploy-mode client \
--jars /opt/cloudera/parcels/CDH/jars/hbase-spark-protocol-shaded-1.0.0.7.1.9.0-387.jar,/opt/cloudera/parcels/CDH/jars/hbase-spark-1.0.0.7.1.9.0-387.jar \
hbase_spark_connector_app.py
Upload the Python hbase_spark_connector_app.py file and the HBase Spark Connector JAR file to your HDFS directory for example /tmp:
hdfs dfs -put hbase_spark_connector_app.py /tmp
hdfs dfs -put /opt/cloudera/parcels/CDH/jars/hbase-spark-protocol-shaded-1.0.0.7.1.9.0-387.jar /tmp
hdfs dfs -put /opt/cloudera/parcels/CDH/jars/hbase-spark-1.0.0.7.1.9.0-387.jar /tmp
Non-kerberized cluster:
curl -k \
-H "Content-Type: application/json" \
-X POST \
-d '{
"file": "/tmp/hbase_spark_connector_app.py",
"name": "Spark HBase Connector Example",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1,
"jars" : ["/tmp/hbase-spark-protocol-shaded-1.0.0.7.1.9.0-387.jar","/tmp/hbase-spark-1.0.0.7.1.9.0-387.jar"],
"conf":{
"spark.dynamicAllocation.enabled":"false",
"spark.executor.instances":1
}
}' \
https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/
curl -k \
--negotiate -u: \
-H "Content-Type: application/json" \
-X POST \
-d '{
"file": "/tmp/hbase_spark_connector_app.py",
"name": "Spark HBase Connector Example",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1,
"jars" : ["/tmp/hbase-spark-protocol-shaded-1.0.0.7.1.9.0-387.jar","/tmp/hbase-spark-1.0.0.7.1.9.0-387.jar"],
"conf":{
"spark.dynamicAllocation.enabled":"false",
"spark.executor.instances":1
}
}' \
https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/
This will submit the Spark job to Livy and execute it on your cluster. You can monitor the job status using the Livy REST API or the Livy web UI.
To verify the Livy job State, run the following command by replace the LIVY_SERVER_HOST, LIVY_SERVER_PORT and BATCH_ID(Generated using above step5).
Non-kerberized cluster:
curl -k \
-H "Content-Type: application/json" \
-X GET \
https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/<BATCH_ID>/state
Kerberized cluster:
curl -k \
--negotiate -u: \
-H "Content-Type: application/json" \
-X GET \
https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/<BATCH_ID>/state
To verify the Livy job logs, run the following command by replace the LIVY_SERVER_HOST, LIVY_SERVER_PORT and BATCH_ID (Generated using above step5).
Non Kerberized cluster:
curl -k \
-H "Content-Type: application/json" \
-X GET \
https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/<BATCH_ID>/log
Kerberized cluster:
curl -k \
--negotiate -u: \
-H "Content-Type: application/json" \
-X GET \
https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/<BATCH_ID>/log