Member since
11-22-2019
36
Posts
10
Kudos Received
0
Solutions
03-16-2025
06:25 PM
Objective
This project provides a quickstart for submitting Spark applications compiled from Scala code in Cloudera Data Engineering. It contains useful information targeted towards Spark developers and users who are migrating to Cloudera Data Engineering, such as:
Context about Scala Jobs and the usage of Jars when running Spark Applications in CDE.
Useful information about the CDE CLI and how it can simplify your migration efforts to CDE.
Links to references and other recommended examples, such as submitting Spark App Code located in S3 and the CDE Spark Migration CLI Tool.
Requirements
To reproduce this example, you will need a CDE Service in Cloudera on Cloud or On Prem and a working installation of the CDE CLI.
The Scala job application has already been compiled into a Jar via sbt and is available in the `target/scala-2.12/cde-scala-example_2.12-0.1.jar` path.
Background Information on Scala, Spark and the Jar File Usage
A JAR (Java ARchive) file is essentially a compressed file that contains compiled Java or Scala code, along with any dependencies or libraries that the code relies on. In the context of Spark, these JARs are used to package and distribute Spark applications.
To develop a Spark application using Scala, the process typically involves writing the application code in Scala in an IDE, compiling the code into bytecode, and packaging it into a JAR file. This JAR file contains all the necessary components for Spark to execute the application on a distributed cluster.
The JAR is then submitted to a Spark cluster using the `spark-submit` command, which handles the distribution of tasks across the cluster, allowing for parallel processing of data.
When submitting a Spark application, the JAR file is provided as part of the job configuration, and Spark automatically loads the necessary code and libraries from the JAR to run the application. This makes Scala JARs an essential component for deploying and executing Spark applications, especially in distributed environments, where scalability and fault tolerance are key requirements.
Steps to Reproduce
The process of submitting a Scala application as a Jar to a CDE Virtual Cluster is very similar to that of a Spark Cluster. In CDE, you can use the CDE CLI or UI, which offers syntax that is nearly identical to that of a spark-submit command, along with enhanced options to manage, monitor, schedule jobs, resources inside the CDE Virtual Cluster, and more.
Using the CDE CLI, run the following commands:
Spark Submit with App Code in Jar file
You can run the spark submit directly with the CDE CLI:
cde spark submit \
--executor-cores 2 \
--executor-memory "4g" \
<path/to/jar-file.jar>
For example:
% cde spark submit --executor-cores 2 --executor-memory "4g" target/scala-2.12/cde-scala-example_2.12-0.1.jar
3.9KB/3.9KB 100% [==============================================] cde-scala-example_2.12-0.1.jar
Job run 12 submitted
Waiting for job run 12 to start...
+ echo driver --proxy-user pauldefusco --properties-file /opt/spark/conf/spark.properties --class CdeScalaExample spark-internal
driver --proxy-user pauldefusco --properties-file /opt/spark/conf/spark.properties --class CdeScalaExample spark-internal
+ [[ true == \t\r\u\e ]]
+ POSITIONAL=()
+ [[ 8 -gt 0 ]]
+ key=driver
+ case $key in
[...]
25-03-17 00:03:35 INFO S3ADelegationTokens: Creating Delegation Token: duration 0:00.000s
25-03-17 00:03:35 INFO SharedState: Warehouse path is 's3a://pdf-3425-buk-c59557bd/data/warehouse/tablespace/external/hive'.
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /SQL: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7d569c10{/SQL,null,AVAILABLE,@Spark}
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /SQL/json: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6da53709{/SQL/json,null,AVAILABLE,@Spark}
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /SQL/execution: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2b55ac77{/SQL/execution,null,AVAILABLE,@Spark}
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /SQL/execution/json: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@12ec5e73{/SQL/execution/json,null,AVAILABLE,@Spark}
25-03-17 00:03:35 INFO ServerInfo: Adding filter to /static/sql: com.cloudera.cde.security.authentication.server.JWTRedirectAuthenticationFilter
25-03-17 00:03:35 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@42130a5c{/static/sql,null,AVAILABLE,@Spark}
25-03-17 00:03:37 INFO CodeGenerator: Code generated in 163.50901 ms
25-03-17 00:03:37 INFO CodeGenerator: Code generated in 15.652979 ms
+-------+-------------+
|Capital| State|
+-------+-------------+
|Olympia| Washington|
| Boise| Idaho|
| Helena| Montana|
|Lincoln| Nebraska|
|Concord|New Hampshire|
| Albany| New York|
+-------+-------------+
For more information on the CDE Spark-Submit, more in-depth examples of the CDE CLI and migrating spark-submits to CDE via the Spark Migration CLI tool, visit the articles and references listed in the Summary and Next Steps section.
CDE Spark Job with App Code in Jar File
You can also create a CDE Resource to host the Jar dependency inside the CDE Virtual Cluster, and then create the CDE Job of type Spark.
This is particularly advantageous when you have to manage complex spark submits as it allows you to create a reusable job definitions which are automatically tracked across runs.
Job definitions are essentially collections of all spark-submit options, configurations, and dependencies.
Create the CDE Files Resource:
cde resource create \
--type files \
--name my-jars
cde resource upload \
--name my-jars \
--local-path target/scala-2.12/cde-scala-example_2.12-0.1.jar
And finally create and run the Job. Notice that executor resource options are the same as those in the spark-submit. You can assume that all spark-submit options, aside from a few exceptions, are compatible with the CDE CLI and the CDE Job definition specifically.
cde job create \
--name cde-scala-job \
--type spark \
--executor-cores 2 \
--executor-memory "4g" \
--mount-1-resource my-jars \
--application-file cde-scala-example_2.12-0.1.jar
cde job run \
--name cde-scala-job
cde job run \
--name cde-scala-job \
--executor-memory "2g"
Notice the `application file` option is used to designate the jar file containing application code. The file is located in the `my-jars` files resource we created earlier, which is referenced via the `mount-1-resource` option. As a way to manage complex spark-submits, you can mount more than one file resource when creating or executing your jobs.
Also notice the `executor memory` option was set to `4g` during job creation and then overridden to `2g` during execution. Another advantage of CDE Jobs vs Spark-Submits is the ability to override properties on a dynamic basis. In this case, you first created a template for the job such that, unless specified otherwise, all executions default to 2 GB executor memory - and ran the job. Then, you updated the default setting and ran for a second time.
For documentation and more in-depth examples of the CDE CLI and migrating spark-submits to CDE via the Spark Migration CLI tool, visit the articles and references listed in the Summary and Next Steps section.
Monitor CDE Files Resources and Job Executions
As mentioned above, among many things, the CDE CLI provides commands to monitor your jobs and resources. Run the following commands to monitor this use case.
Search for all jobs which use the Jar file as application code:
cde job list \
--filter 'spark.file[eq]<your-jar-file-name>'
For example:
% cde job list --filter 'spark.file[eq]cde-scala-example_2.12-0.1.jar'
[
{
"name": "cde-scala-job",
"type": "spark",
"created": "2025-03-17T00:49:05Z",
"modified": "2025-03-17T00:49:05Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "my-jars"
}
],
"spark": {
"file": "cde-scala-example_2.12-0.1.jar",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 2
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
}
]
You could also search for all job runs where `executor-memory` was set to `4g`:
% cde job list --filter 'spark.executorMemory[eq]4g'
{
"name": "cde-scala-job",
"type": "spark",
"created": "2025-03-17T00:49:05Z",
"modified": "2025-03-17T00:49:05Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "my-jars"
}
],
"spark": {
"file": "cde-scala-example_2.12-0.1.jar",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 2
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
}
]
For more information on this specific topic, visit the `Monitoring CDE with the CDE CLI` article referenced in the Summary and Next Steps section.
Summary and Next Steps
Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on.
In this article, you learned how to use the CLI to submit Spark applications compiled as Jar files. If you are using the CDE CLI you might also find the following articles and demos interesting:
Installing the CDE CLI
Simple Introduction to the CDE CLI
CDE CLI Demo
CDE Concepts
CDE CLI Command Reference
CDE CLI Spark Flag Reference
CDE CLI Airflow Flag Reference
CDE CLI list command syntax reference
CDE Jobs API Reference
CDE Spark Migration Tool
CDE Spark Migration Tool Example
Spark Scala Version Compatibility Matrix
Monitoring CDE with the CDE CLI
Creating a Spark Application with Code Located in S3
... View more
03-14-2025
01:24 PM
Objective In this brief example you will learn how to use the CDE CLI to create a CDE Spark Job with PySpark and Scala app code located in an S3 bucket. Requirements In order to reproduce these examples in your CDE Virtual Cluster you will need: A Spark application in the form of a PySpark script or Scala jar, located in an S3 bucket. A working installation of the CDE CLI. A CDE 1.23 Service and a Spark 3 Virtual Cluster running in Cloudera on Cloud or Cloudera on Prem. Using Dependencies located in S3 with the Spark-Submit CDE provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, etc. Apache Spark Spark-Submit allows you to run a Spark job with application code located in an S3 bucket. The CDE CLI also provides this functionality. For example, in PySpark: spark-submit \
--master yarn \
--deploy-mode cluster \
--py-files s3://your-bucket/path/to/dependency_one.zip, s3://your-bucket/path/to/dependency_two.py \
--jars s3://your-bucket/path/to/dependency_one.jar,s3://your-bucket/path/to/dependency_two.jar \
s3://your-bucket/path/to/pyspark_app.py \
--arg1 value_one --arg2 value_two Or with a Jar compiled from Scala application code: spark-submit \
--master yarn \
--deploy-mode cluster \
--py-files s3://your-bucket/path/to/dependency_one.zip, s3://your-bucket/path/to/dependency_two.py \
--jars s3://your-bucket/path/to/dependency_one.jar,s3://your-bucket/path/to/dependency_two.jar \
s3://your-bucket/path/to/spark_app.jar \
--arg1 value_one --arg2 value_two Using Dependencies located in S3 with the CDE CLI You can accomplish the same with the CDE CLI, either by creating a CDE CLI Spark Submit or a CDE Job. When using the CDE Spark Submit the syntax is nearly identical to that of a Spark Submit. When creating the CDE Job the syntax is also similar but the application file flag is needed for specifying the app py or jar file. In both cases the Hadoop FileSystem API Spark configurations are needed. These are specified with the conf flag. Notice that the S3 bucket is the default bucket associated with the CDP Environment. CDE Spark Submit with PySpark application: cde spark submit \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
s3://default-cdp-bucket/data-eng-artifacts/cde_spark_jobs/simple-pyspark-sql.py CDE Job with PySpark application: cde job create \
--application-file s3://your-bucket/path/to/pyspark_app.py \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--py-files s3://your-bucket/path/to/dependency_one.zip, s3://your-bucket/path/to/dependency_two.py \
--jars s3://default-cdp-bucket/path/to/dependency_one.jar,s3://your-bucket/path/to/dependency_two.jar \
--arg1 value_one CDE Spark Submit with Scala application: cde spark submit \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
s3://your-bucket/path/to/spark_app.jar CDE Job with Scala application: cde job create \
--application-file s3://your-bucket/path/to/spark_app.jar \
--py-files s3://your-bucket/path/to/dependency_one.zip, s3://your-bucket/path/to/dependency_two.py \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--jars s3://default-cdp-bucket/path/to/dependency_one.jar,s3://your-bucket/path/to/dependency_two.jar \
--arg1 value_one For example, in the case of a sample PySpark application: CDE Spark Submit: cde spark submit \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
s3://pdf-3425-buk-c59557bd/data-eng-artifacts/cde_spark_jobs/simple-pyspark-sql.py CDE Job: cde job create \
--name my-cde-job-from-s3-pyspark \
--type spark \
--application-file s3://pdf-3425-buk-c59557bd/data-eng-artifacts/cde_spark_jobs/simple-pyspark-sql.py \
--conf spark.sql.shuffle.partitions=10 \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--executor-cores 2 \
--executor-memory 2g cde job run \
--name my-cde-job-from-s3-pyspark Or with a Scala Jar. CDE Spark Submit: cde spark submit \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
s3://pdf-3425-buk-c59557bd/data-eng-artifacts/cde_spark_jobs/cde-scala-example_2.12-0.1.jar cde job create \
--name my-cde-job-from-s3-scalajar \
--type spark \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--application-file s3://data-eng-artifacts/cde_spark_jobs/cde-scala-example_2.12-0.1.jar \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g cde job run \
--name my-cde-job-from-s3-scalajar Using CDE Files Resources As an alternative to hosting your application code and file dependencies in S3, you can leverage CDE Files Resources. Files Resources are arbitrary collections of files that a job can reference where application code and any necessary configuration files or supporting libraries can be stored. Files can be uploaded to and removed from a resource as needed. CDE Files Resources offer a few key advantages: They are located inside a CDE Virtual Cluster where they can be easily monitored. You can easily work with them through the CDE CLI and perform actions such as updating their content or adding them to CDE job definitions. Once the job has run, resources applied for a specific run are easily traceable in the UI or via the CLI and API. In other words, if you want to dynamically apply file dependencies across multiple runs, the CDE Files Resources can be swapped modularly and are shown in the Job Runs UI post-execution. You can create a CDE Files reosource with the CLI. cde resource create \
--name my-files-resource \
--type files You can upload files to the Resource: cde resource upload \
--name my-files-resource \
--local-path simple-pyspark-sql.py And finally you can mount the Files Resource when creating the CDE Job Definition: cde job create \
--type spark \
--name my-job-with-resource \
--mount-1-resource my-files-resource \
--application-file simple-pyspark-sql.py And finally run it with: cde job run \
--name my-job-with-resource \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g For more in-depth information on using CDE Resources please visit the following publications: Working with CDE Files Resources Using CDE Resources in CDE Sessions Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
03-04-2025
06:15 PM
CREATING A CDE REPOSITORY WITH A PRIVATE GIT REPOSITORY
OBJECTIVE
In this article, you will learn how to create a CDE Repository by cloning a private Git repository using CDE Credentials.
Cloudera Data Engineering (CDE) is a cloud-based service that helps businesses build, manage, and scale data pipelines. It is based on Apache Spark and integrates with other Cloudera services.
Git repositories allow teams to collaborate, manage project artifacts, and promote applications from lower to higher environments. CDE Repositories can be used to clone git repositories into a Virtual Cluster to make the imported files available to CDE Jobs as reusable dependencies. CDE currently supports Git providers such as GitHub, GitLab, and Bitbucket.
CDE Credentials allow you to store secrets such as usernames, passwords, and tokens, in secure, reusable objects associated with CDE Jobs, Custom Runtimes and Repositories.
REQUIREMENTS
This example was built with CDE 1.23. To reproduce the commands in your environment, you need a CDE Service, Virtual Cluster, Private Git repository, and a local installation of the CDE CLI.
CREATE PRIVATE GITHUB REPOSITORY
Log in to your GitHub account and create a private GitHub repository. Save the repository's URL, for example:
https://github.com/pdefusco/sample_private_repo.git
CREATE GITHUB TOKEN
In the Developer Settings, create a new Classic Token.
Keep the token handy.
CREATE CDE CREDENTIALS
Using the CDE CLI, create a Basic CDE Credential using Git username and token.
In the following command, assign an arbitrary credential name and apply your git username as the username parameter.
% cde credential create \
--name my-git-creds \
--type basic \
--username pdefusco
You will now be prompted to enter your password, twice. Paste the token you obtained earlier in the GitHub settings.
Enter password for username "pdefusco":
Retype password for username "pdefusco":
Validate credentials. Here is a sample output:
% cde credential list
[
{
"name": "my-git-creds",
"type": "basic",
"created": "2025-03-05T00:03:28Z",
"modified": "2025-03-05T00:03:28Z"
},
{
"name": "paul-git-creds",
"type": "basic",
"created": "2025-03-05T00:12:58Z",
"modified": "2025-03-05T00:12:58Z"
},
{
"name": "test-git-creds",
"type": "workload-credential",
"created": "2025-03-04T23:56:17Z",
"modified": "2025-03-04T23:56:17Z"
}
]
CREATE CDE REPOSITORY USING PRIVATE GIT CREDENTIALS
You will now use the CLI to create a CDE repository. This will be a clone of your private git repository.
Update the name, credential, url, and branch parameters. Then run this CLI command:
% cde repository create \
--name my-test-repo \
--credential my-git-creds \
--url https://github.com/pdefusco/sample_private_repo.git \
--branch main
Validate the repository via the CLI:
% cde repository list
[
{
"name": "my-code-repo",
"type": "git",
"status": "ready",
"signature": "a2a52f11546ef9d4039b4fd01f504bdd8b498c35",
"created": "2025-03-05T01:08:08Z",
"modified": "2025-03-05T01:08:08Z",
"retentionPolicy": "keep_indefinitely",
"git": {
"repository": "https://github.com/pdefusco/sample_private_repo.git",
"branch": "main",
"credential": "my-git-creds"
}
}
]
Validate the repository in the UI:
Summary
Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article, you learned how to use the CLI to create reusable CDE credentials for integrating with GitHub, and generated a CDE Repository to import application code from a private GitHub repository.
References
CDE Repositories Documentation
GitHub Documentation
CDE Documentation
How to Manage CDE Repositories in CDEPY
CDE CLI Reference
... View more
01-13-2025
03:48 PM
JupyterLab is a powerful, web-based interactive development environment widely used for data science, machine learning, and Spark application development. It extends the classic Jupyter Notebook interface, offering a more flexible and integrated workspace. For Spark application development, JupyterLab provides the following advantages: 1. Interactive Exploration: Run Spark queries and visualize results interactively, making it ideal for data exploration and debugging. 2. Rich Visualization: Seamlessly integrate with Python visualization libraries like Matplotlib, Seaborn, and Plotly to analyze and interpret Spark data. 3. Ease of Integration: Use PySpark or Sparkmagic to connect JupyterLab with Spark clusters, enabling distributed data processing from the notebook environment. Spark Connect is a feature introduced in Apache Spark that provides a standardized, client-server architecture for connecting to Spark clusters. It decouples the client from the Spark runtime, allowing users to interact with Spark through lightweight, language-specific clients without the need to run a full Spark environment on the client side. With Spark Connect, users can: 1. Access Spark Remotely: Connect to a Spark cluster from various environments, including local machines or web applications. 2. Support Multiple Languages: Use Spark with Python, Scala, Java, SQL, and other languages through dedicated APIs. 3. Simplify Development: Develop and test Spark applications without needing a full Spark installation, making it easier for developers and data scientists to work with distributed data processing. This architecture enhances usability, scalability, and flexibility, making Spark more accessible to a wider range of users and environments. In this article you will use JupyterLab locally to interactively prototype a PySpark and Iceberg application in a dedicated Spark Virtual Cluster running in Cloudera Data Engineering on AWS. Prerequisites A CDE Service and Virtual Cluster on version 1.23 or above, and 3.5.1, respectively. A local installation of the CDE CLI on version 1.23 or above. A local installation of JupyterLab. Version 4.0.7 was used for this demonstration but other versions should work as well. A local installation of Python. Version 3.9.12 was used for this demonstration but other versions will work as well. 1. Launch a CDE Spark Connect Session Create CDE Files Resources and upload csv files. cde resource create \
--name telcoFiles \
--type files
cde resource upload \
--name telcoFiles \
--local-path resources/cell_towers_1.csv \
--local-path resources/cell_towers_2.csv Start a CDE Session of type Spark Connect. Edit the Session Name parameter so it doesn't collide with other users' sessions. cde session create \
--name spark-connect-session-res \
--type spark-connect \
--num-executors 2 \
--driver-cores 2 \
--driver-memory "2g" \
--executor-cores 2 \
--executor-memory "2g" \
--mount-1-resource telcoFiles In the Sessions UI, validate the Session is Running. 2. Install Spark Connect Prerequisites From the terminal, install the following Spark Connect prerequisites: Download the cdeconnect and PySpark packages from the CDE Session Configuration tab and place them in your project home folder: Create a new Python Virtual Environment: python -m venv spark_connect_jupyter
source spark_connect_jupyter/bin/activate Install the following packages. Notice that these exact versions were used with Python 3.9. Numpy, cmake, and PyArrow versions may be subject to change depending on your Python version. pip install numpy==1.26.4
pip install --upgrade cmake
pip install pyarrow==14.0.0
pip install cdeconnect.tar.gz
pip install pyspark-3.5.1.tar.gz Launch the JupyterLab server with: pip install jupyterlab
jupyter lab 3. Run Your First PySpark & Iceberg Application via Spark Connect You are now ready to connect to the CDE Session from your local JupyterLab instance using Spark Connect. In the first cell, edit the sessionName option and add your session name from the CLI Create Session command above. In the second cell, edit your username. Now run each cell and observe outputs.
... View more
01-13-2025
03:30 PM
PyCharm is a popular integrated development environment (IDE) for Python, developed by JetBrains. It provides a comprehensive set of tools designed to boost productivity and simplify the coding process. When working with Apache Spark, PyCharm can be an excellent choice for managing Python-based Spark applications. Its features include: Code Editing and Debugging: PyCharm offers intelligent code completion, syntax highlighting, and robust debugging tools that simplify Spark application development. Virtual Environments and Dependency Management: PyCharm makes it easy to configure Python environments with Spark libraries and manage dependencies. Notebook Support: With built-in support for Jupyter Notebooks, PyCharm allows you to work interactively with data, making it easier to visualize and debug Spark pipelines. Version Control: PyCharm integrates with Git and other version control systems, simplifying collaboration and project management. Spark Connect is a feature introduced in Apache Spark that provides a standardized, client-server architecture for connecting to Spark clusters. It decouples the client from the Spark runtime, allowing users to interact with Spark through lightweight, language-specific clients without the need to run a full Spark environment on the client side. With Spark Connect, users can: Access Spark Remotely: Connect to a Spark cluster from various environments, including local machines or web applications. Support Multiple Languages: Use Spark with Python, Scala, Java, SQL, and other languages through dedicated APIs. Simplify Development: Develop and test Spark applications without needing a full Spark installation, making it easier for developers and data scientists to work with distributed data processing. This architecture enhances usability, scalability, and flexibility, making Spark more accessible to a wider range of users and environments. In this article you will learn how to use PyCharm locally to interactively prototype your code in a dedicated Spark Virtual Cluster running in Cloudera Data Engineering in AWS. Prerequisites A CDE Service and Virtual Cluster on version 1.23 or above, and 3.5.1, respectively. A local installation of the CDE CLI on version 1.23 or above. A local installation of PyCharm. Version 4.0.7 was used for this demonstration but other versions should work as well. A local installation of Python. Version 3.9.12 was used for this demonstration but other versions will work as well. 1. Launch a CDE Spark Connect Session Start a CDE Session of type Spark Connect. Edit the Session Name parameter so it doesn't collide with other users' sessions. cde session create \
--name pycharm-session \
--type spark-connect \
--num-executors 2 \
--driver-cores 2 \
--driver-memory "2g" \
--executor-cores 2 \
--executor-memory "2g" In the Sessions UI, validate the Session is Running. 2. Install Spark Connect Prerequisites From the terminal, install the following Spark Connect prerequisites: Create a new Project and Python Virtual Environment in PyCharm: In the terminal, install the following packages. Notice that these exact versions were used with Python 3.9. Numpy, cmake, and PyArrow versions may be subject to change depending on your Python version. pip install numpy==1.26.4
pip install --upgrade cmake
pip install pyarrow==14.0.0
pip install cdeconnect.tar.gz
pip install pyspark-3.5.1.tar.gz 3. Run Your First PySpark & Iceberg Application via Spark Connect You are now ready to connect to the CDE Session from your local IDE using Spark Connect. Open "prototype.py". Make the following changes: At line 46, edit the "sessionName" parameter with your Session Name from the above CLI command. At line 48, edit the "storageLocation" parameter with your S3 bucket prefix. At line 49, edit the "username" parameter with your assigned username. Now run "prototype.py" and observe outputs.
... View more
01-06-2025
06:01 PM
You can find the Docs for Python and API by going to User Settings -> API Keys From the Python CML APIv2 Docs, these are the two methods you need: Step 1: Create Job from __future__ import print_function
import time
import cmlapi
from cmlapi.rest import ApiException
from pprint import pprint
# create an instance of the API class
api_instance = cmlapi.CMLServiceApi()
body = cmlapi.CreateJobRequest() # CreateJobRequest |
project_id = 'project_id_example' # str | ID of the project containing the job.
try:
# Create a new job.
api_response = api_instance.create_job(body, project_id)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_job: %s\n" % e) Step 2: Run the Job from __future__ import print_function
import time
import cmlapi
from cmlapi.rest import ApiException
from pprint import pprint
# create an instance of the API class
api_instance = cmlapi.CMLServiceApi()
body = cmlapi.CreateJobRunRequest() # CreateJobRunRequest |
project_id = 'project_id_example' # str | ID of the project containing the job.
job_id = 'job_id_example' # str | The job ID to create a new job run for.
try:
# Create and start a new job run for a job.
api_response = api_instance.create_job_run(body, project_id, job_id)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_job_run: %s\n" % e) Using the "cmlapi.CreateJobRequest()" or "cmlapi.CreateJobRunRequest()" methods can be tricky. Here's an advanced example: https://github.com/pdefusco/SparkGen/blob/main/autogen/cml_orchestrator.py In particular: sparkgen_1_job_body = cmlapi.CreateJobRequest(
project_id = project_id,
name = "SPARKGEN_1_"+session_id,
script = "autogen/cml_sparkjob_1.py",
cpu = 4.0,
memory = 8.0,
runtime_identifier = "docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-workbench-python3.7-standard:2023.05.1-b4",
runtime_addon_identifiers = ["spark320-18-hf4"],
environment = {
"x":str(x),
"y":str(y),
"z":str(z),
"ROW_COUNT_car_installs":str(ROW_COUNT_car_installs),
"UNIQUE_VALS_car_installs":str(UNIQUE_VALS_car_installs),
"PARTITIONS_NUM_car_installs":str(PARTITIONS_NUM_car_installs),
"ROW_COUNT_car_sales":str(ROW_COUNT_car_sales),
"UNIQUE_VALS_car_sales":str(UNIQUE_VALS_car_sales),
"PARTITIONS_NUM_car_sales":str(PARTITIONS_NUM_car_sales),
"ROW_COUNT_customer_data":str(ROW_COUNT_customer_data),
"UNIQUE_VALS_customer_data":str(UNIQUE_VALS_customer_data),
"PARTITIONS_NUM_customer_data":str(PARTITIONS_NUM_customer_data),
"ROW_COUNT_factory_data":str(ROW_COUNT_factory_data),
"UNIQUE_VALS_factory_data":str(UNIQUE_VALS_factory_data),
"PARTITIONS_NUM_factory_data":str(PARTITIONS_NUM_factory_data),
"ROW_COUNT_geo_data":str(ROW_COUNT_geo_data),
"UNIQUE_VALS_geo_data":str(UNIQUE_VALS_geo_data),
"PARTITIONS_NUM_geo_data":str(PARTITIONS_NUM_geo_data)
}
)
sparkgen_1_job = client.create_job(sparkgen_1_job_body, project_id) And jobrun_body = cmlapi.CreateJobRunRequest(project_id, sparkgen_1_job.id)
job_run = client.create_job_run(jobrun_body, project_id, sparkgen_1_job.id) Hope this helps,
... View more
01-06-2025
02:12 PM
Spark GraphFrames is a package within Apache Spark that allows users to perform graph processing operations on data using a DataFrame-based approach. It will enable you to perform various graph operations like finding connected components, calculating shortest paths, identifying triangles, and more. Real world applications include social network analysis, recommendation systems, web link structures, flight connections, and other scenarios where relationships between data points are crucial.
Cloudera AI (CAI) is a cloud-native service within the Cloudera Data Platform (CDP) that enables enterprise data science teams to collaborate across the full data lifecycle. It provides immediate access to enterprise data pipelines, scalable compute resources, and preferred tools, streamlining the process of moving analytic workloads from research to production.
Using Spark GraphFrames in Cloudera AI requires minimum configuration effort. This quickstart provides a basic example so you can get started quickly.
Requirements
You can use Spark GraphFrames in CAI Workbench with Spark Runtime Addon versions 3.2 or above. When you create the SparkSession object, make sure to select the right version of the package reflecting the Spark Runtime Addon you set during the CAI Session launch from SparkPackages.
This example was created with the following platform and system versions, but it will also work in other Workbench versions (below or above 2.0.46).
CAI Workbench (a.k.a. "CML Workspace") on version 2.0.46.
Spark Runtime Addon version 3.5.
Steps to Reproduce the Quickstart
Create a CAI Workbench Session with the following settings:
Resource Profile of 2 vCPU, 8 GiB Mem, 0 GPU
Editor: PBJ Workbench, Python 3.10 Kernel, Standard Edition, 2024.10 Version.
In the Session, run the script. Notice the following:
Line 46: the SparkSession object is created. The packages option is used to download the lib. You have to change the argument with the package version compatible with your Spark Runtime Addon version, if not using Spark 3.5, as listed in SparkPackages.
spark = SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") \
.getOrCreate()
Line 71: A GraphFrame object is instantiated using the two PySpark Dataframes.
from graphframes import *
# Vertex DataFrame
v = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
], ["id", "name", "age"])
# Edge DataFrame
e = spark.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)
Lines 73 and below: You can now use GraphFrames methods to traverse and filter the graph based on relationships between data instances.
> g.vertices.show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| a| Alice| 34|
| b| Bob| 36|
| c|Charlie| 30|
| d| David| 29|
| e| Esther| 32|
| f| Fanny| 36|
| g| Gabby| 60|
+---+-------+---+
> g.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| b| friend|
| b| c| follow|
| c| b| follow|
| f| c| follow|
| e| f| follow|
| e| d| friend|
| d| a| friend|
| a| e| friend|
+---+---+------------+
#Find the youngest user’s age in the graph. This queries the vertex DataFrame.
> g.vertices.groupBy().min("age").show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
Summary & Next Steps
Cloudera AI (CAI) is a cloud-native service within the Cloudera Data Platform (CDP) that enables enterprise data science teams to collaborate across the full data lifecycle. It provides immediate access to enterprise data pipelines, scalable compute resources, and preferred tools, streamlining the process of moving analytic workloads from research to production. In particular:
A CAI Workbench Session allows you to directly access and analyze large datasets, run code (like Python or R), and build and train machine learning models, using the flexibility of Spark Runtime Add-ons to deploy Spark on Kubernetes clusters with minimal configurations, with your Spark version of choice.
The CAI Engineering team maintains and certifies Spark Runtime Add-ons so you don't have to worry about installing and configuring Spark on Kubernetes clusters when using CAI Workbench. You can check if your Spark version is supported in the latest compatibility matrix.
The "packages" option can be used at SparkSession creation to download 3rd party packages such as GraphFrames. For more information on this and other Spark Options, please refer to the Apache Spark Documentation.
GraphFrames, in particular, allows you to query your Spark Dataframes in terms of relationships, thus empowering use cases ranging from Social Network analysis to Recommendation Engines, and more. For more information, please refer to the GraphFrames documentation.
Finally, you can learn more about Cloudera AI Workbench with the following recommended blogs and community articles:
Cloudera AI - What You Should Know An insightful community article that provides an overview of CML's features and capabilities, helping teams deploy machine learning workspaces that auto-scale and auto-suspend to save costs.
Illustrating AI/ML Model Development in Cloudera AI A Medium tutorial that demonstrates how to create and deploy models using CML on the Cloudera Data Platform Private Cloud, offering practical insights into the model development process.
Cloudera Accelerators for ML Projects A catalog of Applied Machine Learning Prototypes (AMPs) that can be deployed with one click directly from CML, designed to jumpstart AI initiatives by providing tailored solutions for specific use cases.
Cloudera AI Overview Official documentation that offers a comprehensive overview of Cloudera AI, detailing its features, benefits, and how it facilitates collaborative machine learning at scale.
... View more
11-12-2024
02:07 PM
Followed up via DM
... View more
10-08-2024
04:24 PM
Objective This article provides an introduction to the Iceberg using Spark SQL in Cloudera Data Engineering (CDE). CDE provides native Apache Iceberg Table Format support in its Spark Runtimes. This means you can create and interact with Iceberg Table format tables without any configurations. Abstract CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams. Apache Iceberg is an open table format format for huge analytic datasets. It provides features that, coupled with Spark as the compute engine, allows you to build data processing pipelines with dramatic gains in terms of scalability, performance, and overall developer productivity. Iceberg is natively supported by CDE. Any time a CDE Spark Job or Session is created, Iceberg dependencies are automatically set in the SparkSession without any need for configurations. As a CDP User, the CDE Data Engineer can thus create, read, modify, and interact with Iceberg tables as allowed by Ranger policies, whether these were created in Cloudera Data Warehouse (CDW), DataHub, or Cloudera AI (CML). In this tutorial you will create a CDE Session and interact with Apache Iceberg tables using PySpark. Requirements CDE Virtual Cluster of type "All-Purpose" running in CDE Service with version 1.22 or above, and Spark version 3.2 or above. An installation of the CDE CLI is recommended but optional. In the steps below you will create the CDE Session using the CLI, but you can alternatively launch one using the UI. Step by Step Guide Create CDE Files Resource cde resource create --name myFiles --type files
cde resource upload --name myFiles --local-path resources/cell_towers_1.csv --local-path resources/cell_towers_2.csv Launch CDE Session & Run Spark Commands cde session create --name icebergSessionCDE --type pyspark --mount-1-resource myFiles
cde session interact --name icebergSessionCDE Create Iceberg Tables from Files Resources In this code snippet two Iceberg tables are created from PySpark dataframes. The dataframes load CSV data from a CDE Files Resource specifying the ```/app/mount``` path. # PySpark commands:
df1 = spark.read.csv("/app/mount/cell_towers_1.csv", header=True, inferSchema=True)
df1.writeTo("CELL_TOWERS_LEFT").using("iceberg").tableProperty("write.format.default", "parquet").createOrReplace()
df2 = spark.read.csv("/app/mount/cell_towers_2.csv", header=True, inferSchema=True)
df2.writeTo("CELL_TOWERS_RIGHT").using("iceberg").tableProperty("write.format.default", "parquet").createOrReplace() Read Iceberg Tables using PySpark Next, use Spark SQL to access the data from the Iceberg tables: # Spark SQL Commands:
spark.sql("SELECT * FROM CELL_TOWERS_LEFT \
WHERE manufacturer == 'TelecomWorld' \
AND cell_tower_failure == 0").show()
# Expected Output:
+---+---------------+------------+------------------+----------+---------+------------+------------+------------+------------------+
| id| device_id|manufacturer| event_type| longitude| latitude|iot_signal_1|iot_signal_3|iot_signal_4|cell_tower_failure|
+---+---------------+------------+------------------+----------+---------+------------+------------+------------+------------------+
| 1|0x100000000001d|TelecomWorld| battery 10%| -83.04828|51.610226| 9| 52| 103| 0|
| 2|0x1000000000008|TelecomWorld| battery 10%| -83.60245|51.892113| 6| 54| 103| 0|
| 7|0x100000000000b|TelecomWorld| device error| -83.62492|51.891964| 5| 54| 102| 0|
| 12|0x1000000000020|TelecomWorld|system malfunction| -83.36766|51.873108| 8| 53| 106| 0|
| 13|0x1000000000017|TelecomWorld| battery 5%| -83.04949|51.906513| 4| 52| 105| 0|
| 24|0x1000000000026|TelecomWorld| device error| -83.15052|51.605473| 6| 55| 103| 0|
| 30|0x1000000000008|TelecomWorld| battery 10%| -83.44602| 51.60561| 2| 53| 106| 0|
| 35|0x1000000000002|TelecomWorld|system malfunction| -83.62555|51.827686| 2| 54| 102| 0|
| 37|0x100000000001d|TelecomWorld| battery 10%| -83.47665|51.670994| 3| 53| 105| 0|
| 41|0x1000000000017|TelecomWorld| device error| -82.89744| 51.92945| 4| 52| 100| 0|
+---+---------------+------------+------------------+----------+---------+------------+------------+------------+------------------+ Validate Iceberg Table Use the ```SHOW TBLPROPERTIES``` command to validate Iceberg Table format: # Spark SQL Command:
spark.sql("SHOW TBLPROPERTIES CELL_TOWERS_LEFT").show()
# Expected Output:
+--------------------+-------------------+
| key| value|
+--------------------+-------------------+
| current-snapshot-id|8073060523561382284|
| format| iceberg/parquet|
| format-version| 1|
|write.format.default| parquet|
+--------------------+-------------------+ As an alternative method to validate Iceberg Table format, investigate Iceberg Metadata with any of the following Spark SQL commands: # Query Iceberg History Table
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_LEFT.history").show()
+--------------------+-------------------+---------+-------------------+
| made_current_at| snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2024-10-08 20:30:...|8073060523561382284| null| true|
+--------------------+-------------------+---------+-------------------+
# Query Iceberg Partitions Table
+------------+----------+----------------------------+--------------------------+----------------------------+--------------------------+
|record_count|file_count|position_delete_record_count|position_delete_file_count|equality_delete_record_count|equality_delete_file_count|
+------------+----------+----------------------------+--------------------------+----------------------------+--------------------------+
| 1440| 1| 0| 0| 0| 0|
+------------+----------+----------------------------+--------------------------+----------------------------+--------------------------+
# Query Iceberg Snapshots Table
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_LEFT.snapshots").show()
+--------------------+-------------------+---------+---------+--------------------+--------------------+
| committed_at| snapshot_id|parent_id|operation| manifest_list| summary|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
|2024-10-08 20:30:...|8073060523561382284| null| append|s3a://paul-aug26-...|{spark.app.id -> ...|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
# Query Iceberg Refs Table
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_LEFT.refs").show()
+----+------+-------------------+-----------------------+---------------------+----------------------+
|name| type| snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+----+------+-------------------+-----------------------+---------------------+----------------------+
|main|BRANCH|8073060523561382284| null| null| null|
+----+------+-------------------+-----------------------+---------------------+----------------------+ Create Empty Iceberg Table using Spark SQL You can also use Spark SQL to create an Iceberg Table. Run a ```SHOW TABLE``` command on an existing table to investigate table format: # Spark SQL Command:
print(spark.sql("SHOW CREATE TABLE CELL_TOWERS_LEFT").collect()[0][0])
# Expected Output
CREATE TABLE spark_catalog.default.cell_towers_left (
`id` INT,
`device_id` STRING,
`manufacturer` STRING,
`event_type` STRING,
`longitude` DOUBLE,
`latitude` DOUBLE,
`iot_signal_1` INT,
`iot_signal_3` INT,
`iot_signal_4` INT,
`cell_tower_failure` INT)
USING iceberg
LOCATION 's3a://paul-aug26-buk-a3c2b50a/data/warehouse/tablespace/external/hive/CELL_TOWERS_LEFT'
TBLPROPERTIES(
'current-snapshot-id' = '8073060523561382284',
'format' = 'iceberg/parquet',
'format-version' = '1',
'write.format.default' = 'parquet') Next, create a new Iceberg table in the likes of this table. Notice the ```USING iceberg``` clause: # Spark SQL Command:
spark.sql("""
CREATE TABLE ICE_TARGET_TABLE (
`id` INT,
`device_id` STRING,
`manufacturer` STRING,
`event_type` STRING,
`longitude` DOUBLE,
`latitude` DOUBLE,
`iot_signal_1` INT,
`iot_signal_3` INT,
`iot_signal_4` INT,
`cell_tower_failure` INT)
USING iceberg;
""") This table is empty. Query Table Files to validate this: # Spark SQL Command:
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.ICE_TARGET_TABLE.files;").show()
# Expected Output:
+-------+---------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+----------------+
|content|file_path|file_format|spec_id|record_count|file_size_in_bytes|column_sizes|value_counts|null_value_counts|nan_value_counts|lower_bounds|upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|readable_metrics|
+-------+---------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+----------------+
+-------+---------+-----------+-------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+----------------+ Append Data Into Empty Iceberg Table Append data from a PySpark dataframe into an Iceberg table. Notice the use of the ```append()``` method. # PySPark command:
df2.writeTo("SPARK_CATALOG.DEFAULT.ICE_TARGET_TABLE").using("iceberg").tableProperty("write.format.default", "parquet").append() Query Iceberg Metadata in order to validate that the append operation completed successfully: # Spark SQL Command:
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.ICE_TARGET_TABLE.files;").show()
# Expected Output:
+-------+--------------------+-----------+-------+------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
|content| file_path|file_format|spec_id|record_count|file_size_in_bytes| column_sizes| value_counts| null_value_counts|nan_value_counts| lower_bounds| upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id| readable_metrics|
+-------+--------------------+-----------+-------+------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
| 0|s3a://paul-aug26-...| PARQUET| 0| 1440| 36103|{1 -> 5796, 2 -> ...|{1 -> 1440, 2 -> ...|{1 -> 0, 2 -> 0, ...|{5 -> 0, 6 -> 0}|{1 -> , 2 -> ...|{1 -> �, 2 -> ...| null| [4]| null| 0|{{286, 1440, 0, n...|
+-------+--------------------+-----------+-------+------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+ Create Iceberg Table from Hive Table There are a few options to convert Hive tables into Iceberg Tables. The easiest approach is an "inplace-migration" to Iceberg table format. Create a Hive Table using a PySpark dataframe: # PySpark Command:
df1.write.mode("overwrite").saveAsTable('HIVE_TO_ICEBERG_TABLE', format="parquet") Now migrate it to Iceberg table format: spark.sql("ALTER TABLE HIVE_TO_ICEBERG_TABLE UNSET TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL')")
spark.sql("CALL spark_catalog.system.migrate('HIVE_TO_ICEBERG_TABLE')") Validate Iceberg Table format: # Spark SQL Command:
spark.sql("SHOW TBLPROPERTIES HIVE_TO_ICEBERG_TABLE").show()
# Expected Output:
+--------------------+--------------------+
| key| value|
+--------------------+--------------------+
| bucketing_version| 2|
| current-snapshot-id| 1440783321004851162|
|external.table.purge| TRUE|
| format| iceberg/parquet|
| format-version| 1|
| migrated| true|
|numFilesErasureCoded| 0|
|schema.name-mappi...|[ {\n "field-id"...|
+--------------------+--------------------+ Summary Cloudera Data Engineering (CDE) and the broader Cloudera Data Platform (CDP) offer a powerful, scalable solution for building, deploying, and managing data workflows in hybrid and multi-cloud environments. CDE simplifies data engineering with serverless architecture, auto-scaling Spark clusters, and built-in Apache Iceberg support. Unlike competing offerings, each CDE release is certified against one or more Apache Iceberg versions. This ensures full compatibility between the Spark engine and the underlying Open Lakehouse capabilities, such as Apache Ranger for security policies. Whenever you launch a CDE Spark Job or Session, Iceberg dependencies are automatically configured as dictated by the chosen Spark version. With full native Iceberg support, you can leverage CDE Sessions to create or migrate to Iceberg Table format without any special configurations. Next Steps Here is a list of helpful articles and blogs related to Cloudera Data Engineering and Apache Iceberg: Cloudera on Public Cloud 5-Day Free Trial Experience Cloudera Data Engineering through common use cases that also introduce you to the platform’s fundamentals and key capabilities with predefined code samples and detailed step by step instructions. Try Cloudera on Public Cloud for free Cloudera Blog: Supercharge Your Data Lakehouse with Apache Iceberg Learn how Apache Iceberg integrates with Cloudera Data Platform (CDP) to enable scalable and performant data lakehouse solutions, covering features like in-place table evolution and time travel. Read more on Cloudera Blog Cloudera Docs: Using Apache Iceberg in Cloudera Data Engineering This documentation explains how Apache Iceberg is utilized in Cloudera Data Engineering to handle massive datasets, with detailed steps on managing tables and virtual clusters. Read more in Cloudera Documentation Cloudera Blog: Building an Open Data Lakehouse Using Apache Iceberg This article covers how to build and optimize a data lakehouse architecture using Apache Iceberg in CDP, along with advanced features like partition evolution and time travel queries. Read more on Cloudera Blog Compatibility for Cloudera Data Engineering and Runtime Components Learn about Cloudera Data Engineering (CDE) and compatibility for Runtime components across different versions. This document also includes component version compatibility information for AWS Graviton. Read more in the Cloudera Documentation
... View more
09-03-2024
04:45 PM
Objective
Cloudera Data Engineering (CDE) is a cloud-native service provided by Cloudera. It is designed to simplify and enhance the development, deployment, and management of data engineering workloads at scale. CDE is part of the Cloudera Data Platform (CDP), which is a comprehensive, enterprise-grade platform for managing and analyzing data across hybrid and multi-cloud environments.
Cloudera Data Engineering offers several advantages. With CDE, you can create a "CDE Spark-Submit" using the same syntax as your regular Spark-Submit. Alternatively, you can specify your Spark-Submit as a "CDE Job of type Spark" using a reusable Job Definition, which enhances observability, troubleshooting, and dependency management.
These unique capabilities of CDE are especially useful for Spark Data Engineers who develop and deploy Spark Pipelines at scale. This includes working with different Spark-Submit definitions and dynamic, complex dependencies across multiple clusters.
For example, when packaging a JAR for a Spark Submit, you can include various types of dependencies that your Spark application requires to run properly. These can consist of application code (compiled Scala/Java code), third-party libraries (external dependencies), configuration and resource files (for application configuration or runtime data), and custom JARs (any internal or utility libraries your application needs).
In this article, you will learn how to effectively manage JAR dependencies and simplify Cloudera Data Engineering in various scenarios.
Example 1: CDE Job with Scala Application Code in Spark Jar
Scala Spark applications are typically developed and deployed in the following manner:
Set Up Project in IDE: Use SBT to set up a Scala project in your IDE.
Write Code: Write your Scala application.
Compile & Package: Use the sbt package to compile and package your code into a JAR.
Submit to Spark: Use spark-submit to run your JAR on a Spark cluster.
In this example, you will build a CDE Spark Job with a Scala application that has already been compiled into a JAR. To learn how to complete these steps, please visit this tutorial.
cde resource create --name cde_scala_job_files
cde resource upload --name cde_scala_job_files --local-path jars/cdejobjar_2.12-1.0.jar
cde job create \
--name cde-scala-job \
--type spark \
--mount-1-resource cde_scala_job_files \
--application-file cdejobjar_2.12-1.0.jar \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g
cde job run --name cde-scala-job
You can add further JAR dependencies with the ```--jar``` or ```--jars``` options. In this case, you can add the Spark XML library from the same CDE Files Resource:
cde resource upload --name cde_scala_job_files --local-path jars/spark-xml_2.12-0.16.0.jar
cde job create \
--name cde-scala-job-jar-dependency \
--type spark \
--mount-1-resource cde_scala_job_files \
--application-file cdejobjar_2.12-1.0.jar \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--jar spark-xml_2.12-0.16.0.jar
cde job run --name cde-scala-job-jar-dependency
Notice that you could achieve the same by using two CDE file resources, each containing one of the JARs. You can create as many CDE file resources as needed for each JAR file.
In the foloowing example, you will be referencing the application code JAR located in the "cde_scala_job_files" CDE Files Resource that you previously created, as well as an additional JAR for the Spark-XML package from a new CDE Files Resource that you will create as "cde_spark_xml_jar".
Note the use of the new "--mount-N-prefix" option below. When you are using more than one CDE Resource with the same "CDE Job Create" command, you need to assign an alias to each Files Resource so that each command option can correctly reference the files.
cde resource create --name cde_spark_xml_jar
cde resource upload --name cde_spark_xml_jar --local-path jars/spark-xml_2.12-0.16.0.jar
cde job create \
--name cde-scala-job-multiple-jar-resources \
--type spark \
--mount-1-prefix scala_app_code \
--mount-1-resource cde_scala_job_files \
--mount-2-prefix spark_xml_jar \
--mount-2-resource cde_spark_xml_jar \
--application-file scala_app_code/cdejobjar_2.12-1.0.jar \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--jar spark_xml_jar/spark-xml_2.12-0.16.0.jar
cde job run --name cde-scala-job-multiple-jar-resources
Example 2: CDE Job with PySpark Application Code and Jar Dependency from Maven
For Maven dependencies, you can use the `--packages` option to automatically download and include dependencies. This is often more convenient than manually managing JAR files. In the following example, the `--packages` option replaces the `--jars` option.
In this example, you will reference the Spark-XML package from Maven so that you can use it to parse the sample "books.xml" file from the CDE Files Resource.
cde resource create --name spark_files --type files
cde resource upload --name spark_files --local-path read_xml.py --local-path books.xml
cde job create --name sparkxml \
--application-file read_xml.py \
--mount-1-resource spark_files \
--type spark \
--packages com.databricks:spark-xml_2.12:0.16.0
cde job run --name sparkxml
Like in the previous example, multiple CDE file resources can be used to manage PySpark Application code and the sample XML file. Notice that the application code in ```read_xml_multi_resource.py``` is different. At line 67, the ```sample_xml_file``` Files Resource is referenced directly in the application code with its alias ```xml_data```.
cde resource create --name sample_xml_file --type files
cde resource create --name pyspark_script --type files
cde resource upload --name pyspark_script --local-path read_xml_multi_resource.py
cde resource upload --name sample_xml_file --local-path books.xml
cde job create --name sparkxml-multi-deps \
--application-file code/read_xml_multi_resource.py \
--mount-1-prefix code \
--mount-1-resource pyspark_script \
--mount-2-prefix xml_data \
--mount-2-resource sample_xml_file \
--type spark \
--packages com.databricks:spark-xml_2.12:0.16.0
cde job run --name sparkxml-multi-deps
Example 3: CDE Job with PySpark Application Code and Jar Dependency from CDE Files Resource
Similar to example 1, you can reference JARs directly uploaded into CDE Files Resources instead of using Maven as in example 2.
The following commands pick up from example 2 but replace the ```packages``` option with the ```jars``` option.
Notice that the ```--jars``` option is used in the ```cde job run``` command rather than the ```cde job create```. The ```---jars``` option can either be set at CDE Job creation or runtime.
cde resource create --name spark_xml_jar --type files
cde resource upload --name spark_xml_jar --local-path jars/spark-xml_2.12-0.16.0.jar
cde job create --name sparkxml-multi-deps-jar-from-res \
--application-file code/read_xml_multi_resource.py \
--mount-1-prefix code \
--mount-1-resource pyspark_script \
--mount-2-prefix xml_data \
--mount-2-resource sample_xml_file \
--mount-3-prefix deps \
--mount-2-resource spark_xml_jar \
--type spark \
cde job run --name sparkxml-multi-deps-jar-from-res \
--jar deps/spark-xml_2.12-0.16.0.jar
Summary
In this article, the CDE CLI was used to simplify Spark JAR management with Cloudera Data Engineering.
You can utilize the CDE CLI to create CDE Job Definitions using Spark JAR dependencies and to create CDE file resources to store and reference one or multiple JARs.
Cloudera Data Engineering offers significant improvements in Spark Dependency Management compared to traditional Spark-Submits outside of CDE.
The Job Runs page in the CDE UI can be used to monitor JAR dependencies applied to each job execution. Cloudera Data Engineering presents substantial advancements in Spark Observability and Troubleshooting compared to traditional Spark-Submits outside of CDE.
References & Useful Articles
CDE Concepts
Using the CDE CLI
CDE CLI Command Reference
... View more