Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Contributor

What is Cloudera Data Engineering

CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.

Typically CDE users leverage the CLI and API to accomplish tasks such as creating, running and orchestrating Spark and Airflow Jobs, uploading files such as Python wheel files, Jars, etc to the Virtual Cluster, and creating reusable Python environments for Spark and Airflow Jobs. 

CDEPY is a package that allows you to do all the above with the convenience of Python. With it you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python. It is available on PyPi at this URL and can be easily installed with a "pip install cdepy" command. At the time of this writing the latest is version 0.1.5. 

In the rest of this article we will walk through some basic steps for getting started with the package.

Package Imports

from cdepy import cdeconnection
from cdepy import cdejob
from cdepy import cdemanager
from cdepy import cderesource

CDECONNECTION Module

The cdeconnection module allows you to establish a connection to a CDE Virtual Cluster. To instantiate a connection object you will need a JOBS_API_URL (available in the CDE Virtual Cluster Service Details page); and your CDP Workload User and Password.

JOBS_API_URL = "https://<YOUR-CLUSTER>.cloudera.site/dex/api/v1"
WORKLOAD_USER = "<Your-CDP-Workload-User>"
WORKLOAD_PASSWORD = "<Your-CDP-Workload-Password>"
myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)

Next you will need to obtain a Token to connect to the cluster. This can be achieved with the following method. Once this is run, the Token is applied to the Connection object.

myCdeConnection.setToken()

CDERESOURCE Module

The cderesource module allows you to create CDE Resources. If you are new to CDE, a CDE Resource can be of type "Files", "Python", or "Custom Docker Runtime".  Mirroring this, the cderesource module allows you to create two types of objects: CdeFilesResource and CdePythonResource (support for Custom Docker Resources will be available in a future release).

CDE_RESOURCE_NAME = "myFilesCdeResource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()

In both cases the object is a just definition for the CDE Resource and the actual Resource will be created with the cdemanager module. Before we get there though, we have to create a definition for a CDE Job. 

CDEJOB Module

The cdejob module allows you to create Job Definitions of type Spark and Airflow. The definition includes important parameters such as Spark Resources Configurations (e.g. driver/executor memory and driver/executor cores) and Airflow DAG configurations (e.g. DAG schedule interval). 

Below we create a definition for a CDE Job of type Spark. The job in the CDE Jobs UI will be named myCdeSparkJob using the local pysparksql.py script.

CDE_JOB_NAME = "myCdeSparkJob"
APPLICATION_FILE_NAME = "pysparksql.py"

myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME, CDE_RESOURCE_NAME, APPLICATION_FILE_NAME)

Just like in the case of the Resource Definition object, the job has not yet been created in CDE. Next, we will use the cdemanager module to create the Resource and Job in CDE.

CDEMANAGER Module

First, the CdeClusterManager object is instantiated with the CdeConnection object:

myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)

Next, the Resource is created with the CdeFilesResourceDefinition object we created above.

myCdeClusterManager.createResource(myCdeFilesResourceDefinition)

Now that the Resource has actually been created in CDE we can upload our PySpark script to it:

LOCAL_FILE_PATH = "examples"
LOCAL_FILE_NAME = "pysparksql.py"
myCdeClusterManager.uploadFile(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME) 

Finally we are now ready to create the CDE Spark Job. The Spark Job will recognize the CDE Files Resource we just created because we set the name of the Resource in the CdeSparkJobDefinition object earlier.

myCdeClusterManager.createJob(myCdeSparkJobDefinition)

The CdeClusterManager object allows you to run a Job and monitor its results.

myCdeClusterManager.runJob(CDE_JOB_NAME)
myCdeClusterManager.listJobRuns()

SUMMARY

In the above workflow we used CDEPY and Python to create a CDE Files Resource and Spark Job, run it, and finally monitor its results. 

Generally, if you use Spark and/or Airflow in Cloudera Data Engineering you can leverage its API with Python to create your own framework. This allows you to integrate your existing Spark and Airflow applications with CDE and/or build Cluster and Job observability applications based on your business requirements. 

617 Views