Community Articles

Find and share helpful community-sourced technical articles.
avatar
Cloudera Employee

 In this article, we will walk through the steps to integrate Data Engineering Experience & Operational DataBase Experience within a few clicks with an example of reading & writing data into Phoenix tables using PySpark script.  

As you know Cloudera Operational Database supports both Traditional relational schema and Evolutionary schema with the capability to transact on petabytes of structured and unstructured data generated by businesses today further details are attached.

 

Whereas Cloudera Data Engineering streamlines and operationalizes data pipelines securely at any scale to increase efficiency and accelerate time to value further details attached.

Example: Customer 360 - Cloudera Operational Database will store customer profiling data for business applications and the data is modeled & prepared with Cloudera Data Engineering. Both capabilities come with High Availability, Scalability, Mission Critical Performance, and Flexibility for handling any type of data.

 

In our demo, we are leveraging CDP Public Cloud but the same experience/set-up can be done on CDP Private Cloud Data Services:CDPConsole.png

Pre-Req:

  • A cluster Environment should be provisioned. Refer to this link for the same.
  • Cloudera Operational Database should be provisioned. Refer to this link for the same.
  • Cloudera Data Engineering Virtual cluster should be provisioned. Refer to this link for the same.

Step -1: 

As a first step we need to make Cloudera Operation Database experience accessible to the Cloudera Data Engineering experience, In order to do that, we need to have an Hbase-site.xml config file and it can be easily obtained following the steps below:

  1. Assuming the Cluster Environment is ready as described in the Pre-Req steps, go to the Go To Home Page of the Cloudera Operational Database.
  2. Click on the 'DataBase' you have created as described in the Pre-Reqs (In our example it's demo-database).
  3. Under the "Connect" tab of the database, copy the HBase Client Configuration URL field. Following is the screenshot for the same:shameed_0-1670397041622.png
  4. Use $curl and the URL copied in step-3 on your terminal to download the Hbase Config file as shown below:

 

curl -f -o "hbase-config.zip" -u "workload-username" "xxxxxxxxxxxxx/clouderamanager/api/v41/clusters/cod--f12gny48396d/services/hbase/clientConfig"

 

 

  • Make sure to provide the "workload-username" & "workload-password" for above curl call. Follow the document to obtain "workload-username" & "workload-password" credentials.
  • Extract the downloaded zip file to obtain the hbase-site.xml file.
  • Note down the Phoenix JDBC URL from the Phoenix (Thick) tab.shameed_1-1670397079844.png

Step-2: 

Preparing the Operational Database Table.

Cloudera provides flexibility with multiple options for the Data Practitioners to use Command Line (CLI) or use the GUI web console. Describing both the options for users' flexibility:

  • Option 1: Preparing Operational Database using CLI:
    1. Log in to the Gateway Node of Cluster.
      • Go to the Environment of the Operational Database Cluster > Nodes.
      • Copy the IP address of the Gateway node.
    2. Run the below commands: ($ phoenix-sqlline is used to login to phoenix SQL )

 

 

$ phoenix-sqlline
$ CREATE TABLE customerrecords (ID BIGINT NOT NULL PRIMARY KEY, Name VARCHAR, Email VARCHAR);
$ UPSERT INTO customerrecords (ID, Name, Email) VALUES (1, John’, ’john@gmail.com’);
$ UPSERT INTO customerrecords (ID, Name, Email) VALUES (2, Lisa’, ’lisa@gmail.com’);

 

 

shameed_0-1669709566396.png

 

 

  • Option 2: Preparing Operational Database using SQL on Hue UI:
    1. Go to the Operational Database (Example: Demo-database) control plane UI and click on Hue.
    2. Run the below commands: 

 

$ CREATE TABLE customerrecords (ID BIGINT NOT NULL PRIMARY KEY, Name VARCHAR, Email VARCHAR);
$ UPSERT INTO customerrecords (ID, Name, Email) VALUES (1, John’, ’john@gmail.com’);
$ UPSERT INTO customerrecords (ID, Name, Email) VALUES (2, Lisa’, ’lisa@gmail.com’);

 

 

shameed_1-1669711883971.png

 

Step-3

(Create & Run CDE Job): 

In this final step, we will create a Data Engineering Job leveraging Cloudera Data Engineering Experience GUI with a few simple clicks, to Write & Read the Phoenix Table we prepared in Step 2. 

  1. Assuming the Data Engineering Virtual Cluster is ready as described in the Pre-Req steps, Kindly go to the Go to Home Page of the Cloudera Data Engineering Experience and Create a New Job.
  2. Below are the steps to submit the Python Job Script and Dependencies in the job:
    1. Name: Job Name (Example: boitest5)
    2. Application File: PySpark script to write & read the Phoenix Table (Sample Script Pasted at the end).
    3. Configurations: (The spark driver & executor will read the Hbase-site.xml file from the below path)
      1. spark.driver.extraClassPath = /app/mount/conf
      2. spark.executor.extraClassPath = /app/mount/conf
    4. Python Version: Python 2
  3. Advanced Options:
    1. Jar Files:  Downloaded from the Gateway Node of the Operational Database Cluster (Version of the jars depends on the COD version & Jars can be obtained under /opt/cloudera/parcels/xxx/jars/xxx).
      • phoenix5-spark-shaded-6.0.x.x.x.jar
      • hbase-shaded-mapreduce-2.x.x.jar
      • opentelemetry-context-0.x.x.jar
      • opentelemetry-api-0.x.x.jar
    2. Other Dependencies: 
      1. Create a folder under resources - /conf
      2. Place the hbase-site.xml (Downloaded in the Step-3 of Step-1)
  4. Create and Run Job.shameed_0-1669617070830.png

OUTPUT/Result: 

  • Go to CDE Virtual Cluster > Job (boitest5) > Logs > stdoutshameed_0-1670396313841.png

Sample Script (test.py): 

 

 

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

def create_spark_session():
  """
  Create a Spark Session
  """
  spark_session = SparkSession.builder \
							  .appName("CDPSparkPhoenixTestApp") \
							  .enableHiveSupport() \
							  .getOrCreate()

  spark_session.sparkContext.setLogLevel('DEBUG')

  return spark_session			
 
zookeeper_urls = "<Phoenix (Thick)URL copied from COD Database UI>:2181"
phoenix_tablename = "customerrecords"

spark = create_spark_session()
sqlContext = SQLContext(spark) 

df = sqlContext.read \
			   .format("phoenix") \
			   .option("table", phoenix_tablename) \
			   .option("zkUrl", zookeeper_urls) \
			   .load()
df.show(truncate=False)


records = [
        (3,'Dan','dan@gmail.com'),
        (4,'Peter','peter@gmail.com'),
        (5,'Shaun','shaun@gmail.com'),
        (6,'Pablo','pablo@gmail.com')
          ]
columns = ["id", "name", "email"]

df = spark.createDataFrame(data = records, schema = columns) \
        
df.write \
  .format("phoenix") \
  .mode("overwrite") \
  .option("table", phoenix_tablename) \
  .option("zkUrl", zookeeper_urls) \
  .save()
        

df = sqlContext.read \
			   .format("phoenix") \
			   .option("table", phoenix_tablename) \
			   .option("zkUrl", zookeeper_urls) \
			   .load()
df.show(truncate=False)


spark.stop()

 

 

Thank you and feel free to reach out for any questions 🙂 

3,924 Views
Comments

Please Note:

1. The above job is tested on Spark 2.4.8 with Python 2. 

2. The above job is tested on Spark 2.4.8 with Python 3.