Member since
11-22-2019
33
Posts
10
Kudos Received
0
Solutions
05-30-2024
05:19 PM
1 Kudo
CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
To manage job dependencies, CDE supports creating custom Python environments dedicated to Airflow using the airflow-python-env resource type. With this option, you can install custom libraries for running your Directed Acyclic Graphs (DAGs). The supported version is Python 3.8.
A resource is a named collection of files or other resources referenced by a job. The airflow-python-env resource type allows you to specify a requirements.txt file that defines an environment that you can then activate globally for airflow deployments in a virtual cluster.
You can install and use custom Python packages for Airflow with CDE. Typically this feature is used to install third-party Airflow providers in CDE. However, it can also be used to install any Python package and use it within the DAG logic.
CDEPY is a package that allows you to do all the above with the convenience of Python. With it, you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python. It is available on PyPi at this URL and can be easily installed with a "pip install cdepy" command.
In this example, you will use CDEPY to create a CDE Airflow Python environment with the Amazon Provider for Airflow. Then, you will deploy an Airflow DAG that creates an S3 bucket, reads a Txt file from a CDE Files Resource writes it to the S3 bucket, launches a CDE Spark Job, and finally deletes the S3 bucket.
Requirements
A CDE Service with Version 1.21 or above.
The code including Airflow DAG, PySpark script, and associated resources are available in this git repository.
A local machine with Python and the latest version of the cdepy Python package installed. pip install cdepy
End-to-End Example
Import cdepy modules and set environment variables: from cdepy import cdeconnection
from cdepy import cdeairflowpython
import os
import json
Connect via CdeConnection Object
JOBS_API_URL = "<myJobsAPIurl>"
WORKLOAD_USER = "<myusername>"
WORKLOAD_PASSWORD = "<mypwd>"
Instantiate a CdeConnection object to be able to connect to the CDE Virtual Cluster. myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
myCdeConnection.setToken()
Instantiate a CdeAirflowPythonEnv object to manage Airflow Python Environments. myAirflowPythonEnvManager = cdeairflowpython.CdeAirflowPythonEnv(myCdeConnection)
Create a Maintenance Session to perform any Airflow Python Environments-related actions. myAirflowPythonEnvManager.createMaintenanceSession()
Register a pip repository in CDE. myAirflowPythonEnvManager.createPipRepository()
Check on the Status of the Maintenance Session myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
The output should be ```{"status":"pip-repos-defined"}```. Load requirements.txt file pathToRequirementsTxt = "/resources/requirements.txt"
myAirflowPythonEnvManager.buildAirflowPythonEnv(pathToRequirementsTxt)
The requirements.txt file must be customized before it is uploaded. myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
The response status should be ```{"status":"building"}```. Repeat the request in a couple of minutes. Eventually, once the response status becomes ```{"status":"built"}``` you will be ready to move on.
Validate the status of the Python environment. myAirflowPythonEnvManager.getAirflowPythonEnvironmentDetails()
Explore Maintenace Session logs. myAirflowPythonEnvManager.viewMaintenanceSessionLogs()
Activate the Python environment. myAirflowPythonEnvManager.activateAirflowPythonEnv()
Check on the Python environment build status. myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
The response should be ```{"status":"activating"}```. The maintenance session will then end after a couple of minutes. This means that the environment has been activated.
Once the Airflow Python environment has been activated, you can create a CDE Airflow Job. First, create a pipeline resource and upload the dag to it: CDE_RESOURCE_NAME = "my_pipeline_resource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
LOCAL_FILE_PATH = "resources"
LOCAL_FILE_NAME = "s3BucketDag.py"
myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
Create files resource. The Airflow DAG will use the S3BucketOperator and the BashOperator to read the file from the CDE Files Resource and write it in an S3 bucket. CDE_RESOURCE_NAME = "my_file_resource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
LOCAL_FILE_PATH = "resources"
LOCAL_FILE_NAME = "my_file.txt"
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
Create a CDE Spark Job along with its resources: CDE_RESOURCE_NAME = "my_script_resource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
LOCAL_FILE_PATH = "resources"
LOCAL_FILE_NAME = "pysparksql.py"
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
myCdeClusterManager.createJob(myCdeSparkJobDefinition)
CDE_JOB_NAME = "simple-pyspark"
myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME, CDE_RESOURCE_NAME, APPLICATION_FILE_NAME=LOCAL_FILE_NAME, executorMemory="2g", executorCores=2)
Create & Run CDE Airflow Job: CDE_JOB_NAME = "PythonEnvDag"
DAG_FILE = "s3BucketDag.py"
CDE_RESOURCE_NAME = "my_pipeline_resource"
myCdeAirflowJob = cdejob.CdeAirflowJob(myCdeConnection)
myCdeAirflowJobDefinition = myCdeAirflowJob.createJobDefinition(CDE_JOB_NAME, DAG_FILE, CDE_RESOURCE_NAME)
myCdeClusterManager.createJob(myCdeAirflowJobDefinition)
myCdeClusterManager.runJob(CDE_JOB_NAME)
Optional: Create a new Maintenance Session to delete the Python environment myAirflowPythonEnvManager.createMaintenanceSession()
myAirflowPythonEnvManager.deleteAirflowPythonEnv()
Optional: End the Maintenance Session once you have deleted the Python environment: myAirflowPythonEnvManager.deleteMaintenanceSession()
References
Documentation
Introductory Article to CDEPY
CDEPY on PyPi
... View more
05-30-2024
05:12 PM
1 Kudo
CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Git repositories allow teams to collaborate, manage project artifacts, and promote applications from lower to higher environments. CDE supports integration with Git providers such as GitHub, GitLab, and Bitbucket to synchronize job runs with different versions of your code.
CDEPY is a package that allows you to do all the above with the convenience of Python. With it, you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python.
In this tutorial, you will use CDEPY to create a CDE Repository from a Git repository and create a CDE Spark Job using the PySpark script loaded in the repository.
Requirements
A CDE Service with Version 1.21 or above.
The code supporting this article is available in this git repository.
A local machine with Python and the latest version of the cdepy Python package installed. pip install cdepy
End-to-End Example
Import cdepy modules and set environment variables: from cdepy import cdeconnection
from cdepy import cdeairflowpython
from cdepy import cderepositories
from cdepy import cdejob
from cdepy import cdemanager
import os
import json
#Connect via CdeConnection Object
JOBS_API_URL = "<myJobsAPIurl>"
WORKLOAD_USER = "<myusername>"
WORKLOAD_PASSWORD = "<mypwd>"
Instantiate a CdeConnection object to be able to connect to the CDE Virtual Cluster. myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
myCdeConnection.setToken()
Instantiate a CdeRepositoryManager object to be able to interact with CDE repositories. myRepoManager = cderepositories.CdeRepositoryManager(myCdeConnection)
Provide git repository information. Use the provided git repository for testing purposes. repoName = "exampleGitRepository"
repoPath = "https://github.com/pdefusco/cde_git_repo.git"
Create CDE Repository from Git Repository. myRepoManager.createRepository(repoName, repoPath, repoBranch="main")
Show available CDE repositories. json.loads(myRepoManager.listRepositories())
Show CDE Repository Metadata. json.loads(myRepoManager.describeRepository(repoName))
Download the file from the CDE Repository. filePath = "simple-pyspark-sql.py"
myRepoManager.downloadFileFromRepo(repoName, filePath)
Delete CDE Repository. myRepoManager.deleteRepository(repoName)
Validate CDE Repository Deletion. json.loads(myRepoManager.listRepositories())
Create a CDE Spark Job from a CDE Repository: CDE_JOB_NAME = "sparkJobFromRepo"
#Set path of PySpark script inside the CDE Repository:
applicationFilePath = "simple-pyspark-sql.py"
myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME=CDE_JOB_NAME, \
CDE_RESOURCE_NAME=repoName, \
APPLICATION_FILE_NAME=applicationFilePath, \
executorMemory="2g", \
executorCores=2)
myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)
myCdeClusterManager.createJob(myCdeSparkJobDefinition)
myCdeClusterManager.runJob(CDE_JOB_NAME)
Optional: update code in "simple-pyspark-sql.py" in your git repository.
Then pull from git repo to CDE repo to synchronize code changes. myRepoManager.pullRepository(repoName)
Describe the CDE repository again. Notice changes to metadata. json.loads(myRepoManager.describeRepository(repoName))
Download the file from the CDE Repository. myRepoManager.downloadFileFromRepo(repoName, filePath)
Delete CDE Repository. myRepoManager.deleteRepository(repoName)
Validate CDE Repository Deletion. json.loads(myRepoManager.listRepositories())
References
Documentation
Introductory Article to CDEPY
CDEPY on PyPi
... View more
Labels:
05-13-2024
05:33 PM
CDE is the Cloudera Data Engineering Service, a containerized managed service for the Cloudera Data Platform designed for Large Scale Batch and Streaming Pipelines with Spark, Airflow, and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure.
A CDE Session is an interactive short-lived development environment for running Python, Scala, and Spark commands to help you iterate upon and build your Spark workloads. You can use CDE Sessions in CDE Virtual Clusters of type "All Purpose - Tier 2".
A resource in CDE is a named collection of files used by a job or a session. Resources can include application code, configuration files, custom Docker images, and Python virtual environment specifications
As of version 1.20, CDE introduces the ability to leverage CDE Resources when running CDE Interactive Sessions. This means that you can leverage files and Python environments when working with your data interactively.
In this example, we will leverage both CDE Files and Python resources to parse a YAML file.
CDE provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, sessions, etc. We will use the CDE CLI to run the following example commands:
Create the CDE Files resource and load a YAML file to it.
%cde resource create --type files --name myYamlFiles
%cde resource upload --name myYamlFiles --local-path cdesessionsresources/myyaml.yaml
Create a CDE Python resource and activate the environment.
%cde resource create --type python-env --name py_yaml_resource
%cde resource upload --name py_yaml_resource --local-path cdesessionsresources/requirements.txt
Allow a moment for the Python resource to build. Check the build status in the UI.
Next, launch the CDE Session:
%cde session create --name pyyamlSession --type pyspark --python-env-resource-name py_yaml_resource --mount-1-resource myYamlFiles
{
"name": "pyyamlSession",
"type": "pyspark",
"creator": "pauldefusco",
"created": "2024-05-13T23:47:29Z",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "myYamlFiles"
}
],
"lastStateUpdated": "2024-05-13T23:47:29Z",
"state": "starting",
"interactiveSpark": {
"id": 4,
"driverCores": 1,
"executorCores": 1,
"driverMemory": "1g",
"executorMemory": "1g",
"numExecutors": 1,
"pythonEnvResourceName": "py_yaml_resource"
}
}
Open the PySpark shell and execute the rest of the following code from there:
%cde session interact --name pyyamlSession
Starting REPL...
Waiting for the session to go into an available state...
Connected to Cloudera Data Engineering...
Press Ctrl+D (i.e. EOF) to exit
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\
/_/
Type in expressions to have them evaluated.
>>>
Import the PyYaml Python module:
>>> import yaml
>>> from yaml import load, dump
>>> from yaml import Loader, Dumper
Load the data from the files resource:
>>> myYamlDocument = "/app/mount/sample.yaml"
Parse the file with PyYaml:
>>> with open(myYamlDocument) as stream:
data = yaml.load(stream, Loader=Loader)
print(data)
{'name': "Martin D'vloper", 'job': 'Developer', 'skill': 'Elite', 'employed': True, 'foods': ['Apple', 'Orange', 'Strawberry', 'Mango'], 'languages': {'perl': 'Elite', 'python': 'Elite', 'pascal': 'Lame'}, 'education': '4 GCSEs\n3 A-Levels\nBSc in the Internet of Things\n'}
... View more
Labels:
03-27-2024
05:39 PM
Logging Iceberg Metrics with MLFlow Tracking in CML Cloudera Machine Learning (CML) on Cloudera Data Platform (CDP) accelerates time-to-value by enabling data scientists to collaborate in a single unified platform that is all inclusive for powering any AI use case. Purpose-built for agile experimentation and production ML workflows, Cloudera Machine Learning manages everything from data preparation to MLOps, to predictive reporting. CML is compatible with the MLflow Tracking API and makes use of the MLflow client library as the default method to log experiments. Existing projects with existing experiments are still available and usable. CML’s experiment tracking features allow you to use the MLflow client library for logging parameters, code versions, metrics, and output files when running your machine learning code. Apache Iceberg is a table format for huge analytics datasets in the cloud that defines how metadata is stored and data files are organized. Iceberg is also a library that compute engines can use to read/write a table. CML offers Data Connections to connect to Data Sources available within the CDP Environment including Iceberg Open Lakehouses. In this example we will create an experiment with MLFlow Tracking and log Iceberg metadata in order to enhance machine learning reproducibility in the context of MLOps. Step by Step Guide The code samples provided below are extracts from the accompanying notebook. The full code can be found in this git repository. Setup Create a CML Project with Python 3.9 / JupyterLab Editor Runtime. Launch a CML Session and install requirements. Run Notebook Run each cell in the notebook. Code highlights: MLFlow Tracking supports modules built specifically for some of the most popular open source frameworks. In this case we will import "mlflow.spark" You can leverage CML Spark Data Connections to launch a SparkSession object with the recommended Iceberg Spark configurations. Spark Data Connections make connecting to your Iceberg data effortless. import mlflow.spark
import cml.data_v1 as cmldata
#Edit your connection name here:
CONNECTION_NAME = "se-aw-mdl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session() The exp1 method acts as a wrapper to your first MLFlow experiment. The experiment name is set with the mlflow.set_exeperiment method. Data is written from a PySpark dataframe to an Iceberg table via a simple routine: "df.writeTo().createOrReplace()" Iceberg History and Snapshots tables are available for you to monitor Iceberg metadata. In this example we save the latest snapshot ID along with its timestamp and parent snapshot ID into Python variables. Within the context of this experiment run, a Spark ML Pipeline is trained to tokenize and classify text. MLFlow Tracking allows you to set custom tags. These tags can be used to search your experiments using the MLFlow client. MLFlow Tracking allows you to create a run context to track metrics according to a specific run. In this particular case we use log_metric method to track the Iceberg variables corresponding to snaphot and write operation timestamp. Once the experiment completes you can retrieve its ID and more metadata using the MLFlow client. def exp1(df):
mlflow.set_experiment("sparkml-experiment")
##EXPERIMENT 1
df.writeTo("spark_catalog.default.training").using("iceberg").createOrReplace()
spark.sql("SELECT * FROM spark_catalog.default.training").show()
### SHOW TABLE HISTORY AND SNAPSHOTS
spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)
snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": training_df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=8
regParam=0.01
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
return runs_df The second experiment is very similar to the first, except data is appended to the Iceberg table via "df.writeTo().append()" As data is inserted into the table, new Iceberg metadata is generated in the Iceberg Metadata Layer and becomes available in the Snapshots and History tables. This metadata is tracked into new Python variables. In this particular example we again use the "log_metric" method to track the Iceberg Snapshot ID and Timestamp for this append operation. Within the context of this experiment run, the Spark ML Pipeline is retrained for the same purpose of tokenizing and classifying text, but using the new version of the data after the append operation. def exp2(df):
mlflow.set_experiment("sparkml-experiment")
##EXPERIMENT 2
### ICEBERG INSERT DATA - APPEND FROM DATAFRAME
# PRE-INSERT
spark.sql("SELECT * FROM spark_catalog.default.training").show()
temp_df = spark.sql("SELECT * FROM spark_catalog.default.training")
temp_df.writeTo("spark_catalog.default.training").append()
df = spark.sql("SELECT * FROM spark_catalog.default.training")
# PROST-INSERT
spark.sql("SELECT * FROM spark_catalog.default.training").show()
spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)
snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=10
regParam=0.002
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
return runs_df Finally, in the third experiment we retrain the Spark ML Pipeline but first we retrieve the data as it was prior to the append operation by applying the provided Iceberg Snapshot ID in the "spark.read.table" method. def exp3(df, snapshot_id):
##EXPERIMENT 3
df = spark.read.option("snapshot-id", snapshot_id).table("spark_catalog.default.training")
committed_at = spark.sql("SELECT committed_at FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).collect()[0][0].strftime('%m/%d/%Y')
parent_id = str(spark.sql("SELECT parent_id FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).tail(1)[0][0])
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": training_df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=7
regParam=0.005
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
#spark.stop()
return runs_df Summary and Next Steps Large ML organizations require standardized best practices such as tracking models and respective dependencies, model developers, and matching those with datasets in order to keep a consistent view of all MLOps practices. MLFlow Tracking in CML allows you to achieve this goal by allowing you to specify datasets and other custom metadata when tracking experiment runs. In the above example we tracked Iceberg metadata in order to allow data scientists to retrain an existing pipeline with datasets as of arbitrary points in time. In the process, we used tags in order to implement a consistent taxonomy across all experiment runs. CML Model Deployment with MLFlow and APIv2 Spark in CML: Recommendatons for using Spark Experiments with MLFlow Registering and Deploying Models with Model Registry Apache Iceberg Documentation Iceberg Time Travel Introducing MLOps and SDX for Models in CML
... View more
03-27-2024
04:42 PM
CML Model Deployment with MLFlow and APIv2 Cloudera Machine Learning (CML) on Cloudera Data Platform (CDP) accelerates time-to-value by enabling data scientists to collaborate in a single unified platform that is all inclusive for powering any AI use case. Purpose-built for agile experimentation and production ML workflows, Cloudera Machine Learning manages everything from data preparation to MLOps, to predictive reporting. CML exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line. In this example we will showcase how to use APIv2 to programmatically register an XGBoost experiment via MLFlow Tracking, Registry, and deploy it as a model endpoint in CML. MLOps in CML CML has extensive MLOps features and a set of model and lifecycle management capabilities to enable the repeatable, transparent, and governed approaches necessary for scaling model deployments and ML use cases. It´s built to support open source standards and is fully integrated with CDP, enabling customers to integrate into existing and future tooling while not being locked into a single vendor. CML enables enterprises to proactively monitor technical metrics such as service level agreements (SLA) adherence, uptime, and resource use as well as prediction metrics including model distribution, drift, and skew from a single governance interface. Users can set custom alerts and eliminate the model “black box” effect with native tools for visualizing model lineage, performance, and trends. Some of the benefits with CML include: Model cataloging and lineage capabilities to allow visibility into the entire ML lifecycle, which eliminates silos and blind spots for full lifecycle transparency, explainability, and accountability. Full end-to-end machine learning lifecycle management that includes everything required to securely deploy machine learning models to production, ensure accuracy, and scale use cases. An extensive model monitoring service designed to track and monitor both technical aspects and accuracy of predictions in a repeatable, secure, and scalable way. New MLOps features for monitoring the functional and business performance of machine learning models such as detecting model performance and drift over time with native storage and access to custom and arbitrary model metrics; measuring and tracking individual prediction accuracy, ensuring models are compliant and performing optimally. The ability to track, manage, and understand large numbers of ML models deployed across the enterprise with model cataloging, full lifecycle lineage, and custom metadata in Apache Atlas. The ability to view the lineage of data tied to the models built and deployed in a single system to help manage and govern the ML lifecycle. Increased model security for Model REST endpoints, which allows models to be served in a CML production environment without compromising security. Use Case In this example we will create a basic MLOps pipeline to put a credit card fraud classifier into production. We will create a model prototype with XGBoost, register and manage experiments with MLFlow Tracking, and stage the best experiment run in the MLFlow Registry. Next, we will deploy the model from the Registry into an API Endpoint, and finally redeploy it with additional resources for High Availability and increased serving performance. The full code is available in this git repository. Step by Step Guide Setup Create a CML Project with Python 3.9 / Workbench Editor Runtime. Launch a CML Session and install requirements. Open script "00_datagen.py" and update lines 140 and 141 with your Iceberg database name and Spark Data Connection Name. Then run it. Script 1: Create the Model Experiment Run script "01_train_xgboost.py" in order to create an MLFlow Experiment. Code highlights: MLFlow is installed in CML by default. You must import mlflow in order to use it in your script. The experiment run is determined by the "mlflow tracking run context". When this executes for the first time an experiment is created. If the same code runs again without changing EXPERIMENT_NAME, a new Experiment Run is logged for the same experiment. Else, a new experiment is created. You can log one or multiple metrics for the specific run with the "mlflow.log_param()" method. Model artifacts such as useful metadata and dependencies are logged with the "mlflow.log_model()" method. import mlflow
EXPERIMENT_NAME = "xgb-cc-fraud-{0}-{1}".format(USERNAME, DATE)
mlflow.set_experiment(EXPERIMENT_NAME)
with mlflow.start_run():
model = XGBClassifier(use_label_encoder=False, eval_metric="logloss")
model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
mlflow.log_param("accuracy", accuracy)
mlflow.xgboost.log_model(model, artifact_path="artifacts") You can use the mlflow library or instantiate an mlflow client to manage experiments. In this example we use the "mlflow.get_experiment_by_name()", "mlflow.search_runs()" and "mlflow.get_run()" methods. In this example we also instantiate the client to list artifacts for a specific run. def getLatestExperimentInfo(experimentName):
"""
Method to capture the latest Experiment Id and Run ID for the provided experimentName
"""
experimentId = mlflow.get_experiment_by_name(experimentName).experiment_id
runsDf = mlflow.search_runs(experimentId, run_view_type=1)
experimentId = runsDf.iloc[-1]['experiment_id']
experimentRunId = runsDf.iloc[-1]['run_id']
return experimentId, experimentRunId
experimentId, experimentRunId = getLatestExperimentInfo(EXPERIMENT_NAME)
#Replace Experiment Run ID here:
run = mlflow.get_run(experimentRunId)
pd.DataFrame(data=[run.data.params], index=["Value"]).T
pd.DataFrame(data=[run.data.metrics], index=["Value"]).T
client = mlflow.tracking.MlflowClient()
client.list_artifacts(run_id=run.info.run_id) Script 2: Register the Model Experiment Run script "02_cml_api_endpoint.py" in order to register an MLFlow Experiment. Code highlights: CML APIv2 is installed in your workspace by default. You must import cmlapi in order to use it in your script. The API provides about 100 python methods for MLOps. In this example, we created a "registerModelFromExperimentRun" method as a wrapper to the API's create_registered_model() method. In this example, we created a ModelRegistration class including the "registerModelFromExperimentRun" method to register the model. Creating your own Python classes and methods to implement the API methods in the context of your project is highly recommended. import cmlapi
from cmlapi.rest import ApiException
class ModelRegistration():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, username, experimentName):
self.client = cmlapi.default_client()
self.username = username
self.experimentName = experimentName
def registerModelFromExperimentRun(self, modelName, experimentId, experimentRunId, modelPath):
"""
Method to register a model from an Experiment Run
Input: requires an experiment run
Output: api_response object
"""
model_name = 'xgb-cc-' + username
CreateRegisteredModelRequest = {
"project_id": os.environ['CDSW_PROJECT_ID'],
"experiment_id" : experimentId,
"run_id": experimentRunId,
"model_name": modelName,
"model_path": modelPath
}
try:
# Register a model.
api_response = self.client.create_registered_model(CreateRegisteredModelRequest)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_registered_model: %s\n" % e)
return api_response The response from the method request contains very useful information. In this example, the registeredModelResponse response includes modelId and modelVersionId variables which are in turn used by other API methods. modelReg = ModelRegistration(username, experimentName)
modelPath = "artifacts"
modelName = "FraudCLF-" + username
registeredModelResponse = modelReg.registerModelFromExperimentRun(modelName, experimentId, experimentRunId, modelPath)
modelId = registeredModelResponse.model_id
modelVersionId = registeredModelResponse.model_versions[0].model_version_id
registeredModelResponse.model_versions[0].model_version_id Script 3: Deploy Endpoint from Registry Model Run script "03_api_deployment.py" in order to create a Model Endpoint from the registered model. Code highlights: In this example we created a ModelDeployment class to manage multiple API wrapper methods. The "listRegisteredModels()" method is a wrapper to the API's "list_registered_models()" method. Notice it is arbitrarily preconfigured to list models corresponding to your user and model name. In the context of a broader MLOps pipeline, these values can obviously be parameterized. This method is necessary for obtaining the "registeredModelId" variable needed for model deployment. The "getRegisteredModel()" method is a wrapper to the API's "get_registered_model()" method. This method is necessary for obtaining the "modelVersionId" variable needed for model deployment. Once registeredModelId and modelVersionId are obtained, you can begin the deployment. The deployment consists of three phases: model creation, model build, and model deployment. The model creation corresponds to the creation of an API Endpoint. Once you run this, you will see a new entry in the Model Deployments tab. The model build corresponds to the creation of the model's container. Thanks to MLFlow Registry, CML automatically packages all dependencies used to train the Experiment into the model endpoint for you. The model deployment corresponds to the activation of the model endpoint. This is when the container with its associated resource profile and endpoint is actually deployed so inference can start. The "listRuntimes()" method is an example of querying the Workspace for all available runtimes in order to select the most appropriate for model build. class ModelDeployment():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, projectId, username):
self.client = cmlapi.default_client()
self.projectId = projectId
self.username = username
def listRegisteredModels(self):
"""
Method to retrieve registered models by the user
"""
#str | Search filter is an optional HTTP parameter to filter results by. Supported search_filter = {\"model_name\": \"model_name\"} search_filter = {\"creator_id\": \"<sso name or user name>\"}. (optional)
search_filter = {"creator_id" : self.username, "model_name": "FraudCLF-"+self.username}
search = json.dumps(search_filter)
page_size = 1000
try:
# List registered models.
api_response = self.client.list_registered_models(search_filter=search, page_size=page_size)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_registered_models: %s\n" % e)
return api_response
def getRegisteredModel(self, modelId):
"""
Method to return registered model metadata including model version id
"""
search_filter = {"creator_id" : self.username}
search = json.dumps(search_filter)
try:
# Get a registered model.
api_response = self.client.get_registered_model(modelId, search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->get_registered_model: %s\n" % e)
return api_response
def createModel(self, projectId, modelName, registeredModelId, description = "Fraud Detection 2024"):
"""
Method to create a model
"""
CreateModelRequest = {
"project_id": projectId,
"name" : modelName,
"description": description,
"registered_model_id": registeredModelId,
"disable_authentication": True
}
try:
# Create a model.
api_response = self.client.create_model(CreateModelRequest, projectId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model: %s\n" % e)
return api_response
def createModelBuild(self, projectId, modelVersionId, modelCreationId, runtimeId):
"""
Method to create a Model build
"""
# Create Model Build
CreateModelBuildRequest = {
"registered_model_version_id": modelVersionId,
"runtime_identifier": runtimeId,
"comment": "invoking model build",
"model_id": modelCreationId
}
try:
# Create a model build.
api_response = self.client.create_model_build(CreateModelBuildRequest, projectId, modelCreationId)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_build: %s\n" % e)
return api_response
def createModelDeployment(self, modelBuildId, projectId, modelCreationId):
"""
Method to deploy a model build
"""
CreateModelDeploymentRequest = {
"cpu" : "2",
"memory" : "4"
}
try:
# Create a model deployment.
api_response = self.client.create_model_deployment(CreateModelDeploymentRequest, projectId, modelCreationId, modelBuildId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_deployment: %s\n" % e)
return api_response
def listRuntimes(self):
"""
Method to list available runtimes
"""
search_filter = {"kernel": "Python 3.9", "edition": "Standard", "editor": "Workbench"}
# str | Search filter is an optional HTTP parameter to filter results by.
# Supported search filter keys are: [\"image_identifier\", \"editor\", \"kernel\", \"edition\", \"description\", \"full_version\"].
# For example: search_filter = {\"kernel\":\"Python 3.7\",\"editor\":\"JupyterLab\"},. (optional)
search = json.dumps(search_filter)
try:
# List the available runtimes, optionally filtered, sorted, and paginated.
api_response = self.client.list_runtimes(search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_runtimes: %s\n" % e)
return api_response Once you have created your model endpoint, give it a minute and then try a test request: {"dataframe_split":
{"columns": ["age", "credit_card_balance", "bank_account_balance", "mortgage_balance", "sec_bank_account_balance", "savings_account_balance", "sec_savings_account_balance", "total_est_nworth", "primary_loan_balance", "secondary_loan_balance", "uni_loan_balance", "longitude", "latitude", "transaction_amount"],
"data":[[35.5, 20000.5, 3900.5, 14000.5, 2944.5, 3400.5, 12000.5, 29000.5, 1300.5, 15000.5, 10000.5, 2000.5, 90.5, 120.5]]}} Script 4: Endpoint Redeployment Run script "04_api_redeployment.py" in order to create a new model deployment with increased resources. Code highlights: A slightly different version of the ModelDeployment class is implemented. This includes the "get_latest_deployment_details()" as an example of creating a wrapper method to the API's "list_models()" and "list_model_deployments()" methods all in one. You can implement your own methods in a similar fashion as best needed in the context of your MLOps pipeline. Once the latest model deployment's metadata has been obtained in one go, a new model build is created with additional CPU, Memory and Replicas. Notice that in the process you also have the ability to switch to a different runtime as needed. deployment = ModelDeployment(projectId, username)
getLatestDeploymentResponse = deployment.get_latest_deployment_details(modelName)
listRuntimesResponse = deployment.listRuntimes()
listRuntimesResponse
runtimeId = 'docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-workbench-python3.9-standard:2024.02.1-b4' # Copy a runtime ID from previous output
cpu = 2
mem = 4
replicas = 2
createModelBuildResponse = deployment.createModelBuild(projectId, modelVersionId, modelCreationId, runtimeId, cpu, mem, replicas)
modelBuildId = createModelBuildResponse.id
deployment.createModelDeployment(modelBuildId, projectId, modelCreationId) Summary and Next Steps Cloudera Machine Learning exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line. CML's API accelerates your data science projects by allowing you to build end to end pipelines programmatically. When coupled with CML MLFlow Tracking and MLFlow Registry, it can be used to manage models from inception to production. APIv2 Documentation APIv2 Examples APIv2 AMP Registering and Deploying Models with Model Registry Securing Models CML Projects CML Runtimes Introducing MLOps and SDX for Models in CML Model Registry GA in CML
... View more
Labels:
02-28-2024
01:56 PM
1 Kudo
Objective CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch and Streaming Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. CDE Sessions are short-lived development environment used by Data Engineers and Analysts in order to iterate upon and build Spark workloads, prototype pipelines, or simply explore data. You can use CDE Sessions in CDE Virtual Clusters of type "All Purpose - Tier 2". In this example we will use a CDE Session to perform basic Cloud Storage File System operations with the Hadoop API. The API was originally used to perform file system operations in HDFS but is still used to interact with S3 and ADLS in the Public Cloud. Requirements The only requirement for this exercise is a CDE Virtual Cluster in a CDE Service on version 1.19+. Version 1.20+ is recommended. Copying the following commands in a CDE Session is recommended, although you can also run the same code in a CDE Job of type Spark. Step by Step Instructions Preload some data in a cloud storage location of your choice. Then, specify the path as a Python variable. In this example we use two locations to read and write data with AWS S3. >>> readlocation = "s3a://go01-demo/mycsvfile.csv"
>>> writelocation = "s3a://go01-demo/newdir/myfile" Read the file in a Spark Dataframe: >>> df=spark.read.csv(readlocation) Obtain the configured FileSystem implementation: >>> myPath = "s3a://go01-demo/newdir"
>>> sc=df.rdd.context
>>> hadoop = sc._jvm.org.apache.hadoop
>>> hdfs = hadoop.fs.FileSystem.get(sc._jvm.java.net.URI.create(myPath), sc._jsc.hadoopConfiguration()) Write the dataframe as a file in Cloud Storage: >>> df.write.mode("overwrite").parquet(writelocation) List the contents of the directory: >>> status = hdfs.listStatus(hadoop.fs.Path("s3a://go01-demo/newdir"))
>>> for fileStatus in status:
print(fileStatus.getPath())
s3a://go01-demo/newdir/myfile List the contents of a directory with filter pattern: >>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
print(fileStatus.getPath())
s3a://go01-demo/test_file.csv Rename file: >>> hdfs.rename(hadoop.fs.Path("s3a://go01-demo/test_file.csv"), hadoop.fs.Path("s3a://go01-demo/test_file_2.csv"))
True
>>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
print(fileStatus.getPath())
s3a://go01-demo/test_file_2.csv Delete file: >>> hdfs.delete(hadoop.fs.Path("s3a://go01-demo/test_file_2.csv"), True)
True
>>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
print(fileStatus.getPath()) Create a subdirectory: >>> hdfs.mkdirs(hadoop.fs.Path("s3a://go01-demo/newdir/newsubdir"))
True References The full API Method list is located at this link. Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
12-13-2023
06:02 PM
CDE is the Cloudera Data Engineering Service, a containerized managed service for the Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow, and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications and less time on infrastructure.
CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs.
Cloudera Data Engineering (CDE) provides a command-line interface (CLI) client. The CDE CLI doesn't just allow you to create CDE Spark Jobs with syntax that is nearly identical to that of a Spark Submit.
The CDE CLI also offers you invaluable advantages as part of your daily Spark Pipeline development and management activities, including enhancing observability, reusability, and overall organization of Spark dependencies and configurations.
Simply put, if you're creating and managing many Spark Submits, the CDE CLI will save you time and dramatically increase your productivity.
In this article, you will learn about different options for parameterizing a CDE Spark Job i.e. to pass Spark Configurations, Options, Files and Arguments.
Requirements
The following are required in order to reproduce these commands in your CDE environment:
A CDE Service on version 1.19.0 or above.
A working installation of the CDE CLI. Please follow these instructions to install the CLI.
CDE Spark Jobs and Spark Configurations
The CDE Spark Job is a reusable definition of a Spark Application consisting of its code, and file, Docker, and Python dependencies, and Spark configurations and options.
Once a CDE Spark Job is created, its definition is accessible in the CDE UI or via the CDE CLI.
cde job create --name myJob\
--type Spark\
--application-file myScript.py\
--mount-1-resource myFilesResource
More importantly, it is not yet run. The Application is actually executed with a separate command.
cde job run --name myJob
Generally, parameterizing a CDE Spark Jobs allows you to dynamically set important configurations at runtime i.e. override default parameters based on some changing external input.
Before moving on, let us review the different types of parameters that can be set in a CDE Spark Job:
1. Spark Application File
This is the PySpark script or Jar containing the Application code. In CDE you must use the --application-file flag and you can only set this at CDE Spark Job creation time.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource cde job create --name myScalaJob
--type spark
--application-file myJar.jar
--mount-1-resource myFilesResource
The files can only be found by CDE if they are present in a Files resource. As shown above, the Files resource is then set with the --mount-N-resource flag. More on that in the end to end example section.
2. Spark Resources
These include basic job resources such as executor and driver memory and cores, initial number of executors, min and max executors for Spark Dynamic Allocation.
They can be set both at CDE Spark Job creation as well as runtime. Each of these has a default value which is chosen unless specified by your CLI command.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--executor-cores 4\
--executor-memory "2g" cde job run --name myPySparkJob
--executor-cores 4
--driver-cores 2
The above job will run with the following resource configurations:
--executor-cores 4 --executor-memory "2g" and --driver-cores 2
3. Spark Configurations
Spark offers a wide range of configurations ranging from Python version to memory options, join and dynamic allocation thresholds, and much more.
These can be set via
--conf flag e.g. --conf spark.executor.memoryOverhead=6g or --conf spark.pyspark.python=python3 or --conf spark.yarn.maxAppAttempts=1.
These can also be ovverriden at runtime.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--conf spark.executor.memoryOverhead=6g cde job run --name myPySparkJob\
--conf spark.executor.memoryOverhead=2g
In the above example, the "memoryOverhead" setting is overriden to "2g".
4. Command Line Arguments
These are specific variable inputs that lend themselves particularly well to being overridden at runtime.
They are defined with the --arg flag in the CDE Spark Job definition, and require being read in Spark Application code via the Python sys.argv module.
For example, a CDE Spark Job will include the --arg 10 argument from the CLI in order for the value of "10" to be utilized as part of the Spark Application code.
Example CDE CLI command to create the CDE Spark Job:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--conf spark.executor.memoryOverhead=6g\
--arg 10
Example CDE Spark Job application code rerefencing the argument:
import sys
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("CLIArguments")\
.getOrCreate()
print("JOB ARGUMENT")
print(sys.argv[1])
You can override CLI arguments at runtime. The following example sets the input to 5 rather than 10:
cde job run --name myPySparkJob --arg 5
5. Files
These can be set via the --file or --py-files options and allow you to specify file dependencies at CDE Spark Job creation.
More importantly, the funtionality provided by these flags is enhanced by CDE Files Resources and we using them instead is generally recommended.
CDE Files Resources are unique to CDE and allow you to pass and manage entire files as CDE Spark Job dependencies.
You can reference these files within Spark Application code via the /app/mount prefix.
Example:
cde job create --name penv-udf-test --type spark \
--mount-1-prefix appFiles/ \
--mount-1-resource cde-cli-files-resource \
--python-env-resource-name cde-cli-penv-resource \
--application-file /app/mount/appFiles/deps_udf_dependency.py \
--conf spark.executorEnv.PYTHONPATH=/app/mount/appFiles/deps_udf.zip \
--py-file /app/mount/appFiles/deps_udf.zip
Here, the "--py-file" flag is used to set in order to distribute dependencies that have been compressed and then loaded in a CDE Files Resource.
Notice that unlike in a Spark Submit, you cannot use --file to specify the application code and you must use the --application-file flag as shown in example 1.
CDE Files Resources are mounted on the Spark Application pod at CDE Job runtime. Thanks to this, you can also reference a file from within your Spark Application code by using the ConfigParser module and referencing the "/app/mount" directory.
Example properties file "parameters.conf" that has been loaded in the CDE Files Resource referenced by the CDE Spark Job:
[general]
property_1: s3a://go01-demo/
property_2: datalake/
Example PySpark application code referencing the "parameters.conf" file:
from pyspark.sql import SparkSession
import configparser
config = configparser.ConfigParser()
config.read('/app/mount/parameters.conf')
data_lake_name=config.get("general","property_1")
file_path=config.get("general","property_2")
spark = SparkSession.\
builder.\
appName('INGEST').\
config("spark.kubernetes.access.hadoopFileSystems", data_lake_name).getOrCreate()
cloudPath=data_lake_name+file_path
myDf = spark.read.csv(cloudPath + "/myCsv.csv", header=True, inferSchema=True)
myDf.show()
End to End CDE Spark Job Workflow Example
Now that you have gained exposure to the different options we will utilize these in the context of a simplified CDE Spark Job creation workflow that will show how to actually implement the above:
Create a CDE Files Resource in order to host all Application code and file dependencies.
Upload Application code and file dependencies to the CDE Files Resource in order to make those accessible to CDE Spark Jobs in the future.
Create the CDE Spark Job by referencing Application code, file dependencies and other Spark configurations in order to then:
Run the CDE Spark Job with either no additional configurations or by overriding configurations in order to execute your Spark Application.
1. Create a CDE Files Resource
cde resource create --name myProperties
2. Upload Application Code and File Dependencies
cde resource upload --name myProperties\
--local-path cde_jobs/propertiesFile_1.conf\
--local-path cde_jobs/propertiesFile_2.conf\
--local-path cde_jobs/sparkJob.py
3. Create CDE Spark Job
cde job create --name myPySparkJob\
--type spark\
--application-file sparkJob.py\
--mount-1-resource myProperties\
--executor-cores 2\
--executor-memory "2g"
4. Run the CDE Spark Job with Different Options
Example 1: Run the job with two CLI arguments and read properties file 1
cde job run --name myPySparkJob\
--arg MY_DB\
--arg CUSTOMER_TABLE\
--arg propertiesFile_1.conf
Example 2: Run the job with two CLI arguments and read properties file 2
cde job run --name myPySparkJob\
--arg MY_DB\
--arg SALES_TABLE\
--arg propertiesFile_2.conf
Application code in sparkJob.py:
import sys
from pyspark.sql import SparkSession
import configparser
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.getOrCreate()
print("JOB ARGUMENTS")
print(sys.argv)
print(sys.argv[0])
print(sys.argv[1])
print(sys.argv[2])
print(sys.argv[3])
dbname = sys.argv[1]
tablename = sys.argv[2]
config = configparser.ConfigParser()
config.read('/app/mount/{}.properties'.format(sys.argv[3]))
property_1=config.get("general","property_1")
property_2=config.get("general","property_2")
def myFunction(dbname, tablename, property_1, property_2):
print("DBNAME\n")
print(dbname)
print("TABLNAME\n")
print(tablename)
print("PROPERTY1\n")
print(property_1)
print("PROPERTY2\n")
print(property_2)
print("COMPLETE!\n")
# A list of Rows. Infer schema from the first row, create a DataFrame and print the schema
myFunction(dbname, tablename, property_1, property_2)
Summary and Next Steps
Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more.
In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting:
Installing the CDE CLI
Simple Introduction to the CDE CLI
CDE CLI Demo
CDE Concepts
CDE CLI Command Reference
CDE CLI Spark Flag Reference
CDE CLI Airflow Flag Reference
CDE CLI list command syntax reference
CDE Jobs API Reference
... View more
11-29-2023
04:52 PM
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository and run the following commands in the terminal with the CLI: You can easily list all jobs and job runs. % cde job list
% cde run list However, that is often impossible if you have a large number of jobs/runs in your Virtual Cluster. Therefore, using filters can be very important. Setup Prior to running the filtering commands you must set up some jobs and related dependencies. Run the following commands in bulk. To learn more about these please visit this Cloudera Community Article. % cde resource create --name myScripts \
--type files
% cde resource upload --name myScripts \
--local-path cde_jobs/spark_geospatial.py \
--local-path cde_jobs/utils.py
% cde resource describe --name myScripts
% cde resource create --name myData \
--type files
% cde resource upload-archive --name myData \
--local-path data/ne_50m_admin_0_countries_lakes.zip
cde credential create --name my-docker-creds \
--type docker-basic \
--docker-server hub.docker.com \
cde resource create --name dex-spark-runtime-sedona-geospatial \
--image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 \
--image-engine spark3 \
--type custom-runtime-image
cde job create --name geospatialRdd \
--type spark \
--mount-1-prefix code/ --mount-1-resource myScripts \
--mount-2-prefix data/ --mount-2-resource myData \
--runtime-image-resource-name dex-spark-runtime-sedona-geospatial \
--packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 \
--application-file code/spark_geospatial.py \
--arg myArg \
--max-executors 4 \
--min-executors 2 \
--executor-cores 2
cde job run --name geospatialRdd --executor-cores 4 Monitoring Examples Now the monitoring examples: Filter all jobs by name where name equals "geospatialRdd" % cde job list --filter 'name[eq]geospatialRdd'
[
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] You can nest filters. For example, filter all jobs where job application file equals "code/spark_geospatial.py": % cde job list --filter 'spark.file[eq]code/spark_geospatial.py'
[
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] You can use different operators. For example, search all jobs whose name contains "spark": % cde job list --filter 'name[rlike]spark'
[
{
"name": "sparkxml",
"type": "spark",
"created": "2023-11-23T07:11:41Z",
"modified": "2023-11-23T07:39:32Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "files"
}
],
"spark": {
"file": "read_xml.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1,
"conf": {
"dex.safariEnabled": "false",
"spark.jars.packages": "com.databricks:spark-xml_2.12:0.16.0",
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
}
] Search all jobs created on or after 11/23/23: % cde job list --filter 'created[gte]2023-11-23'
API User Password: [
{
"name": "asdfsdfsdfsdf",
"type": "airflow",
"created": "2023-11-23T06:56:45Z",
"modified": "2023-11-23T06:56:45Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "PipelineResource-asdfsdfsdfsdf-1700722602468"
}
],
"airflow": {
"dagID": "asdfsdfsdfsdf",
"dagFile": "dag.py"
},
"schedule": {
"enabled": false,
"user": "dschoberle",
"start": "Thu, 23 Nov 2023 06:56:44 GMT",
"catchup": true
}
},
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] Search all jobs with executorCores less than 2: % cde job list --filter 'spark.executorCores[lt]2'
API User Password: [
{
"name": "CDEPY_SPARK_JOB",
"type": "spark",
"created": "2023-11-14T23:02:48Z",
"modified": "2023-11-14T23:02:48Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "CDEPY_DEMO"
}
],
"spark": {
"file": "pysparksql.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 1,
"conf": {
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
},
{
"name": "CDEPY_SPARK_JOB_APAC",
"type": "spark",
"created": "2023-11-15T03:33:36Z",
"modified": "2023-11-15T03:33:36Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "CDEPY_DEMO_APAC"
}
],
"spark": {
"file": "pysparksql.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 1,
"conf": {
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
},
] List all runs for job "geospatialRdd": % cde run list --filter 'job[eq]geospatialRdd'
[
{
"id": 21815,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:32:02Z",
"ended": "2023-11-29T00:32:36Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-f542530da24f485da4993338dca81d3c",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-f542530da24f485da4993338dca81d3c/jobs/",
"spec": {
"file": "code/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 4,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
},
{
"id": 21825,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:48:29Z",
"ended": "2023-11-29T00:49:01Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-e5460856fb3a459ba7ee2c748c802d07",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-e5460856fb3a459ba7ee2c748c802d07/jobs/",
"spec": {
"file": "myScripts/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] You can combine multiple filters. Return all job runs from today (11/29/23) i.e. where the start date is greater than or equal to 11/29 and the end date is less than or equal to 11/30. Notice all times default to +00 UTC timezone. % cde run list --filter 'started[gte]2023-11-29' --filter 'ended[lte]2023-11-30'
[
{
"id": 21907,
"job": "ge_data_quality-pauldefusco-banking",
"type": "spark",
"status": "succeeded",
"user": "pauldefusco",
"started": "2023-11-29T02:56:44Z",
"ended": "2023-11-29T02:57:46Z",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "cde_demo_files-pauldefusco-banking"
}
],
"runtimeImageResourceName": "dex-spark-runtime-ge-data-quality-pauldefusco-banking",
"spark": {
"sparkAppID": "spark-8f9d7999056f4b53a01cc2afc5304cca",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-8f9d7999056f4b53a01cc2afc5304cca/jobs/",
"spec": {
"file": "ge_data_quality.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
},
{
"id": 21908,
"job": "data_quality_orchestration-pauldefusco-banking",
"type": "airflow",
"status": "running",
"user": "pauldefusco",
"started": "2023-11-29T03:00:01Z",
"ended": "0001-01-01T00:00:00Z",
"airflow": {
"dagID": "CDE_Demo_pauldefusco-banking",
"dagRunID": "scheduled__2023-11-29T02:55:00+00:00",
"dagFile": "airflow.py",
"executionDate": "2023-11-29T02:55:00Z"
}
},
{
"id": 21909,
"job": "batch_load-pauldefusco-banking",
"type": "spark",
"status": "running",
"user": "pauldefusco",
"started": "2023-11-29T03:00:14Z",
"ended": "0001-01-01T00:00:00Z",
"mounts": [
{
"dirPrefix": "jobCode/",
"resourceName": "cde_demo_files-pauldefusco-banking"
}
],
"runtimeImageResourceName": "dex-spark-runtime-ge-data-quality-pauldefusco-banking",
"spark": {
"sparkAppID": "spark-3d8a4704418841929d325af0e0190a20",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/livy-batch-14907-dyL7LLeM",
"spec": {
"file": "jobCode/batch_load.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] List all successful airflow jobs created by user pauldefusco that started after 3 am UTC on 11/29/23: % cde run list --filter 'type[eq]airflow' --filter 'status[eq]succeeded' --filter 'user[eq]pauldefusco' --filter 'started[gte]2023-11-29T03'
[
{
"id": 21908,
"job": "data_quality_orchestration-pauldefusco-banking",
"type": "airflow",
"status": "succeeded",
"user": "pauldefusco",
"started": "2023-11-29T03:00:01Z",
"ended": "2023-11-29T03:03:01Z",
"airflow": {
"dagID": "CDE_Demo_pauldefusco-banking",
"dagRunID": "scheduled__2023-11-29T02:55:00+00:00",
"dagFile": "airflow.py",
"executionDate": "2023-11-29T02:55:00Z"
}
}
] List all CDE Resources will return all types ("python-env", "files", "custom-runtime-image"): % cde resource list
[
{
"name": "BankingPyEnv",
"type": "python-env",
"status": "pending-build",
"created": "2023-11-07T21:27:16Z",
"modified": "2023-11-07T21:27:16Z",
"retentionPolicy": "keep_indefinitely",
"pythonEnv": {
"pythonVersion": "python3",
"type": "python-env"
}
},
{
"name": "CDEPY_DEMO_APAC",
"type": "files",
"status": "ready",
"signature": "5d216f3c4a10578ffadba415b13022d9e383bc22",
"created": "2023-11-15T03:33:36Z",
"modified": "2023-11-15T03:33:36Z",
"retentionPolicy": "keep_indefinitely"
},
{
"name": "dex-spark-runtime-sedona-geospatial",
"type": "custom-runtime-image",
"status": "ready",
"created": "2023-11-28T23:51:11Z",
"modified": "2023-11-28T23:51:11Z",
"retentionPolicy": "keep_indefinitely",
"customRuntimeImage": {
"engine": "spark3",
"image": "pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003"
}
}
] List all CDE Resources named "myScripts": % cde resource list --filter 'name[eq]myScripts'
[
{
"name": "myScripts",
"type": "files",
"status": "ready",
"signature": "17f820aacdad9bbd17a24d78a5b93cd0ec9e467b",
"created": "2023-11-28T23:31:31Z",
"modified": "2023-11-29T01:48:12Z",
"retentionPolicy": "keep_indefinitely"
}
] List all CDE Resources of type Python Environment: % cde resource list --filter 'type[eq]python-env'
[
{
"name": "BankingPyEnv",
"type": "python-env",
"status": "pending-build",
"created": "2023-11-07T21:27:16Z",
"modified": "2023-11-07T21:27:16Z",
"retentionPolicy": "keep_indefinitely",
"pythonEnv": {
"pythonVersion": "python3",
"type": "python-env"
}
}
] Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-29-2023
04:42 PM
2 Kudos
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. Apache Iceberg Apache Iceberg is a cloud-native, high-performance open table format for organizing petabyte-scale analytic datasets on a file system or object store. Combined with Cloudera Data Platform (CDP), users can build an open data lakehouse architecture for multi-function analytics and to deploy large scale end-to-end pipelines. Open data lakehouse on CDP simplifies advanced analytics on all data with a unified platform for structured and unstructured data and integrated data services to enable any analytics use case from ML, BI to stream analytics and real-time analytics. Apache Iceberg is the secret sauce of the open lakehouse. CDE Sessions A Cloudera Data Engineering (CDE) Session is an interactive short-lived development environment for running Spark commands to help you iterate upon and build your Spark workloads. You can use CDE Sessions in CDE Virtual Clusters of type "All Purpose - Tier 2". The following commands illustrate a basic Iceberg Time Travel Example. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository and run the following commands in your terminal. Create the Session: % cde session create --name interactiveSession \
--type pyspark \
--executor-cores 2 \
--executor-memory "2g"
{
"name": "interactiveSession",
"creator": "pauldefusco",
"created": "2023-11-28T22:00:47Z",
"type": "pyspark",
"lastStateUpdated": "2023-11-28T22:00:47Z",
"state": "starting",
"interactiveSpark": {
"id": 5,
"driverCores": 1,
"executorCores": 2,
"driverMemory": "1g",
"executorMemory": "2g",
"numExecutors": 1
}
} Show session metadata: % cde session describe --name interactiveSession
{
"name": "interactiveSession",
"creator": "pauldefusco",
"created": "2023-11-28T22:00:47Z",
"type": "pyspark",
"lastStateUpdated": "2023-11-28T22:01:16Z",
"state": "available",
"interactiveSpark": {
"id": 5,
"appId": "spark-3fe3bd8905a04eef8805e6b973ec4289",
"driverCores": 1,
"executorCores": 2,
"driverMemory": "1g",
"executorMemory": "2g",
"numExecutors": 1
}
} Interact via the PySpark Shell from your terminal (the session is running in CDE): % cde session interact --name interactiveSession
Starting REPL...
Waiting for the session to go into an available state...
Connected to Cloudera Data Engineering...
Press Ctrl+D (i.e. EOF) to exit
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\
/_/
Type in expressions to have them evaluated.
>>> Run some basic Spark SQL operations: from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
some_df = spark.createDataFrame(rows)
some_df.printSchema()
>>> from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
>>> rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
>>> some_df = spark.createDataFrame(rows)
>>> some_df.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
>>> Notice that we didn't need to create a Spark Session. The Spark Context is already running inside the CDE Session. Here are all the confs the session is running with. Notice that Iceberg dependencies have already been accounted for: >>> def printConfs(confs):
for ele1,ele2 in confs:
print("{:<14}{:<11}".format(ele1,ele2))
>>> printConfs(confs)
spark.eventLog.enabledtrue
spark.driver.hostinteractivesession-b7d65d8c1d6005a9-driver-svc.dex-app-58kqsms2.svc
spark.kubernetes.executor.annotation.created-bylivy
spark.kubernetes.memoryOverheadFactor0.1
spark.sql.catalog.spark_catalogorg.apache.iceberg.spark.SparkSessionCatalog
spark.kubernetes.container.imagecontainer.repository.cloudera.com/cloudera/dex/dex-livy-runtime-3.3.0-7.2.16.3:1.19.3-b29
spark.kubernetes.executor.label.nameexecutor
spark.kubernetes.driver.connectionTimeout60000
spark.hadoop.yarn.resourcemanager.principalpauldefusco
...
spark.yarn.isPythontrue
spark.kubernetes.submission.connectionTimeout60000
spark.kryo.registrationRequiredfalse
spark.sql.catalog.spark_catalog.typehive
spark.kubernetes.driver.pod.nameinteractivesession-b7d65d8c1d6005a9-driver Load a CSV file from Cloud Storage: >>> cloudPath = "s3a://go01-demo/datalake/pdefusco/cde119_workshop"
>>> car_installs = spark.read.csv(cloudPath + "/car_installs_119.csv", header=True, inferSchema=True)
>>> car_installs.show()
+-----+-----+----------------+--------------------+
| id|model| VIN| serial_no|
+-----+-----+----------------+--------------------+
|16413| D|433248UCGTTV245J|5600942CL3R015666...|
|16414| D|404328UCGTTV965J|204542CL4R0156661...|
|16415| B|647168UCGTTV8Z5J|6302942CL2R015666...|
|16416| B|454608UCGTTV7H5J|4853942CL1R015666...|
|16417| D|529408UCGTTV6R5J|2428342CL9R015666...|
|16418| B|362858UCGTTV7A5J|903142CL2R0156661...|
|16419| E|609158UCGTTV245J|3804142CL7R015666...|
|16420| D| 8478UCGTTV825J|6135442CL7R015666...|
|16421| B|539488UCGTTV4R5J|306642CL6R0156661...|
|16422| B|190928UCGTTV6A5J|5466242CL1R015666...|
|16423| B|316268UCGTTV4M5J|4244342CL5R015666...|
|16424| B|298898UCGTTV3Y5J|3865742CL4R015666...|
|16425| B| 28688UCGTTV9T5J|6328542CL5R015666...|
|16426| D|494858UCGTTV295J|463642CL5R0156661...|
|16427| D|503338UCGTTV5Y5J|4358642CL2R015666...|
|16428| D|167128UCGTTV2H5J|3809342CL1R015666...|
|16429| D|547178UCGTTV7M5J|2768042CL3R015666...|
|16430| B|503998UCGTTV4Q5J|2568142CL6R015666...|
|16431| D|433998UCGTTV9Y5J|6338642CL6R015666...|
|16432| B|378548UCGTTV7V5J|2648942CL1R015666...|
+-----+-----+----------------+--------------------+ Create a Hive Managed Table with Spark: >>> username = "pauldefusco"
>>> spark.sql("DROP DATABASE IF EXISTS MYDB_{} CASCADE".format(username))
>>> spark.sql("CREATE DATABASE IF NOT EXISTS MYDB_{}".format(username))
>>> car_installs.write.mode("overwrite").saveAsTable('MYDB_{0}.CAR_INSTALLS_{0}'.format(username), format="parquet") Migrate the table to Iceberg Table Format: spark.sql("ALTER TABLE MYDB_{0}.CAR_INSTALLS_{0} UNSET TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL')".format(username))
spark.sql("CALL spark_catalog.system.migrate('MYDB_{0}.CAR_INSTALLS_{0}')".format(username)) You can query Iceberg Metadata tables to track Iceberg Snapshots, History, Partitions, etc: >>> spark.read.format("iceberg").load("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.history".format(username)).show(20, False)
+-----------------------+-------------------+---------+-------------------+
|made_current_at |snapshot_id |parent_id|is_current_ancestor|
+-----------------------+-------------------+---------+-------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |true |
+-----------------------+-------------------+---------+-------------------+
>>> spark.read.format("iceberg").load("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.snapshots".format(username)).show(20, False)
+-----------------------+-------------------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at |snapshot_id |parent_id|operation|manifest_list |summary |
+-----------------------+-------------------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |append |s3a://go01-demo/warehouse/tablespace/external/hive/mydb_pauldefusco.db/car_installs_pauldefusco/metadata/snap-6191572403226489858-1-bf191e06-38cd-4d6e-9757-b8762c999177.avro|{added-data-files -> 2, added-records -> 82066, added-files-size -> 1825400, changed-partition-count -> 1, total-records -> 82066, total-files-size -> 1825400, total-data-files -> 2, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}|
+-----------------------+-------------------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Insert some data. Notice that Iceberg provides a PySpark API to create, append, and overwrite data in an Iceberg table from a Spark Dataframe. In this case we will append some data that we sample from the same table: # PRE-INSERT TIMESTAMP
>>> from datetime import datetime
>>> now = datetime.now()
>>> timestamp = datetime.timestamp(now)
>>> print("PRE-INSERT TIMESTAMP: ", timestamp)
PRE-INSERT TIMESTAMP: 1701302029.338524
# PRE-INSERT COUNT
>>> spark.sql("SELECT COUNT(*) FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).show()
+--------+
|count(1)|
+--------+
| 82066|
+--------+
>>> temp_df = spark.sql("SELECT * FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).sample(fraction=0.1, seed=3)
>>> temp_df.writeTo("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).append() Check the new count Post insert: # POST-INSERT COUNT
>>> spark.sql("SELECT COUNT(*) FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).show()
+--------+
|count(1)|
+--------+
| 90276|
+--------+ Notice that the table history and snapshots have been updated: >>> spark.sql("SELECT * FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.history".format(username)).show(20, False)
+-----------------------+-------------------+-------------------+-------------------+
|made_current_at |snapshot_id |parent_id |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |true |
|2023-11-30 00:00:15.263|1032812961485886468|6191572403226489858|true |
+-----------------------+-------------------+-------------------+-------------------+
>>> spark.sql("SELECT * FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.snapshots".format(username)).show(20, False)
+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at |snapshot_id |parent_id |operation|manifest_list |summary |
+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |append |s3a://go01-demo/warehouse/tablespace/external/hive/mydb_pauldefusco.db/car_installs_pauldefusco/metadata/snap-6191572403226489858-1-bf191e06-38cd-4d6e-9757-b8762c999177.avro|{added-data-files -> 2, added-records -> 82066, added-files-size -> 1825400, changed-partition-count -> 1, total-records -> 82066, total-files-size -> 1825400, total-data-files -> 2, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0} |
|2023-11-30 00:00:15.263|1032812961485886468|6191572403226489858|append |s3a://go01-demo/warehouse/tablespace/external/hive/mydb_pauldefusco.db/car_installs_pauldefusco/metadata/snap-1032812961485886468-1-142965b8-67ea-4b53-b76d-558ab5e74e1f.avro|{spark.app.id -> spark-93d1909a680948fea5303b55986704ac, added-data-files -> 1, added-records -> 8210, added-files-size -> 183954, changed-partition-count -> 1, total-records -> 90276, total-files-size -> 2009354, total-data-files -> 3, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}|
+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Time travel to pre-insert table state: # TIME TRAVEL AS OF PREVIOUS TIMESTAMP
>>> df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username))
# POST TIME TRAVEL COUNT
>>> print(df.count())
82066 Finally, drop the database: >>> spark.sql("DROP DATABASE IF EXISTS MYDB_{} CASCADE".format(username)) Exit the Spark Shell (Ctrl+D). List commands that were run in the session. Notice that this could be a lot, so the example below only includes a few initial commands. % cde session statements --name interactiveSession
+--------------------------------+------------------------------------------+
| CODE | OUTPUT |
+--------------------------------+------------------------------------------+
| print("hello Spark") | hello Spark |
+--------------------------------+------------------------------------------+
| from pyspark.sql.types import | |
| Row, StructField, StructType, | |
| StringType, IntegerType | |
+--------------------------------+------------------------------------------+
| rows = [Row(name="John", | |
| age=19), Row(name="Smith", | |
| age=23), Row(name="Sarah", | |
| age=18)] | |
+--------------------------------+------------------------------------------+
| some_df = | |
| spark.createDataFrame(rows) | |
+--------------------------------+------------------------------------------+
| some_df.printSchema() | root |-- name: string |
| | (nullable = true) |-- age: |
| | long (nullable = true) |
+--------------------------------+------------------------------------------+ List all sessions: % cde session list
+---------------------------+-----------+---------+-------------+----------------------+----------------------+-------------+
| NAME | STATE | TYPE | DESCRIPTION | CREATED | LAST UPDATED | CREATOR |
+---------------------------+-----------+---------+-------------+----------------------+----------------------+-------------+
| francetemp | killed | pyspark | | 2023-11-16T15:59:35Z | 2023-11-16T16:02:16Z | jmarchand |
| IcebergSession | available | pyspark | | 2023-11-29T21:24:27Z | 2023-11-29T21:56:56Z | pauldefusco |
| interactiveSession | killed | pyspark | | 2023-11-28T22:00:47Z | 2023-11-28T22:01:16Z | pauldefusco |
| interactiveSessionIceberg | available | pyspark | | 2023-11-29T23:17:58Z | 2023-11-29T23:56:06Z | pauldefusco |
| myNewSession | killed | pyspark | | 2023-11-28T21:58:38Z | 2023-11-28T21:59:06Z | pauldefusco |
| mySparkSession | killed | pyspark | | 2023-11-28T21:44:30Z | 2023-11-28T21:45:01Z | pauldefusco |
| TA-demo | killed | pyspark | | 2023-11-13T10:12:12Z | 2023-11-13T10:13:41Z | glivni |
+---------------------------+-----------+---------+-------------+----------------------+----------------------+-------------+ Kill session: % cde session kill --name interactiveSession Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-29-2023
04:34 PM
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. CDE Files Resources A resource in Cloudera Data Engineering (CDE) is a named collection of files used by a job. Resources can include application code, configuration files, custom Docker images, and Python virtual environment specifications. Resources are associated with virtual clusters. A resource can be used by multiple jobs, and jobs can use multiple resources.The resource types supported by CDE are files, python-env, and custom-runtime-image. In this article we will walk through some useful commands for operating with CDE Files Resources efficiently. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository in your local machine. Then run the following commands: Create a Files Resource: % cde resource create --name myScripts \
--type files Uplaod multiple files to the same Files Resource: % cde resource upload --name myScripts \
--local-path cde_jobs/spark_geospatial.py \
--local-path cde_jobs/utils.py
3.5KB/3.5KB 100% [==============================================] spark_geospatial.py
4.0KB/4.0KB 100% [==============================================] utils.py Describe Files resource: % cde resource describe --name myScripts
{
"name": "myScripts",
"type": "files",
"status": "ready",
"signature": "08531cbe538858eb20bda5ff1b7567ae4623d885",
"created": "2023-11-28T23:31:31Z",
"modified": "2023-11-28T23:33:32Z",
"retentionPolicy": "keep_indefinitely",
"files": [
{
"path": "spark_geospatial.py",
"signature": "ec91fb5bddfcd16a0bcbe344f229b5e326b759c5",
"sizeBytes": 3529,
"created": "2023-11-28T23:33:32Z",
"modified": "2023-11-28T23:33:32Z"
},
{
"path": "utils.py",
"signature": "aa5a8ea4b4f240183da8bd2d2b354eeaa58fd97a",
"sizeBytes": 3996,
"created": "2023-11-28T23:33:32Z",
"modified": "2023-11-28T23:33:32Z"
}
]
} Create a Files Resource for data: % cde resource create --name myData \
--type files Upload an archive file to the resource: % cde resource upload-archive --name myData \
--local-path data/ne_50m_admin_0_countries_lakes.zip
817.5KB/817.5KB 100% [==============================================] ne_50m_admin_0_countries_lakes.zip Describe resource metadata. Notice that the archive has been unarchived for you: % cde resource describe --name myData
{
"name": "myData",
"type": "files",
"status": "ready",
"signature": "d552dff8fb80a0c7067afa1c4227b29010cce67b",
"created": "2023-11-28T23:35:43Z",
"modified": "2023-11-28T23:36:56Z",
"retentionPolicy": "keep_indefinitely",
"files": [
{
"path": "ne_50m_admin_0_countries_lakes.cpg",
"signature": "663b90c899fa25a111067be0c22ffc64dcf581c2",
"sizeBytes": 5,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.dbf",
"signature": "eec48a122399782bbef02aa8108e99aeaf52e506",
"sizeBytes": 786828,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.prj",
"signature": "308d6355be935e0f7853161b1adda5bcd48188ff",
"sizeBytes": 143,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.README.html",
"signature": "4bec87fbbe5f4e0e18edb3d6a4f10e9e2a705581",
"sizeBytes": 38988,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.shp",
"signature": "57c38f48c5234db925a9fb1b31785250bd7c8d86",
"sizeBytes": 1652200,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.shx",
"signature": "983eba7b34cf94b0cfe8bda8b2b7d533bd233c49",
"sizeBytes": 2036,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.VERSION.txt",
"signature": "9e3d18e5216a7b4dfd402b00a25ee794842b481b",
"sizeBytes": 7,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
}
]
} Create CDE Credentials for accessing your Docker repository: cde credential create --name my-docker-creds \
--type docker-basic \
--docker-server hub.docker.com \
--docker-username pauldefusco Create a CDE Custom Docker Runtime Resource: cde resource create --name dex-spark-runtime-sedona-geospatial \
--image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 \
--image-engine spark3 \
--type custom-runtime-image Notice that the custom image has already been created. If you want to learn more about how to create Custom Docker Resources please visit this Cloudera Community Article Finally, create a job leveraging all three resources above: cde job create --name geospatialRdd \
--type spark \
--mount-1-prefix code/ --mount-1-resource myScripts \
--mount-2-prefix data/ --mount-2-resource myData \
--runtime-image-resource-name dex-spark-runtime-sedona-geospatial \
--packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 \
--application-file code/spark_geospatial.py \
--arg myArg \
--max-executors 4 \
--min-executors 2 \
--executor-cores 2 Notice the following: When using multiple Files resources you should prefix each i.e. "data" and "code". The "data" prefix is used at line 82 ("/app/mount/data") in the "spark_geospatial.py" script in order to access the data from the resource. The "code" prefix is usded in the same CLI command in the application file argument. If you are using any Spark packages you can set these directly at job creation. You can pass one or multiple arguments to the Python script via the --arg argument. The arguments are referenced in the script with the "sys.argv" syntax e.g. line 60 in "geospatial_rdd.py". Run the job: cde job run --name geospatialRdd --executor-cores 4 Notice that at runtime, you can override spark configs that were set at job creation. For example, the "--executor-cores" was originally set to 2 and is now overridden to 4. List job runs filtering by job name: % cde run list --filter 'job[eq]geospatialRdd'
[
{
"id": 21815,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:32:02Z",
"ended": "2023-11-29T00:32:36Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-f542530da24f485da4993338dca81d3c",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-f542530da24f485da4993338dca81d3c/jobs/",
"spec": {
"file": "code/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 4,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] Open Spark UI for respective run: % cde run ui --id 21815 You can delete single files from resources: % cde resource delete-file --name myScripts --resource-path utils.py You can delete the resource: % cde resource delete --name myScripts Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more