Member since
11-22-2019
29
Posts
10
Kudos Received
0
Solutions
03-27-2024
05:39 PM
Logging Iceberg Metrics with MLFlow Tracking in CML Cloudera Machine Learning (CML) on Cloudera Data Platform (CDP) accelerates time-to-value by enabling data scientists to collaborate in a single unified platform that is all inclusive for powering any AI use case. Purpose-built for agile experimentation and production ML workflows, Cloudera Machine Learning manages everything from data preparation to MLOps, to predictive reporting. CML is compatible with the MLflow Tracking API and makes use of the MLflow client library as the default method to log experiments. Existing projects with existing experiments are still available and usable. CML’s experiment tracking features allow you to use the MLflow client library for logging parameters, code versions, metrics, and output files when running your machine learning code. Apache Iceberg is a table format for huge analytics datasets in the cloud that defines how metadata is stored and data files are organized. Iceberg is also a library that compute engines can use to read/write a table. CML offers Data Connections to connect to Data Sources available within the CDP Environment including Iceberg Open Lakehouses. In this example we will create an experiment with MLFlow Tracking and log Iceberg metadata in order to enhance machine learning reproducibility in the context of MLOps. Step by Step Guide The code samples provided below are extracts from the accompanying notebook. The full code can be found in this git repository. Setup Create a CML Project with Python 3.9 / JupyterLab Editor Runtime. Launch a CML Session and install requirements. Run Notebook Run each cell in the notebook. Code highlights: MLFlow Tracking supports modules built specifically for some of the most popular open source frameworks. In this case we will import "mlflow.spark" You can leverage CML Spark Data Connections to launch a SparkSession object with the recommended Iceberg Spark configurations. Spark Data Connections make connecting to your Iceberg data effortless. import mlflow.spark
import cml.data_v1 as cmldata
#Edit your connection name here:
CONNECTION_NAME = "se-aw-mdl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session() The exp1 method acts as a wrapper to your first MLFlow experiment. The experiment name is set with the mlflow.set_exeperiment method. Data is written from a PySpark dataframe to an Iceberg table via a simple routine: "df.writeTo().createOrReplace()" Iceberg History and Snapshots tables are available for you to monitor Iceberg metadata. In this example we save the latest snapshot ID along with its timestamp and parent snapshot ID into Python variables. Within the context of this experiment run, a Spark ML Pipeline is trained to tokenize and classify text. MLFlow Tracking allows you to set custom tags. These tags can be used to search your experiments using the MLFlow client. MLFlow Tracking allows you to create a run context to track metrics according to a specific run. In this particular case we use log_metric method to track the Iceberg variables corresponding to snaphot and write operation timestamp. Once the experiment completes you can retrieve its ID and more metadata using the MLFlow client. def exp1(df):
mlflow.set_experiment("sparkml-experiment")
##EXPERIMENT 1
df.writeTo("spark_catalog.default.training").using("iceberg").createOrReplace()
spark.sql("SELECT * FROM spark_catalog.default.training").show()
### SHOW TABLE HISTORY AND SNAPSHOTS
spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)
snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": training_df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=8
regParam=0.01
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
return runs_df The second experiment is very similar to the first, except data is appended to the Iceberg table via "df.writeTo().append()" As data is inserted into the table, new Iceberg metadata is generated in the Iceberg Metadata Layer and becomes available in the Snapshots and History tables. This metadata is tracked into new Python variables. In this particular example we again use the "log_metric" method to track the Iceberg Snapshot ID and Timestamp for this append operation. Within the context of this experiment run, the Spark ML Pipeline is retrained for the same purpose of tokenizing and classifying text, but using the new version of the data after the append operation. def exp2(df):
mlflow.set_experiment("sparkml-experiment")
##EXPERIMENT 2
### ICEBERG INSERT DATA - APPEND FROM DATAFRAME
# PRE-INSERT
spark.sql("SELECT * FROM spark_catalog.default.training").show()
temp_df = spark.sql("SELECT * FROM spark_catalog.default.training")
temp_df.writeTo("spark_catalog.default.training").append()
df = spark.sql("SELECT * FROM spark_catalog.default.training")
# PROST-INSERT
spark.sql("SELECT * FROM spark_catalog.default.training").show()
spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)
snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=10
regParam=0.002
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
return runs_df Finally, in the third experiment we retrain the Spark ML Pipeline but first we retrieve the data as it was prior to the append operation by applying the provided Iceberg Snapshot ID in the "spark.read.table" method. def exp3(df, snapshot_id):
##EXPERIMENT 3
df = spark.read.option("snapshot-id", snapshot_id).table("spark_catalog.default.training")
committed_at = spark.sql("SELECT committed_at FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).collect()[0][0].strftime('%m/%d/%Y')
parent_id = str(spark.sql("SELECT parent_id FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).tail(1)[0][0])
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": training_df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=7
regParam=0.005
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
#spark.stop()
return runs_df Summary and Next Steps Large ML organizations require standardized best practices such as tracking models and respective dependencies, model developers, and matching those with datasets in order to keep a consistent view of all MLOps practices. MLFlow Tracking in CML allows you to achieve this goal by allowing you to specify datasets and other custom metadata when tracking experiment runs. In the above example we tracked Iceberg metadata in order to allow data scientists to retrain an existing pipeline with datasets as of arbitrary points in time. In the process, we used tags in order to implement a consistent taxonomy across all experiment runs. CML Model Deployment with MLFlow and APIv2 Spark in CML: Recommendatons for using Spark Experiments with MLFlow Registering and Deploying Models with Model Registry Apache Iceberg Documentation Iceberg Time Travel Introducing MLOps and SDX for Models in CML
... View more
03-27-2024
04:42 PM
CML Model Deployment with MLFlow and APIv2 Cloudera Machine Learning (CML) on Cloudera Data Platform (CDP) accelerates time-to-value by enabling data scientists to collaborate in a single unified platform that is all inclusive for powering any AI use case. Purpose-built for agile experimentation and production ML workflows, Cloudera Machine Learning manages everything from data preparation to MLOps, to predictive reporting. CML exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line. In this example we will showcase how to use APIv2 to programmatically register an XGBoost experiment via MLFlow Tracking, Registry, and deploy it as a model endpoint in CML. MLOps in CML CML has extensive MLOps features and a set of model and lifecycle management capabilities to enable the repeatable, transparent, and governed approaches necessary for scaling model deployments and ML use cases. It´s built to support open source standards and is fully integrated with CDP, enabling customers to integrate into existing and future tooling while not being locked into a single vendor. CML enables enterprises to proactively monitor technical metrics such as service level agreements (SLA) adherence, uptime, and resource use as well as prediction metrics including model distribution, drift, and skew from a single governance interface. Users can set custom alerts and eliminate the model “black box” effect with native tools for visualizing model lineage, performance, and trends. Some of the benefits with CML include: Model cataloging and lineage capabilities to allow visibility into the entire ML lifecycle, which eliminates silos and blind spots for full lifecycle transparency, explainability, and accountability. Full end-to-end machine learning lifecycle management that includes everything required to securely deploy machine learning models to production, ensure accuracy, and scale use cases. An extensive model monitoring service designed to track and monitor both technical aspects and accuracy of predictions in a repeatable, secure, and scalable way. New MLOps features for monitoring the functional and business performance of machine learning models such as detecting model performance and drift over time with native storage and access to custom and arbitrary model metrics; measuring and tracking individual prediction accuracy, ensuring models are compliant and performing optimally. The ability to track, manage, and understand large numbers of ML models deployed across the enterprise with model cataloging, full lifecycle lineage, and custom metadata in Apache Atlas. The ability to view the lineage of data tied to the models built and deployed in a single system to help manage and govern the ML lifecycle. Increased model security for Model REST endpoints, which allows models to be served in a CML production environment without compromising security. Use Case In this example we will create a basic MLOps pipeline to put a credit card fraud classifier into production. We will create a model prototype with XGBoost, register and manage experiments with MLFlow Tracking, and stage the best experiment run in the MLFlow Registry. Next, we will deploy the model from the Registry into an API Endpoint, and finally redeploy it with additional resources for High Availability and increased serving performance. The full code is available in this git repository. Step by Step Guide Setup Create a CML Project with Python 3.9 / Workbench Editor Runtime. Launch a CML Session and install requirements. Open script "00_datagen.py" and update lines 140 and 141 with your Iceberg database name and Spark Data Connection Name. Then run it. Script 1: Create the Model Experiment Run script "01_train_xgboost.py" in order to create an MLFlow Experiment. Code highlights: MLFlow is installed in CML by default. You must import mlflow in order to use it in your script. The experiment run is determined by the "mlflow tracking run context". When this executes for the first time an experiment is created. If the same code runs again without changing EXPERIMENT_NAME, a new Experiment Run is logged for the same experiment. Else, a new experiment is created. You can log one or multiple metrics for the specific run with the "mlflow.log_param()" method. Model artifacts such as useful metadata and dependencies are logged with the "mlflow.log_model()" method. import mlflow
EXPERIMENT_NAME = "xgb-cc-fraud-{0}-{1}".format(USERNAME, DATE)
mlflow.set_experiment(EXPERIMENT_NAME)
with mlflow.start_run():
model = XGBClassifier(use_label_encoder=False, eval_metric="logloss")
model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
mlflow.log_param("accuracy", accuracy)
mlflow.xgboost.log_model(model, artifact_path="artifacts") You can use the mlflow library or instantiate an mlflow client to manage experiments. In this example we use the "mlflow.get_experiment_by_name()", "mlflow.search_runs()" and "mlflow.get_run()" methods. In this example we also instantiate the client to list artifacts for a specific run. def getLatestExperimentInfo(experimentName):
"""
Method to capture the latest Experiment Id and Run ID for the provided experimentName
"""
experimentId = mlflow.get_experiment_by_name(experimentName).experiment_id
runsDf = mlflow.search_runs(experimentId, run_view_type=1)
experimentId = runsDf.iloc[-1]['experiment_id']
experimentRunId = runsDf.iloc[-1]['run_id']
return experimentId, experimentRunId
experimentId, experimentRunId = getLatestExperimentInfo(EXPERIMENT_NAME)
#Replace Experiment Run ID here:
run = mlflow.get_run(experimentRunId)
pd.DataFrame(data=[run.data.params], index=["Value"]).T
pd.DataFrame(data=[run.data.metrics], index=["Value"]).T
client = mlflow.tracking.MlflowClient()
client.list_artifacts(run_id=run.info.run_id) Script 2: Register the Model Experiment Run script "02_cml_api_endpoint.py" in order to register an MLFlow Experiment. Code highlights: CML APIv2 is installed in your workspace by default. You must import cmlapi in order to use it in your script. The API provides about 100 python methods for MLOps. In this example, we created a "registerModelFromExperimentRun" method as a wrapper to the API's create_registered_model() method. In this example, we created a ModelRegistration class including the "registerModelFromExperimentRun" method to register the model. Creating your own Python classes and methods to implement the API methods in the context of your project is highly recommended. import cmlapi
from cmlapi.rest import ApiException
class ModelRegistration():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, username, experimentName):
self.client = cmlapi.default_client()
self.username = username
self.experimentName = experimentName
def registerModelFromExperimentRun(self, modelName, experimentId, experimentRunId, modelPath):
"""
Method to register a model from an Experiment Run
Input: requires an experiment run
Output: api_response object
"""
model_name = 'xgb-cc-' + username
CreateRegisteredModelRequest = {
"project_id": os.environ['CDSW_PROJECT_ID'],
"experiment_id" : experimentId,
"run_id": experimentRunId,
"model_name": modelName,
"model_path": modelPath
}
try:
# Register a model.
api_response = self.client.create_registered_model(CreateRegisteredModelRequest)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_registered_model: %s\n" % e)
return api_response The response from the method request contains very useful information. In this example, the registeredModelResponse response includes modelId and modelVersionId variables which are in turn used by other API methods. modelReg = ModelRegistration(username, experimentName)
modelPath = "artifacts"
modelName = "FraudCLF-" + username
registeredModelResponse = modelReg.registerModelFromExperimentRun(modelName, experimentId, experimentRunId, modelPath)
modelId = registeredModelResponse.model_id
modelVersionId = registeredModelResponse.model_versions[0].model_version_id
registeredModelResponse.model_versions[0].model_version_id Script 3: Deploy Endpoint from Registry Model Run script "03_api_deployment.py" in order to create a Model Endpoint from the registered model. Code highlights: In this example we created a ModelDeployment class to manage multiple API wrapper methods. The "listRegisteredModels()" method is a wrapper to the API's "list_registered_models()" method. Notice it is arbitrarily preconfigured to list models corresponding to your user and model name. In the context of a broader MLOps pipeline, these values can obviously be parameterized. This method is necessary for obtaining the "registeredModelId" variable needed for model deployment. The "getRegisteredModel()" method is a wrapper to the API's "get_registered_model()" method. This method is necessary for obtaining the "modelVersionId" variable needed for model deployment. Once registeredModelId and modelVersionId are obtained, you can begin the deployment. The deployment consists of three phases: model creation, model build, and model deployment. The model creation corresponds to the creation of an API Endpoint. Once you run this, you will see a new entry in the Model Deployments tab. The model build corresponds to the creation of the model's container. Thanks to MLFlow Registry, CML automatically packages all dependencies used to train the Experiment into the model endpoint for you. The model deployment corresponds to the activation of the model endpoint. This is when the container with its associated resource profile and endpoint is actually deployed so inference can start. The "listRuntimes()" method is an example of querying the Workspace for all available runtimes in order to select the most appropriate for model build. class ModelDeployment():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, projectId, username):
self.client = cmlapi.default_client()
self.projectId = projectId
self.username = username
def listRegisteredModels(self):
"""
Method to retrieve registered models by the user
"""
#str | Search filter is an optional HTTP parameter to filter results by. Supported search_filter = {\"model_name\": \"model_name\"} search_filter = {\"creator_id\": \"<sso name or user name>\"}. (optional)
search_filter = {"creator_id" : self.username, "model_name": "FraudCLF-"+self.username}
search = json.dumps(search_filter)
page_size = 1000
try:
# List registered models.
api_response = self.client.list_registered_models(search_filter=search, page_size=page_size)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_registered_models: %s\n" % e)
return api_response
def getRegisteredModel(self, modelId):
"""
Method to return registered model metadata including model version id
"""
search_filter = {"creator_id" : self.username}
search = json.dumps(search_filter)
try:
# Get a registered model.
api_response = self.client.get_registered_model(modelId, search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->get_registered_model: %s\n" % e)
return api_response
def createModel(self, projectId, modelName, registeredModelId, description = "Fraud Detection 2024"):
"""
Method to create a model
"""
CreateModelRequest = {
"project_id": projectId,
"name" : modelName,
"description": description,
"registered_model_id": registeredModelId,
"disable_authentication": True
}
try:
# Create a model.
api_response = self.client.create_model(CreateModelRequest, projectId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model: %s\n" % e)
return api_response
def createModelBuild(self, projectId, modelVersionId, modelCreationId, runtimeId):
"""
Method to create a Model build
"""
# Create Model Build
CreateModelBuildRequest = {
"registered_model_version_id": modelVersionId,
"runtime_identifier": runtimeId,
"comment": "invoking model build",
"model_id": modelCreationId
}
try:
# Create a model build.
api_response = self.client.create_model_build(CreateModelBuildRequest, projectId, modelCreationId)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_build: %s\n" % e)
return api_response
def createModelDeployment(self, modelBuildId, projectId, modelCreationId):
"""
Method to deploy a model build
"""
CreateModelDeploymentRequest = {
"cpu" : "2",
"memory" : "4"
}
try:
# Create a model deployment.
api_response = self.client.create_model_deployment(CreateModelDeploymentRequest, projectId, modelCreationId, modelBuildId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_deployment: %s\n" % e)
return api_response
def listRuntimes(self):
"""
Method to list available runtimes
"""
search_filter = {"kernel": "Python 3.9", "edition": "Standard", "editor": "Workbench"}
# str | Search filter is an optional HTTP parameter to filter results by.
# Supported search filter keys are: [\"image_identifier\", \"editor\", \"kernel\", \"edition\", \"description\", \"full_version\"].
# For example: search_filter = {\"kernel\":\"Python 3.7\",\"editor\":\"JupyterLab\"},. (optional)
search = json.dumps(search_filter)
try:
# List the available runtimes, optionally filtered, sorted, and paginated.
api_response = self.client.list_runtimes(search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_runtimes: %s\n" % e)
return api_response Once you have created your model endpoint, give it a minute and then try a test request: {"dataframe_split":
{"columns": ["age", "credit_card_balance", "bank_account_balance", "mortgage_balance", "sec_bank_account_balance", "savings_account_balance", "sec_savings_account_balance", "total_est_nworth", "primary_loan_balance", "secondary_loan_balance", "uni_loan_balance", "longitude", "latitude", "transaction_amount"],
"data":[[35.5, 20000.5, 3900.5, 14000.5, 2944.5, 3400.5, 12000.5, 29000.5, 1300.5, 15000.5, 10000.5, 2000.5, 90.5, 120.5]]}} Script 4: Endpoint Redeployment Run script "04_api_redeployment.py" in order to create a new model deployment with increased resources. Code highlights: A slightly different version of the ModelDeployment class is implemented. This includes the "get_latest_deployment_details()" as an example of creating a wrapper method to the API's "list_models()" and "list_model_deployments()" methods all in one. You can implement your own methods in a similar fashion as best needed in the context of your MLOps pipeline. Once the latest model deployment's metadata has been obtained in one go, a new model build is created with additional CPU, Memory and Replicas. Notice that in the process you also have the ability to switch to a different runtime as needed. deployment = ModelDeployment(projectId, username)
getLatestDeploymentResponse = deployment.get_latest_deployment_details(modelName)
listRuntimesResponse = deployment.listRuntimes()
listRuntimesResponse
runtimeId = 'docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-workbench-python3.9-standard:2024.02.1-b4' # Copy a runtime ID from previous output
cpu = 2
mem = 4
replicas = 2
createModelBuildResponse = deployment.createModelBuild(projectId, modelVersionId, modelCreationId, runtimeId, cpu, mem, replicas)
modelBuildId = createModelBuildResponse.id
deployment.createModelDeployment(modelBuildId, projectId, modelCreationId) Summary and Next Steps Cloudera Machine Learning exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line. CML's API accelerates your data science projects by allowing you to build end to end pipelines programmatically. When coupled with CML MLFlow Tracking and MLFlow Registry, it can be used to manage models from inception to production. APIv2 Documentation APIv2 Examples APIv2 AMP Registering and Deploying Models with Model Registry Securing Models CML Projects CML Runtimes Introducing MLOps and SDX for Models in CML Model Registry GA in CML
... View more
Labels:
02-28-2024
01:56 PM
1 Kudo
Objective CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch and Streaming Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. CDE Sessions are short-lived development environment used by Data Engineers and Analysts in order to iterate upon and build Spark workloads, prototype pipelines, or simply explore data. You can use CDE Sessions in CDE Virtual Clusters of type "All Purpose - Tier 2". In this example we will use a CDE Session to perform basic Cloud Storage File System operations with the Hadoop API. The API was originally used to perform file system operations in HDFS but is still used to interact with S3 and ADLS in the Public Cloud. Requirements The only requirement for this exercise is a CDE Virtual Cluster in a CDE Service on version 1.19+. Version 1.20+ is recommended. Copying the following commands in a CDE Session is recommended, although you can also run the same code in a CDE Job of type Spark. Step by Step Instructions Preload some data in a cloud storage location of your choice. Then, specify the path as a Python variable. In this example we use two locations to read and write data with AWS S3. >>> readlocation = "s3a://go01-demo/mycsvfile.csv"
>>> writelocation = "s3a://go01-demo/newdir/myfile" Read the file in a Spark Dataframe: >>> df=spark.read.csv(readlocation) Obtain the configured FileSystem implementation: >>> myPath = "s3a://go01-demo/newdir"
>>> sc=df.rdd.context
>>> hadoop = sc._jvm.org.apache.hadoop
>>> hdfs = hadoop.fs.FileSystem.get(sc._jvm.java.net.URI.create(myPath), sc._jsc.hadoopConfiguration()) Write the dataframe as a file in Cloud Storage: >>> df.write.mode("overwrite").parquet(writelocation) List the contents of the directory: >>> status = hdfs.listStatus(hadoop.fs.Path("s3a://go01-demo/newdir"))
>>> for fileStatus in status:
print(fileStatus.getPath())
s3a://go01-demo/newdir/myfile List the contents of a directory with filter pattern: >>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
print(fileStatus.getPath())
s3a://go01-demo/test_file.csv Rename file: >>> hdfs.rename(hadoop.fs.Path("s3a://go01-demo/test_file.csv"), hadoop.fs.Path("s3a://go01-demo/test_file_2.csv"))
True
>>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
print(fileStatus.getPath())
s3a://go01-demo/test_file_2.csv Delete file: >>> hdfs.delete(hadoop.fs.Path("s3a://go01-demo/test_file_2.csv"), True)
True
>>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
print(fileStatus.getPath()) Create a subdirectory: >>> hdfs.mkdirs(hadoop.fs.Path("s3a://go01-demo/newdir/newsubdir"))
True References The full API Method list is located at this link. Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
12-13-2023
06:02 PM
CDE is the Cloudera Data Engineering Service, a containerized managed service for the Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow, and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications and less time on infrastructure.
CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs.
Cloudera Data Engineering (CDE) provides a command-line interface (CLI) client. The CDE CLI doesn't just allow you to create CDE Spark Jobs with syntax that is nearly identical to that of a Spark Submit.
The CDE CLI also offers you invaluable advantages as part of your daily Spark Pipeline development and management activities, including enhancing observability, reusability, and overall organization of Spark dependencies and configurations.
Simply put, if you're creating and managing many Spark Submits, the CDE CLI will save you time and dramatically increase your productivity.
In this article, you will learn about different options for parameterizing a CDE Spark Job i.e. to pass Spark Configurations, Options, Files and Arguments.
Requirements
The following are required in order to reproduce these commands in your CDE environment:
A CDE Service on version 1.19.0 or above.
A working installation of the CDE CLI. Please follow these instructions to install the CLI.
CDE Spark Jobs and Spark Configurations
The CDE Spark Job is a reusable definition of a Spark Application consisting of its code, and file, Docker, and Python dependencies, and Spark configurations and options.
Once a CDE Spark Job is created, its definition is accessible in the CDE UI or via the CDE CLI.
cde job create --name myJob\
--type Spark\
--application-file myScript.py\
--mount-1-resource myFilesResource
More importantly, it is not yet run. The Application is actually executed with a separate command.
cde job run --name myJob
Generally, parameterizing a CDE Spark Jobs allows you to dynamically set important configurations at runtime i.e. override default parameters based on some changing external input.
Before moving on, let us review the different types of parameters that can be set in a CDE Spark Job:
1. Spark Application File
This is the PySpark script or Jar containing the Application code. In CDE you must use the --application-file flag and you can only set this at CDE Spark Job creation time.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource cde job create --name myScalaJob
--type spark
--application-file myJar.jar
--mount-1-resource myFilesResource
The files can only be found by CDE if they are present in a Files resource. As shown above, the Files resource is then set with the --mount-N-resource flag. More on that in the end to end example section.
2. Spark Resources
These include basic job resources such as executor and driver memory and cores, initial number of executors, min and max executors for Spark Dynamic Allocation.
They can be set both at CDE Spark Job creation as well as runtime. Each of these has a default value which is chosen unless specified by your CLI command.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--executor-cores 4\
--executor-memory "2g" cde job run --name myPySparkJob
--executor-cores 4
--driver-cores 2
The above job will run with the following resource configurations:
--executor-cores 4 --executor-memory "2g" and --driver-cores 2
3. Spark Configurations
Spark offers a wide range of configurations ranging from Python version to memory options, join and dynamic allocation thresholds, and much more.
These can be set via
--conf flag e.g. --conf spark.executor.memoryOverhead=6g or --conf spark.pyspark.python=python3 or --conf spark.yarn.maxAppAttempts=1.
These can also be ovverriden at runtime.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--conf spark.executor.memoryOverhead=6g cde job run --name myPySparkJob\
--conf spark.executor.memoryOverhead=2g
In the above example, the "memoryOverhead" setting is overriden to "2g".
4. Command Line Arguments
These are specific variable inputs that lend themselves particularly well to being overridden at runtime.
They are defined with the --arg flag in the CDE Spark Job definition, and require being read in Spark Application code via the Python sys.argv module.
For example, a CDE Spark Job will include the --arg 10 argument from the CLI in order for the value of "10" to be utilized as part of the Spark Application code.
Example CDE CLI command to create the CDE Spark Job:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--conf spark.executor.memoryOverhead=6g\
--arg 10
Example CDE Spark Job application code rerefencing the argument:
import sys
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("CLIArguments")\
.getOrCreate()
print("JOB ARGUMENT")
print(sys.argv[1])
You can override CLI arguments at runtime. The following example sets the input to 5 rather than 10:
cde job run --name myPySparkJob --arg 5
5. Files
These can be set via the --file or --py-files options and allow you to specify file dependencies at CDE Spark Job creation.
More importantly, the funtionality provided by these flags is enhanced by CDE Files Resources and we using them instead is generally recommended.
CDE Files Resources are unique to CDE and allow you to pass and manage entire files as CDE Spark Job dependencies.
You can reference these files within Spark Application code via the /app/mount prefix.
Example:
cde job create --name penv-udf-test --type spark \
--mount-1-prefix appFiles/ \
--mount-1-resource cde-cli-files-resource \
--python-env-resource-name cde-cli-penv-resource \
--application-file /app/mount/appFiles/deps_udf_dependency.py \
--conf spark.executorEnv.PYTHONPATH=/app/mount/appFiles/deps_udf.zip \
--py-file /app/mount/appFiles/deps_udf.zip
Here, the "--py-file" flag is used to set in order to distribute dependencies that have been compressed and then loaded in a CDE Files Resource.
Notice that unlike in a Spark Submit, you cannot use --file to specify the application code and you must use the --application-file flag as shown in example 1.
CDE Files Resources are mounted on the Spark Application pod at CDE Job runtime. Thanks to this, you can also reference a file from within your Spark Application code by using the ConfigParser module and referencing the "/app/mount" directory.
Example properties file "parameters.conf" that has been loaded in the CDE Files Resource referenced by the CDE Spark Job:
[general]
property_1: s3a://go01-demo/
property_2: datalake/
Example PySpark application code referencing the "parameters.conf" file:
from pyspark.sql import SparkSession
import configparser
config = configparser.ConfigParser()
config.read('/app/mount/parameters.conf')
data_lake_name=config.get("general","property_1")
file_path=config.get("general","property_2")
spark = SparkSession.\
builder.\
appName('INGEST').\
config("spark.kubernetes.access.hadoopFileSystems", data_lake_name).getOrCreate()
cloudPath=data_lake_name+file_path
myDf = spark.read.csv(cloudPath + "/myCsv.csv", header=True, inferSchema=True)
myDf.show()
End to End CDE Spark Job Workflow Example
Now that you have gained exposure to the different options we will utilize these in the context of a simplified CDE Spark Job creation workflow that will show how to actually implement the above:
Create a CDE Files Resource in order to host all Application code and file dependencies.
Upload Application code and file dependencies to the CDE Files Resource in order to make those accessible to CDE Spark Jobs in the future.
Create the CDE Spark Job by referencing Application code, file dependencies and other Spark configurations in order to then:
Run the CDE Spark Job with either no additional configurations or by overriding configurations in order to execute your Spark Application.
1. Create a CDE Files Resource
cde resource create --name myProperties
2. Upload Application Code and File Dependencies
cde resource upload --name myProperties\
--local-path cde_jobs/propertiesFile_1.conf\
--local-path cde_jobs/propertiesFile_2.conf\
--local-path cde_jobs/sparkJob.py
3. Create CDE Spark Job
cde job create --name myPySparkJob\
--type spark\
--application-file sparkJob.py\
--mount-1-resource myProperties\
--executor-cores 2\
--executor-memory "2g"
4. Run the CDE Spark Job with Different Options
Example 1: Run the job with two CLI arguments and read properties file 1
cde job run --name myPySparkJob\
--arg MY_DB\
--arg CUSTOMER_TABLE\
--arg propertiesFile_1.conf
Example 2: Run the job with two CLI arguments and read properties file 2
cde job run --name myPySparkJob\
--arg MY_DB\
--arg SALES_TABLE\
--arg propertiesFile_2.conf
Application code in sparkJob.py:
import sys
from pyspark.sql import SparkSession
import configparser
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.getOrCreate()
print("JOB ARGUMENTS")
print(sys.argv)
print(sys.argv[0])
print(sys.argv[1])
print(sys.argv[2])
print(sys.argv[3])
dbname = sys.argv[1]
tablename = sys.argv[2]
config = configparser.ConfigParser()
config.read('/app/mount/{}.properties'.format(sys.argv[3]))
property_1=config.get("general","property_1")
property_2=config.get("general","property_2")
def myFunction(dbname, tablename, property_1, property_2):
print("DBNAME\n")
print(dbname)
print("TABLNAME\n")
print(tablename)
print("PROPERTY1\n")
print(property_1)
print("PROPERTY2\n")
print(property_2)
print("COMPLETE!\n")
# A list of Rows. Infer schema from the first row, create a DataFrame and print the schema
myFunction(dbname, tablename, property_1, property_2)
Summary and Next Steps
Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more.
In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting:
Installing the CDE CLI
Simple Introduction to the CDE CLI
CDE CLI Demo
CDE Concepts
CDE CLI Command Reference
CDE CLI Spark Flag Reference
CDE CLI Airflow Flag Reference
CDE CLI list command syntax reference
CDE Jobs API Reference
... View more
11-29-2023
04:52 PM
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository and run the following commands in the terminal with the CLI: You can easily list all jobs and job runs. % cde job list
% cde run list However, that is often impossible if you have a large number of jobs/runs in your Virtual Cluster. Therefore, using filters can be very important. Setup Prior to running the filtering commands you must set up some jobs and related dependencies. Run the following commands in bulk. To learn more about these please visit this Cloudera Community Article. % cde resource create --name myScripts \
--type files
% cde resource upload --name myScripts \
--local-path cde_jobs/spark_geospatial.py \
--local-path cde_jobs/utils.py
% cde resource describe --name myScripts
% cde resource create --name myData \
--type files
% cde resource upload-archive --name myData \
--local-path data/ne_50m_admin_0_countries_lakes.zip
cde credential create --name my-docker-creds \
--type docker-basic \
--docker-server hub.docker.com \
cde resource create --name dex-spark-runtime-sedona-geospatial \
--image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 \
--image-engine spark3 \
--type custom-runtime-image
cde job create --name geospatialRdd \
--type spark \
--mount-1-prefix code/ --mount-1-resource myScripts \
--mount-2-prefix data/ --mount-2-resource myData \
--runtime-image-resource-name dex-spark-runtime-sedona-geospatial \
--packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 \
--application-file code/spark_geospatial.py \
--arg myArg \
--max-executors 4 \
--min-executors 2 \
--executor-cores 2
cde job run --name geospatialRdd --executor-cores 4 Monitoring Examples Now the monitoring examples: Filter all jobs by name where name equals "geospatialRdd" % cde job list --filter 'name[eq]geospatialRdd'
[
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] You can nest filters. For example, filter all jobs where job application file equals "code/spark_geospatial.py": % cde job list --filter 'spark.file[eq]code/spark_geospatial.py'
[
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] You can use different operators. For example, search all jobs whose name contains "spark": % cde job list --filter 'name[rlike]spark'
[
{
"name": "sparkxml",
"type": "spark",
"created": "2023-11-23T07:11:41Z",
"modified": "2023-11-23T07:39:32Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "files"
}
],
"spark": {
"file": "read_xml.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1,
"conf": {
"dex.safariEnabled": "false",
"spark.jars.packages": "com.databricks:spark-xml_2.12:0.16.0",
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
}
] Search all jobs created on or after 11/23/23: % cde job list --filter 'created[gte]2023-11-23'
API User Password: [
{
"name": "asdfsdfsdfsdf",
"type": "airflow",
"created": "2023-11-23T06:56:45Z",
"modified": "2023-11-23T06:56:45Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "PipelineResource-asdfsdfsdfsdf-1700722602468"
}
],
"airflow": {
"dagID": "asdfsdfsdfsdf",
"dagFile": "dag.py"
},
"schedule": {
"enabled": false,
"user": "dschoberle",
"start": "Thu, 23 Nov 2023 06:56:44 GMT",
"catchup": true
}
},
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] Search all jobs with executorCores less than 2: % cde job list --filter 'spark.executorCores[lt]2'
API User Password: [
{
"name": "CDEPY_SPARK_JOB",
"type": "spark",
"created": "2023-11-14T23:02:48Z",
"modified": "2023-11-14T23:02:48Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "CDEPY_DEMO"
}
],
"spark": {
"file": "pysparksql.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 1,
"conf": {
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
},
{
"name": "CDEPY_SPARK_JOB_APAC",
"type": "spark",
"created": "2023-11-15T03:33:36Z",
"modified": "2023-11-15T03:33:36Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "CDEPY_DEMO_APAC"
}
],
"spark": {
"file": "pysparksql.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 1,
"conf": {
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
},
] List all runs for job "geospatialRdd": % cde run list --filter 'job[eq]geospatialRdd'
[
{
"id": 21815,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:32:02Z",
"ended": "2023-11-29T00:32:36Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-f542530da24f485da4993338dca81d3c",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-f542530da24f485da4993338dca81d3c/jobs/",
"spec": {
"file": "code/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 4,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
},
{
"id": 21825,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:48:29Z",
"ended": "2023-11-29T00:49:01Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-e5460856fb3a459ba7ee2c748c802d07",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-e5460856fb3a459ba7ee2c748c802d07/jobs/",
"spec": {
"file": "myScripts/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] You can combine multiple filters. Return all job runs from today (11/29/23) i.e. where the start date is greater than or equal to 11/29 and the end date is less than or equal to 11/30. Notice all times default to +00 UTC timezone. % cde run list --filter 'started[gte]2023-11-29' --filter 'ended[lte]2023-11-30'
[
{
"id": 21907,
"job": "ge_data_quality-pauldefusco-banking",
"type": "spark",
"status": "succeeded",
"user": "pauldefusco",
"started": "2023-11-29T02:56:44Z",
"ended": "2023-11-29T02:57:46Z",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "cde_demo_files-pauldefusco-banking"
}
],
"runtimeImageResourceName": "dex-spark-runtime-ge-data-quality-pauldefusco-banking",
"spark": {
"sparkAppID": "spark-8f9d7999056f4b53a01cc2afc5304cca",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-8f9d7999056f4b53a01cc2afc5304cca/jobs/",
"spec": {
"file": "ge_data_quality.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
},
{
"id": 21908,
"job": "data_quality_orchestration-pauldefusco-banking",
"type": "airflow",
"status": "running",
"user": "pauldefusco",
"started": "2023-11-29T03:00:01Z",
"ended": "0001-01-01T00:00:00Z",
"airflow": {
"dagID": "CDE_Demo_pauldefusco-banking",
"dagRunID": "scheduled__2023-11-29T02:55:00+00:00",
"dagFile": "airflow.py",
"executionDate": "2023-11-29T02:55:00Z"
}
},
{
"id": 21909,
"job": "batch_load-pauldefusco-banking",
"type": "spark",
"status": "running",
"user": "pauldefusco",
"started": "2023-11-29T03:00:14Z",
"ended": "0001-01-01T00:00:00Z",
"mounts": [
{
"dirPrefix": "jobCode/",
"resourceName": "cde_demo_files-pauldefusco-banking"
}
],
"runtimeImageResourceName": "dex-spark-runtime-ge-data-quality-pauldefusco-banking",
"spark": {
"sparkAppID": "spark-3d8a4704418841929d325af0e0190a20",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/livy-batch-14907-dyL7LLeM",
"spec": {
"file": "jobCode/batch_load.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] List all successful airflow jobs created by user pauldefusco that started after 3 am UTC on 11/29/23: % cde run list --filter 'type[eq]airflow' --filter 'status[eq]succeeded' --filter 'user[eq]pauldefusco' --filter 'started[gte]2023-11-29T03'
[
{
"id": 21908,
"job": "data_quality_orchestration-pauldefusco-banking",
"type": "airflow",
"status": "succeeded",
"user": "pauldefusco",
"started": "2023-11-29T03:00:01Z",
"ended": "2023-11-29T03:03:01Z",
"airflow": {
"dagID": "CDE_Demo_pauldefusco-banking",
"dagRunID": "scheduled__2023-11-29T02:55:00+00:00",
"dagFile": "airflow.py",
"executionDate": "2023-11-29T02:55:00Z"
}
}
] List all CDE Resources will return all types ("python-env", "files", "custom-runtime-image"): % cde resource list
[
{
"name": "BankingPyEnv",
"type": "python-env",
"status": "pending-build",
"created": "2023-11-07T21:27:16Z",
"modified": "2023-11-07T21:27:16Z",
"retentionPolicy": "keep_indefinitely",
"pythonEnv": {
"pythonVersion": "python3",
"type": "python-env"
}
},
{
"name": "CDEPY_DEMO_APAC",
"type": "files",
"status": "ready",
"signature": "5d216f3c4a10578ffadba415b13022d9e383bc22",
"created": "2023-11-15T03:33:36Z",
"modified": "2023-11-15T03:33:36Z",
"retentionPolicy": "keep_indefinitely"
},
{
"name": "dex-spark-runtime-sedona-geospatial",
"type": "custom-runtime-image",
"status": "ready",
"created": "2023-11-28T23:51:11Z",
"modified": "2023-11-28T23:51:11Z",
"retentionPolicy": "keep_indefinitely",
"customRuntimeImage": {
"engine": "spark3",
"image": "pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003"
}
}
] List all CDE Resources named "myScripts": % cde resource list --filter 'name[eq]myScripts'
[
{
"name": "myScripts",
"type": "files",
"status": "ready",
"signature": "17f820aacdad9bbd17a24d78a5b93cd0ec9e467b",
"created": "2023-11-28T23:31:31Z",
"modified": "2023-11-29T01:48:12Z",
"retentionPolicy": "keep_indefinitely"
}
] List all CDE Resources of type Python Environment: % cde resource list --filter 'type[eq]python-env'
[
{
"name": "BankingPyEnv",
"type": "python-env",
"status": "pending-build",
"created": "2023-11-07T21:27:16Z",
"modified": "2023-11-07T21:27:16Z",
"retentionPolicy": "keep_indefinitely",
"pythonEnv": {
"pythonVersion": "python3",
"type": "python-env"
}
}
] Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-29-2023
04:34 PM
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. CDE Files Resources A resource in Cloudera Data Engineering (CDE) is a named collection of files used by a job. Resources can include application code, configuration files, custom Docker images, and Python virtual environment specifications. Resources are associated with virtual clusters. A resource can be used by multiple jobs, and jobs can use multiple resources.The resource types supported by CDE are files, python-env, and custom-runtime-image. In this article we will walk through some useful commands for operating with CDE Files Resources efficiently. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository in your local machine. Then run the following commands: Create a Files Resource: % cde resource create --name myScripts \
--type files Uplaod multiple files to the same Files Resource: % cde resource upload --name myScripts \
--local-path cde_jobs/spark_geospatial.py \
--local-path cde_jobs/utils.py
3.5KB/3.5KB 100% [==============================================] spark_geospatial.py
4.0KB/4.0KB 100% [==============================================] utils.py Describe Files resource: % cde resource describe --name myScripts
{
"name": "myScripts",
"type": "files",
"status": "ready",
"signature": "08531cbe538858eb20bda5ff1b7567ae4623d885",
"created": "2023-11-28T23:31:31Z",
"modified": "2023-11-28T23:33:32Z",
"retentionPolicy": "keep_indefinitely",
"files": [
{
"path": "spark_geospatial.py",
"signature": "ec91fb5bddfcd16a0bcbe344f229b5e326b759c5",
"sizeBytes": 3529,
"created": "2023-11-28T23:33:32Z",
"modified": "2023-11-28T23:33:32Z"
},
{
"path": "utils.py",
"signature": "aa5a8ea4b4f240183da8bd2d2b354eeaa58fd97a",
"sizeBytes": 3996,
"created": "2023-11-28T23:33:32Z",
"modified": "2023-11-28T23:33:32Z"
}
]
} Create a Files Resource for data: % cde resource create --name myData \
--type files Upload an archive file to the resource: % cde resource upload-archive --name myData \
--local-path data/ne_50m_admin_0_countries_lakes.zip
817.5KB/817.5KB 100% [==============================================] ne_50m_admin_0_countries_lakes.zip Describe resource metadata. Notice that the archive has been unarchived for you: % cde resource describe --name myData
{
"name": "myData",
"type": "files",
"status": "ready",
"signature": "d552dff8fb80a0c7067afa1c4227b29010cce67b",
"created": "2023-11-28T23:35:43Z",
"modified": "2023-11-28T23:36:56Z",
"retentionPolicy": "keep_indefinitely",
"files": [
{
"path": "ne_50m_admin_0_countries_lakes.cpg",
"signature": "663b90c899fa25a111067be0c22ffc64dcf581c2",
"sizeBytes": 5,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.dbf",
"signature": "eec48a122399782bbef02aa8108e99aeaf52e506",
"sizeBytes": 786828,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.prj",
"signature": "308d6355be935e0f7853161b1adda5bcd48188ff",
"sizeBytes": 143,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.README.html",
"signature": "4bec87fbbe5f4e0e18edb3d6a4f10e9e2a705581",
"sizeBytes": 38988,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.shp",
"signature": "57c38f48c5234db925a9fb1b31785250bd7c8d86",
"sizeBytes": 1652200,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.shx",
"signature": "983eba7b34cf94b0cfe8bda8b2b7d533bd233c49",
"sizeBytes": 2036,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.VERSION.txt",
"signature": "9e3d18e5216a7b4dfd402b00a25ee794842b481b",
"sizeBytes": 7,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
}
]
} Create CDE Credentials for accessing your Docker repository: cde credential create --name my-docker-creds \
--type docker-basic \
--docker-server hub.docker.com \
--docker-username pauldefusco Create a CDE Custom Docker Runtime Resource: cde resource create --name dex-spark-runtime-sedona-geospatial \
--image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 \
--image-engine spark3 \
--type custom-runtime-image Notice that the custom image has already been created. If you want to learn more about how to create Custom Docker Resources please visit this Cloudera Community Article Finally, create a job leveraging all three resources above: cde job create --name geospatialRdd \
--type spark \
--mount-1-prefix code/ --mount-1-resource myScripts \
--mount-2-prefix data/ --mount-2-resource myData \
--runtime-image-resource-name dex-spark-runtime-sedona-geospatial \
--packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 \
--application-file code/spark_geospatial.py \
--arg myArg \
--max-executors 4 \
--min-executors 2 \
--executor-cores 2 Notice the following: When using multiple Files resources you should prefix each i.e. "data" and "code". The "data" prefix is used at line 82 ("/app/mount/data") in the "spark_geospatial.py" script in order to access the data from the resource. The "code" prefix is usded in the same CLI command in the application file argument. If you are using any Spark packages you can set these directly at job creation. You can pass one or multiple arguments to the Python script via the --arg argument. The arguments are referenced in the script with the "sys.argv" syntax e.g. line 60 in "geospatial_rdd.py". Run the job: cde job run --name geospatialRdd --executor-cores 4 Notice that at runtime, you can override spark configs that were set at job creation. For example, the "--executor-cores" was originally set to 2 and is now overridden to 4. List job runs filtering by job name: % cde run list --filter 'job[eq]geospatialRdd'
[
{
"id": 21815,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:32:02Z",
"ended": "2023-11-29T00:32:36Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-f542530da24f485da4993338dca81d3c",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-f542530da24f485da4993338dca81d3c/jobs/",
"spec": {
"file": "code/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 4,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] Open Spark UI for respective run: % cde run ui --id 21815 You can delete single files from resources: % cde resource delete-file --name myScripts --resource-path utils.py You can delete the resource: % cde resource delete --name myScripts Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-22-2023
11:47 PM
Objective
This article provides a quickstart for the SparkXML package in Cloudera Data Engineering. You can run the following commands to parse XML files with Spark in Cloudera Data Engineering (CDE).
CDE is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
The CDE is fully integrated with the Cloudera Data Platform (CDP), enabling end-to-end visibility and security with SDX as well as seamless integrations with CDP services such as Data Warehouse and Machine Learning. Data Engineering on CDP powers consistent, repeatable, and automated data engineering workflows on a hybrid cloud platform anywhere
Spark-XML is a library for parsing and querying XML data with Apache Spark, for Spark SQL and DataFrames. Spark Data Engineers use the package in CDE to parse XML files at scale. In the rest of this tutorial, we will run a few commands to demonstrate basic functionality of this package in CDE.
The code is also provided in this git repository.
Requirements
The following are required to reproduce the Demo in your CDE Virtual Cluster:
CDE Service version 1.19 and above with Spark version 3.x (Spark 2 is not compatible with SparkXML).
A Working installation of the CDE CLI. Instructions to install the CLI are provided in Using the Cloudera Data Engineering command line interface.
A working installation of git in your local machine. Please clone this git repository and keep in mind all commands assume they are run in the project's main directory.
No code edits required but familiarity with Python, Spark and XML is recommended.
CDE CLI Steps
Create a CDE Files Resource cde resource create --name files --type files
Upload Files to the Resource cde resource upload --name files --local-path read_xml.py --local-path books.xml
Create a CDE Spark Job cde job create --name sparkxml --application-file read_xml.py --mount-1-resource files --type spark --packages com.databricks:spark-xml_2.12:0.16.0
Run the CDE Spark Job cde job run --name sparkxml
+-----+--------------------+--------------------+---------------+-----+------------+--------------------+
| _id| author| description| genre|price|publish_date| title|
+-----+--------------------+--------------------+---------------+-----+------------+--------------------+
|bk101|Gambardella, Matthew|\n\n\n An...| Computer|44.95| 2000-10-01|XML Developer's G...|
|bk102| Ralls, Kim|A former architec...| Fantasy| 5.95| 2000-12-16| Midnight Rain|
|bk103| Corets, Eva|After the collaps...| Fantasy| 5.95| 2000-11-17| Maeve Ascendant|
|bk104| Corets, Eva|In post-apocalyps...| Fantasy| 5.95| 2001-03-10| Oberon's Legacy|
|bk105| Corets, Eva|The two daughters...| Fantasy| 5.95| 2001-09-10| The Sundered Grail|
|bk106| Randall, Cynthia|When Carla meets ...| Romance| 4.95| 2000-09-02| Lover Birds|
|bk107| Thurman, Paula|A deep sea diver ...| Romance| 4.95| 2000-11-02| Splish Splash|
|bk108| Knorr, Stefan|An anthology of h...| Horror| 4.95| 2000-12-06| Creepy Crawlies|
|bk109| Kress, Peter|After an inadvert...|Science Fiction| 6.95| 2000-11-02| Paradox Lost|
|bk110| O'Brien, Tim|Microsoft's .NET ...| Computer|36.95| 2000-12-09|Microsoft .NET: T...|
|bk111| O'Brien, Tim|The Microsoft MSX...| Computer|36.95| 2000-12-01|MSXML3: A Compreh...|
|bk112| Galos, Mike|Microsoft Visual ...| Computer|49.95| 2001-04-16|Visual Studio 7: ...|
+-----+--------------------+--------------------+---------------+-----+------------+--------------------+
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- description: string (nullable = true)
|-- genre: string (nullable = true)
|-- price: double (nullable = true)
|-- publish_date: string (nullable = true)
|-- title: string (nullable = true)
References
Spark XML Package
Cloudera Data Engineering Documentation
... View more
11-16-2023
11:27 AM
The Cloudera Data Platform (CDP) is a hybrid data platform designed to deliver faster and easier data management, analytics and AI at enterprise scale. Cloudera Machine Learning (CML) is one of CDP’s Cloud Native Data Services designed to enable secure, governed Data Science. Data Scientists and Engineers use CML to securely analyze large amounts of data via interactive notebooks. Since 2022 CML enhances this capability with the Data Connections feature by providing boiler template code to access and push down SQL queries to the CDW service. This allows the user to run large scale queries directly in the Data Warehouse while accessing and visualizing results from a simple notebook with a small consumption footprint. In 2023 CML doubled down with CML Custom Data Connections. Custom Data Connections allow the CML user to create their own boiler template code so they or their collaborators can easily connect to 3rd party tools such as Snowflake, Postgres, legacy on-prem databases (Oracle, MSSQL, MySQL), serverless cloud databases (Redshift, Snowflake, SAP HANA Cloud, BigQuery), APIs, and specialized data stores (Neo4j). This article provides a tutorial on how to create a MySQL Custom Data Connection in your own CML Workspace. The code referenced is provided in this git repository. Data Connections Refresher You can use Data Connections from a CML Session to connect to a CDP environment (CDW or Spark) regardless of the Editor choice by following the below steps: Open your CML Session and click on the “Data” tab at the top right of your screen. Scroll down and select the CDW or Spark Data Lake connection of your choice. Copy and paste the code in your Editor. Run the sample SHOW DATABASES command and create your SQL statements. Learning from the Postgres Custom Data Connection The CML Engineering Team has created two custom data connection templates that you can reuse to connect to Snowflake and Postgres. In this section we will use the Postgres template to learn how to deploy an existing custom connection. Steps to Deploy the Postgres Custom Connection to your Project Create a CML Project by cloning the provided template with this URL: https://github.com/pdefusco/Using_CustomConn_CML.git Notice the mysqlconn and postgresconn folders are included in the project. Open the postgres folder and familiarize yourself with the code in pg-conn.py. This is the source code for the Custom Connection. The PostgresCustomImp class is a child of CustomConnection which is imported from cml.data_v1.customconnection. You must extend this class in order to create a custom connection. The get_base_connection method uses the psycopg2 module to establish a connection to a given Postgres source. The connection requires a hostname, a default port, a database name, a user and password. When implementing your own custom connection you can choose which connection parameters to include as needed. The get_pandas_dataframe method executes the provided SQL via the Pandas read_sql method. The get_cursor method returns a cursor object. This is optional as the SQL command is executed in get_pandas_dataframe. The parameters required to initialize a connection are set manually via the UI (see below). The override_parameters and check_params_or_env methods ensure that these are set correctly. While these methods are also optional, we recommend implementing them. Navigate to the Workspace Site Administration page and open the Data Connections Tab. Click on the “New Connection” icon. Fill out the form as shown below: Name: Custom Postgres Type: Custom Connection Type Display: Postgres Project: select your project Connection Files: select the “postgresconn” folder Custom Parameters: create the following four key value pairs: PG_HOST : hh-pgsql-public.ebi.ac.uk PG_PORT : 5432 PG_DB : pfmegrnargs PG_USER : reader Note: in this example we are connecting to the RNAcentral Public Postgres Database. Please visit this URL for more information: https://rnacentral.org/help/public-database Navigate back to your project. Open the “Project Settings” page and then the “Data Connections” tab. Make the new connection available in the project by clicking on “Sync with Workspace”. Launch a CML Session with JupyterLab as your Editor. A small resource profile without GPUs is ok. There is no need to enable a Spark Runtime Add-On. The Data Connections window will load automatically. Notice the new “Custom Postgres” connection with a reusable template code block. Open the “using_connections” notebook. Notice the code block to connect to the Postgres database has already been prepopulated for you. Normally you would copy and paste from the Data Connections pop up window. Execute the first four cells and validate that query results are output in the notebook (do not run the entire notebook!). Creating a MySQL Custom Data Connection Now that we have connected to Postgres with a provided Custom Connection template we can follow the same steps to create a custom MySQL connection. Open the mysqlconn folder and familiarize yourself with the code. Notice the source code for this connection is in a separate folder. This is required in order to select a folder in the Custom Data Connection creation form in the Workspace Settings. As in the previous example, we create a MySQLCustomImp class which inherits from the CustomConnection interface in the cml.data_v1.customconnection module. This module does not have to be pip installed and is already provided to you in CML by default. The implemented class methods are similar to the Postgres example. Notice that in this case we don’t have a method to return the connection cursor. In addition, we leverage the mysql-connector-python package rather than psycopg2. To use this custom connection, go through the same steps and set the following Custom Parameters: MYSQL_HOST : mysql-rfam-public.ebi.ac.uk MYSQL_PORT : 4497 MYSQL_DB : Rfam MYSQL_USER: rfamro Note: in this example we are connecting to the Rfam MySQL public database. For more information please visit this URL: https://docs.rfam.org/en/latest/database.html Conclusions In this article we highlighted CML Custom Data Connections. In summary: CML is a Cloud Native Platform for Enterprise Machine Learning. The built-in integrations with the Cloudera Data Platform (CDP) allow CML Users to operate in a secure, governed Machine Learning environment at scale. CML Data Connections enhance Data Analysis, Exploration, and Model Experimentation by providing Data Scientists with an easy interface to process large amounts of data from a notebook with minimum compute resources. Custom Data Connections augment this capability by opening access to 3rd party systems such as external RDBMSs or Cloud Vendor Databases such as Snowflake. CML Users are free to implement their own Custom Connections while giving the CML Admins the ability to approve them at the Workspace level. What Custom Data Connection are you using in CML? Please don’t hesitate to comment with your favorite external data sources.
... View more
Labels:
10-25-2023
08:27 PM
Objective
Great Expectations is a Python-based open-source library for validating, documenting, and profiling your data. It helps you maintain data quality and improve communication about data between teams. Software developers have long known that automated testing is essential for managing complex codebases. Great Expectations brings the same discipline, confidence, and acceleration to data science and data engineering teams. (Source: Great Expectations)
CDP Data Engineering is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Data Engineering is fully integrated with the Cloudera Data Platform, enabling end-to-end visibility and security with SDX as well as seamless integrations with CDP services such as data warehouses and machine learning. Data engineering on CDP powers consistent, repeatable, and automated data engineering workflows on a hybrid cloud platform anywhere. (Source: Streamline and operationalize data pipelines securely at any scale.)
Spark Data Engineers use Great Expectations in CDE to enforce data quality standards on enterprise datasets and data engineering pipelines at massive scale. In the rest of this tutorial, we will walk through a basic workflow with Great Expectations and Spark.
All code, dependencies, and CLI commands are available for reuse in this git repository.
Requirements
The following are required to reproduce the demo in your CDE virtual cluster:
CDE Service version 1.19 and above
A working installation of the CDE CLI Instructions to install the CLI are provided here.
A working installation of git on your local machine Please clone the git repository and keep in mind that all commands assume they are run in the project's main directory.
Custom Docker Runtime as CDE Resource
By default, CDE workloads rely on CDE Resources to mount dependencies into a job container. CDE resources can be of three types:"files,, Python environment, and custom Docker untime.
In our case, we will create a custom Docker runtime to ensure Great Expectations and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details.
The below steps have already been executed, and the image is available for reuse in my public Docker registry. If you want, create your own image by personalizing the value in the --name parameter to reflect your username and replacing the value in the --docker-username parameter with your DockerHub username.
docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile docker run it pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile /bin/bash docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality
Next, create CDE Resource Credentials and the CDE Custom Docker Runtime resource:
cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco cde resource create --name dex-spark-runtime-great-expectations-data-quality --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality --image-engine spark3 --type custom-runtime-image
CDE FIles Resources for Job Code
Next, create another CDE Files Resource to store your code and dependencies in your virtual cluster for reuse.
Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these, please contact your CDP administrator. Then upload scripts and dependencies.
cde resource create --name job_code_data_quality cde resource upload --name job_code_data_quality --local-path code/airflow.py --local-path code/batch_load.py --local-path code/great_expectations.py --local-path code/parameters.conf --local-path code/utils.py
Finally, we will create two CDE Spark jobs for the data quality pipeline and one CDE Airflow job to orchestrate it. Before doing so, open code/great_expectations.py and familiarize yourself with the code. Here are some of the most important points about the job:
We create a synthetic banking dataset with the Faker and dbldatagen libraries. This gives us 100,000 banking transactions with transaction amount, currency, account numbers, routing numbers, and much more banking data. The data columns are of different types, including numerical, timestamp, and categorical. Review the utils.py script to learn more details.
FakerTextUS = FakerTextFactory(locale=['en_US'], providers=[bank]) partition parameters, etc. self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested) fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested)) .withColumn("name", percentNulls=0.1, text=FakerTextUS("name") ) .withColumn("address", text=FakerTextUS("address" )) .withColumn("email", text=FakerTextUS("ascii_company_email") ) .withColumn("aba_routing", text=FakerTextUS("aba" )) .withColumn("bank_country", text=FakerTextUS("bank_country") ) .withColumn("account_no", text=FakerTextUS("bban" )) .withColumn("int_account_no", text=FakerTextUS("iban") ) .withColumn("swift11", text=FakerTextUS("swift11" )) .withColumn("credit_card_number", text=FakerTextUS("credit_card_number") ) .withColumn("credit_card_provider", text=FakerTextUS("credit_card_provider") ) .withColumn("event_type", "string", values=["purchase", "cash_advance"],random=True) .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00, end="2020-12-31 23:59:00",interval="1 minute", random=True ) .withColumn("longitude", "float", minValue=-180, maxValue=180, random=True) .withColumn("latitude", "float", minValue=-90, maxValue=90, random=True) .withColumn("transaction_currency", values=["USD", "EUR", "KWD", "BHD", "GBP", "CHF", "MEX"]) .withColumn("transaction_amount", "decimal", minValue=0.01, maxValue=30000, random=True) ) df = fakerDataspec.build()
In ge_data_quality.py, the "SparkDFDataset" class is imported from the "great_expectations.dataset.sparkdf_dataset" module. This allows you to wrap a Spark Dataframe and make it compatible with the Great Expectations API
gdf = SparkDFDataset(df)
From the GE Documentation, "Expectations are declarative statements and the interface to the Great Expectations library, supporting your validation, profiling, and translation. Expectations are implemented as classes; some are in the core library, with many others contributed by community members." In the script, we implemented a few wrapper methods for a variety of built-in expectations. For example, the "expect_column_to_exist" expectation checks for column existence.
assert gdf.expect_column_to_exist(column).success, Column {column} is not found."
The GE Gallery offers a constantly growing number of expectations to test for different conditions on different column value types. For example, in the script, we use expectations to check for nulls for all columns: longitude and latitude min and max for transactions; mean and standard deviation for transaction amounts; email string validity with REGEX; and finally, whether the values in the categorical Currency column match or are contained in a provided test set. Here are some more examples that can be found in the ge_data_quality.py script:
# EXPECTATION ON ALL COLUMNS Ensure the existence of all mandatory columns. def run_existance_expactation(gdf, MANDATORY_COLUMNS): for the column in MANDATORY_COLUMNS: try: assert gdf.expect_column_to_exist(column).success, Column {column} is not found." print(f"Column {column} is found") except Exception as e: print(e)
# EXPECTATION ON NUMERIC COLUMN # Ensure minimum longitude is between -180 and 180 def run_longitude_min_expectation(gdf): try: test_result = gdf.expect_column_min_to_be_between(column="longitude", min_value=-180, max_value=180).success, f"Min for column longitude is not within expected range\n" assert test_result.success, f"Min for column longitude is within expected range\n" except Exception as e: print(e)
# EXPECTATION ON STRING COLUMNS Use REGEX to ensure email is in correct format def run_email_match_regex_expectation(gdf): try: test_result = gdf.expect_column_values_to_match_regex(column="email", regex="^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$").success, f"Values for column Email are not within REGEX pattern." assert test_result.success, f"Values for column Email are within REGEX Pattern\n" except Exception as e: print(e)
Please review the ge_data_quality.py script to find more examples.
Again, the idea is to organize tests in a clear, reliable, and reusable manner. When combined with the modularity of the CDE Job and Dependency construct, using Great Expectations and Spark allows you to structure your data pipelines under clear data quality rules.
Finally, the Airflow Job is scheduled to run every five minutes by default. Therefore, there is no need to manually run any of the above jobs. Run the below commands to create the jobs. Then open the CDE Job Runs page to validate the pipeline.
cde job create --name batch_load --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/batch_load.py cde job create --name data_quality --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/great_expectations.py cde job create --name data_quality_orchestration --type airflow --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --dag-file airflow.py
Summary
With CDE Runtimes, you can choose any data quality package of your choice. This article in particular showcased a simple data quality pipeline with Great Expectations, a leading open source package for testing and validating data at scale. You can easily leverage GE at scale with Spark in Cloudera Data Engineering in order to complement Great Expectations' reusable declarative expectations with advanced Spark and Airflow Job observability, monitoring, and development capabilities provided by CDE.
... View more
10-23-2023
06:29 PM
Objective CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams. Apache Sedona™ (formerly known as "GeoSpark") is a cluster computing system for processing large-scale spatial data. Sedona extends existing cluster computing systems, such as Apache Spark and Apache Flink, with a set of out-of-the-box distributed Spatial Datasets and Spatial SQL that efficiently load, process, and analyze large-scale spatial data across machines. Sedona jobs can run in CDE with minor configuration changes. This article will present a simple workflow to help you get started with Spark Geospatial use cases in CDE. All code, dependencies and CLI commands are available for reuse in this git repository. Requirements The following are required to reproduce the Demo in your CDE Virtual Cluster: CDE Service version 1.19 and above A Working installation of the CDE CLI. Instructions to install the CLI are provided here. A working installation of git in your local machine. Please clone this git repository and keep in mind all commands assume they are run in the project's main directory. Custom Docker Runtime as CDE Resource By default, CDE workloads rely on CDE Resources to mount dependencies into a Job container. CDE Resources can be of three types" Files, Python Environment, and Custom Docker Runtime. In our case we will create a Custom Docker Runtime to ensure Sedona and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details. The below steps have already been executed and the image is available for reuse in my public Docker registry. If you want, create your own image by replacing my Docker username and running the following commands from your terminal: docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-002 . -f Dockerfile
docker run -it docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 . -f Dockerfile /bin/bash
docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 Next, create CDE Resource Credentials and the CDE Custom Docker Runtime resource: cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco cde resource create --name dex-spark-runtime-sedona-geospatial-pauldefusco --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 --image-engine spark3 --type custom-runtime-image CDE FIles Resources for Datasets Sedona is compatible with many geospatial file formats. In our example we will load geospatial countries data to a CDE Files Resource so it can be read by the CDE Spark Job. The "cde resource upload-archive" command allows you to bulk upload and extract a zipped folder: cde resource create --name countries_data cde resource upload-archive --name countries_data --local-path data/ne_50m_admin_0_countries_lakes.zip CDE Files Resources for Job Code Next, create another CDE Files Resource to store your code and dependencies in your Virtual Cluster for reuse. Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these please contact your CDP Administrator. cde resource create --name job_code cde resource upload --name job_code --local-path code/geospatial_joins.py --local-path code/geospatial_rdd.py --local-path code/parameters.conf --local-path code/utils.py Geospatial RDD Spark Job in CDE The Sedona RDD allows you to store geospatial data with custom data types in a Spark RDD. You can then use these for performing geospatial queries. In this section we will create a CDE Spark Job to manipulate Sedona RDD's. First create the CDE Spark Job with the following command. Notice that the CDE CLI allows you to specify multiple CDE Files Resources along with a prefix. The prefix can be used within the Python script to reference data or files in one or more CDE Files Resources. In this CLI command we reference the job_code and countries_data Files Resources to respectively locate the script file and access the data in geospatial format: Also notice that the CDE Custom Docker Runtime is referenced with the "runtime-image-resource-name" parameter. Finally, at time of this writing Sedona 1.5 is the latest version and, similarly to a spark-submit, we use the packages parameter to load the Maven packages into our CDE Spark Job. cde job create --name geospatial_rdd --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 --application-file jobCode/geospatial_rdd.py Next, open "geospatial_rdd.py" in your editor. Familiarize yourself with the code and notice the following: The SparkSession is created and then passed to the SedonaContext object. From then on, all Spark SQL queries are running in the SedonaContext. spark = SparkSession \ .builder \ .appName("IOT DEVICES LOAD") \ .config("spark.kubernetes.access.hadoopFileSystems", data_lake_name)\ .getOrCreate() config = SedonaContext.builder().getOrCreate() sedona = SedonaContext.create(spark) sc = sedona.sparkContext sc.setSystemProperty("sedona.global.charset", "utf8") Data is read from the CDE Files Resource using the CDE CLI Alias. A SpatialRDD is created with it: countries_rdd = ShapefileReader.readToGeometryRDD(sc, "/app/mount/countriesData") Sedona allows you to store data in Cloud Storage in GeoJSON format: countries_rdd.saveAsGeoJSON(data_lake_name + geoparquetoutputlocation + "/countries.json") A Spark Dataframe can be created from a SpatialRDD using the Sedona Adapter: countries_df = Adapter.toDf(countries_rdd, sedona) You can generate Geospatial POINT data with your favorite util as shown in util.py. The data can then be transformed into Sedona POINT data using Sedona SQL: dg = DataGen(spark, username) iot_points_df = dg.iot_points_gen(row_count = 100000, unique_vals=100000) iot_points_df.createOrReplaceTempView("iot_geo_tmp") iot_points_geo_df = sedona.sql("SELECT id, device_id, manufacturer, event_type, event_ts, \ ST_Point(CAST(iot_geo_tmp.latitude as Decimal(24,20)), \ CAST(iot_geo_tmp.longitude as Decimal(24,20))) as arealandmark \ FROM iot_geo_tmp") Sedona allows you to spatially partition and index your data: iotSpatialRDD.spatialPartitioning(GridType.KDBTREE) We can use the SpatialRDD to run geospatial queries on our data; for example, we calculate the distance between each POINT object and a predefined point in our query e.g. 52-21: iotSpatialRDD.rawSpatialRDD.map(lambda x: x.geom.distance(Point(21, 52))).take(5) Finally, we run a KNN query on the same data to obtain the k number of POINT objects that lay closes to the provided POINT object: result = KNNQuery.SpatialKnnQuery(iotSpatialRDD, Point(-84.01, 34.01), 5, False) Run the Job with the CLI: cde job run --name geospatial_rdd --executor-cores 2 --executor-memory "4g" Geospatial Join Spark Job in CDE Similarly to above, create a "geospatial_joins" CDE Spark Job using the CLI: cde job create --name geospatial_joins --application-file jobCode/geospatial_joins.py --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code --mount-2-prefix countriesData/ --mount-2-resource countries_data --runtime-image-resource-name dex-spark-runtime-sedona-geospatial-pauldefusco --packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 Then open "geospatial_joins.py"; notice that the synthetic IOT data is joined with the countries data loaded from a CDE Files resource. The data is loaded from Cloud Storage in GeoJSON format: from sedona.core.formatMapper import GeoJsonReader geo_json_file_location = data_lake_name + geoparquetoutputlocation + "/iot_spatial.json" saved_rdd_iot = GeoJsonReader.readToGeometryRDD(sc, geo_json_file_location) Using Sedona SQL the data is joined using the "ST_Contains" spatial predicate. Sedona allows us to join the two tables by matching POINT objects from the IOT dataset located within the boundaries of the POLYGON objects used to define country shapes in the Countries dataset: GEOSPATIAL_JOIN = """ SELECT c.geometry as country_geom, c.NAME_EN, a.geometry as iot_device_location, a.device_id FROM COUNTRIES_{0} c, IOT_GEO_DEVICES_{0} a WHERE ST_Contains(c.geometry, a.geometry) """.format(username) result = sedona.sql(GEOSPATIAL_JOIN) Run the Job with the CLI: cde job run --name geospatial_joins --executor-cores 2 --executor-memory "4g" Summary This article showcased a simple geospatial use case with Apache Sedona. You can easily run Sedona Spark jobs in Cloudera Data Engineering to run Geospatial Spark Jobs at Scale.
... View more
- « Previous
-
- 1
- 2
- Next »