Member since 
    
	
		
		
		09-15-2020
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                4
            
            
                Posts
            
        
                0
            
            
                Kudos Received
            
        
                0
            
            
                Solutions
            
        
			
    
	
		
		
		02-17-2021
	
		
		09:33 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 In this article, we will learn how to run scheduled ETL workloads on CDE ( Cloudera Data Engineering )  and CDW ( Cloudera Data Warehouse) using Airflow. 
   
 CDE 1.4 release provides the capability to schedule Airflow jobs (shipped along with CDE). If we need to learn more about Airflow, refer to the Airflow website.  
 This release adds two Airflow operators: CDEJobRunOperator and CDWOperator. 
 The CDEJobRunOperator behind the scene calls the jobs API of CDE to run Spark jobs. More details on jobs API can be found here. 
   
 The CDWOperator behind the scene makes use of JDBC connection to interact with CDW Hive Virtual warehouse or Impala Virtual warehouse. 
   
 In this article to demonstrate ETL workload, we will be reading CSV from AWS S3 bucket using Spark, perform a transformation (via CDEJobRunOperator) and load the processed data to CDW warehouse using Airflow CDWOperator. 
 Pre-Requisite  
 CDP environment is created and CDW, CDE experience are provisioned. 
 CDE Job 
 The CDE job will be run on Airflow using the CDEJobRunOperator. The job will read CSV data (sample available in GitHub) and write the transformed back to S3. 
   
 mvn build in this repo, the artifact will be airflow-cde-etl-1.0-SNAPSHOT.jar 
 Upload the jar in the Create Job screen of CDE virtual cluster UI. Following is the screenshot for the same. 
    
 Note: {{ dataSetBucketPath }} and {{ writeBucketPath }} are variable that can be overiden from Airflow DAG. 
 Configuration Hive|Impala VW in Airflow 
 Launch the Airflow UI of the CDE virtual cluster, the link for the same will be in the virtual cluster details screen. 
 Add the Hive|Impala connection details as shown in the following screenshot (available under Admin > Connections).  
    
 The hostname can be obtained from the Copy JDBC Url of Hive/Impala virtual warehouse as follows: 
    
 Note: The username/password will be CDP workload username and password. 
 Submit Airflow DAG  
 Before submitting the job, upload the Provider.csv, to the CDP S3 bucket. Then, submit the Airflow dag 
 from CDE job create UI as follows: 
   
    
 Note: Replace the test_data_bucket in the dag code with the bucket name applicable. 
 There you go, we have scheduled an Airflow job to read from the S3 bucket and load to CDW Hive virtual warehouse. 
    
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		01-08-2021
	
		
		04:28 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							  COD - CDE using Phoenix  In this article, we will walk through steps required to be followed to read/ write using Phoenix JDBC to COD (Cloudera Operational Database) from Spark on CDE (Cloudera Data Engineering).  Assumption   COD is already provisioned and database cod-db is created. Refer to this link for the same.  CDE is already provisioned and virtual cluster is already created. Refer to this link for the same.   COD  Configuration  Spark in CDE to be able to talk to COD, it would require the hbase-site.xml config of the COD cluster. Do the following steps for retrieve the same:   Go to the COD control plane UI and click on cod-db database.  Under the "Connect" tab of COD database, look for HBase Client Configuration URL field. Following is the screenshot for the same.      The configuration can be downloaded using the following curl command.       curl -o "hbase-config.zip" -u "santhosh" "https://XXXXXX.cldr.work/clouderamanager/api/v41/clusters/cod-hvtur2ovawfr/services/hbase/clientConfig"      Make sure to provide the "workload" password for above curl call.  Explore the downloaded zip file to obtain the hbase-site.xml file.  Note down the Phoenix JDBC url from the Phoenix (Thick) tab.   Create table in COD  We would need to create table in COD using Phoenix for this demo. In order to do the same, please login into the gateway node of the COD cluster and run phoenix-sqlline command to create table as follows.     CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);     The node details can be obtained from the datahub hardware tab from control plane UI. Following is the screenshot for the same.      Build phoenix-spark project  Build the following Spark phoenix demo maven project.  https://github.com/bgsanthosh/spark-hbase/tree/master/spark-hbase  Make sure to bump the phoenix-spark and hbase-client version as per the Cloudera runtime version.  Run the following command from the project home.     mvn clean compile package     CDE  Configure and Run Job   Configure CDE CLI to point to the virtual cluster created in the above step. For more details, see Configuring the CLI client.  Create resource using the following command.  cde resource create --name odx-spark-resource  Upload hbase-site.xml  cde resource upload --name odx-spark-resource --local-path hbase-site.xml --resource-path conf/hbase-site.xml  Upload the demo app jar that was built earlier.  cde resource upload --name odx-spark-resource --local-path ./spark-hbase/target/spark-hbase-1.0-SNAPSHOT.jar --resource-path spark-hbase-1.0-SNAPSHOT.jar  Create the CDE job using the following json and import command.      {
"mounts": [
{
"resourceName": "odx-spark-resource"
}
],
"name": "odx-read-write",
"spark": {
"className": "org.cloudera.e2e.spark.PhoenixReadWrite",
"conf": {
"spark.executor.extraClassPath": "/app/mount/conf",
"spark.driver.extraClassPath": "/app/mount/conf"
},
"args": [ "{{ phoenix_jdbc_url }}"],
"driverCores": 1,
"driverMemory": "1g",
"executorCores": 1,
"executorMemory": "1g",
"file": "spark-hbase-1.0-SNAPSHOT.jar",
"pyFiles": [],
"files": ["conf/hbase-site.xml"],
"numExecutors": 4
}
}  cde job import --file odx-readwrite-var.json     Next, run the job using the following command:     cde job run --name odx-read-write --variable phoenix_jdbc_url=<phoenix_jdbc_url>     Check in phoenix-sqlline, the following data should be present.     0: jdbc:phoenix:> select * from OUTPUT_TEST_TABLE;
+-----+-------+-------+
| ID | COL1 | COL2 |
+-----+-------+-------+
| 1 | 1 | 1 |
| 2 | 2 | 2 |
| 3 | 3 | 3 |
+-----+-------+-------+    
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		09-28-2020
	
		
		05:54 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 
 This article is inspired by another article Spark Structured Streaming example with CDE, which talks about how to use CDE to read/write from Kafka using Spark Streaming. The difference in this article is that this article talks about how to connect to "Kafka" of the CDP Streams Messaging data hub cluster. 
   
 The following are the differences in CDP Streams Messaging data hub cluster:  
 
 Kafka of CDP datahub is SASL_SSL enabled. 
 CDE needs the CA certificate of Kafka to connect. 
 
 Prerequisites 
 
 Knowledge of CDP, Nifi, Kafka, and CDE 
 CDE cluster provisioned from CDP Environment 
 CDP Flow Management Datahub Cluster (NiFi)  
 CDP Streams Messaging Datahub Cluster (Kafka, Schema Registry) 
 CDE CLI 
 
 The assumption is that the CDP environment, Data lake, Flow Management, and Streaming Messaging datahub cluster are already created. 
 Flow Management Datahub Cluster 
 
 Once the above clusters are provisioned, setup NiFi to read tweets from twitter and publish it to Kafka. 
 Use this NiFi template to upload to NiFi and setup the Read Twitter > Push Kafka pipeline:     
 Edit the KafkaProducer processor and set the Kafka broker to the hostname of Kafka of the SMM cluster. 
 Edit the GetTwitter processor group with the twitter developer credentials [access key, secret]. 
 
 Streaming Messaging Manager Datahub Cluster 
 Nothing to be done  🙂  
 CDE Cluster 
 The overall goal of the Spark job is to read from Kafka topic named "twitter" pushed by NiFi, extract the text from tweet messages, and push to another Kafka topic "tweet".  
   
 Note: Spark streaming job needs to talk to Kafka with SSL, this would mean the CA certificate of Kafka needs to be configured in the Spark job. The JRE of the default spark docker image already has the CA certificate of Kafka. The following is the link to the code that points to the same: 
 https://github.com/bgsanthosh/spark-streaming/blob/master/spark-streaming/src/main/scala/org/cloudera/qe/e2es/TwitterStructureStreaming.scala#L23 
   
 To configure a CA certificate of Kafka in the Spark job, do the following:  
   
 Note: For custom trust-store, the CA certificate of Kafka can be downloaded from Control Plane UI ( "Get FreeIPA Certificate" ) and the docker path to the same can be provided in the job. 
    
   
 
 Build the spark streaming application from the following repo.   https://github.com/bgsanthosh/spark-streaming/tree/master/spark-streaming   
 Create a Virtual Cluster. Check more details see, Creating virtual clusters. 
 Configure CDE CLI to point to the virtual cluster created in the above step. For more details, see Configuring the CLI client. 
 Create resources using the following command:   cde resource create --name spark-streaming  
 Upload the Spark job from Step#1 using the following command: cde resource upload --name spark-streaming --local-path spark-streaming.jar --resource-path spark-streaming.jar  
 Create a CDE job definition using the following command: cat twitter-streaming.json
{
  "mounts": [
    {
      "resourceName": "spark-streaming"
    }
  ],
  "name": "twitter-streaming",
  "spark": {
    "className": "org.cloudera.qe.e2es.TwitterStructuredStreaming",
    "args": [
    ],
    "driverCores": 1,
    "driverMemory": "1g",
    "executorCores": 1,
    "executorMemory": "1g",
    "file": "spark-streaming.jar",
    "pyFiles": [],
   "files": [],
    "numExecutors": 4
  }
}
cde job import --file twitter-streaming.json  
 Run the Spark Streaming using the following command: cde job run --name twitter-streaming  
 
 There you go !!! Spark Streaming on CDE is up and running !! 
 References 
 To create a CDP environment, provisioning flow management, streaming datahub, and CDE cluster, see the following documents:  
   
 
 Getstart Quick Starts 
 Creating your first Flow Management cluster 
 Enabling Cloudera Data Engineering 
 
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
 
        

