Member since
11-22-2019
27
Posts
10
Kudos Received
0
Solutions
12-13-2023
06:02 PM
CDE is the Cloudera Data Engineering Service, a containerized managed service for the Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow, and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications and less time on infrastructure.
CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs.
Cloudera Data Engineering (CDE) provides a command-line interface (CLI) client. The CDE CLI doesn't just allow you to create CDE Spark Jobs with syntax that is nearly identical to that of a Spark Submit.
The CDE CLI also offers you invaluable advantages as part of your daily Spark Pipeline development and management activities, including enhancing observability, reusability, and overall organization of Spark dependencies and configurations.
Simply put, if you're creating and managing many Spark Submits, the CDE CLI will save you time and dramatically increase your productivity.
In this article, you will learn about different options for parameterizing a CDE Spark Job i.e. to pass Spark Configurations, Options, Files and Arguments.
Requirements
The following are required in order to reproduce these commands in your CDE environment:
A CDE Service on version 1.19.0 or above.
A working installation of the CDE CLI. Please follow these instructions to install the CLI.
CDE Spark Jobs and Spark Configurations
The CDE Spark Job is a reusable definition of a Spark Application consisting of its code, and file, Docker, and Python dependencies, and Spark configurations and options.
Once a CDE Spark Job is created, its definition is accessible in the CDE UI or via the CDE CLI.
cde job create --name myJob\
--type Spark\
--application-file myScript.py\
--mount-1-resource myFilesResource
More importantly, it is not yet run. The Application is actually executed with a separate command.
cde job run --name myJob
Generally, parameterizing a CDE Spark Jobs allows you to dynamically set important configurations at runtime i.e. override default parameters based on some changing external input.
Before moving on, let us review the different types of parameters that can be set in a CDE Spark Job:
1. Spark Application File
This is the PySpark script or Jar containing the Application code. In CDE you must use the --application-file flag and you can only set this at CDE Spark Job creation time.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource cde job create --name myScalaJob
--type spark
--application-file myJar.jar
--mount-1-resource myFilesResource
The files can only be found by CDE if they are present in a Files resource. As shown above, the Files resource is then set with the --mount-N-resource flag. More on that in the end to end example section.
2. Spark Resources
These include basic job resources such as executor and driver memory and cores, initial number of executors, min and max executors for Spark Dynamic Allocation.
They can be set both at CDE Spark Job creation as well as runtime. Each of these has a default value which is chosen unless specified by your CLI command.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--executor-cores 4\
--executor-memory "2g" cde job run --name myPySparkJob
--executor-cores 4
--driver-cores 2
The above job will run with the following resource configurations:
--executor-cores 4 --executor-memory "2g" and --driver-cores 2
3. Spark Configurations
Spark offers a wide range of configurations ranging from Python version to memory options, join and dynamic allocation thresholds, and much more.
These can be set via
--conf flag e.g. --conf spark.executor.memoryOverhead=6g or --conf spark.pyspark.python=python3 or --conf spark.yarn.maxAppAttempts=1.
These can also be ovverriden at runtime.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--conf spark.executor.memoryOverhead=6g cde job run --name myPySparkJob\
--conf spark.executor.memoryOverhead=2g
In the above example, the "memoryOverhead" setting is overriden to "2g".
4. Command Line Arguments
These are specific variable inputs that lend themselves particularly well to being overridden at runtime.
They are defined with the --arg flag in the CDE Spark Job definition, and require being read in Spark Application code via the Python sys.argv module.
For example, a CDE Spark Job will include the --arg 10 argument from the CLI in order for the value of "10" to be utilized as part of the Spark Application code.
Example CDE CLI command to create the CDE Spark Job:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--conf spark.executor.memoryOverhead=6g\
--arg 10
Example CDE Spark Job application code rerefencing the argument:
import sys
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("CLIArguments")\
.getOrCreate()
print("JOB ARGUMENT")
print(sys.argv[1])
You can override CLI arguments at runtime. The following example sets the input to 5 rather than 10:
cde job run --name myPySparkJob --arg 5
5. Files
These can be set via the --file or --py-files options and allow you to specify file dependencies at CDE Spark Job creation.
More importantly, the funtionality provided by these flags is enhanced by CDE Files Resources and we using them instead is generally recommended.
CDE Files Resources are unique to CDE and allow you to pass and manage entire files as CDE Spark Job dependencies.
You can reference these files within Spark Application code via the /app/mount prefix.
Example:
cde job create --name penv-udf-test --type spark \
--mount-1-prefix appFiles/ \
--mount-1-resource cde-cli-files-resource \
--python-env-resource-name cde-cli-penv-resource \
--application-file /app/mount/appFiles/deps_udf_dependency.py \
--conf spark.executorEnv.PYTHONPATH=/app/mount/appFiles/deps_udf.zip \
--py-file /app/mount/appFiles/deps_udf.zip
Here, the "--py-file" flag is used to set in order to distribute dependencies that have been compressed and then loaded in a CDE Files Resource.
Notice that unlike in a Spark Submit, you cannot use --file to specify the application code and you must use the --application-file flag as shown in example 1.
CDE Files Resources are mounted on the Spark Application pod at CDE Job runtime. Thanks to this, you can also reference a file from within your Spark Application code by using the ConfigParser module and referencing the "/app/mount" directory.
Example properties file "parameters.conf" that has been loaded in the CDE Files Resource referenced by the CDE Spark Job:
[general]
property_1: s3a://go01-demo/
property_2: datalake/
Example PySpark application code referencing the "parameters.conf" file:
from pyspark.sql import SparkSession
import configparser
config = configparser.ConfigParser()
config.read('/app/mount/parameters.conf')
data_lake_name=config.get("general","property_1")
file_path=config.get("general","property_2")
spark = SparkSession.\
builder.\
appName('INGEST').\
config("spark.kubernetes.access.hadoopFileSystems", data_lake_name).getOrCreate()
cloudPath=data_lake_name+file_path
myDf = spark.read.csv(cloudPath + "/myCsv.csv", header=True, inferSchema=True)
myDf.show()
End to End CDE Spark Job Workflow Example
Now that you have gained exposure to the different options we will utilize these in the context of a simplified CDE Spark Job creation workflow that will show how to actually implement the above:
Create a CDE Files Resource in order to host all Application code and file dependencies.
Upload Application code and file dependencies to the CDE Files Resource in order to make those accessible to CDE Spark Jobs in the future.
Create the CDE Spark Job by referencing Application code, file dependencies and other Spark configurations in order to then:
Run the CDE Spark Job with either no additional configurations or by overriding configurations in order to execute your Spark Application.
1. Create a CDE Files Resource
cde resource create --name myProperties
2. Upload Application Code and File Dependencies
cde resource upload --name myProperties\
--local-path cde_jobs/propertiesFile_1.conf\
--local-path cde_jobs/propertiesFile_2.conf\
--local-path cde_jobs/sparkJob.py
3. Create CDE Spark Job
cde job create --name myPySparkJob\
--type spark\
--application-file sparkJob.py\
--mount-1-resource myProperties\
--executor-cores 2\
--executor-memory "2g"
4. Run the CDE Spark Job with Different Options
Example 1: Run the job with two CLI arguments and read properties file 1
cde job run --name myPySparkJob\
--arg MY_DB\
--arg CUSTOMER_TABLE\
--arg propertiesFile_1.conf
Example 2: Run the job with two CLI arguments and read properties file 2
cde job run --name myPySparkJob\
--arg MY_DB\
--arg SALES_TABLE\
--arg propertiesFile_2.conf
Application code in sparkJob.py:
import sys
from pyspark.sql import SparkSession
import configparser
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.getOrCreate()
print("JOB ARGUMENTS")
print(sys.argv)
print(sys.argv[0])
print(sys.argv[1])
print(sys.argv[2])
print(sys.argv[3])
dbname = sys.argv[1]
tablename = sys.argv[2]
config = configparser.ConfigParser()
config.read('/app/mount/{}.properties'.format(sys.argv[3]))
property_1=config.get("general","property_1")
property_2=config.get("general","property_2")
def myFunction(dbname, tablename, property_1, property_2):
print("DBNAME\n")
print(dbname)
print("TABLNAME\n")
print(tablename)
print("PROPERTY1\n")
print(property_1)
print("PROPERTY2\n")
print(property_2)
print("COMPLETE!\n")
# A list of Rows. Infer schema from the first row, create a DataFrame and print the schema
myFunction(dbname, tablename, property_1, property_2)
Summary and Next Steps
Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more.
In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting:
Installing the CDE CLI
Simple Introduction to the CDE CLI
CDE CLI Demo
CDE Concepts
CDE CLI Command Reference
CDE CLI Spark Flag Reference
CDE CLI Airflow Flag Reference
CDE CLI list command syntax reference
CDE Jobs API Reference
... View more
11-29-2023
04:52 PM
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository and run the following commands in the terminal with the CLI: You can easily list all jobs and job runs. % cde job list
% cde run list However, that is often impossible if you have a large number of jobs/runs in your Virtual Cluster. Therefore, using filters can be very important. Setup Prior to running the filtering commands you must set up some jobs and related dependencies. Run the following commands in bulk. To learn more about these please visit this Cloudera Community Article. % cde resource create --name myScripts \
--type files
% cde resource upload --name myScripts \
--local-path cde_jobs/spark_geospatial.py \
--local-path cde_jobs/utils.py
% cde resource describe --name myScripts
% cde resource create --name myData \
--type files
% cde resource upload-archive --name myData \
--local-path data/ne_50m_admin_0_countries_lakes.zip
cde credential create --name my-docker-creds \
--type docker-basic \
--docker-server hub.docker.com \
cde resource create --name dex-spark-runtime-sedona-geospatial \
--image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 \
--image-engine spark3 \
--type custom-runtime-image
cde job create --name geospatialRdd \
--type spark \
--mount-1-prefix code/ --mount-1-resource myScripts \
--mount-2-prefix data/ --mount-2-resource myData \
--runtime-image-resource-name dex-spark-runtime-sedona-geospatial \
--packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 \
--application-file code/spark_geospatial.py \
--arg myArg \
--max-executors 4 \
--min-executors 2 \
--executor-cores 2
cde job run --name geospatialRdd --executor-cores 4 Monitoring Examples Now the monitoring examples: Filter all jobs by name where name equals "geospatialRdd" % cde job list --filter 'name[eq]geospatialRdd'
[
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] You can nest filters. For example, filter all jobs where job application file equals "code/spark_geospatial.py": % cde job list --filter 'spark.file[eq]code/spark_geospatial.py'
[
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] You can use different operators. For example, search all jobs whose name contains "spark": % cde job list --filter 'name[rlike]spark'
[
{
"name": "sparkxml",
"type": "spark",
"created": "2023-11-23T07:11:41Z",
"modified": "2023-11-23T07:39:32Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "files"
}
],
"spark": {
"file": "read_xml.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1,
"conf": {
"dex.safariEnabled": "false",
"spark.jars.packages": "com.databricks:spark-xml_2.12:0.16.0",
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
}
] Search all jobs created on or after 11/23/23: % cde job list --filter 'created[gte]2023-11-23'
API User Password: [
{
"name": "asdfsdfsdfsdf",
"type": "airflow",
"created": "2023-11-23T06:56:45Z",
"modified": "2023-11-23T06:56:45Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "PipelineResource-asdfsdfsdfsdf-1700722602468"
}
],
"airflow": {
"dagID": "asdfsdfsdfsdf",
"dagFile": "dag.py"
},
"schedule": {
"enabled": false,
"user": "dschoberle",
"start": "Thu, 23 Nov 2023 06:56:44 GMT",
"catchup": true
}
},
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] Search all jobs with executorCores less than 2: % cde job list --filter 'spark.executorCores[lt]2'
API User Password: [
{
"name": "CDEPY_SPARK_JOB",
"type": "spark",
"created": "2023-11-14T23:02:48Z",
"modified": "2023-11-14T23:02:48Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "CDEPY_DEMO"
}
],
"spark": {
"file": "pysparksql.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 1,
"conf": {
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
},
{
"name": "CDEPY_SPARK_JOB_APAC",
"type": "spark",
"created": "2023-11-15T03:33:36Z",
"modified": "2023-11-15T03:33:36Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "CDEPY_DEMO_APAC"
}
],
"spark": {
"file": "pysparksql.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 1,
"conf": {
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
},
] List all runs for job "geospatialRdd": % cde run list --filter 'job[eq]geospatialRdd'
[
{
"id": 21815,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:32:02Z",
"ended": "2023-11-29T00:32:36Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-f542530da24f485da4993338dca81d3c",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-f542530da24f485da4993338dca81d3c/jobs/",
"spec": {
"file": "code/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 4,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
},
{
"id": 21825,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:48:29Z",
"ended": "2023-11-29T00:49:01Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-e5460856fb3a459ba7ee2c748c802d07",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-e5460856fb3a459ba7ee2c748c802d07/jobs/",
"spec": {
"file": "myScripts/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] You can combine multiple filters. Return all job runs from today (11/29/23) i.e. where the start date is greater than or equal to 11/29 and the end date is less than or equal to 11/30. Notice all times default to +00 UTC timezone. % cde run list --filter 'started[gte]2023-11-29' --filter 'ended[lte]2023-11-30'
[
{
"id": 21907,
"job": "ge_data_quality-pauldefusco-banking",
"type": "spark",
"status": "succeeded",
"user": "pauldefusco",
"started": "2023-11-29T02:56:44Z",
"ended": "2023-11-29T02:57:46Z",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "cde_demo_files-pauldefusco-banking"
}
],
"runtimeImageResourceName": "dex-spark-runtime-ge-data-quality-pauldefusco-banking",
"spark": {
"sparkAppID": "spark-8f9d7999056f4b53a01cc2afc5304cca",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-8f9d7999056f4b53a01cc2afc5304cca/jobs/",
"spec": {
"file": "ge_data_quality.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
},
{
"id": 21908,
"job": "data_quality_orchestration-pauldefusco-banking",
"type": "airflow",
"status": "running",
"user": "pauldefusco",
"started": "2023-11-29T03:00:01Z",
"ended": "0001-01-01T00:00:00Z",
"airflow": {
"dagID": "CDE_Demo_pauldefusco-banking",
"dagRunID": "scheduled__2023-11-29T02:55:00+00:00",
"dagFile": "airflow.py",
"executionDate": "2023-11-29T02:55:00Z"
}
},
{
"id": 21909,
"job": "batch_load-pauldefusco-banking",
"type": "spark",
"status": "running",
"user": "pauldefusco",
"started": "2023-11-29T03:00:14Z",
"ended": "0001-01-01T00:00:00Z",
"mounts": [
{
"dirPrefix": "jobCode/",
"resourceName": "cde_demo_files-pauldefusco-banking"
}
],
"runtimeImageResourceName": "dex-spark-runtime-ge-data-quality-pauldefusco-banking",
"spark": {
"sparkAppID": "spark-3d8a4704418841929d325af0e0190a20",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/livy-batch-14907-dyL7LLeM",
"spec": {
"file": "jobCode/batch_load.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] List all successful airflow jobs created by user pauldefusco that started after 3 am UTC on 11/29/23: % cde run list --filter 'type[eq]airflow' --filter 'status[eq]succeeded' --filter 'user[eq]pauldefusco' --filter 'started[gte]2023-11-29T03'
[
{
"id": 21908,
"job": "data_quality_orchestration-pauldefusco-banking",
"type": "airflow",
"status": "succeeded",
"user": "pauldefusco",
"started": "2023-11-29T03:00:01Z",
"ended": "2023-11-29T03:03:01Z",
"airflow": {
"dagID": "CDE_Demo_pauldefusco-banking",
"dagRunID": "scheduled__2023-11-29T02:55:00+00:00",
"dagFile": "airflow.py",
"executionDate": "2023-11-29T02:55:00Z"
}
}
] List all CDE Resources will return all types ("python-env", "files", "custom-runtime-image"): % cde resource list
[
{
"name": "BankingPyEnv",
"type": "python-env",
"status": "pending-build",
"created": "2023-11-07T21:27:16Z",
"modified": "2023-11-07T21:27:16Z",
"retentionPolicy": "keep_indefinitely",
"pythonEnv": {
"pythonVersion": "python3",
"type": "python-env"
}
},
{
"name": "CDEPY_DEMO_APAC",
"type": "files",
"status": "ready",
"signature": "5d216f3c4a10578ffadba415b13022d9e383bc22",
"created": "2023-11-15T03:33:36Z",
"modified": "2023-11-15T03:33:36Z",
"retentionPolicy": "keep_indefinitely"
},
{
"name": "dex-spark-runtime-sedona-geospatial",
"type": "custom-runtime-image",
"status": "ready",
"created": "2023-11-28T23:51:11Z",
"modified": "2023-11-28T23:51:11Z",
"retentionPolicy": "keep_indefinitely",
"customRuntimeImage": {
"engine": "spark3",
"image": "pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003"
}
}
] List all CDE Resources named "myScripts": % cde resource list --filter 'name[eq]myScripts'
[
{
"name": "myScripts",
"type": "files",
"status": "ready",
"signature": "17f820aacdad9bbd17a24d78a5b93cd0ec9e467b",
"created": "2023-11-28T23:31:31Z",
"modified": "2023-11-29T01:48:12Z",
"retentionPolicy": "keep_indefinitely"
}
] List all CDE Resources of type Python Environment: % cde resource list --filter 'type[eq]python-env'
[
{
"name": "BankingPyEnv",
"type": "python-env",
"status": "pending-build",
"created": "2023-11-07T21:27:16Z",
"modified": "2023-11-07T21:27:16Z",
"retentionPolicy": "keep_indefinitely",
"pythonEnv": {
"pythonVersion": "python3",
"type": "python-env"
}
}
] Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-29-2023
04:42 PM
2 Kudos
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. Apache Iceberg Apache Iceberg is a cloud-native, high-performance open table format for organizing petabyte-scale analytic datasets on a file system or object store. Combined with Cloudera Data Platform (CDP), users can build an open data lakehouse architecture for multi-function analytics and to deploy large scale end-to-end pipelines. Open data lakehouse on CDP simplifies advanced analytics on all data with a unified platform for structured and unstructured data and integrated data services to enable any analytics use case from ML, BI to stream analytics and real-time analytics. Apache Iceberg is the secret sauce of the open lakehouse. CDE Sessions A Cloudera Data Engineering (CDE) Session is an interactive short-lived development environment for running Spark commands to help you iterate upon and build your Spark workloads. You can use CDE Sessions in CDE Virtual Clusters of type "All Purpose - Tier 2". The following commands illustrate a basic Iceberg Time Travel Example. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository and run the following commands in your terminal. Create the Session: % cde session create --name interactiveSession \
--type pyspark \
--executor-cores 2 \
--executor-memory "2g"
{
"name": "interactiveSession",
"creator": "pauldefusco",
"created": "2023-11-28T22:00:47Z",
"type": "pyspark",
"lastStateUpdated": "2023-11-28T22:00:47Z",
"state": "starting",
"interactiveSpark": {
"id": 5,
"driverCores": 1,
"executorCores": 2,
"driverMemory": "1g",
"executorMemory": "2g",
"numExecutors": 1
}
} Show session metadata: % cde session describe --name interactiveSession
{
"name": "interactiveSession",
"creator": "pauldefusco",
"created": "2023-11-28T22:00:47Z",
"type": "pyspark",
"lastStateUpdated": "2023-11-28T22:01:16Z",
"state": "available",
"interactiveSpark": {
"id": 5,
"appId": "spark-3fe3bd8905a04eef8805e6b973ec4289",
"driverCores": 1,
"executorCores": 2,
"driverMemory": "1g",
"executorMemory": "2g",
"numExecutors": 1
}
} Interact via the PySpark Shell from your terminal (the session is running in CDE): % cde session interact --name interactiveSession
Starting REPL...
Waiting for the session to go into an available state...
Connected to Cloudera Data Engineering...
Press Ctrl+D (i.e. EOF) to exit
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\
/_/
Type in expressions to have them evaluated.
>>> Run some basic Spark SQL operations: from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
some_df = spark.createDataFrame(rows)
some_df.printSchema()
>>> from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
>>> rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
>>> some_df = spark.createDataFrame(rows)
>>> some_df.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
>>> Notice that we didn't need to create a Spark Session. The Spark Context is already running inside the CDE Session. Here are all the confs the session is running with. Notice that Iceberg dependencies have already been accounted for: >>> def printConfs(confs):
for ele1,ele2 in confs:
print("{:<14}{:<11}".format(ele1,ele2))
>>> printConfs(confs)
spark.eventLog.enabledtrue
spark.driver.hostinteractivesession-b7d65d8c1d6005a9-driver-svc.dex-app-58kqsms2.svc
spark.kubernetes.executor.annotation.created-bylivy
spark.kubernetes.memoryOverheadFactor0.1
spark.sql.catalog.spark_catalogorg.apache.iceberg.spark.SparkSessionCatalog
spark.kubernetes.container.imagecontainer.repository.cloudera.com/cloudera/dex/dex-livy-runtime-3.3.0-7.2.16.3:1.19.3-b29
spark.kubernetes.executor.label.nameexecutor
spark.kubernetes.driver.connectionTimeout60000
spark.hadoop.yarn.resourcemanager.principalpauldefusco
...
spark.yarn.isPythontrue
spark.kubernetes.submission.connectionTimeout60000
spark.kryo.registrationRequiredfalse
spark.sql.catalog.spark_catalog.typehive
spark.kubernetes.driver.pod.nameinteractivesession-b7d65d8c1d6005a9-driver Load a CSV file from Cloud Storage: >>> cloudPath = "s3a://go01-demo/datalake/pdefusco/cde119_workshop"
>>> car_installs = spark.read.csv(cloudPath + "/car_installs_119.csv", header=True, inferSchema=True)
>>> car_installs.show()
+-----+-----+----------------+--------------------+
| id|model| VIN| serial_no|
+-----+-----+----------------+--------------------+
|16413| D|433248UCGTTV245J|5600942CL3R015666...|
|16414| D|404328UCGTTV965J|204542CL4R0156661...|
|16415| B|647168UCGTTV8Z5J|6302942CL2R015666...|
|16416| B|454608UCGTTV7H5J|4853942CL1R015666...|
|16417| D|529408UCGTTV6R5J|2428342CL9R015666...|
|16418| B|362858UCGTTV7A5J|903142CL2R0156661...|
|16419| E|609158UCGTTV245J|3804142CL7R015666...|
|16420| D| 8478UCGTTV825J|6135442CL7R015666...|
|16421| B|539488UCGTTV4R5J|306642CL6R0156661...|
|16422| B|190928UCGTTV6A5J|5466242CL1R015666...|
|16423| B|316268UCGTTV4M5J|4244342CL5R015666...|
|16424| B|298898UCGTTV3Y5J|3865742CL4R015666...|
|16425| B| 28688UCGTTV9T5J|6328542CL5R015666...|
|16426| D|494858UCGTTV295J|463642CL5R0156661...|
|16427| D|503338UCGTTV5Y5J|4358642CL2R015666...|
|16428| D|167128UCGTTV2H5J|3809342CL1R015666...|
|16429| D|547178UCGTTV7M5J|2768042CL3R015666...|
|16430| B|503998UCGTTV4Q5J|2568142CL6R015666...|
|16431| D|433998UCGTTV9Y5J|6338642CL6R015666...|
|16432| B|378548UCGTTV7V5J|2648942CL1R015666...|
+-----+-----+----------------+--------------------+ Create a Hive Managed Table with Spark: >>> username = "pauldefusco"
>>> spark.sql("DROP DATABASE IF EXISTS MYDB_{} CASCADE".format(username))
>>> spark.sql("CREATE DATABASE IF NOT EXISTS MYDB_{}".format(username))
>>> car_installs.write.mode("overwrite").saveAsTable('MYDB_{0}.CAR_INSTALLS_{0}'.format(username), format="parquet") Migrate the table to Iceberg Table Format: spark.sql("ALTER TABLE MYDB_{0}.CAR_INSTALLS_{0} UNSET TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL')".format(username))
spark.sql("CALL spark_catalog.system.migrate('MYDB_{0}.CAR_INSTALLS_{0}')".format(username)) You can query Iceberg Metadata tables to track Iceberg Snapshots, History, Partitions, etc: >>> spark.read.format("iceberg").load("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.history".format(username)).show(20, False)
+-----------------------+-------------------+---------+-------------------+
|made_current_at |snapshot_id |parent_id|is_current_ancestor|
+-----------------------+-------------------+---------+-------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |true |
+-----------------------+-------------------+---------+-------------------+
>>> spark.read.format("iceberg").load("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.snapshots".format(username)).show(20, False)
+-----------------------+-------------------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at |snapshot_id |parent_id|operation|manifest_list |summary |
+-----------------------+-------------------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |append |s3a://go01-demo/warehouse/tablespace/external/hive/mydb_pauldefusco.db/car_installs_pauldefusco/metadata/snap-6191572403226489858-1-bf191e06-38cd-4d6e-9757-b8762c999177.avro|{added-data-files -> 2, added-records -> 82066, added-files-size -> 1825400, changed-partition-count -> 1, total-records -> 82066, total-files-size -> 1825400, total-data-files -> 2, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}|
+-----------------------+-------------------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Insert some data. Notice that Iceberg provides a PySpark API to create, append, and overwrite data in an Iceberg table from a Spark Dataframe. In this case we will append some data that we sample from the same table: # PRE-INSERT TIMESTAMP
>>> from datetime import datetime
>>> now = datetime.now()
>>> timestamp = datetime.timestamp(now)
>>> print("PRE-INSERT TIMESTAMP: ", timestamp)
PRE-INSERT TIMESTAMP: 1701302029.338524
# PRE-INSERT COUNT
>>> spark.sql("SELECT COUNT(*) FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).show()
+--------+
|count(1)|
+--------+
| 82066|
+--------+
>>> temp_df = spark.sql("SELECT * FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).sample(fraction=0.1, seed=3)
>>> temp_df.writeTo("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).append() Check the new count Post insert: # POST-INSERT COUNT
>>> spark.sql("SELECT COUNT(*) FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).show()
+--------+
|count(1)|
+--------+
| 90276|
+--------+ Notice that the table history and snapshots have been updated: >>> spark.sql("SELECT * FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.history".format(username)).show(20, False)
+-----------------------+-------------------+-------------------+-------------------+
|made_current_at |snapshot_id |parent_id |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |true |
|2023-11-30 00:00:15.263|1032812961485886468|6191572403226489858|true |
+-----------------------+-------------------+-------------------+-------------------+
>>> spark.sql("SELECT * FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.snapshots".format(username)).show(20, False)
+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at |snapshot_id |parent_id |operation|manifest_list |summary |
+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |append |s3a://go01-demo/warehouse/tablespace/external/hive/mydb_pauldefusco.db/car_installs_pauldefusco/metadata/snap-6191572403226489858-1-bf191e06-38cd-4d6e-9757-b8762c999177.avro|{added-data-files -> 2, added-records -> 82066, added-files-size -> 1825400, changed-partition-count -> 1, total-records -> 82066, total-files-size -> 1825400, total-data-files -> 2, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0} |
|2023-11-30 00:00:15.263|1032812961485886468|6191572403226489858|append |s3a://go01-demo/warehouse/tablespace/external/hive/mydb_pauldefusco.db/car_installs_pauldefusco/metadata/snap-1032812961485886468-1-142965b8-67ea-4b53-b76d-558ab5e74e1f.avro|{spark.app.id -> spark-93d1909a680948fea5303b55986704ac, added-data-files -> 1, added-records -> 8210, added-files-size -> 183954, changed-partition-count -> 1, total-records -> 90276, total-files-size -> 2009354, total-data-files -> 3, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}|
+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Time travel to pre-insert table state: # TIME TRAVEL AS OF PREVIOUS TIMESTAMP
>>> df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username))
# POST TIME TRAVEL COUNT
>>> print(df.count())
82066 Finally, drop the database: >>> spark.sql("DROP DATABASE IF EXISTS MYDB_{} CASCADE".format(username)) Exit the Spark Shell (Ctrl+D). List commands that were run in the session. Notice that this could be a lot, so the example below only includes a few initial commands. % cde session statements --name interactiveSession
+--------------------------------+------------------------------------------+
| CODE | OUTPUT |
+--------------------------------+------------------------------------------+
| print("hello Spark") | hello Spark |
+--------------------------------+------------------------------------------+
| from pyspark.sql.types import | |
| Row, StructField, StructType, | |
| StringType, IntegerType | |
+--------------------------------+------------------------------------------+
| rows = [Row(name="John", | |
| age=19), Row(name="Smith", | |
| age=23), Row(name="Sarah", | |
| age=18)] | |
+--------------------------------+------------------------------------------+
| some_df = | |
| spark.createDataFrame(rows) | |
+--------------------------------+------------------------------------------+
| some_df.printSchema() | root |-- name: string |
| | (nullable = true) |-- age: |
| | long (nullable = true) |
+--------------------------------+------------------------------------------+ List all sessions: % cde session list
+---------------------------+-----------+---------+-------------+----------------------+----------------------+-------------+
| NAME | STATE | TYPE | DESCRIPTION | CREATED | LAST UPDATED | CREATOR |
+---------------------------+-----------+---------+-------------+----------------------+----------------------+-------------+
| francetemp | killed | pyspark | | 2023-11-16T15:59:35Z | 2023-11-16T16:02:16Z | jmarchand |
| IcebergSession | available | pyspark | | 2023-11-29T21:24:27Z | 2023-11-29T21:56:56Z | pauldefusco |
| interactiveSession | killed | pyspark | | 2023-11-28T22:00:47Z | 2023-11-28T22:01:16Z | pauldefusco |
| interactiveSessionIceberg | available | pyspark | | 2023-11-29T23:17:58Z | 2023-11-29T23:56:06Z | pauldefusco |
| myNewSession | killed | pyspark | | 2023-11-28T21:58:38Z | 2023-11-28T21:59:06Z | pauldefusco |
| mySparkSession | killed | pyspark | | 2023-11-28T21:44:30Z | 2023-11-28T21:45:01Z | pauldefusco |
| TA-demo | killed | pyspark | | 2023-11-13T10:12:12Z | 2023-11-13T10:13:41Z | glivni |
+---------------------------+-----------+---------+-------------+----------------------+----------------------+-------------+ Kill session: % cde session kill --name interactiveSession Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-29-2023
04:34 PM
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. CDE Files Resources A resource in Cloudera Data Engineering (CDE) is a named collection of files used by a job. Resources can include application code, configuration files, custom Docker images, and Python virtual environment specifications. Resources are associated with virtual clusters. A resource can be used by multiple jobs, and jobs can use multiple resources.The resource types supported by CDE are files, python-env, and custom-runtime-image. In this article we will walk through some useful commands for operating with CDE Files Resources efficiently. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository in your local machine. Then run the following commands: Create a Files Resource: % cde resource create --name myScripts \
--type files Uplaod multiple files to the same Files Resource: % cde resource upload --name myScripts \
--local-path cde_jobs/spark_geospatial.py \
--local-path cde_jobs/utils.py
3.5KB/3.5KB 100% [==============================================] spark_geospatial.py
4.0KB/4.0KB 100% [==============================================] utils.py Describe Files resource: % cde resource describe --name myScripts
{
"name": "myScripts",
"type": "files",
"status": "ready",
"signature": "08531cbe538858eb20bda5ff1b7567ae4623d885",
"created": "2023-11-28T23:31:31Z",
"modified": "2023-11-28T23:33:32Z",
"retentionPolicy": "keep_indefinitely",
"files": [
{
"path": "spark_geospatial.py",
"signature": "ec91fb5bddfcd16a0bcbe344f229b5e326b759c5",
"sizeBytes": 3529,
"created": "2023-11-28T23:33:32Z",
"modified": "2023-11-28T23:33:32Z"
},
{
"path": "utils.py",
"signature": "aa5a8ea4b4f240183da8bd2d2b354eeaa58fd97a",
"sizeBytes": 3996,
"created": "2023-11-28T23:33:32Z",
"modified": "2023-11-28T23:33:32Z"
}
]
} Create a Files Resource for data: % cde resource create --name myData \
--type files Upload an archive file to the resource: % cde resource upload-archive --name myData \
--local-path data/ne_50m_admin_0_countries_lakes.zip
817.5KB/817.5KB 100% [==============================================] ne_50m_admin_0_countries_lakes.zip Describe resource metadata. Notice that the archive has been unarchived for you: % cde resource describe --name myData
{
"name": "myData",
"type": "files",
"status": "ready",
"signature": "d552dff8fb80a0c7067afa1c4227b29010cce67b",
"created": "2023-11-28T23:35:43Z",
"modified": "2023-11-28T23:36:56Z",
"retentionPolicy": "keep_indefinitely",
"files": [
{
"path": "ne_50m_admin_0_countries_lakes.cpg",
"signature": "663b90c899fa25a111067be0c22ffc64dcf581c2",
"sizeBytes": 5,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.dbf",
"signature": "eec48a122399782bbef02aa8108e99aeaf52e506",
"sizeBytes": 786828,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.prj",
"signature": "308d6355be935e0f7853161b1adda5bcd48188ff",
"sizeBytes": 143,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.README.html",
"signature": "4bec87fbbe5f4e0e18edb3d6a4f10e9e2a705581",
"sizeBytes": 38988,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.shp",
"signature": "57c38f48c5234db925a9fb1b31785250bd7c8d86",
"sizeBytes": 1652200,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.shx",
"signature": "983eba7b34cf94b0cfe8bda8b2b7d533bd233c49",
"sizeBytes": 2036,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.VERSION.txt",
"signature": "9e3d18e5216a7b4dfd402b00a25ee794842b481b",
"sizeBytes": 7,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
}
]
} Create CDE Credentials for accessing your Docker repository: cde credential create --name my-docker-creds \
--type docker-basic \
--docker-server hub.docker.com \
--docker-username pauldefusco Create a CDE Custom Docker Runtime Resource: cde resource create --name dex-spark-runtime-sedona-geospatial \
--image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 \
--image-engine spark3 \
--type custom-runtime-image Notice that the custom image has already been created. If you want to learn more about how to create Custom Docker Resources please visit this Cloudera Community Article Finally, create a job leveraging all three resources above: cde job create --name geospatialRdd \
--type spark \
--mount-1-prefix code/ --mount-1-resource myScripts \
--mount-2-prefix data/ --mount-2-resource myData \
--runtime-image-resource-name dex-spark-runtime-sedona-geospatial \
--packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 \
--application-file code/spark_geospatial.py \
--arg myArg \
--max-executors 4 \
--min-executors 2 \
--executor-cores 2 Notice the following: When using multiple Files resources you should prefix each i.e. "data" and "code". The "data" prefix is used at line 82 ("/app/mount/data") in the "spark_geospatial.py" script in order to access the data from the resource. The "code" prefix is usded in the same CLI command in the application file argument. If you are using any Spark packages you can set these directly at job creation. You can pass one or multiple arguments to the Python script via the --arg argument. The arguments are referenced in the script with the "sys.argv" syntax e.g. line 60 in "geospatial_rdd.py". Run the job: cde job run --name geospatialRdd --executor-cores 4 Notice that at runtime, you can override spark configs that were set at job creation. For example, the "--executor-cores" was originally set to 2 and is now overridden to 4. List job runs filtering by job name: % cde run list --filter 'job[eq]geospatialRdd'
[
{
"id": 21815,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:32:02Z",
"ended": "2023-11-29T00:32:36Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-f542530da24f485da4993338dca81d3c",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-f542530da24f485da4993338dca81d3c/jobs/",
"spec": {
"file": "code/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 4,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] Open Spark UI for respective run: % cde run ui --id 21815 You can delete single files from resources: % cde resource delete-file --name myScripts --resource-path utils.py You can delete the resource: % cde resource delete --name myScripts Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-22-2023
11:47 PM
Objective
This article provides a quickstart for the SparkXML package in Cloudera Data Engineering. You can run the following commands to parse XML files with Spark in Cloudera Data Engineering (CDE).
CDE is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
The CDE is fully integrated with the Cloudera Data Platform (CDP), enabling end-to-end visibility and security with SDX as well as seamless integrations with CDP services such as Data Warehouse and Machine Learning. Data Engineering on CDP powers consistent, repeatable, and automated data engineering workflows on a hybrid cloud platform anywhere
Spark-XML is a library for parsing and querying XML data with Apache Spark, for Spark SQL and DataFrames. Spark Data Engineers use the package in CDE to parse XML files at scale. In the rest of this tutorial, we will run a few commands to demonstrate basic functionality of this package in CDE.
The code is also provided in this git repository.
Requirements
The following are required to reproduce the Demo in your CDE Virtual Cluster:
CDE Service version 1.19 and above with Spark version 3.x (Spark 2 is not compatible with SparkXML).
A Working installation of the CDE CLI. Instructions to install the CLI are provided in Using the Cloudera Data Engineering command line interface.
A working installation of git in your local machine. Please clone this git repository and keep in mind all commands assume they are run in the project's main directory.
No code edits required but familiarity with Python, Spark and XML is recommended.
CDE CLI Steps
Create a CDE Files Resource cde resource create --name files --type files
Upload Files to the Resource cde resource upload --name files --local-path read_xml.py --local-path books.xml
Create a CDE Spark Job cde job create --name sparkxml --application-file read_xml.py --mount-1-resource files --type spark --packages com.databricks:spark-xml_2.12:0.16.0
Run the CDE Spark Job cde job run --name sparkxml
+-----+--------------------+--------------------+---------------+-----+------------+--------------------+
| _id| author| description| genre|price|publish_date| title|
+-----+--------------------+--------------------+---------------+-----+------------+--------------------+
|bk101|Gambardella, Matthew|\n\n\n An...| Computer|44.95| 2000-10-01|XML Developer's G...|
|bk102| Ralls, Kim|A former architec...| Fantasy| 5.95| 2000-12-16| Midnight Rain|
|bk103| Corets, Eva|After the collaps...| Fantasy| 5.95| 2000-11-17| Maeve Ascendant|
|bk104| Corets, Eva|In post-apocalyps...| Fantasy| 5.95| 2001-03-10| Oberon's Legacy|
|bk105| Corets, Eva|The two daughters...| Fantasy| 5.95| 2001-09-10| The Sundered Grail|
|bk106| Randall, Cynthia|When Carla meets ...| Romance| 4.95| 2000-09-02| Lover Birds|
|bk107| Thurman, Paula|A deep sea diver ...| Romance| 4.95| 2000-11-02| Splish Splash|
|bk108| Knorr, Stefan|An anthology of h...| Horror| 4.95| 2000-12-06| Creepy Crawlies|
|bk109| Kress, Peter|After an inadvert...|Science Fiction| 6.95| 2000-11-02| Paradox Lost|
|bk110| O'Brien, Tim|Microsoft's .NET ...| Computer|36.95| 2000-12-09|Microsoft .NET: T...|
|bk111| O'Brien, Tim|The Microsoft MSX...| Computer|36.95| 2000-12-01|MSXML3: A Compreh...|
|bk112| Galos, Mike|Microsoft Visual ...| Computer|49.95| 2001-04-16|Visual Studio 7: ...|
+-----+--------------------+--------------------+---------------+-----+------------+--------------------+
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- description: string (nullable = true)
|-- genre: string (nullable = true)
|-- price: double (nullable = true)
|-- publish_date: string (nullable = true)
|-- title: string (nullable = true)
References
Spark XML Package
Cloudera Data Engineering Documentation
... View more
11-18-2023
01:26 PM
Objective
This article contains basic instructions for creating and deploying a Custom Runtime with VS Code and Nvidia libraries to your Cloudera Machine Learning (CML) Workspace. With this runtime, you can increase your productivity when developing your deep learning use cases in CML Sessions. This example is also valid for Cloudera Data Science Workbench (CDSW) clusters.
If you are using CML or CDSW for deep learning, GenAI, or LLM use cases, please scroll down to the bottom of this page to check out more examples.
Using Custom Runtimes
Cloudera ML Runtimes are a set of Docker images created to enable machine learning development and host data applications in the Cloudera Data Platform (CDP) and the Cloudera Machine Learning (CML) service.
ML Runtimes provide a flexible, fully customizable, lightweight development and production machine learning environment for both CPU and GPU processing frameworks while enabling unfettered access to data, on-demand resources, and the ability to install and use any libraries/algorithms without IT assistance.
PBJ Runtimes
Powered by Jupyter (PBJ) Runtimes are the second generation of ML Runtimes. While the original ML Runtimes relied on a custom proprietary integration with CML, PBJ Runtimes rely on Jupyter protocols for ecosystem compatibility and openness.
Open Source
For data scientists who need to fully understand the environment they are working in, Cloudera provides the Dockerfiles and all dependencies in this git repository that enables the construction of the official Cloudera ML Runtime images.
The open sources PBJ Runtime Dockerfiles serve as a blueprint to create custom ML Runtimes so data scientists or partners can build ML Runtime images on their selected OS (base image), with the kernel of their choice, or just integrate their existing ML container images with Cloudera Machine Learning.
In this example we reuse the Dockerfiles provided this git repository to create a runtime with both VSCode and Nvidia libraries.
Requirements
In order to use this runtime you need:
A CML Workspace or CDSW Cluster. AWS and Azure Public Cloud, or OCP and ECS Private Cloud OK.
Workspace Admin rights and access to the Runtime Catalog.
Basic familiarity with Docker and a working installation of Docker on your local machine.
Steps to Reproduce
Clone the git repository
Clone this git repository to obtain all necessary files.
mkdir mydir
cd mydir
git clone https://github.com/pdefusco/Nvidia_VSCode_Runtime_CML.git
Explore Dockerfile
Open the Dockerfile and familiarize yourself with the code. Note the following:
To create a new image we are extending an existing CML image. This image has the CUDA libraries and is available here. FROM docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-pbj-workbench-python3.10-cuda:2023.08.2-b8
We then install the VS Code Editor on top as shown in the PBJ VS Code runtime available here. Creating a symlink is necessary in order to ensure the right editor is launched at runtime. # Install latest version. See https://github.com/coder/code-server/blob/main/install.sh for details
RUN curl -fsSL https://code-server.dev/install.sh | sh -s -- --version 4.16.1
# Create launch script and symlink to the default editor launcher
RUN printf "#!/bin/bash\n/usr/bin/code-server --auth=none --bind-addr=127.0.0.1:8090 --disable-telemetry" > /usr/local/bin/vscode
RUN chmod +x /usr/local/bin/vscode
RUN ln -s /usr/local/bin/vscode /usr/local/bin/ml-runtime-editor
The remaining lines are just environment variables that must be overridden in order to distinguish this build from others. In other words, we need to add some unique metadata for this image. Specifically you must update the values for ML_RUNTIME_SHORT_VERSION and ML_RUNTIME_MAINTENANCE_VERSION and make sure these are congruent with ML_RUNTIME_FULL_VERSION. Adding a unique description is also highly recommended. # Override Runtime label and environment variables metadata
ENV ML_RUNTIME_EDITOR="VsCode" \
ML_RUNTIME_EDITION="Community" \
ML_RUNTIME_SHORT_VERSION="2023.11" \
ML_RUNTIME_MAINTENANCE_VERSION="1" \
ML_RUNTIME_FULL_VERSION="2023.11.1" \
ML_RUNTIME_DESCRIPTION="This runtime includes VsCode editor v4.16.1 and is based on PBJ Workbench image with Python 3.10 and CUDA"
LABEL com.cloudera.ml.runtime.editor=$ML_RUNTIME_EDITOR \
com.cloudera.ml.runtime.edition=$ML_RUNTIME_EDITION \
com.cloudera.ml.runtime.full-version=$ML_RUNTIME_FULL_VERSION \
com.cloudera.ml.runtime.short-version=$ML_RUNTIME_SHORT_VERSION \
com.cloudera.ml.runtime.maintenance-version=$ML_RUNTIME_MAINTENANCE_VERSION \
com.cloudera.ml.runtime.description=$ML_RUNTIME_DESCRIPTION
Build Dockerfile and Push Image
Run the following command to build the docker image. Edit the following with your username and preferred image tag:
docker build . -t pauldefusco/vscode4_cuda11_cml_runtime:latest
Push the image to your preferred Docker repository. In this example we will use a personal repository but CML and CDSW can also be used with other enterprise solutions. If your CML Workspace resides in an Airgapped environment you can use your Podman Local Container Registry.
docker push pauldefusco/vscode4_cuda11_cml_runtime
Add the Runtime to your CML Workspace Runtime Catalog and CML Project
This step can only be completed by a CML user with Workspace Admin rights. If you don't have Admin rights please reach out to your CML or CDP Admin.
1. In your CML Workspace navigate to the Runtime Catalog tab (available on the left side of your screen). Then click on "Add Runtime" at the top right corner of your screen.
Add the runtime:
Navigate back to your CML Project, open Project Settings -> Runtime tab and add the runtime there.
Congratulations, you are now ready to use VS Code in a CML Session with Nvidia libraries!
Conclusions and Next Steps
Cloudera ML Runtimes are a set of Docker images created to enable machine learning development in a flexible, fully customizable, lightweight development and production machine learning environment for both CPU and GPU processing frameworks while enabling unfettered access to data, on-demand resources, and the ability to install and use any libraries/algorithms without IT assistance.
You can create custom CML Runtimes by extending the Cloudera Machine Learning base runtimes, PBJ and open runtimes available in this git repository as we have done in this example; or create your own from scratch as shown in this example.
If you are using CML for Deep Learning, GenAI and LLM use cases here are some more examples you may find interesting:
CML LLM Hands on Lab
LLM Demo in CML
How to Launch an Applied Machine Learning Prototype (AMP) in CML
AMP: Intelligent QA Chatbot with NiFi, Pinecone, and Llama2
AMP: Text Summarization and more with Amazon Bedrock
AMP: Fine-Tuning a Foundation Model for Multiple Tasks (with QLoRA)
AMP: LLM Chatbot Augmented with Enterprise Data
AMP: Semantic Image Search with Convolutional Neural Networks
AMP: Deep Learning for Anomaly Detection
AMP: Deep Learning for Question Answering
AMP: Automatic Text Summarization
Quickstarts with PyTorch, Tensorflow and MXNet in CML
Distributed PyTorch with Horovod and CML Workers in CML
Distributed Tensorflow with CML Workers in CML
An end to end example of PyTorch and MLFlow in CML
... View more
Labels:
11-16-2023
11:27 AM
The Cloudera Data Platform (CDP) is a hybrid data platform designed to deliver faster and easier data management, analytics and AI at enterprise scale. Cloudera Machine Learning (CML) is one of CDP’s Cloud Native Data Services designed to enable secure, governed Data Science. Data Scientists and Engineers use CML to securely analyze large amounts of data via interactive notebooks. Since 2022 CML enhances this capability with the Data Connections feature by providing boiler template code to access and push down SQL queries to the CDW service. This allows the user to run large scale queries directly in the Data Warehouse while accessing and visualizing results from a simple notebook with a small consumption footprint. In 2023 CML doubled down with CML Custom Data Connections. Custom Data Connections allow the CML user to create their own boiler template code so they or their collaborators can easily connect to 3rd party tools such as Snowflake, Postgres, legacy on-prem databases (Oracle, MSSQL, MySQL), serverless cloud databases (Redshift, Snowflake, SAP HANA Cloud, BigQuery), APIs, and specialized data stores (Neo4j). This article provides a tutorial on how to create a MySQL Custom Data Connection in your own CML Workspace. The code referenced is provided in this git repository. Data Connections Refresher You can use Data Connections from a CML Session to connect to a CDP environment (CDW or Spark) regardless of the Editor choice by following the below steps: Open your CML Session and click on the “Data” tab at the top right of your screen. Scroll down and select the CDW or Spark Data Lake connection of your choice. Copy and paste the code in your Editor. Run the sample SHOW DATABASES command and create your SQL statements. Learning from the Postgres Custom Data Connection The CML Engineering Team has created two custom data connection templates that you can reuse to connect to Snowflake and Postgres. In this section we will use the Postgres template to learn how to deploy an existing custom connection. Steps to Deploy the Postgres Custom Connection to your Project Create a CML Project by cloning the provided template with this URL: https://github.com/pdefusco/Using_CustomConn_CML.git Notice the mysqlconn and postgresconn folders are included in the project. Open the postgres folder and familiarize yourself with the code in pg-conn.py. This is the source code for the Custom Connection. The PostgresCustomImp class is a child of CustomConnection which is imported from cml.data_v1.customconnection. You must extend this class in order to create a custom connection. The get_base_connection method uses the psycopg2 module to establish a connection to a given Postgres source. The connection requires a hostname, a default port, a database name, a user and password. When implementing your own custom connection you can choose which connection parameters to include as needed. The get_pandas_dataframe method executes the provided SQL via the Pandas read_sql method. The get_cursor method returns a cursor object. This is optional as the SQL command is executed in get_pandas_dataframe. The parameters required to initialize a connection are set manually via the UI (see below). The override_parameters and check_params_or_env methods ensure that these are set correctly. While these methods are also optional, we recommend implementing them. Navigate to the Workspace Site Administration page and open the Data Connections Tab. Click on the “New Connection” icon. Fill out the form as shown below: Name: Custom Postgres Type: Custom Connection Type Display: Postgres Project: select your project Connection Files: select the “postgresconn” folder Custom Parameters: create the following four key value pairs: PG_HOST : hh-pgsql-public.ebi.ac.uk PG_PORT : 5432 PG_DB : pfmegrnargs PG_USER : reader Note: in this example we are connecting to the RNAcentral Public Postgres Database. Please visit this URL for more information: https://rnacentral.org/help/public-database Navigate back to your project. Open the “Project Settings” page and then the “Data Connections” tab. Make the new connection available in the project by clicking on “Sync with Workspace”. Launch a CML Session with JupyterLab as your Editor. A small resource profile without GPUs is ok. There is no need to enable a Spark Runtime Add-On. The Data Connections window will load automatically. Notice the new “Custom Postgres” connection with a reusable template code block. Open the “using_connections” notebook. Notice the code block to connect to the Postgres database has already been prepopulated for you. Normally you would copy and paste from the Data Connections pop up window. Execute the first four cells and validate that query results are output in the notebook (do not run the entire notebook!). Creating a MySQL Custom Data Connection Now that we have connected to Postgres with a provided Custom Connection template we can follow the same steps to create a custom MySQL connection. Open the mysqlconn folder and familiarize yourself with the code. Notice the source code for this connection is in a separate folder. This is required in order to select a folder in the Custom Data Connection creation form in the Workspace Settings. As in the previous example, we create a MySQLCustomImp class which inherits from the CustomConnection interface in the cml.data_v1.customconnection module. This module does not have to be pip installed and is already provided to you in CML by default. The implemented class methods are similar to the Postgres example. Notice that in this case we don’t have a method to return the connection cursor. In addition, we leverage the mysql-connector-python package rather than psycopg2. To use this custom connection, go through the same steps and set the following Custom Parameters: MYSQL_HOST : mysql-rfam-public.ebi.ac.uk MYSQL_PORT : 4497 MYSQL_DB : Rfam MYSQL_USER: rfamro Note: in this example we are connecting to the Rfam MySQL public database. For more information please visit this URL: https://docs.rfam.org/en/latest/database.html Conclusions In this article we highlighted CML Custom Data Connections. In summary: CML is a Cloud Native Platform for Enterprise Machine Learning. The built-in integrations with the Cloudera Data Platform (CDP) allow CML Users to operate in a secure, governed Machine Learning environment at scale. CML Data Connections enhance Data Analysis, Exploration, and Model Experimentation by providing Data Scientists with an easy interface to process large amounts of data from a notebook with minimum compute resources. Custom Data Connections augment this capability by opening access to 3rd party systems such as external RDBMSs or Cloud Vendor Databases such as Snowflake. CML Users are free to implement their own Custom Connections while giving the CML Admins the ability to approve them at the Workspace level. What Custom Data Connection are you using in CML? Please don’t hesitate to comment with your favorite external data sources.
... View more
Labels:
10-25-2023
08:27 PM
Objective
Great Expectations is a Python-based open-source library for validating, documenting, and profiling your data. It helps you maintain data quality and improve communication about data between teams. Software developers have long known that automated testing is essential for managing complex codebases. Great Expectations brings the same discipline, confidence, and acceleration to data science and data engineering teams. (Source: Great Expectations)
CDP Data Engineering is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Data Engineering is fully integrated with the Cloudera Data Platform, enabling end-to-end visibility and security with SDX as well as seamless integrations with CDP services such as data warehouses and machine learning. Data engineering on CDP powers consistent, repeatable, and automated data engineering workflows on a hybrid cloud platform anywhere. (Source: Streamline and operationalize data pipelines securely at any scale.)
Spark Data Engineers use Great Expectations in CDE to enforce data quality standards on enterprise datasets and data engineering pipelines at massive scale. In the rest of this tutorial, we will walk through a basic workflow with Great Expectations and Spark.
All code, dependencies, and CLI commands are available for reuse in this git repository.
Requirements
The following are required to reproduce the demo in your CDE virtual cluster:
CDE Service version 1.19 and above
A working installation of the CDE CLI Instructions to install the CLI are provided here.
A working installation of git on your local machine Please clone the git repository and keep in mind that all commands assume they are run in the project's main directory.
Custom Docker Runtime as CDE Resource
By default, CDE workloads rely on CDE Resources to mount dependencies into a job container. CDE resources can be of three types:"files,, Python environment, and custom Docker untime.
In our case, we will create a custom Docker runtime to ensure Great Expectations and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details.
The below steps have already been executed, and the image is available for reuse in my public Docker registry. If you want, create your own image by personalizing the value in the --name parameter to reflect your username and replacing the value in the --docker-username parameter with your DockerHub username.
docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile docker run it pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile /bin/bash docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality
Next, create CDE Resource Credentials and the CDE Custom Docker Runtime resource:
cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco cde resource create --name dex-spark-runtime-great-expectations-data-quality --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality --image-engine spark3 --type custom-runtime-image
CDE FIles Resources for Job Code
Next, create another CDE Files Resource to store your code and dependencies in your virtual cluster for reuse.
Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these, please contact your CDP administrator. Then upload scripts and dependencies.
cde resource create --name job_code_data_quality cde resource upload --name job_code_data_quality --local-path code/airflow.py --local-path code/batch_load.py --local-path code/great_expectations.py --local-path code/parameters.conf --local-path code/utils.py
Finally, we will create two CDE Spark jobs for the data quality pipeline and one CDE Airflow job to orchestrate it. Before doing so, open code/great_expectations.py and familiarize yourself with the code. Here are some of the most important points about the job:
We create a synthetic banking dataset with the Faker and dbldatagen libraries. This gives us 100,000 banking transactions with transaction amount, currency, account numbers, routing numbers, and much more banking data. The data columns are of different types, including numerical, timestamp, and categorical. Review the utils.py script to learn more details.
FakerTextUS = FakerTextFactory(locale=['en_US'], providers=[bank]) partition parameters, etc. self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested) fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested)) .withColumn("name", percentNulls=0.1, text=FakerTextUS("name") ) .withColumn("address", text=FakerTextUS("address" )) .withColumn("email", text=FakerTextUS("ascii_company_email") ) .withColumn("aba_routing", text=FakerTextUS("aba" )) .withColumn("bank_country", text=FakerTextUS("bank_country") ) .withColumn("account_no", text=FakerTextUS("bban" )) .withColumn("int_account_no", text=FakerTextUS("iban") ) .withColumn("swift11", text=FakerTextUS("swift11" )) .withColumn("credit_card_number", text=FakerTextUS("credit_card_number") ) .withColumn("credit_card_provider", text=FakerTextUS("credit_card_provider") ) .withColumn("event_type", "string", values=["purchase", "cash_advance"],random=True) .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00, end="2020-12-31 23:59:00",interval="1 minute", random=True ) .withColumn("longitude", "float", minValue=-180, maxValue=180, random=True) .withColumn("latitude", "float", minValue=-90, maxValue=90, random=True) .withColumn("transaction_currency", values=["USD", "EUR", "KWD", "BHD", "GBP", "CHF", "MEX"]) .withColumn("transaction_amount", "decimal", minValue=0.01, maxValue=30000, random=True) ) df = fakerDataspec.build()
In ge_data_quality.py, the "SparkDFDataset" class is imported from the "great_expectations.dataset.sparkdf_dataset" module. This allows you to wrap a Spark Dataframe and make it compatible with the Great Expectations API
gdf = SparkDFDataset(df)
From the GE Documentation, "Expectations are declarative statements and the interface to the Great Expectations library, supporting your validation, profiling, and translation. Expectations are implemented as classes; some are in the core library, with many others contributed by community members." In the script, we implemented a few wrapper methods for a variety of built-in expectations. For example, the "expect_column_to_exist" expectation checks for column existence.
assert gdf.expect_column_to_exist(column).success, Column {column} is not found."
The GE Gallery offers a constantly growing number of expectations to test for different conditions on different column value types. For example, in the script, we use expectations to check for nulls for all columns: longitude and latitude min and max for transactions; mean and standard deviation for transaction amounts; email string validity with REGEX; and finally, whether the values in the categorical Currency column match or are contained in a provided test set. Here are some more examples that can be found in the ge_data_quality.py script:
# EXPECTATION ON ALL COLUMNS Ensure the existence of all mandatory columns. def run_existance_expactation(gdf, MANDATORY_COLUMNS): for the column in MANDATORY_COLUMNS: try: assert gdf.expect_column_to_exist(column).success, Column {column} is not found." print(f"Column {column} is found") except Exception as e: print(e)
# EXPECTATION ON NUMERIC COLUMN # Ensure minimum longitude is between -180 and 180 def run_longitude_min_expectation(gdf): try: test_result = gdf.expect_column_min_to_be_between(column="longitude", min_value=-180, max_value=180).success, f"Min for column longitude is not within expected range\n" assert test_result.success, f"Min for column longitude is within expected range\n" except Exception as e: print(e)
# EXPECTATION ON STRING COLUMNS Use REGEX to ensure email is in correct format def run_email_match_regex_expectation(gdf): try: test_result = gdf.expect_column_values_to_match_regex(column="email", regex="^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$").success, f"Values for column Email are not within REGEX pattern." assert test_result.success, f"Values for column Email are within REGEX Pattern\n" except Exception as e: print(e)
Please review the ge_data_quality.py script to find more examples.
Again, the idea is to organize tests in a clear, reliable, and reusable manner. When combined with the modularity of the CDE Job and Dependency construct, using Great Expectations and Spark allows you to structure your data pipelines under clear data quality rules.
Finally, the Airflow Job is scheduled to run every five minutes by default. Therefore, there is no need to manually run any of the above jobs. Run the below commands to create the jobs. Then open the CDE Job Runs page to validate the pipeline.
cde job create --name batch_load --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/batch_load.py cde job create --name data_quality --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/great_expectations.py cde job create --name data_quality_orchestration --type airflow --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --dag-file airflow.py
Summary
With CDE Runtimes, you can choose any data quality package of your choice. This article in particular showcased a simple data quality pipeline with Great Expectations, a leading open source package for testing and validating data at scale. You can easily leverage GE at scale with Spark in Cloudera Data Engineering in order to complement Great Expectations' reusable declarative expectations with advanced Spark and Airflow Job observability, monitoring, and development capabilities provided by CDE.
... View more
10-23-2023
06:29 PM
Objective CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams. Apache Sedona™ (formerly known as "GeoSpark") is a cluster computing system for processing large-scale spatial data. Sedona extends existing cluster computing systems, such as Apache Spark and Apache Flink, with a set of out-of-the-box distributed Spatial Datasets and Spatial SQL that efficiently load, process, and analyze large-scale spatial data across machines. Sedona jobs can run in CDE with minor configuration changes. This article will present a simple workflow to help you get started with Spark Geospatial use cases in CDE. All code, dependencies and CLI commands are available for reuse in this git repository. Requirements The following are required to reproduce the Demo in your CDE Virtual Cluster: CDE Service version 1.19 and above A Working installation of the CDE CLI. Instructions to install the CLI are provided here. A working installation of git in your local machine. Please clone this git repository and keep in mind all commands assume they are run in the project's main directory. Custom Docker Runtime as CDE Resource By default, CDE workloads rely on CDE Resources to mount dependencies into a Job container. CDE Resources can be of three types" Files, Python Environment, and Custom Docker Runtime. In our case we will create a Custom Docker Runtime to ensure Sedona and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details. The below steps have already been executed and the image is available for reuse in my public Docker registry. If you want, create your own image by replacing my Docker username and running the following commands from your terminal: docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-002 . -f Dockerfile
docker run -it docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 . -f Dockerfile /bin/bash
docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 Next, create CDE Resource Credentials and the CDE Custom Docker Runtime resource: cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco cde resource create --name dex-spark-runtime-sedona-geospatial-pauldefusco --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 --image-engine spark3 --type custom-runtime-image CDE FIles Resources for Datasets Sedona is compatible with many geospatial file formats. In our example we will load geospatial countries data to a CDE Files Resource so it can be read by the CDE Spark Job. The "cde resource upload-archive" command allows you to bulk upload and extract a zipped folder: cde resource create --name countries_data cde resource upload-archive --name countries_data --local-path data/ne_50m_admin_0_countries_lakes.zip CDE Files Resources for Job Code Next, create another CDE Files Resource to store your code and dependencies in your Virtual Cluster for reuse. Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these please contact your CDP Administrator. cde resource create --name job_code cde resource upload --name job_code --local-path code/geospatial_joins.py --local-path code/geospatial_rdd.py --local-path code/parameters.conf --local-path code/utils.py Geospatial RDD Spark Job in CDE The Sedona RDD allows you to store geospatial data with custom data types in a Spark RDD. You can then use these for performing geospatial queries. In this section we will create a CDE Spark Job to manipulate Sedona RDD's. First create the CDE Spark Job with the following command. Notice that the CDE CLI allows you to specify multiple CDE Files Resources along with a prefix. The prefix can be used within the Python script to reference data or files in one or more CDE Files Resources. In this CLI command we reference the job_code and countries_data Files Resources to respectively locate the script file and access the data in geospatial format: Also notice that the CDE Custom Docker Runtime is referenced with the "runtime-image-resource-name" parameter. Finally, at time of this writing Sedona 1.5 is the latest version and, similarly to a spark-submit, we use the packages parameter to load the Maven packages into our CDE Spark Job. cde job create --name geospatial_rdd --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 --application-file jobCode/geospatial_rdd.py Next, open "geospatial_rdd.py" in your editor. Familiarize yourself with the code and notice the following: The SparkSession is created and then passed to the SedonaContext object. From then on, all Spark SQL queries are running in the SedonaContext. spark = SparkSession \ .builder \ .appName("IOT DEVICES LOAD") \ .config("spark.kubernetes.access.hadoopFileSystems", data_lake_name)\ .getOrCreate() config = SedonaContext.builder().getOrCreate() sedona = SedonaContext.create(spark) sc = sedona.sparkContext sc.setSystemProperty("sedona.global.charset", "utf8") Data is read from the CDE Files Resource using the CDE CLI Alias. A SpatialRDD is created with it: countries_rdd = ShapefileReader.readToGeometryRDD(sc, "/app/mount/countriesData") Sedona allows you to store data in Cloud Storage in GeoJSON format: countries_rdd.saveAsGeoJSON(data_lake_name + geoparquetoutputlocation + "/countries.json") A Spark Dataframe can be created from a SpatialRDD using the Sedona Adapter: countries_df = Adapter.toDf(countries_rdd, sedona) You can generate Geospatial POINT data with your favorite util as shown in util.py. The data can then be transformed into Sedona POINT data using Sedona SQL: dg = DataGen(spark, username) iot_points_df = dg.iot_points_gen(row_count = 100000, unique_vals=100000) iot_points_df.createOrReplaceTempView("iot_geo_tmp") iot_points_geo_df = sedona.sql("SELECT id, device_id, manufacturer, event_type, event_ts, \ ST_Point(CAST(iot_geo_tmp.latitude as Decimal(24,20)), \ CAST(iot_geo_tmp.longitude as Decimal(24,20))) as arealandmark \ FROM iot_geo_tmp") Sedona allows you to spatially partition and index your data: iotSpatialRDD.spatialPartitioning(GridType.KDBTREE) We can use the SpatialRDD to run geospatial queries on our data; for example, we calculate the distance between each POINT object and a predefined point in our query e.g. 52-21: iotSpatialRDD.rawSpatialRDD.map(lambda x: x.geom.distance(Point(21, 52))).take(5) Finally, we run a KNN query on the same data to obtain the k number of POINT objects that lay closes to the provided POINT object: result = KNNQuery.SpatialKnnQuery(iotSpatialRDD, Point(-84.01, 34.01), 5, False) Run the Job with the CLI: cde job run --name geospatial_rdd --executor-cores 2 --executor-memory "4g" Geospatial Join Spark Job in CDE Similarly to above, create a "geospatial_joins" CDE Spark Job using the CLI: cde job create --name geospatial_joins --application-file jobCode/geospatial_joins.py --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 Then open "geospatial_joins.py"; notice that the synthetic IOT data is joined with the countries data loaded from a CDE Files resource. The data is loaded from Cloud Storage in GeoJSON format: from sedona.core.formatMapper import GeoJsonReader geo_json_file_location = data_lake_name + geoparquetoutputlocation + "/iot_spatial.json" saved_rdd_iot = GeoJsonReader.readToGeometryRDD(sc, geo_json_file_location) Using Sedona SQL the data is joined using the "ST_Contains" spatial predicate. Sedona allows us to join the two tables by matching POINT objects from the IOT dataset located within the boundaries of the POLYGON objects used to define country shapes in the Countries dataset: GEOSPATIAL_JOIN = """ SELECT c.geometry as country_geom, c.NAME_EN, a.geometry as iot_device_location, a.device_id FROM COUNTRIES_{0} c, IOT_GEO_DEVICES_{0} a WHERE ST_Contains(c.geometry, a.geometry) """.format(username) result = sedona.sql(GEOSPATIAL_JOIN) Run the Job with the CLI: cde job run --name geospatial_joins --executor-cores 2 --executor-memory "4g" Summary This article showcased a simple geospatial use case with Apache Sedona. You can easily run Sedona Spark jobs in Cloudera Data Engineering to run Geospatial Spark Jobs at Scale.
... View more
10-20-2023
07:01 PM
1 Kudo
What is Cloudera Data Engineering CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams. Typically CDE users leverage the CLI and API to accomplish tasks such as creating, running and orchestrating Spark and Airflow Jobs, uploading files such as Python wheel files, Jars, etc to the Virtual Cluster, and creating reusable Python environments for Spark and Airflow Jobs. CDEPY is a package that allows you to do all the above with the convenience of Python. With it you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python. It is available on PyPi at this URL and can be easily installed with a "pip install cdepy" command. At the time of this writing the latest is version 0.1.5. In the rest of this article we will walk through some basic steps for getting started with the package. Package Imports from cdepy import cdeconnection
from cdepy import cdejob
from cdepy import cdemanager
from cdepy import cderesource CDECONNECTION Module The cdeconnection module allows you to establish a connection to a CDE Virtual Cluster. To instantiate a connection object you will need a JOBS_API_URL (available in the CDE Virtual Cluster Service Details page); and your CDP Workload User and Password. JOBS_API_URL = "https://<YOUR-CLUSTER>.cloudera.site/dex/api/v1"
WORKLOAD_USER = "<Your-CDP-Workload-User>"
WORKLOAD_PASSWORD = "<Your-CDP-Workload-Password>" myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD) Next you will need to obtain a Token to connect to the cluster. This can be achieved with the following method. Once this is run, the Token is applied to the Connection object. myCdeConnection.setToken() CDERESOURCE Module The cderesource module allows you to create CDE Resources. If you are new to CDE, a CDE Resource can be of type "Files", "Python", or "Custom Docker Runtime". Mirroring this, the cderesource module allows you to create two types of objects: CdeFilesResource and CdePythonResource (support for Custom Docker Resources will be available in a future release). CDE_RESOURCE_NAME = "myFilesCdeResource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition() In both cases the object is a just definition for the CDE Resource and the actual Resource will be created with the cdemanager module. Before we get there though, we have to create a definition for a CDE Job. CDEJOB Module The cdejob module allows you to create Job Definitions of type Spark and Airflow. The definition includes important parameters such as Spark Resources Configurations (e.g. driver/executor memory and driver/executor cores) and Airflow DAG configurations (e.g. DAG schedule interval). Below we create a definition for a CDE Job of type Spark. The job in the CDE Jobs UI will be named myCdeSparkJob using the local pysparksql.py script. CDE_JOB_NAME = "myCdeSparkJob"
APPLICATION_FILE_NAME = "pysparksql.py"
myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME, CDE_RESOURCE_NAME, APPLICATION_FILE_NAME) Just like in the case of the Resource Definition object, the job has not yet been created in CDE. Next, we will use the cdemanager module to create the Resource and Job in CDE. CDEMANAGER Module First, the CdeClusterManager object is instantiated with the CdeConnection object: myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection) Next, the Resource is created with the CdeFilesResourceDefinition object we created above. myCdeClusterManager.createResource(myCdeFilesResourceDefinition) Now that the Resource has actually been created in CDE we can upload our PySpark script to it: LOCAL_FILE_PATH = "examples"
LOCAL_FILE_NAME = "pysparksql.py" myCdeClusterManager.uploadFile(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME) Finally we are now ready to create the CDE Spark Job. The Spark Job will recognize the CDE Files Resource we just created because we set the name of the Resource in the CdeSparkJobDefinition object earlier. myCdeClusterManager.createJob(myCdeSparkJobDefinition) The CdeClusterManager object allows you to run a Job and monitor its results. myCdeClusterManager.runJob(CDE_JOB_NAME)
myCdeClusterManager.listJobRuns() SUMMARY In the above workflow we used CDEPY and Python to create a CDE Files Resource and Spark Job, run it, and finally monitor its results. Generally, if you use Spark and/or Airflow in Cloudera Data Engineering you can leverage its API with Python to create your own framework. This allows you to integrate your existing Spark and Airflow applications with CDE and/or build Cluster and Job observability applications based on your business requirements.
... View more
- « Previous
-
- 1
- 2
- Next »