Community Articles

Find and share helpful community-sourced technical articles.
avatar
Rising Star

CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.

Git repositories allow teams to collaborate, manage project artifacts, and promote applications from lower to higher environments. CDE supports integration with Git providers such as GitHub, GitLab, and Bitbucket to synchronize job runs with different versions of your code.

CDEPY is a package that allows you to do all the above with the convenience of Python. With it, you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python.

In this tutorial, you will use CDEPY to create a CDE Repository from a Git repository and create a CDE Spark Job using the PySpark script loaded in the repository.

Requirements

  • A CDE Service with Version 1.21 or above.
  • The code supporting this article is available in this git repository.
  • A local machine with Python and the latest version of the cdepy Python package installed.
    pip install cdepy

End-to-End Example

  1. Import cdepy modules and set environment variables:
    from cdepy import cdeconnection
    from cdepy import cdeairflowpython
    from cdepy import cderepositories
    from cdepy import cdejob
    from cdepy import cdemanager
    import os
    import json
    
    #Connect via CdeConnection Object
    JOBS_API_URL = "<myJobsAPIurl>"
    WORKLOAD_USER = "<myusername>"
    WORKLOAD_PASSWORD = "<mypwd>"
  2. Instantiate a CdeConnection object to be able to connect to the CDE Virtual Cluster.
    myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
    myCdeConnection.setToken()
  3. Instantiate a CdeRepositoryManager object to be able to interact with CDE repositories.
    myRepoManager = cderepositories.CdeRepositoryManager(myCdeConnection)
  4. Provide git repository information. Use the provided git repository for testing purposes.
    repoName = "exampleGitRepository"
    repoPath = "https://github.com/pdefusco/cde_git_repo.git"
  5. Create CDE Repository from Git Repository.
    myRepoManager.createRepository(repoName, repoPath, repoBranch="main")
  6. Show available CDE repositories.
    json.loads(myRepoManager.listRepositories())
  7. Show CDE Repository Metadata.
    json.loads(myRepoManager.describeRepository(repoName))
  8. Download the file from the CDE Repository.
    filePath = "simple-pyspark-sql.py"
    myRepoManager.downloadFileFromRepo(repoName, filePath)
  9. Delete CDE Repository.
    myRepoManager.deleteRepository(repoName)
  10. Validate CDE Repository Deletion.
    json.loads(myRepoManager.listRepositories())
  11. Create a CDE Spark Job from a CDE Repository:
    CDE_JOB_NAME = "sparkJobFromRepo"
    
    #Set path of PySpark script inside the CDE Repository:
    applicationFilePath = "simple-pyspark-sql.py"
    
    myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
    myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME=CDE_JOB_NAME, \
    CDE_RESOURCE_NAME=repoName, \
    APPLICATION_FILE_NAME=applicationFilePath, \
    executorMemory="2g", \
    executorCores=2)
    
    myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)
    myCdeClusterManager.createJob(myCdeSparkJobDefinition)
    myCdeClusterManager.runJob(CDE_JOB_NAME)
  12. Optional: update code in "simple-pyspark-sql.py" in your git repository.
  13. Then pull from git repo to CDE repo to synchronize code changes.
    myRepoManager.pullRepository(repoName)
  14. Describe the CDE repository again. Notice changes to metadata.
    json.loads(myRepoManager.describeRepository(repoName))
  15. Download the file from the CDE Repository.
    myRepoManager.downloadFileFromRepo(repoName, filePath)
  16. Delete CDE Repository.
    myRepoManager.deleteRepository(repoName)
  17. Validate CDE Repository Deletion.
    json.loads(myRepoManager.listRepositories())

 

 

References

1,000 Views