Created on 12-07-2022 12:17 AM - edited 12-13-2022 12:59 AM
In this article, we will walk through the steps to integrate Data Engineering Experience & Operational DataBase Experience within a few clicks with an example of reading & writing data into Phoenix tables using PySpark script.
As you know Cloudera Operational Database supports both Traditional relational schema and Evolutionary schema with the capability to transact on petabytes of structured and unstructured data generated by businesses today further details are attached.
Whereas Cloudera Data Engineering streamlines and operationalizes data pipelines securely at any scale to increase efficiency and accelerate time to value further details attached.
Example: Customer 360 - Cloudera Operational Database will store customer profiling data for business applications and the data is modeled & prepared with Cloudera Data Engineering. Both capabilities come with High Availability, Scalability, Mission Critical Performance, and Flexibility for handling any type of data.
In our demo, we are leveraging CDP Public Cloud but the same experience/set-up can be done on CDP Private Cloud Data Services:
As a first step we need to make Cloudera Operation Database experience accessible to the Cloudera Data Engineering experience, In order to do that, we need to have an Hbase-site.xml config file and it can be easily obtained following the steps below:
curl -f -o "hbase-config.zip" -u "workload-username" "xxxxxxxxxxxxx/clouderamanager/api/v41/clusters/cod--f12gny48396d/services/hbase/clientConfig"
Preparing the Operational Database Table.
Cloudera provides flexibility with multiple options for the Data Practitioners to use Command Line (CLI) or use the GUI web console. Describing both the options for users' flexibility:
$ phoenix-sqlline
$ CREATE TABLE customerrecords (ID BIGINT NOT NULL PRIMARY KEY, Name VARCHAR, Email VARCHAR);
$ UPSERT INTO customerrecords (ID, Name, Email) VALUES (1, John’, ’john@gmail.com’);
$ UPSERT INTO customerrecords (ID, Name, Email) VALUES (2, Lisa’, ’lisa@gmail.com’);
$ CREATE TABLE customerrecords (ID BIGINT NOT NULL PRIMARY KEY, Name VARCHAR, Email VARCHAR);
$ UPSERT INTO customerrecords (ID, Name, Email) VALUES (1, John’, ’john@gmail.com’);
$ UPSERT INTO customerrecords (ID, Name, Email) VALUES (2, Lisa’, ’lisa@gmail.com’);
(Create & Run CDE Job):
In this final step, we will create a Data Engineering Job leveraging Cloudera Data Engineering Experience GUI with a few simple clicks, to Write & Read the Phoenix Table we prepared in Step 2.
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
def create_spark_session():
"""
Create a Spark Session
"""
spark_session = SparkSession.builder \
.appName("CDPSparkPhoenixTestApp") \
.enableHiveSupport() \
.getOrCreate()
spark_session.sparkContext.setLogLevel('DEBUG')
return spark_session
zookeeper_urls = "<Phoenix (Thick)URL copied from COD Database UI>:2181"
phoenix_tablename = "customerrecords"
spark = create_spark_session()
sqlContext = SQLContext(spark)
df = sqlContext.read \
.format("phoenix") \
.option("table", phoenix_tablename) \
.option("zkUrl", zookeeper_urls) \
.load()
df.show(truncate=False)
records = [
(3,'Dan','dan@gmail.com'),
(4,'Peter','peter@gmail.com'),
(5,'Shaun','shaun@gmail.com'),
(6,'Pablo','pablo@gmail.com')
]
columns = ["id", "name", "email"]
df = spark.createDataFrame(data = records, schema = columns) \
df.write \
.format("phoenix") \
.mode("overwrite") \
.option("table", phoenix_tablename) \
.option("zkUrl", zookeeper_urls) \
.save()
df = sqlContext.read \
.format("phoenix") \
.option("table", phoenix_tablename) \
.option("zkUrl", zookeeper_urls) \
.load()
df.show(truncate=False)
spark.stop()
Thank you and feel free to reach out for any questions 🙂
Created on 12-07-2022 11:33 AM
Please Note:
1. The above job is tested on Spark 2.4.8 with Python 2.
2. The above job is tested on Spark 2.4.8 with Python 3.