Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Contributor

CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.

To manage job dependencies, CDE supports creating custom Python environments dedicated to Airflow using the airflow-python-env resource type. With this option, you can install custom libraries for running your Directed Acyclic Graphs (DAGs). The supported version is Python 3.8.

A resource is a named collection of files or other resources referenced by a job. The airflow-python-env resource type allows you to specify a requirements.txt file that defines an environment that you can then activate globally for airflow deployments in a virtual cluster.

You can install and use custom Python packages for Airflow with CDE. Typically this feature is used to install third-party Airflow providers in CDE. However, it can also be used to install any Python package and use it within the DAG logic.

CDEPY is a package that allows you to do all the above with the convenience of Python. With it, you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python. It is available on PyPi at this URL and can be easily installed with a "pip install cdepy" command.

In this example, you will use CDEPY to create a CDE Airflow Python environment with the Amazon Provider for Airflow. Then, you will deploy an Airflow DAG that creates an S3 bucket, reads a Txt file from a CDE Files Resource writes it to the S3 bucket, launches a CDE Spark Job, and finally deletes the S3 bucket.

Requirements

  • A CDE Service with Version 1.21 or above.
  • The code including Airflow DAG, PySpark script, and associated resources are available in this git repository.
  • A local machine with Python and the latest version of the cdepy Python package installed.
    pip install cdepy

 

 

End-to-End Example

  1. Import cdepy modules and set environment variables:
    from cdepy import cdeconnection
    from cdepy import cdeairflowpython
    import os
    import json
    
    Connect via CdeConnection Object
    JOBS_API_URL = "<myJobsAPIurl>"
    WORKLOAD_USER = "<myusername>"
    WORKLOAD_PASSWORD = "<mypwd>"
  2. Instantiate a CdeConnection object to be able to connect to the CDE Virtual Cluster.
    myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
    myCdeConnection.setToken()
  3. Instantiate a CdeAirflowPythonEnv object to manage Airflow Python Environments.
    myAirflowPythonEnvManager = cdeairflowpython.CdeAirflowPythonEnv(myCdeConnection)
  4. Create a Maintenance Session to perform any Airflow Python Environments-related actions.
    myAirflowPythonEnvManager.createMaintenanceSession()
  5. Register a pip repository in CDE.
    myAirflowPythonEnvManager.createPipRepository()
  6. Check on the Status of the Maintenance Session
    myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
  7. The output should be ```{"status":"pip-repos-defined"}```. Load requirements.txt file
    pathToRequirementsTxt = "/resources/requirements.txt"
    myAirflowPythonEnvManager.buildAirflowPythonEnv(pathToRequirementsTxt)
  8. The requirements.txt file must be customized before it is uploaded.
    myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
  9. The response status should be ```{"status":"building"}```. Repeat the request in a couple of minutes. Eventually, once the response status becomes ```{"status":"built"}``` you will be ready to move on.
  10. Validate the status of the Python environment.
    myAirflowPythonEnvManager.getAirflowPythonEnvironmentDetails()
  11. Explore Maintenace Session logs.
    myAirflowPythonEnvManager.viewMaintenanceSessionLogs()
  12. Activate the Python environment.
    myAirflowPythonEnvManager.activateAirflowPythonEnv()
  13. Check on the Python environment build status.
    myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
  14. The response should be ```{"status":"activating"}```. The maintenance session will then end after a couple of minutes. This means that the environment has been activated.
  15. Once the Airflow Python environment has been activated, you can create a CDE Airflow Job. First, create a pipeline resource and upload the dag to it:
    CDE_RESOURCE_NAME = "my_pipeline_resource"
    myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
    myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
    
    LOCAL_FILE_PATH = "resources"
    LOCAL_FILE_NAME = "s3BucketDag.py"
    
    myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)
    
    myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
    myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
  16. Create files resource. The Airflow DAG will use the S3BucketOperator and the BashOperator to read the file from the CDE Files Resource and write it in an S3 bucket.
    CDE_RESOURCE_NAME = "my_file_resource"
    myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
    myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
    
    LOCAL_FILE_PATH = "resources"
    LOCAL_FILE_NAME = "my_file.txt"
    
    myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
    myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
  17. Create a CDE Spark Job along with its resources:
    CDE_RESOURCE_NAME = "my_script_resource"
    myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
    myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
    
    LOCAL_FILE_PATH = "resources"
    LOCAL_FILE_NAME = "pysparksql.py"
    
    myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
    myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
    myCdeClusterManager.createJob(myCdeSparkJobDefinition)
    
    CDE_JOB_NAME = "simple-pyspark"
    
    myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
    myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME, CDE_RESOURCE_NAME, APPLICATION_FILE_NAME=LOCAL_FILE_NAME, executorMemory="2g", executorCores=2)
  18. Create & Run CDE Airflow Job:
    CDE_JOB_NAME = "PythonEnvDag"
    DAG_FILE = "s3BucketDag.py"
    CDE_RESOURCE_NAME = "my_pipeline_resource"
    
    myCdeAirflowJob = cdejob.CdeAirflowJob(myCdeConnection)
    myCdeAirflowJobDefinition = myCdeAirflowJob.createJobDefinition(CDE_JOB_NAME, DAG_FILE, CDE_RESOURCE_NAME)
    
    myCdeClusterManager.createJob(myCdeAirflowJobDefinition)
    myCdeClusterManager.runJob(CDE_JOB_NAME)
  19. Optional: Create a new Maintenance Session to delete the Python environment
    myAirflowPythonEnvManager.createMaintenanceSession()
    myAirflowPythonEnvManager.deleteAirflowPythonEnv()
  20. Optional: End the Maintenance Session once you have deleted the Python environment:
    myAirflowPythonEnvManager.deleteMaintenanceSession()

 

 

References

237 Views
0 Kudos