Member since
09-15-2020
4
Posts
0
Kudos Received
0
Solutions
02-17-2021
09:33 AM
1 Kudo
In this article, we will learn how to run scheduled ETL workloads on CDE ( Cloudera Data Engineering ) and CDW ( Cloudera Data Warehouse) using Airflow.
CDE 1.4 release provides the capability to schedule Airflow jobs (shipped along with CDE). If we need to learn more about Airflow, refer to the Airflow website.
This release adds two Airflow operators: CDEJobRunOperator and CDWOperator.
The CDEJobRunOperator behind the scene calls the jobs API of CDE to run Spark jobs. More details on jobs API can be found here.
The CDWOperator behind the scene makes use of JDBC connection to interact with CDW Hive Virtual warehouse or Impala Virtual warehouse.
In this article to demonstrate ETL workload, we will be reading CSV from AWS S3 bucket using Spark, perform a transformation (via CDEJobRunOperator) and load the processed data to CDW warehouse using Airflow CDWOperator.
Pre-Requisite
CDP environment is created and CDW, CDE experience are provisioned.
CDE Job
The CDE job will be run on Airflow using the CDEJobRunOperator. The job will read CSV data (sample available in GitHub) and write the transformed back to S3.
mvn build in this repo, the artifact will be airflow-cde-etl-1.0-SNAPSHOT.jar
Upload the jar in the Create Job screen of CDE virtual cluster UI. Following is the screenshot for the same.
Note: {{ dataSetBucketPath }} and {{ writeBucketPath }} are variable that can be overiden from Airflow DAG.
Configuration Hive|Impala VW in Airflow
Launch the Airflow UI of the CDE virtual cluster, the link for the same will be in the virtual cluster details screen.
Add the Hive|Impala connection details as shown in the following screenshot (available under Admin > Connections).
The hostname can be obtained from the Copy JDBC Url of Hive/Impala virtual warehouse as follows:
Note: The username/password will be CDP workload username and password.
Submit Airflow DAG
Before submitting the job, upload the Provider.csv, to the CDP S3 bucket. Then, submit the Airflow dag
from CDE job create UI as follows:
Note: Replace the test_data_bucket in the dag code with the bucket name applicable.
There you go, we have scheduled an Airflow job to read from the S3 bucket and load to CDW Hive virtual warehouse.
... View more
01-08-2021
04:28 AM
1 Kudo
COD - CDE using Phoenix In this article, we will walk through steps required to be followed to read/ write using Phoenix JDBC to COD (Cloudera Operational Database) from Spark on CDE (Cloudera Data Engineering). Assumption COD is already provisioned and database cod-db is created. Refer to this link for the same. CDE is already provisioned and virtual cluster is already created. Refer to this link for the same. COD Configuration Spark in CDE to be able to talk to COD, it would require the hbase-site.xml config of the COD cluster. Do the following steps for retrieve the same: Go to the COD control plane UI and click on cod-db database. Under the "Connect" tab of COD database, look for HBase Client Configuration URL field. Following is the screenshot for the same. The configuration can be downloaded using the following curl command. curl -o "hbase-config.zip" -u "santhosh" "https://XXXXXX.cldr.work/clouderamanager/api/v41/clusters/cod-hvtur2ovawfr/services/hbase/clientConfig" Make sure to provide the "workload" password for above curl call. Explore the downloaded zip file to obtain the hbase-site.xml file. Note down the Phoenix JDBC url from the Phoenix (Thick) tab. Create table in COD We would need to create table in COD using Phoenix for this demo. In order to do the same, please login into the gateway node of the COD cluster and run phoenix-sqlline command to create table as follows. CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); The node details can be obtained from the datahub hardware tab from control plane UI. Following is the screenshot for the same. Build phoenix-spark project Build the following Spark phoenix demo maven project. https://github.com/bgsanthosh/spark-hbase/tree/master/spark-hbase Make sure to bump the phoenix-spark and hbase-client version as per the Cloudera runtime version. Run the following command from the project home. mvn clean compile package CDE Configure and Run Job Configure CDE CLI to point to the virtual cluster created in the above step. For more details, see Configuring the CLI client. Create resource using the following command. cde resource create --name odx-spark-resource Upload hbase-site.xml cde resource upload --name odx-spark-resource --local-path hbase-site.xml --resource-path conf/hbase-site.xml Upload the demo app jar that was built earlier. cde resource upload --name odx-spark-resource --local-path ./spark-hbase/target/spark-hbase-1.0-SNAPSHOT.jar --resource-path spark-hbase-1.0-SNAPSHOT.jar Create the CDE job using the following json and import command. {
"mounts": [
{
"resourceName": "odx-spark-resource"
}
],
"name": "odx-read-write",
"spark": {
"className": "org.cloudera.e2e.spark.PhoenixReadWrite",
"conf": {
"spark.executor.extraClassPath": "/app/mount/conf",
"spark.driver.extraClassPath": "/app/mount/conf"
},
"args": [ "{{ phoenix_jdbc_url }}"],
"driverCores": 1,
"driverMemory": "1g",
"executorCores": 1,
"executorMemory": "1g",
"file": "spark-hbase-1.0-SNAPSHOT.jar",
"pyFiles": [],
"files": ["conf/hbase-site.xml"],
"numExecutors": 4
}
} cde job import --file odx-readwrite-var.json Next, run the job using the following command: cde job run --name odx-read-write --variable phoenix_jdbc_url=<phoenix_jdbc_url> Check in phoenix-sqlline, the following data should be present. 0: jdbc:phoenix:> select * from OUTPUT_TEST_TABLE;
+-----+-------+-------+
| ID | COL1 | COL2 |
+-----+-------+-------+
| 1 | 1 | 1 |
| 2 | 2 | 2 |
| 3 | 3 | 3 |
+-----+-------+-------+
... View more
09-28-2020
05:54 AM
This article is inspired by another article Spark Structured Streaming example with CDE, which talks about how to use CDE to read/write from Kafka using Spark Streaming. The difference in this article is that this article talks about how to connect to "Kafka" of the CDP Streams Messaging data hub cluster.
The following are the differences in CDP Streams Messaging data hub cluster:
Kafka of CDP datahub is SASL_SSL enabled.
CDE needs the CA certificate of Kafka to connect.
Prerequisites
Knowledge of CDP, Nifi, Kafka, and CDE
CDE cluster provisioned from CDP Environment
CDP Flow Management Datahub Cluster (NiFi)
CDP Streams Messaging Datahub Cluster (Kafka, Schema Registry)
CDE CLI
The assumption is that the CDP environment, Data lake, Flow Management, and Streaming Messaging datahub cluster are already created.
Flow Management Datahub Cluster
Once the above clusters are provisioned, setup NiFi to read tweets from twitter and publish it to Kafka.
Use this NiFi template to upload to NiFi and setup the Read Twitter > Push Kafka pipeline:
Edit the KafkaProducer processor and set the Kafka broker to the hostname of Kafka of the SMM cluster.
Edit the GetTwitter processor group with the twitter developer credentials [access key, secret].
Streaming Messaging Manager Datahub Cluster
Nothing to be done 🙂
CDE Cluster
The overall goal of the Spark job is to read from Kafka topic named "twitter" pushed by NiFi, extract the text from tweet messages, and push to another Kafka topic "tweet".
Note: Spark streaming job needs to talk to Kafka with SSL, this would mean the CA certificate of Kafka needs to be configured in the Spark job. The JRE of the default spark docker image already has the CA certificate of Kafka. The following is the link to the code that points to the same:
https://github.com/bgsanthosh/spark-streaming/blob/master/spark-streaming/src/main/scala/org/cloudera/qe/e2es/TwitterStructureStreaming.scala#L23
To configure a CA certificate of Kafka in the Spark job, do the following:
Note: For custom trust-store, the CA certificate of Kafka can be downloaded from Control Plane UI ( "Get FreeIPA Certificate" ) and the docker path to the same can be provided in the job.
Build the spark streaming application from the following repo. https://github.com/bgsanthosh/spark-streaming/tree/master/spark-streaming
Create a Virtual Cluster. Check more details see, Creating virtual clusters.
Configure CDE CLI to point to the virtual cluster created in the above step. For more details, see Configuring the CLI client.
Create resources using the following command: cde resource create --name spark-streaming
Upload the Spark job from Step#1 using the following command: cde resource upload --name spark-streaming --local-path spark-streaming.jar --resource-path spark-streaming.jar
Create a CDE job definition using the following command: cat twitter-streaming.json
{
"mounts": [
{
"resourceName": "spark-streaming"
}
],
"name": "twitter-streaming",
"spark": {
"className": "org.cloudera.qe.e2es.TwitterStructuredStreaming",
"args": [
],
"driverCores": 1,
"driverMemory": "1g",
"executorCores": 1,
"executorMemory": "1g",
"file": "spark-streaming.jar",
"pyFiles": [],
"files": [],
"numExecutors": 4
}
}
cde job import --file twitter-streaming.json
Run the Spark Streaming using the following command: cde job run --name twitter-streaming
There you go !!! Spark Streaming on CDE is up and running !!
References
To create a CDP environment, provisioning flow management, streaming datahub, and CDE cluster, see the following documents:
Getstart Quick Starts
Creating your first Flow Management cluster
Enabling Cloudera Data Engineering
... View more
Labels: