Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (2)
avatar
Super Collaborator

This article delves into the practical aspects of integrating Spark and HBase using Livy, showcasing a comprehensive example that demonstrates the process of reading, processing, and writing data between Spark and HBase. The example utilizes Livy to submit Spark jobs to a YARN cluster, enabling remote execution of Spark applications on HBase data.

Prerequisites:

  • Apache Spark installed and configured
  • Apache Livy installed and configured
  • Apache HBase installed and configured
  • HBase Spark Connector jar file available

Steps:

This step-by-step guide provides a comprehensive overview of how to integrate Spark and HBase using Livy.

Step 1: Create an HBase Table

Note: If your cluster is kerberized, then you need to provide the proper Ranger HBase permissions to the user and needs to the kinit.

  1. Connect to your HBase cluster using the HBase shell:
    hbase shell​
  2. Create an HBase table named employees with two column families: per and prof:
    create 'employees', 'per', 'prof'​
  3. Exit the HBase Shell: 
    quit

Step 2: Create pyspark  code. 

  1. Create a Python file (e.g., hbase_spark_connector_app.py) and add the following code: 
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, LongType, ShortType, FloatType
    import json
    def main():
        spark = SparkSession.builder.appName("HBase Spark Connector App").getOrCreate()
        data = [(1, "Ranga", 34, 15000.5), (2, "Nishanth", 5, 35000.5),(3, "Meena", 30, 25000.5)]
        schema = StructType([ \
            StructField("id",LongType(),True), \
            StructField("name",StringType(),True), \
            StructField("age",ShortType(),True), \
            StructField("salary", FloatType(), True)
          ])
     
        employeeDF = spark.createDataFrame(data=data,schema=schema)
        catalog = json.dumps({
              "table":{"namespace":"default", "name":"employees"},
              "rowkey":"key",
              "columns":{
                  "id":{"cf":"rowkey", "col":"key", "type":"long"},
                  "name":{"cf":"per", "col":"name", "type":"string"},
                  "age":{"cf":"per", "col":"age", "type":"short"},
                  "salary":{"cf":"prof", "col":"salary", "type":"float"}
              }
        })
        employeeDF.write.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).save()
        df = spark.read.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).load()
        df.show()
        spark.stop()
    if __name__ == "__main__":
        main()​

Step 3: Verify the pyspark code using spark-submit

Run the following command to verify application is working with out any issues. 

Note: 

  1. Based on your cluster cdp version, the hbase-spark jar version(s) needs to be updated.
  2. If your cluster kerberized, then do the kinit:
    spark-submit \
    --master yarn \
    --deploy-mode client \
    --jars /opt/cloudera/parcels/CDH/jars/hbase-spark-protocol-shaded-1.0.0.7.1.9.0-387.jar,/opt/cloudera/parcels/CDH/jars/hbase-spark-1.0.0.7.1.9.0-387.jar \
    hbase_spark_connector_app.py​

Step 4: Upload Resources to HDFS

Upload the Python hbase_spark_connector_app.py file and the HBase Spark Connector JAR file to your HDFS directory for example /tmp: 

 

hdfs dfs -put hbase_spark_connector_app.py /tmp
hdfs dfs -put /opt/cloudera/parcels/CDH/jars/hbase-spark-protocol-shaded-1.0.0.7.1.9.0-387.jar /tmp
hdfs dfs -put /opt/cloudera/parcels/CDH/jars/hbase-spark-1.0.0.7.1.9.0-387.jar /tmp

 

Step 5: Submit the Spark Job to Livy

  1. Submit the Spark job to Livy using the Livy REST API:
    Note: You need to replace the LIVY_SERVER_HOST (for example localhost) value and LIVY_SERVER_PORT (for example 8998) value.

Non-kerberized cluster:

 

curl -k \
    -H "Content-Type: application/json" \
    -X POST \
    -d '{
            "file": "/tmp/hbase_spark_connector_app.py",
            "name": "Spark HBase Connector Example",
            "driverMemory": "1g",
            "driverCores": 1,
            "executorMemory": "1g",
            "executorCores": 1,
            "jars" : ["/tmp/hbase-spark-protocol-shaded-1.0.0.7.1.9.0-387.jar","/tmp/hbase-spark-1.0.0.7.1.9.0-387.jar"],
            "conf":{
                "spark.dynamicAllocation.enabled":"false",
                "spark.executor.instances":1
            }
        }' \
    https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/

 

 

Kerberized cluster:
Run the kinit command after that run the following curl command:

 

curl -k \
    --negotiate -u: \
    -H "Content-Type: application/json" \
    -X POST \
    -d '{
            "file": "/tmp/hbase_spark_connector_app.py",
            "name": "Spark HBase Connector Example",
            "driverMemory": "1g",
            "driverCores": 1,
            "executorMemory": "1g",
            "executorCores": 1,
            "jars" : ["/tmp/hbase-spark-protocol-shaded-1.0.0.7.1.9.0-387.jar","/tmp/hbase-spark-1.0.0.7.1.9.0-387.jar"],
            "conf":{
                "spark.dynamicAllocation.enabled":"false",
                "spark.executor.instances":1
            }
        }' \
    https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/

 

This will submit the Spark job to Livy and execute it on your cluster. You can monitor the job status using the Livy REST API or the Livy web UI.

Step 6: Monitor the Livy Job State

To verify the Livy job State, run the following command by replace the LIVY_SERVER_HOST, LIVY_SERVER_PORT and BATCH_ID(Generated using above step5).

Non-kerberized cluster:

 

curl -k \
    -H "Content-Type: application/json" \
    -X GET \
  https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/<BATCH_ID>/state

 

Kerberized cluster:

 

curl -k \
    --negotiate -u: \
    -H "Content-Type: application/json" \
    -X GET \
  https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/<BATCH_ID>/state

 

Step 7: Verify the Livy job logs

To verify the Livy job logs, run the following command by replace the LIVY_SERVER_HOST, LIVY_SERVER_PORT and BATCH_ID (Generated using above step5).

Non Kerberized cluster:

 

curl -k \
    -H "Content-Type: application/json" \
    -X GET \
  https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/<BATCH_ID>/log

 

Kerberized cluster:

 

curl -k \
    --negotiate -u: \
    -H "Content-Type: application/json" \
    -X GET \
  https://<LIVY_SERVER_HOST>:<LIVY_SERVER_PORT>/batches/<BATCH_ID>/log

 

 

 

896 Views
0 Kudos
Version history
Last update:
‎12-11-2023 12:22 AM
Updated by: