Created on 05-30-2024 05:19 PM - edited on 06-05-2024 03:28 AM by VidyaSargur
CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
To manage job dependencies, CDE supports creating custom Python environments dedicated to Airflow using the airflow-python-env resource type. With this option, you can install custom libraries for running your Directed Acyclic Graphs (DAGs). The supported version is Python 3.8.
A resource is a named collection of files or other resources referenced by a job. The airflow-python-env resource type allows you to specify a requirements.txt file that defines an environment that you can then activate globally for airflow deployments in a virtual cluster.
You can install and use custom Python packages for Airflow with CDE. Typically this feature is used to install third-party Airflow providers in CDE. However, it can also be used to install any Python package and use it within the DAG logic.
CDEPY is a package that allows you to do all the above with the convenience of Python. With it, you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python. It is available on PyPi at this URL and can be easily installed with a "pip install cdepy" command.
In this example, you will use CDEPY to create a CDE Airflow Python environment with the Amazon Provider for Airflow. Then, you will deploy an Airflow DAG that creates an S3 bucket, reads a Txt file from a CDE Files Resource writes it to the S3 bucket, launches a CDE Spark Job, and finally deletes the S3 bucket.
Requirements
pip install cdepy
End-to-End Example
from cdepy import cdeconnection
from cdepy import cdeairflowpython
import os
import json
Connect via CdeConnection Object
JOBS_API_URL = "<myJobsAPIurl>"
WORKLOAD_USER = "<myusername>"
WORKLOAD_PASSWORD = "<mypwd>"
myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
myCdeConnection.setToken()
myAirflowPythonEnvManager = cdeairflowpython.CdeAirflowPythonEnv(myCdeConnection)
myAirflowPythonEnvManager.createMaintenanceSession()
myAirflowPythonEnvManager.createPipRepository()
myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
pathToRequirementsTxt = "/resources/requirements.txt"
myAirflowPythonEnvManager.buildAirflowPythonEnv(pathToRequirementsTxt)
myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
myAirflowPythonEnvManager.getAirflowPythonEnvironmentDetails()
myAirflowPythonEnvManager.viewMaintenanceSessionLogs()
myAirflowPythonEnvManager.activateAirflowPythonEnv()
myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
CDE_RESOURCE_NAME = "my_pipeline_resource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
LOCAL_FILE_PATH = "resources"
LOCAL_FILE_NAME = "s3BucketDag.py"
myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
CDE_RESOURCE_NAME = "my_file_resource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
LOCAL_FILE_PATH = "resources"
LOCAL_FILE_NAME = "my_file.txt"
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
CDE_RESOURCE_NAME = "my_script_resource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
LOCAL_FILE_PATH = "resources"
LOCAL_FILE_NAME = "pysparksql.py"
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
myCdeClusterManager.createJob(myCdeSparkJobDefinition)
CDE_JOB_NAME = "simple-pyspark"
myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME, CDE_RESOURCE_NAME, APPLICATION_FILE_NAME=LOCAL_FILE_NAME, executorMemory="2g", executorCores=2)
CDE_JOB_NAME = "PythonEnvDag"
DAG_FILE = "s3BucketDag.py"
CDE_RESOURCE_NAME = "my_pipeline_resource"
myCdeAirflowJob = cdejob.CdeAirflowJob(myCdeConnection)
myCdeAirflowJobDefinition = myCdeAirflowJob.createJobDefinition(CDE_JOB_NAME, DAG_FILE, CDE_RESOURCE_NAME)
myCdeClusterManager.createJob(myCdeAirflowJobDefinition)
myCdeClusterManager.runJob(CDE_JOB_NAME)
myAirflowPythonEnvManager.createMaintenanceSession()
myAirflowPythonEnvManager.deleteAirflowPythonEnv()
myAirflowPythonEnvManager.deleteMaintenanceSession()
References