Created on 05-30-2024 05:12 PM - edited on 06-05-2024 03:08 AM by VidyaSargur
CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Git repositories allow teams to collaborate, manage project artifacts, and promote applications from lower to higher environments. CDE supports integration with Git providers such as GitHub, GitLab, and Bitbucket to synchronize job runs with different versions of your code.
CDEPY is a package that allows you to do all the above with the convenience of Python. With it, you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python.
In this tutorial, you will use CDEPY to create a CDE Repository from a Git repository and create a CDE Spark Job using the PySpark script loaded in the repository.
Requirements
pip install cdepy
End-to-End Example
from cdepy import cdeconnection
from cdepy import cdeairflowpython
from cdepy import cderepositories
from cdepy import cdejob
from cdepy import cdemanager
import os
import json
#Connect via CdeConnection Object
JOBS_API_URL = "<myJobsAPIurl>"
WORKLOAD_USER = "<myusername>"
WORKLOAD_PASSWORD = "<mypwd>"
myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
myCdeConnection.setToken()
myRepoManager = cderepositories.CdeRepositoryManager(myCdeConnection)
repoName = "exampleGitRepository"
repoPath = "https://github.com/pdefusco/cde_git_repo.git"
myRepoManager.createRepository(repoName, repoPath, repoBranch="main")
json.loads(myRepoManager.listRepositories())
json.loads(myRepoManager.describeRepository(repoName))
filePath = "simple-pyspark-sql.py"
myRepoManager.downloadFileFromRepo(repoName, filePath)
myRepoManager.deleteRepository(repoName)
json.loads(myRepoManager.listRepositories())
CDE_JOB_NAME = "sparkJobFromRepo"
#Set path of PySpark script inside the CDE Repository:
applicationFilePath = "simple-pyspark-sql.py"
myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME=CDE_JOB_NAME, \
CDE_RESOURCE_NAME=repoName, \
APPLICATION_FILE_NAME=applicationFilePath, \
executorMemory="2g", \
executorCores=2)
myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)
myCdeClusterManager.createJob(myCdeSparkJobDefinition)
myCdeClusterManager.runJob(CDE_JOB_NAME)
myRepoManager.pullRepository(repoName)
json.loads(myRepoManager.describeRepository(repoName))
myRepoManager.downloadFileFromRepo(repoName, filePath)
myRepoManager.deleteRepository(repoName)
json.loads(myRepoManager.listRepositories())
References