Member since
11-22-2019
38
Posts
10
Kudos Received
0
Solutions
04-03-2025
06:18 PM
Objective
In this article, you will create a CDE Spark Job with dependencies in multiple CDE Repositories.
On CDE and the CLI
CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for large-scale batch and Streaming Pipelines with Spark, Airflow, and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications and less on infrastructure.
CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs.
CDE provides a command-line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, etc.
Requirements
The following are required to reproduce these commands in your CDE environment:
A CDE Service on version 1.23.
A working installation of the CDE CLI. Please follow these instructions to install the CLI.
A working git installation. The two sample git repositories referenced in the CLI commands below have already been created and can be reused.
Code
The application code and util are located in two different repositories. Create two CDE Repositories by cloning git repositories.
cde repository create \
--name function_repo \
--url https://github.com/pdefusco/cde_git_repo_2.git \
--branch main cde repository create \
--name app_code_repo \
--url https://github.com/pdefusco/cde_git_repo.git \
--branch main
Optionally, sync the repositories as needed:
cde repository sync \
--name function_repo cde repository sync \
--name app_code_repo
Create the job by referencing the repositories as if they were CDE Files Resources. In other words, use the `--mount-n-resource` for each repository, as needed:
cde job create \
--name from_multiple_repos \
--type spark \
--mount-1-resource app_code_repo \
--mount-2-resource function_repo \
--application-file app_code.py
Run the Job:
cde job run \
--name from_multiple_repos
Summary and Next Steps
Cloudera Data Engineering (CDE) provides a command-line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on.
In this article, we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI, you might also find the following articles and demos interesting:
Installing the CDE CLI
Simple Introduction to the CDE CLI
CDE CLI Demo
CDE Concepts
CDE CLI Command Reference
CDE CLI Spark Flag Reference
CDE CLI Airflow Flag Reference
CDE CLI list command syntax reference
CDE Jobs API Reference
CDE CLI Articles
Creating a CDE Job with Spark Application Code Located in S3
How to Create a CDE Repository with a Private git Repository
JupyterLab and Spark Connect Quickstart in CDE
How to Create an Iceberg Table with PySpark in CDE
How to Simplify Spark-Submit Jar Dependency Management with CDE
CDE Spark Job with Python Wheel
Iceberg Table Tagging and Branching with PySpark in CDE
Using CDE Resources in CDE Sessions
Working with Iceberg in CDE Sessions
Cloud Storage File System Operations with the Hadoop API in CDE
Working with CDE Spark Job Parameters in CDE
Efficiently Monitoring Jobs, Runs and Resources with the CDE CLI
Working with CDE Files Resources
Enterprise Data Quality at Scale with Spark and Great Expectations in CDE
Spark Geospatial with Apache Sedona in CDE
CDEPY: A Python Package to work with CDE Virtual Clusters
... View more
03-31-2025
06:54 PM
Objective
Deploying Dask Cuda Clusters on Kubernetes for distributed GPU workloads can require time, effort, and money, especially at Enterprise scale. You'd have to set up a Kubernetes cluster with GPU support, manage docker images, and dask cuda workers via complex deployment files.
Cloudera AI simplifies distributed compute use cases in the context of Machine Learning and AI. In this article, you will learn how to quickly deploy a Dask Cuda Cluster in Cloudera AI using the cmlextensions library.
Background Information
Dask CUDA clusters enable scalable parallel computing on NVIDIA GPUs by leveraging Dask, a flexible parallel computing framework, in conjunction with the power of CUDA for GPU acceleration. These clusters allow users to distribute computation across multiple GPUs, significantly speeding up data processing and machine learning workflows, particularly for tasks involving large datasets or complex algorithms.
Cloudera AI is a suite of artificial intelligence and machine learning solutions designed to help organizations harness the power of their data to drive innovation and optimize decision-making. Built on top of the Cloudera Data Platform, it enables businesses to seamlessly integrate AI and ML models into their existing data workflows, providing advanced analytics capabilities.
Enterprise Data Scientists and Engineers utilize Cloudera AI to launch distributed CPU and GPU sessions with frameworks such as Tensorflow, PyTorch, Spark, and Dask. In this context, Cloudera AI simplifies the installation, configuration, and management of dependencies by providing out-of-the-box, customizable Runtimes. These facilitate the deployment of ML workflows and ensure consistency in model execution, from development to production.
The cmlextensions library is an open-source package maintained by Cloudera AI developers that provides a wrapper for the CAI Workers SDK to easily allow the deployment of distributed CPU and GPU sessions. With cmlextensions, options CAI developers can deploy Dask Cuda clusters at scale.
Requirements
To reproduce this example you will need:
A CML Workspace on Cloudera on Cloud or Prem with GPU Nodes enabled. Version 2.0.47+ is recommended.
Setup
Create a CAI project and clone the repository located at this GitHub URL:
https://github.com/pdefusco/cmlextensions
Launch a CML Session with the following Resource Profile:
Editor: PBJ Workbench
Kernel: Python 3.10
Edition: Nvidia GPU
Version: 2025.01 or above
Spark Runtime Add-On: disabled
Resource Profile: 4 vCPU / 16 GiB Memory / 0 GPU
In the session, install Dask and CUDA requirements by running the ```install_requirements.py``` script.
Then, install the CML extensions package:
pip install git+https://github.com/cloudera/cmlextensions.git
Code
Deploy a Dask Cuda cluster with two worker pods, each with two GPUs, by running the following code.
Shortly after running this, you should notice the Dask Scheduler and Workers on the right side of the screen.
from src.cmlextensions.dask_cuda_cluster.dask_cuda_cluster import DaskCudaCluster
cluster = DaskCudaCluster(num_workers=2, worker_cpu=4, nvidia_gpu=2, worker_memory=12, scheduler_cpu=4, scheduler_memory=12)
cluster.init()
Connect to the cluster via the Client constructor. Also on the right side of the screen, notice the Cluster has started and the Client has connected successfully.
from dask.distributed import Client
client = Client(cluster.get_client_url())
Perform some basic data manipulations:
import dask.array as da
# Create a dask array from a NumPy array
x = da.from_array([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2))
# Perform a computation on the dask array
y = (x + 1) * 2
# Submit the computation to the cluster for execution
future = client.submit(y.compute)
# Wait for the computation to complete and retrieve the result
result = future.result()
print(result)
Monitor your work in the Dask Dashboard. The URL is provided in the output when the Dask Cluster is started.
Recommendations
The Dask Cuda Scheduler and Workers run in separate pods. Therefore, when launching the CAI Session for installing requirements and running the code above, a GPU is not required. However, a higher than usual amount of memory is recommended for installing RAPIDS dependencies as these can take a few minutes to install in your environments.
This article was written in March 2025. Use the RAPIDS selector tool at this site to locate the right version of CUDA for your environment and update the ```install_requirements.py``` script accordingly.
When you deploy a CML Session with a GPU, a GPU node rather than a CPU node is deployed. These typically come with higher-than-usual memory resources. Therefore, feel free to request large amounts of memory when instantiating the Dask Cuda cluster object, especially if you are the only Workspace user.
The source code for the cmlextensions package can be found on Git Hub. For customizations, you're welcome to fork or directly load the source code in your environment.
If you have any issues reaching the Dask Dashboard, you can try running this code and reaching the site provided in the output: print("https://"+os.environ["CDSW_ENGINE_ID"]+"."+os.environ["CDSW_DOMAIN"])
Summary and Next Steps
In this article, you learned how to easily deploy a distributed GPU Dask Cuda Cluster on Kubernetes in Cloudera AI in just a few steps. For more information, blogs, and documentation please visit the following sites.
From Machine Learning to AI: Simplifying the Path to Enterprise Intelligence: This blog post discusses how Cloudera AI brings together tools like Cloudera AI Workbench and Cloudera AI Registry to operationalize AI at scale.
Cloudera AI overview: This documentation provides an overview of Cloudera AI's capabilities, including its support for Python, R, and Spark-on-Kubernetes, enabling scale-out data engineering and machine learning.
Using Cloudera AI Inference service: This article explains how Cloudera AI Inference service provides a production-grade environment for hosting predictive and generative AI, addressing challenges like high availability and scalability.
Cloudera AI on Cloud: Documentation: This documentation details Cloudera AI's cloud-native machine learning platform, unifying self-service data science and data engineering in a single, portable service.
Cloudera AI Inference Service: This page highlights Cloudera AI Inference Service's features, including one-click deployment, robust security, and unified support for all AI inference needs.
... View more
Labels:
03-16-2025
06:25 PM
Objective
This project provides a quickstart for submitting Spark applications compiled from Scala code in Cloudera Data Engineering. It contains useful information targeted towards Spark developers and users who are migrating to Cloudera Data Engineering, such as:
Context about Scala Jobs and the usage of Jars when running Spark Applications in CDE.
Useful information about the CDE CLI and how it can simplify your migration efforts to CDE.
Links to references and other recommended examples, such as submitting Spark App Code located in S3 and the CDE Spark Migration CLI Tool.
Requirements
To reproduce this example, you will need a CDE Service in Cloudera on Cloud or On Prem and a working installation of the CDE CLI.
The Scala job application has already been compiled into a Jar via sbt and is available in the `target/scala-2.12/cde-scala-example_2.12-0.1.jar` path.
Background Information on Scala, Spark and the Jar File Usage
A JAR (Java ARchive) file is essentially a compressed file that contains compiled Java or Scala code, along with any dependencies or libraries that the code relies on. In the context of Spark, these JARs are used to package and distribute Spark applications.
To develop a Spark application using Scala, the process typically involves writing the application code in Scala in an IDE, compiling the code into bytecode, and packaging it into a JAR file. This JAR file contains all the necessary components for Spark to execute the application on a distributed cluster.
The JAR is then submitted to a Spark cluster using the `spark-submit` command, which handles the distribution of tasks across the cluster, allowing for parallel processing of data.
When submitting a Spark application, the JAR file is provided as part of the job configuration, and Spark automatically loads the necessary code and libraries from the JAR to run the application. This makes Scala JARs an essential component for deploying and executing Spark applications, especially in distributed environments, where scalability and fault tolerance are key requirements.
Steps to Reproduce
The process of submitting a Scala application as a Jar to a CDE Virtual Cluster is very similar to that of a Spark Cluster. In CDE, you can use the CDE CLI or UI, which offers syntax that is nearly identical to that of a spark-submit command, along with enhanced options to manage, monitor, schedule jobs, resources inside the CDE Virtual Cluster, and more.
Using the CDE CLI, run the following commands:
Spark Submit with App Code in Jar file
You can run the spark submit directly with the CDE CLI:
cde spark submit \
--executor-cores 2 \
--executor-memory "4g" \
<path/to/jar-file.jar>
For example:
% cde spark submit --executor-cores 2 --executor-memory "4g" target/scala-2.12/cde-scala-example_2.12-0.1.jar
3.9KB/3.9KB 100% [==============================================] cde-scala-example_2.12-0.1.jar
Job run 12 submitted
Waiting for job run 12 to start...
+ echo driver --proxy-user pauldefusco --properties-file /opt/spark/conf/spark.properties --class CdeScalaExample spark-internal
driver --proxy-user pauldefusco --properties-file /opt/spark/conf/spark.properties --class CdeScalaExample spark-internal
+ [[ true == \t\r\u\e ]]
+ POSITIONAL=()
+ [[ 8 -gt 0 ]]
+ key=driver
+ case $key in
[...]
25-03-17 00:03:35 INFO S3ADelegationTokens: Creating Delegation Token: duration 0:00.000s
25-03-17 00:03:35 INFO SharedState: Warehouse path is 's3a://pdf-3425-buk-c59557bd/data/warehouse/tablespace/external/hive'.
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /SQL: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7d569c10{/SQL,null,AVAILABLE,@Spark}
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /SQL/json: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6da53709{/SQL/json,null,AVAILABLE,@Spark}
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /SQL/execution: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2b55ac77{/SQL/execution,null,AVAILABLE,@Spark}
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /SQL/execution/json: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@12ec5e73{/SQL/execution/json,null,AVAILABLE,@Spark}
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /static/sql: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@42130a5c{/static/sql,null,AVAILABLE,@Spark}
25-03-17 00:03:37 INFO CodeGenerator: Code generated in 163.50901 ms
25-03-17 00:03:37 INFO CodeGenerator: Code generated in 15.652979 ms
+-------+-------------+
|Capital| State|
+-------+-------------+
|Olympia| Washington|
| Boise| Idaho|
| Helena| Montana|
|Lincoln| Nebraska|
|Concord|New Hampshire|
| Albany| New York|
+-------+-------------+
For more information on the CDE Spark-Submit, more in-depth examples of the CDE CLI and migrating spark-submits to CDE via the Spark Migration CLI tool, visit the articles and references listed in the Summary and Next Steps section.
CDE Spark Job with App Code in Jar File
You can also create a CDE Resource to host the Jar dependency inside the CDE Virtual Cluster, and then create the CDE Job of type Spark.
This is particularly advantageous when you have to manage complex spark submits as it allows you to create a reusable job definitions which are automatically tracked across runs.
Job definitions are essentially collections of all spark-submit options, configurations, and dependencies.
Create the CDE Files Resource:
cde resource create \
--type files \
--name my-jars
cde resource upload \
--name my-jars \
--local-path target/scala-2.12/cde-scala-example_2.12-0.1.jar
And finally create and run the Job. Notice that executor resource options are the same as those in the spark-submit. You can assume that all spark-submit options, aside from a few exceptions, are compatible with the CDE CLI and the CDE Job definition specifically.
cde job create \
--name cde-scala-job \
--type spark \
--executor-cores 2 \
--executor-memory "4g" \
--mount-1-resource my-jars \
--application-file cde-scala-example_2.12-0.1.jar
cde job run \
--name cde-scala-job
cde job run \
--name cde-scala-job \
--executor-memory "2g"
Notice the `application file` option is used to designate the jar file containing application code. The file is located in the `my-jars` files resource we created earlier, which is referenced via the `mount-1-resource` option. As a way to manage complex spark-submits, you can mount more than one file resource when creating or executing your jobs.
Also notice the `executor memory` option was set to `4g` during job creation and then overridden to `2g` during execution. Another advantage of CDE Jobs vs Spark-Submits is the ability to override properties on a dynamic basis. In this case, you first created a template for the job such that, unless specified otherwise, all executions default to 2 GB executor memory - and ran the job. Then, you updated the default setting and ran for a second time.
For documentation and more in-depth examples of the CDE CLI and migrating spark-submits to CDE via the Spark Migration CLI tool, visit the articles and references listed in the Summary and Next Steps section.
Monitor CDE Files Resources and Job Executions
As mentioned above, among many things, the CDE CLI provides commands to monitor your jobs and resources. Run the following commands to monitor this use case.
Search for all jobs which use the Jar file as application code:
cde job list \
--filter 'spark.file[eq]<your-jar-file-name>'
For example:
% cde job list --filter 'spark.file[eq]cde-scala-example_2.12-0.1.jar'
[
{
"name": "cde-scala-job",
"type": "spark",
"created": "2025-03-17T00:49:05Z",
"modified": "2025-03-17T00:49:05Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "my-jars"
}
],
"spark": {
"file": "cde-scala-example_2.12-0.1.jar",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 2
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
}
]
You could also search for all job runs where `executor-memory` was set to `4g`:
% cde job list --filter 'spark.executorMemory[eq]4g'
{
"name": "cde-scala-job",
"type": "spark",
"created": "2025-03-17T00:49:05Z",
"modified": "2025-03-17T00:49:05Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "my-jars"
}
],
"spark": {
"file": "cde-scala-example_2.12-0.1.jar",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 2
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
}
]
For more information on this specific topic, visit the `Monitoring CDE with the CDE CLI` article referenced in the Summary and Next Steps section.
Summary and Next Steps
Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on.
In this article, you learned how to use the CLI to submit Spark applications compiled as Jar files. If you are using the CDE CLI you might also find the following articles and demos interesting:
Installing the CDE CLI
Simple Introduction to the CDE CLI
CDE CLI Demo
CDE Concepts
CDE CLI Command Reference
CDE CLI Spark Flag Reference
CDE CLI Airflow Flag Reference
CDE CLI list command syntax reference
CDE Jobs API Reference
CDE Spark Migration Tool
CDE Spark Migration Tool Example
Spark Scala Version Compatibility Matrix
Monitoring CDE with the CDE CLI
Creating a Spark Application with Code Located in S3
... View more
03-14-2025
01:24 PM
Objective In this brief example you will learn how to use the CDE CLI to create a CDE Spark Job with PySpark and Scala app code located in an S3 bucket. Requirements In order to reproduce these examples in your CDE Virtual Cluster you will need: A Spark application in the form of a PySpark script or Scala jar, located in an S3 bucket. A working installation of the CDE CLI. A CDE 1.23 Service and a Spark 3 Virtual Cluster running in Cloudera on Cloud or Cloudera on Prem. Using Dependencies located in S3 with the Spark-Submit CDE provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, etc. Apache Spark Spark-Submit allows you to run a Spark job with application code located in an S3 bucket. The CDE CLI also provides this functionality. For example, in PySpark: spark-submit \
--master yarn \
--deploy-mode cluster \
--py-files s3://your-bucket/path/to/dependency_one.zip, s3://your-bucket/path/to/dependency_two.py \
--jars s3://your-bucket/path/to/dependency_one.jar,s3://your-bucket/path/to/dependency_two.jar \
s3://your-bucket/path/to/pyspark_app.py \
--arg1 value_one --arg2 value_two Or with a Jar compiled from Scala application code: spark-submit \
--master yarn \
--deploy-mode cluster \
--py-files s3://your-bucket/path/to/dependency_one.zip, s3://your-bucket/path/to/dependency_two.py \
--jars s3://your-bucket/path/to/dependency_one.jar,s3://your-bucket/path/to/dependency_two.jar \
s3://your-bucket/path/to/spark_app.jar \
--arg1 value_one --arg2 value_two Using Dependencies located in S3 with the CDE CLI You can accomplish the same with the CDE CLI, either by creating a CDE CLI Spark Submit or a CDE Job. When using the CDE Spark Submit the syntax is nearly identical to that of a Spark Submit. When creating the CDE Job the syntax is also similar but the application file flag is needed for specifying the app py or jar file. In both cases the Hadoop FileSystem API Spark configurations are needed. These are specified with the conf flag. Notice that the S3 bucket is the default bucket associated with the CDP Environment. CDE Spark Submit with PySpark application: cde spark submit \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
s3://default-cdp-bucket/data-eng-artifacts/cde_spark_jobs/simple-pyspark-sql.py CDE Job with PySpark application: cde job create \
--application-file s3://your-bucket/path/to/pyspark_app.py \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--py-files s3://your-bucket/path/to/dependency_one.zip, s3://your-bucket/path/to/dependency_two.py \
--jars s3://default-cdp-bucket/path/to/dependency_one.jar,s3://your-bucket/path/to/dependency_two.jar \
--arg1 value_one CDE Spark Submit with Scala application: cde spark submit \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
s3://your-bucket/path/to/spark_app.jar CDE Job with Scala application: cde job create \
--application-file s3://your-bucket/path/to/spark_app.jar \
--py-files s3://your-bucket/path/to/dependency_one.zip, s3://your-bucket/path/to/dependency_two.py \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--jars s3://default-cdp-bucket/path/to/dependency_one.jar,s3://your-bucket/path/to/dependency_two.jar \
--arg1 value_one For example, in the case of a sample PySpark application: CDE Spark Submit: cde spark submit \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
s3://pdf-3425-buk-c59557bd/data-eng-artifacts/cde_spark_jobs/simple-pyspark-sql.py CDE Job: cde job create \
--name my-cde-job-from-s3-pyspark \
--type spark \
--application-file s3://pdf-3425-buk-c59557bd/data-eng-artifacts/cde_spark_jobs/simple-pyspark-sql.py \
--conf spark.sql.shuffle.partitions=10 \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--executor-cores 2 \
--executor-memory 2g cde job run \
--name my-cde-job-from-s3-pyspark Or with a Scala Jar. CDE Spark Submit: cde spark submit \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
s3://pdf-3425-buk-c59557bd/data-eng-artifacts/cde_spark_jobs/cde-scala-example_2.12-0.1.jar cde job create \
--name my-cde-job-from-s3-scalajar \
--type spark \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--application-file s3://data-eng-artifacts/cde_spark_jobs/cde-scala-example_2.12-0.1.jar \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g cde job run \
--name my-cde-job-from-s3-scalajar Using CDE Files Resources As an alternative to hosting your application code and file dependencies in S3, you can leverage CDE Files Resources. Files Resources are arbitrary collections of files that a job can reference where application code and any necessary configuration files or supporting libraries can be stored. Files can be uploaded to and removed from a resource as needed. CDE Files Resources offer a few key advantages: They are located inside a CDE Virtual Cluster where they can be easily monitored. You can easily work with them through the CDE CLI and perform actions such as updating their content or adding them to CDE job definitions. Once the job has run, resources applied for a specific run are easily traceable in the UI or via the CLI and API. In other words, if you want to dynamically apply file dependencies across multiple runs, the CDE Files Resources can be swapped modularly and are shown in the Job Runs UI post-execution. You can create a CDE Files reosource with the CLI. cde resource create \
--name my-files-resource \
--type files You can upload files to the Resource: cde resource upload \
--name my-files-resource \
--local-path simple-pyspark-sql.py And finally you can mount the Files Resource when creating the CDE Job Definition: cde job create \
--type spark \
--name my-job-with-resource \
--mount-1-resource my-files-resource \
--application-file simple-pyspark-sql.py And finally run it with: cde job run \
--name my-job-with-resource \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g For more in-depth information on using CDE Resources please visit the following publications: Working with CDE Files Resources Using CDE Resources in CDE Sessions Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
03-04-2025
06:15 PM
CREATING A CDE REPOSITORY WITH A PRIVATE GIT REPOSITORY
OBJECTIVE
In this article, you will learn how to create a CDE Repository by cloning a private Git repository using CDE Credentials.
Cloudera Data Engineering (CDE) is a cloud-based service that helps businesses build, manage, and scale data pipelines. It is based on Apache Spark and integrates with other Cloudera services.
Git repositories allow teams to collaborate, manage project artifacts, and promote applications from lower to higher environments. CDE Repositories can be used to clone git repositories into a Virtual Cluster to make the imported files available to CDE Jobs as reusable dependencies. CDE currently supports Git providers such as GitHub, GitLab, and Bitbucket.
CDE Credentials allow you to store secrets such as usernames, passwords, and tokens, in secure, reusable objects associated with CDE Jobs, Custom Runtimes and Repositories.
REQUIREMENTS
This example was built with CDE 1.23. To reproduce the commands in your environment, you need a CDE Service, Virtual Cluster, Private Git repository, and a local installation of the CDE CLI.
CREATE PRIVATE GITHUB REPOSITORY
Log in to your GitHub account and create a private GitHub repository. Save the repository's URL, for example:
https://github.com/pdefusco/sample_private_repo.git
CREATE GITHUB TOKEN
In the Developer Settings, create a new Classic Token.
Keep the token handy.
CREATE CDE CREDENTIALS
Using the CDE CLI, create a Basic CDE Credential using Git username and token.
In the following command, assign an arbitrary credential name and apply your git username as the username parameter.
% cde credential create \
--name my-git-creds \
--type basic \
--username pdefusco
You will now be prompted to enter your password, twice. Paste the token you obtained earlier in the GitHub settings.
Enter password for username "pdefusco":
Retype password for username "pdefusco":
Validate credentials. Here is a sample output:
% cde credential list
[
{
"name": "my-git-creds",
"type": "basic",
"created": "2025-03-05T00:03:28Z",
"modified": "2025-03-05T00:03:28Z"
},
{
"name": "paul-git-creds",
"type": "basic",
"created": "2025-03-05T00:12:58Z",
"modified": "2025-03-05T00:12:58Z"
},
{
"name": "test-git-creds",
"type": "workload-credential",
"created": "2025-03-04T23:56:17Z",
"modified": "2025-03-04T23:56:17Z"
}
]
CREATE CDE REPOSITORY USING PRIVATE GIT CREDENTIALS
You will now use the CLI to create a CDE repository. This will be a clone of your private git repository.
Update the name, credential, url, and branch parameters. Then run this CLI command:
% cde repository create \
--name my-test-repo \
--credential my-git-creds \
--url https://github.com/pdefusco/sample_private_repo.git \
--branch main
Validate the repository via the CLI:
% cde repository list
[
{
"name": "my-code-repo",
"type": "git",
"status": "ready",
"signature": "a2a52f11546ef9d4039b4fd01f504bdd8b498c35",
"created": "2025-03-05T01:08:08Z",
"modified": "2025-03-05T01:08:08Z",
"retentionPolicy": "keep_indefinitely",
"git": {
"repository": "https://github.com/pdefusco/sample_private_repo.git",
"branch": "main",
"credential": "my-git-creds"
}
}
]
Validate the repository in the UI:
Summary
Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article, you learned how to use the CLI to create reusable CDE credentials for integrating with GitHub, and generated a CDE Repository to import application code from a private GitHub repository.
References
CDE Repositories Documentation
GitHub Documentation
CDE Documentation
How to Manage CDE Repositories in CDEPY
CDE CLI Reference
... View more
01-13-2025
03:48 PM
JupyterLab is a powerful, web-based interactive development environment widely used for data science, machine learning, and Spark application development. It extends the classic Jupyter Notebook interface, offering a more flexible and integrated workspace. For Spark application development, JupyterLab provides the following advantages: 1. Interactive Exploration: Run Spark queries and visualize results interactively, making it ideal for data exploration and debugging. 2. Rich Visualization: Seamlessly integrate with Python visualization libraries like Matplotlib, Seaborn, and Plotly to analyze and interpret Spark data. 3. Ease of Integration: Use PySpark or Sparkmagic to connect JupyterLab with Spark clusters, enabling distributed data processing from the notebook environment. Spark Connect is a feature introduced in Apache Spark that provides a standardized, client-server architecture for connecting to Spark clusters. It decouples the client from the Spark runtime, allowing users to interact with Spark through lightweight, language-specific clients without the need to run a full Spark environment on the client side. With Spark Connect, users can: 1. Access Spark Remotely: Connect to a Spark cluster from various environments, including local machines or web applications. 2. Support Multiple Languages: Use Spark with Python, Scala, Java, SQL, and other languages through dedicated APIs. 3. Simplify Development: Develop and test Spark applications without needing a full Spark installation, making it easier for developers and data scientists to work with distributed data processing. This architecture enhances usability, scalability, and flexibility, making Spark more accessible to a wider range of users and environments. In this article you will use JupyterLab locally to interactively prototype a PySpark and Iceberg application in a dedicated Spark Virtual Cluster running in Cloudera Data Engineering on AWS. Prerequisites A CDE Service and Virtual Cluster on version 1.23 or above, and 3.5.1, respectively. A local installation of the CDE CLI on version 1.23 or above. A local installation of JupyterLab. Version 4.0.7 was used for this demonstration but other versions should work as well. A local installation of Python. Version 3.9.12 was used for this demonstration but other versions will work as well. 1. Launch a CDE Spark Connect Session Create CDE Files Resources and upload csv files. cde resource create \
--name telcoFiles \
--type files
cde resource upload \
--name telcoFiles \
--local-path resources/cell_towers_1.csv \
--local-path resources/cell_towers_2.csv Start a CDE Session of type Spark Connect. Edit the Session Name parameter so it doesn't collide with other users' sessions. cde session create \
--name spark-connect-session-res \
--type spark-connect \
--num-executors 2 \
--driver-cores 2 \
--driver-memory "2g" \
--executor-cores 2 \
--executor-memory "2g" \
--mount-1-resource telcoFiles In the Sessions UI, validate the Session is Running. 2. Install Spark Connect Prerequisites From the terminal, install the following Spark Connect prerequisites: Download the cdeconnect and PySpark packages from the CDE Session Configuration tab and place them in your project home folder: Create a new Python Virtual Environment: python -m venv spark_connect_jupyter
source spark_connect_jupyter/bin/activate Install the following packages. Notice that these exact versions were used with Python 3.9. Numpy, cmake, and PyArrow versions may be subject to change depending on your Python version. pip install numpy==1.26.4
pip install --upgrade cmake
pip install pyarrow==14.0.0
pip install cdeconnect.tar.gz
pip install pyspark-3.5.1.tar.gz Launch the JupyterLab server with: pip install jupyterlab
jupyter lab 3. Run Your First PySpark & Iceberg Application via Spark Connect You are now ready to connect to the CDE Session from your local JupyterLab instance using Spark Connect. In the first cell, edit the sessionName option and add your session name from the CLI Create Session command above. In the second cell, edit your username. Now run each cell and observe outputs.
... View more
01-13-2025
03:30 PM
PyCharm is a popular integrated development environment (IDE) for Python, developed by JetBrains. It provides a comprehensive set of tools designed to boost productivity and simplify the coding process. When working with Apache Spark, PyCharm can be an excellent choice for managing Python-based Spark applications. Its features include: Code Editing and Debugging: PyCharm offers intelligent code completion, syntax highlighting, and robust debugging tools that simplify Spark application development. Virtual Environments and Dependency Management: PyCharm makes it easy to configure Python environments with Spark libraries and manage dependencies. Notebook Support: With built-in support for Jupyter Notebooks, PyCharm allows you to work interactively with data, making it easier to visualize and debug Spark pipelines. Version Control: PyCharm integrates with Git and other version control systems, simplifying collaboration and project management. Spark Connect is a feature introduced in Apache Spark that provides a standardized, client-server architecture for connecting to Spark clusters. It decouples the client from the Spark runtime, allowing users to interact with Spark through lightweight, language-specific clients without the need to run a full Spark environment on the client side. With Spark Connect, users can: Access Spark Remotely: Connect to a Spark cluster from various environments, including local machines or web applications. Support Multiple Languages: Use Spark with Python, Scala, Java, SQL, and other languages through dedicated APIs. Simplify Development: Develop and test Spark applications without needing a full Spark installation, making it easier for developers and data scientists to work with distributed data processing. This architecture enhances usability, scalability, and flexibility, making Spark more accessible to a wider range of users and environments. In this article you will learn how to use PyCharm locally to interactively prototype your code in a dedicated Spark Virtual Cluster running in Cloudera Data Engineering in AWS. Prerequisites A CDE Service and Virtual Cluster on version 1.23 or above, and 3.5.1, respectively. A local installation of the CDE CLI on version 1.23 or above. A local installation of PyCharm. Version 4.0.7 was used for this demonstration but other versions should work as well. A local installation of Python. Version 3.9.12 was used for this demonstration but other versions will work as well. 1. Launch a CDE Spark Connect Session Start a CDE Session of type Spark Connect. Edit the Session Name parameter so it doesn't collide with other users' sessions. cde session create \
--name pycharm-session \
--type spark-connect \
--num-executors 2 \
--driver-cores 2 \
--driver-memory "2g" \
--executor-cores 2 \
--executor-memory "2g" In the Sessions UI, validate the Session is Running. 2. Install Spark Connect Prerequisites From the terminal, install the following Spark Connect prerequisites: Create a new Project and Python Virtual Environment in PyCharm: In the terminal, install the following packages. Notice that these exact versions were used with Python 3.9. Numpy, cmake, and PyArrow versions may be subject to change depending on your Python version. pip install numpy==1.26.4
pip install --upgrade cmake
pip install pyarrow==14.0.0
pip install cdeconnect.tar.gz
pip install pyspark-3.5.1.tar.gz 3. Run Your First PySpark & Iceberg Application via Spark Connect You are now ready to connect to the CDE Session from your local IDE using Spark Connect. Open "prototype.py". Make the following changes: At line 46, edit the "sessionName" parameter with your Session Name from the above CLI command. At line 48, edit the "storageLocation" parameter with your S3 bucket prefix. At line 49, edit the "username" parameter with your assigned username. Now run "prototype.py" and observe outputs.
... View more
01-06-2025
06:01 PM
You can find the Docs for Python and API by going to User Settings -> API Keys From the Python CML APIv2 Docs, these are the two methods you need: Step 1: Create Job from __future__ import print_function
import time
import cmlapi
from cmlapi.rest import ApiException
from pprint import pprint
# create an instance of the API class
api_instance = cmlapi.CMLServiceApi()
body = cmlapi.CreateJobRequest() # CreateJobRequest |
project_id = 'project_id_example' # str | ID of the project containing the job.
try:
# Create a new job.
api_response = api_instance.create_job(body, project_id)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_job: %s\n" % e) Step 2: Run the Job from __future__ import print_function
import time
import cmlapi
from cmlapi.rest import ApiException
from pprint import pprint
# create an instance of the API class
api_instance = cmlapi.CMLServiceApi()
body = cmlapi.CreateJobRunRequest() # CreateJobRunRequest |
project_id = 'project_id_example' # str | ID of the project containing the job.
job_id = 'job_id_example' # str | The job ID to create a new job run for.
try:
# Create and start a new job run for a job.
api_response = api_instance.create_job_run(body, project_id, job_id)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_job_run: %s\n" % e) Using the "cmlapi.CreateJobRequest()" or "cmlapi.CreateJobRunRequest()" methods can be tricky. Here's an advanced example: https://github.com/pdefusco/SparkGen/blob/main/autogen/cml_orchestrator.py In particular: sparkgen_1_job_body = cmlapi.CreateJobRequest(
project_id = project_id,
name = "SPARKGEN_1_"+session_id,
script = "autogen/cml_sparkjob_1.py",
cpu = 4.0,
memory = 8.0,
runtime_identifier = "docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-workbench-python3.7-standard:2023.05.1-b4",
runtime_addon_identifiers = ["spark320-18-hf4"],
environment = {
"x":str(x),
"y":str(y),
"z":str(z),
"ROW_COUNT_car_installs":str(ROW_COUNT_car_installs),
"UNIQUE_VALS_car_installs":str(UNIQUE_VALS_car_installs),
"PARTITIONS_NUM_car_installs":str(PARTITIONS_NUM_car_installs),
"ROW_COUNT_car_sales":str(ROW_COUNT_car_sales),
"UNIQUE_VALS_car_sales":str(UNIQUE_VALS_car_sales),
"PARTITIONS_NUM_car_sales":str(PARTITIONS_NUM_car_sales),
"ROW_COUNT_customer_data":str(ROW_COUNT_customer_data),
"UNIQUE_VALS_customer_data":str(UNIQUE_VALS_customer_data),
"PARTITIONS_NUM_customer_data":str(PARTITIONS_NUM_customer_data),
"ROW_COUNT_factory_data":str(ROW_COUNT_factory_data),
"UNIQUE_VALS_factory_data":str(UNIQUE_VALS_factory_data),
"PARTITIONS_NUM_factory_data":str(PARTITIONS_NUM_factory_data),
"ROW_COUNT_geo_data":str(ROW_COUNT_geo_data),
"UNIQUE_VALS_geo_data":str(UNIQUE_VALS_geo_data),
"PARTITIONS_NUM_geo_data":str(PARTITIONS_NUM_geo_data)
}
)
sparkgen_1_job = client.create_job(sparkgen_1_job_body, project_id) And jobrun_body = cmlapi.CreateJobRunRequest(project_id, sparkgen_1_job.id)
job_run = client.create_job_run(jobrun_body, project_id, sparkgen_1_job.id) Hope this helps,
... View more
01-06-2025
02:12 PM
Spark GraphFrames is a package within Apache Spark that allows users to perform graph processing operations on data using a DataFrame-based approach. It will enable you to perform various graph operations like finding connected components, calculating shortest paths, identifying triangles, and more. Real world applications include social network analysis, recommendation systems, web link structures, flight connections, and other scenarios where relationships between data points are crucial.
Cloudera AI (CAI) is a cloud-native service within the Cloudera Data Platform (CDP) that enables enterprise data science teams to collaborate across the full data lifecycle. It provides immediate access to enterprise data pipelines, scalable compute resources, and preferred tools, streamlining the process of moving analytic workloads from research to production.
Using Spark GraphFrames in Cloudera AI requires minimum configuration effort. This quickstart provides a basic example so you can get started quickly.
Requirements
You can use Spark GraphFrames in CAI Workbench with Spark Runtime Addon versions 3.2 or above. When you create the SparkSession object, make sure to select the right version of the package reflecting the Spark Runtime Addon you set during the CAI Session launch from SparkPackages.
This example was created with the following platform and system versions, but it will also work in other Workbench versions (below or above 2.0.46).
CAI Workbench (a.k.a. "CML Workspace") on version 2.0.46.
Spark Runtime Addon version 3.5.
Steps to Reproduce the Quickstart
Create a CAI Workbench Session with the following settings:
Resource Profile of 2 vCPU, 8 GiB Mem, 0 GPU
Editor: PBJ Workbench, Python 3.10 Kernel, Standard Edition, 2024.10 Version.
In the Session, run the script. Notice the following:
Line 46: the SparkSession object is created. The packages option is used to download the lib. You have to change the argument with the package version compatible with your Spark Runtime Addon version, if not using Spark 3.5, as listed in SparkPackages.
spark = SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") \
.getOrCreate()
Line 71: A GraphFrame object is instantiated using the two PySpark Dataframes.
from graphframes import *
# Vertex DataFrame
v = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
], ["id", "name", "age"])
# Edge DataFrame
e = spark.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)
Lines 73 and below: You can now use GraphFrames methods to traverse and filter the graph based on relationships between data instances.
> g.vertices.show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| a| Alice| 34|
| b| Bob| 36|
| c|Charlie| 30|
| d| David| 29|
| e| Esther| 32|
| f| Fanny| 36|
| g| Gabby| 60|
+---+-------+---+
> g.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| b| friend|
| b| c| follow|
| c| b| follow|
| f| c| follow|
| e| f| follow|
| e| d| friend|
| d| a| friend|
| a| e| friend|
+---+---+------------+
#Find the youngest user’s age in the graph. This queries the vertex DataFrame.
> g.vertices.groupBy().min("age").show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
Summary & Next Steps
Cloudera AI (CAI) is a cloud-native service within the Cloudera Data Platform (CDP) that enables enterprise data science teams to collaborate across the full data lifecycle. It provides immediate access to enterprise data pipelines, scalable compute resources, and preferred tools, streamlining the process of moving analytic workloads from research to production. In particular:
A CAI Workbench Session allows you to directly access and analyze large datasets, run code (like Python or R), and build and train machine learning models, using the flexibility of Spark Runtime Add-ons to deploy Spark on Kubernetes clusters with minimal configurations, with your Spark version of choice.
The CAI Engineering team maintains and certifies Spark Runtime Add-ons so you don't have to worry about installing and configuring Spark on Kubernetes clusters when using CAI Workbench. You can check if your Spark version is supported in the latest compatibility matrix.
The "packages" option can be used at SparkSession creation to download 3rd party packages such as GraphFrames. For more information on this and other Spark Options, please refer to the Apache Spark Documentation.
GraphFrames, in particular, allows you to query your Spark Dataframes in terms of relationships, thus empowering use cases ranging from Social Network analysis to Recommendation Engines, and more. For more information, please refer to the GraphFrames documentation.
Finally, you can learn more about Cloudera AI Workbench with the following recommended blogs and community articles:
Cloudera AI - What You Should Know An insightful community article that provides an overview of CML's features and capabilities, helping teams deploy machine learning workspaces that auto-scale and auto-suspend to save costs.
Illustrating AI/ML Model Development in Cloudera AI A Medium tutorial that demonstrates how to create and deploy models using CML on the Cloudera Data Platform Private Cloud, offering practical insights into the model development process.
Cloudera Accelerators for ML Projects A catalog of Applied Machine Learning Prototypes (AMPs) that can be deployed with one click directly from CML, designed to jumpstart AI initiatives by providing tailored solutions for specific use cases.
Cloudera AI Overview Official documentation that offers a comprehensive overview of Cloudera AI, detailing its features, benefits, and how it facilitates collaborative machine learning at scale.
... View more
11-12-2024
02:07 PM
Followed up via DM
... View more