Member since
11-22-2019
29
Posts
10
Kudos Received
0
Solutions
03-27-2024
04:42 PM
CML Model Deployment with MLFlow and APIv2 Cloudera Machine Learning (CML) on Cloudera Data Platform (CDP) accelerates time-to-value by enabling data scientists to collaborate in a single unified platform that is all inclusive for powering any AI use case. Purpose-built for agile experimentation and production ML workflows, Cloudera Machine Learning manages everything from data preparation to MLOps, to predictive reporting. CML exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line. In this example we will showcase how to use APIv2 to programmatically register an XGBoost experiment via MLFlow Tracking, Registry, and deploy it as a model endpoint in CML. MLOps in CML CML has extensive MLOps features and a set of model and lifecycle management capabilities to enable the repeatable, transparent, and governed approaches necessary for scaling model deployments and ML use cases. It´s built to support open source standards and is fully integrated with CDP, enabling customers to integrate into existing and future tooling while not being locked into a single vendor. CML enables enterprises to proactively monitor technical metrics such as service level agreements (SLA) adherence, uptime, and resource use as well as prediction metrics including model distribution, drift, and skew from a single governance interface. Users can set custom alerts and eliminate the model “black box” effect with native tools for visualizing model lineage, performance, and trends. Some of the benefits with CML include: Model cataloging and lineage capabilities to allow visibility into the entire ML lifecycle, which eliminates silos and blind spots for full lifecycle transparency, explainability, and accountability. Full end-to-end machine learning lifecycle management that includes everything required to securely deploy machine learning models to production, ensure accuracy, and scale use cases. An extensive model monitoring service designed to track and monitor both technical aspects and accuracy of predictions in a repeatable, secure, and scalable way. New MLOps features for monitoring the functional and business performance of machine learning models such as detecting model performance and drift over time with native storage and access to custom and arbitrary model metrics; measuring and tracking individual prediction accuracy, ensuring models are compliant and performing optimally. The ability to track, manage, and understand large numbers of ML models deployed across the enterprise with model cataloging, full lifecycle lineage, and custom metadata in Apache Atlas. The ability to view the lineage of data tied to the models built and deployed in a single system to help manage and govern the ML lifecycle. Increased model security for Model REST endpoints, which allows models to be served in a CML production environment without compromising security. Use Case In this example we will create a basic MLOps pipeline to put a credit card fraud classifier into production. We will create a model prototype with XGBoost, register and manage experiments with MLFlow Tracking, and stage the best experiment run in the MLFlow Registry. Next, we will deploy the model from the Registry into an API Endpoint, and finally redeploy it with additional resources for High Availability and increased serving performance. The full code is available in this git repository. Step by Step Guide Setup Create a CML Project with Python 3.9 / Workbench Editor Runtime. Launch a CML Session and install requirements. Open script "00_datagen.py" and update lines 140 and 141 with your Iceberg database name and Spark Data Connection Name. Then run it. Script 1: Create the Model Experiment Run script "01_train_xgboost.py" in order to create an MLFlow Experiment. Code highlights: MLFlow is installed in CML by default. You must import mlflow in order to use it in your script. The experiment run is determined by the "mlflow tracking run context". When this executes for the first time an experiment is created. If the same code runs again without changing EXPERIMENT_NAME, a new Experiment Run is logged for the same experiment. Else, a new experiment is created. You can log one or multiple metrics for the specific run with the "mlflow.log_param()" method. Model artifacts such as useful metadata and dependencies are logged with the "mlflow.log_model()" method. import mlflow
EXPERIMENT_NAME = "xgb-cc-fraud-{0}-{1}".format(USERNAME, DATE)
mlflow.set_experiment(EXPERIMENT_NAME)
with mlflow.start_run():
model = XGBClassifier(use_label_encoder=False, eval_metric="logloss")
model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
mlflow.log_param("accuracy", accuracy)
mlflow.xgboost.log_model(model, artifact_path="artifacts") You can use the mlflow library or instantiate an mlflow client to manage experiments. In this example we use the "mlflow.get_experiment_by_name()", "mlflow.search_runs()" and "mlflow.get_run()" methods. In this example we also instantiate the client to list artifacts for a specific run. def getLatestExperimentInfo(experimentName):
"""
Method to capture the latest Experiment Id and Run ID for the provided experimentName
"""
experimentId = mlflow.get_experiment_by_name(experimentName).experiment_id
runsDf = mlflow.search_runs(experimentId, run_view_type=1)
experimentId = runsDf.iloc[-1]['experiment_id']
experimentRunId = runsDf.iloc[-1]['run_id']
return experimentId, experimentRunId
experimentId, experimentRunId = getLatestExperimentInfo(EXPERIMENT_NAME)
#Replace Experiment Run ID here:
run = mlflow.get_run(experimentRunId)
pd.DataFrame(data=[run.data.params], index=["Value"]).T
pd.DataFrame(data=[run.data.metrics], index=["Value"]).T
client = mlflow.tracking.MlflowClient()
client.list_artifacts(run_id=run.info.run_id) Script 2: Register the Model Experiment Run script "02_cml_api_endpoint.py" in order to register an MLFlow Experiment. Code highlights: CML APIv2 is installed in your workspace by default. You must import cmlapi in order to use it in your script. The API provides about 100 python methods for MLOps. In this example, we created a "registerModelFromExperimentRun" method as a wrapper to the API's create_registered_model() method. In this example, we created a ModelRegistration class including the "registerModelFromExperimentRun" method to register the model. Creating your own Python classes and methods to implement the API methods in the context of your project is highly recommended. import cmlapi
from cmlapi.rest import ApiException
class ModelRegistration():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, username, experimentName):
self.client = cmlapi.default_client()
self.username = username
self.experimentName = experimentName
def registerModelFromExperimentRun(self, modelName, experimentId, experimentRunId, modelPath):
"""
Method to register a model from an Experiment Run
Input: requires an experiment run
Output: api_response object
"""
model_name = 'xgb-cc-' + username
CreateRegisteredModelRequest = {
"project_id": os.environ['CDSW_PROJECT_ID'],
"experiment_id" : experimentId,
"run_id": experimentRunId,
"model_name": modelName,
"model_path": modelPath
}
try:
# Register a model.
api_response = self.client.create_registered_model(CreateRegisteredModelRequest)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_registered_model: %s\n" % e)
return api_response The response from the method request contains very useful information. In this example, the registeredModelResponse response includes modelId and modelVersionId variables which are in turn used by other API methods. modelReg = ModelRegistration(username, experimentName)
modelPath = "artifacts"
modelName = "FraudCLF-" + username
registeredModelResponse = modelReg.registerModelFromExperimentRun(modelName, experimentId, experimentRunId, modelPath)
modelId = registeredModelResponse.model_id
modelVersionId = registeredModelResponse.model_versions[0].model_version_id
registeredModelResponse.model_versions[0].model_version_id Script 3: Deploy Endpoint from Registry Model Run script "03_api_deployment.py" in order to create a Model Endpoint from the registered model. Code highlights: In this example we created a ModelDeployment class to manage multiple API wrapper methods. The "listRegisteredModels()" method is a wrapper to the API's "list_registered_models()" method. Notice it is arbitrarily preconfigured to list models corresponding to your user and model name. In the context of a broader MLOps pipeline, these values can obviously be parameterized. This method is necessary for obtaining the "registeredModelId" variable needed for model deployment. The "getRegisteredModel()" method is a wrapper to the API's "get_registered_model()" method. This method is necessary for obtaining the "modelVersionId" variable needed for model deployment. Once registeredModelId and modelVersionId are obtained, you can begin the deployment. The deployment consists of three phases: model creation, model build, and model deployment. The model creation corresponds to the creation of an API Endpoint. Once you run this, you will see a new entry in the Model Deployments tab. The model build corresponds to the creation of the model's container. Thanks to MLFlow Registry, CML automatically packages all dependencies used to train the Experiment into the model endpoint for you. The model deployment corresponds to the activation of the model endpoint. This is when the container with its associated resource profile and endpoint is actually deployed so inference can start. The "listRuntimes()" method is an example of querying the Workspace for all available runtimes in order to select the most appropriate for model build. class ModelDeployment():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, projectId, username):
self.client = cmlapi.default_client()
self.projectId = projectId
self.username = username
def listRegisteredModels(self):
"""
Method to retrieve registered models by the user
"""
#str | Search filter is an optional HTTP parameter to filter results by. Supported search_filter = {\"model_name\": \"model_name\"} search_filter = {\"creator_id\": \"<sso name or user name>\"}. (optional)
search_filter = {"creator_id" : self.username, "model_name": "FraudCLF-"+self.username}
search = json.dumps(search_filter)
page_size = 1000
try:
# List registered models.
api_response = self.client.list_registered_models(search_filter=search, page_size=page_size)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_registered_models: %s\n" % e)
return api_response
def getRegisteredModel(self, modelId):
"""
Method to return registered model metadata including model version id
"""
search_filter = {"creator_id" : self.username}
search = json.dumps(search_filter)
try:
# Get a registered model.
api_response = self.client.get_registered_model(modelId, search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->get_registered_model: %s\n" % e)
return api_response
def createModel(self, projectId, modelName, registeredModelId, description = "Fraud Detection 2024"):
"""
Method to create a model
"""
CreateModelRequest = {
"project_id": projectId,
"name" : modelName,
"description": description,
"registered_model_id": registeredModelId,
"disable_authentication": True
}
try:
# Create a model.
api_response = self.client.create_model(CreateModelRequest, projectId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model: %s\n" % e)
return api_response
def createModelBuild(self, projectId, modelVersionId, modelCreationId, runtimeId):
"""
Method to create a Model build
"""
# Create Model Build
CreateModelBuildRequest = {
"registered_model_version_id": modelVersionId,
"runtime_identifier": runtimeId,
"comment": "invoking model build",
"model_id": modelCreationId
}
try:
# Create a model build.
api_response = self.client.create_model_build(CreateModelBuildRequest, projectId, modelCreationId)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_build: %s\n" % e)
return api_response
def createModelDeployment(self, modelBuildId, projectId, modelCreationId):
"""
Method to deploy a model build
"""
CreateModelDeploymentRequest = {
"cpu" : "2",
"memory" : "4"
}
try:
# Create a model deployment.
api_response = self.client.create_model_deployment(CreateModelDeploymentRequest, projectId, modelCreationId, modelBuildId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_deployment: %s\n" % e)
return api_response
def listRuntimes(self):
"""
Method to list available runtimes
"""
search_filter = {"kernel": "Python 3.9", "edition": "Standard", "editor": "Workbench"}
# str | Search filter is an optional HTTP parameter to filter results by.
# Supported search filter keys are: [\"image_identifier\", \"editor\", \"kernel\", \"edition\", \"description\", \"full_version\"].
# For example: search_filter = {\"kernel\":\"Python 3.7\",\"editor\":\"JupyterLab\"},. (optional)
search = json.dumps(search_filter)
try:
# List the available runtimes, optionally filtered, sorted, and paginated.
api_response = self.client.list_runtimes(search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_runtimes: %s\n" % e)
return api_response Once you have created your model endpoint, give it a minute and then try a test request: {"dataframe_split":
{"columns": ["age", "credit_card_balance", "bank_account_balance", "mortgage_balance", "sec_bank_account_balance", "savings_account_balance", "sec_savings_account_balance", "total_est_nworth", "primary_loan_balance", "secondary_loan_balance", "uni_loan_balance", "longitude", "latitude", "transaction_amount"],
"data":[[35.5, 20000.5, 3900.5, 14000.5, 2944.5, 3400.5, 12000.5, 29000.5, 1300.5, 15000.5, 10000.5, 2000.5, 90.5, 120.5]]}} Script 4: Endpoint Redeployment Run script "04_api_redeployment.py" in order to create a new model deployment with increased resources. Code highlights: A slightly different version of the ModelDeployment class is implemented. This includes the "get_latest_deployment_details()" as an example of creating a wrapper method to the API's "list_models()" and "list_model_deployments()" methods all in one. You can implement your own methods in a similar fashion as best needed in the context of your MLOps pipeline. Once the latest model deployment's metadata has been obtained in one go, a new model build is created with additional CPU, Memory and Replicas. Notice that in the process you also have the ability to switch to a different runtime as needed. deployment = ModelDeployment(projectId, username)
getLatestDeploymentResponse = deployment.get_latest_deployment_details(modelName)
listRuntimesResponse = deployment.listRuntimes()
listRuntimesResponse
runtimeId = 'docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-workbench-python3.9-standard:2024.02.1-b4' # Copy a runtime ID from previous output
cpu = 2
mem = 4
replicas = 2
createModelBuildResponse = deployment.createModelBuild(projectId, modelVersionId, modelCreationId, runtimeId, cpu, mem, replicas)
modelBuildId = createModelBuildResponse.id
deployment.createModelDeployment(modelBuildId, projectId, modelCreationId) Summary and Next Steps Cloudera Machine Learning exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line. CML's API accelerates your data science projects by allowing you to build end to end pipelines programmatically. When coupled with CML MLFlow Tracking and MLFlow Registry, it can be used to manage models from inception to production. APIv2 Documentation APIv2 Examples APIv2 AMP Registering and Deploying Models with Model Registry Securing Models CML Projects CML Runtimes Introducing MLOps and SDX for Models in CML Model Registry GA in CML
... View more
Labels:
02-28-2024
01:56 PM
1 Kudo
Objective CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch and Streaming Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. CDE Sessions are short-lived development environment used by Data Engineers and Analysts in order to iterate upon and build Spark workloads, prototype pipelines, or simply explore data. You can use CDE Sessions in CDE Virtual Clusters of type "All Purpose - Tier 2". In this example we will use a CDE Session to perform basic Cloud Storage File System operations with the Hadoop API. The API was originally used to perform file system operations in HDFS but is still used to interact with S3 and ADLS in the Public Cloud. Requirements The only requirement for this exercise is a CDE Virtual Cluster in a CDE Service on version 1.19+. Version 1.20+ is recommended. Copying the following commands in a CDE Session is recommended, although you can also run the same code in a CDE Job of type Spark. Step by Step Instructions Preload some data in a cloud storage location of your choice. Then, specify the path as a Python variable. In this example we use two locations to read and write data with AWS S3. >>> readlocation = "s3a://go01-demo/mycsvfile.csv"
>>> writelocation = "s3a://go01-demo/newdir/myfile" Read the file in a Spark Dataframe: >>> df=spark.read.csv(readlocation) Obtain the configured FileSystem implementation: >>> myPath = "s3a://go01-demo/newdir"
>>> sc=df.rdd.context
>>> hadoop = sc._jvm.org.apache.hadoop
>>> hdfs = hadoop.fs.FileSystem.get(sc._jvm.java.net.URI.create(myPath), sc._jsc.hadoopConfiguration()) Write the dataframe as a file in Cloud Storage: >>> df.write.mode("overwrite").parquet(writelocation) List the contents of the directory: >>> status = hdfs.listStatus(hadoop.fs.Path("s3a://go01-demo/newdir"))
>>> for fileStatus in status:
print(fileStatus.getPath())
s3a://go01-demo/newdir/myfile List the contents of a directory with filter pattern: >>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
print(fileStatus.getPath())
s3a://go01-demo/test_file.csv Rename file: >>> hdfs.rename(hadoop.fs.Path("s3a://go01-demo/test_file.csv"), hadoop.fs.Path("s3a://go01-demo/test_file_2.csv"))
True
>>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
print(fileStatus.getPath())
s3a://go01-demo/test_file_2.csv Delete file: >>> hdfs.delete(hadoop.fs.Path("s3a://go01-demo/test_file_2.csv"), True)
True
>>> status = hdfs.globStatus(hadoop.fs.Path("s3a://go01-demo/*.csv"))
>>> for fileStatus in status:
print(fileStatus.getPath()) Create a subdirectory: >>> hdfs.mkdirs(hadoop.fs.Path("s3a://go01-demo/newdir/newsubdir"))
True References The full API Method list is located at this link. Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
12-13-2023
06:02 PM
CDE is the Cloudera Data Engineering Service, a containerized managed service for the Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow, and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications and less time on infrastructure.
CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs.
Cloudera Data Engineering (CDE) provides a command-line interface (CLI) client. The CDE CLI doesn't just allow you to create CDE Spark Jobs with syntax that is nearly identical to that of a Spark Submit.
The CDE CLI also offers you invaluable advantages as part of your daily Spark Pipeline development and management activities, including enhancing observability, reusability, and overall organization of Spark dependencies and configurations.
Simply put, if you're creating and managing many Spark Submits, the CDE CLI will save you time and dramatically increase your productivity.
In this article, you will learn about different options for parameterizing a CDE Spark Job i.e. to pass Spark Configurations, Options, Files and Arguments.
Requirements
The following are required in order to reproduce these commands in your CDE environment:
A CDE Service on version 1.19.0 or above.
A working installation of the CDE CLI. Please follow these instructions to install the CLI.
CDE Spark Jobs and Spark Configurations
The CDE Spark Job is a reusable definition of a Spark Application consisting of its code, and file, Docker, and Python dependencies, and Spark configurations and options.
Once a CDE Spark Job is created, its definition is accessible in the CDE UI or via the CDE CLI.
cde job create --name myJob\
--type Spark\
--application-file myScript.py\
--mount-1-resource myFilesResource
More importantly, it is not yet run. The Application is actually executed with a separate command.
cde job run --name myJob
Generally, parameterizing a CDE Spark Jobs allows you to dynamically set important configurations at runtime i.e. override default parameters based on some changing external input.
Before moving on, let us review the different types of parameters that can be set in a CDE Spark Job:
1. Spark Application File
This is the PySpark script or Jar containing the Application code. In CDE you must use the --application-file flag and you can only set this at CDE Spark Job creation time.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource cde job create --name myScalaJob
--type spark
--application-file myJar.jar
--mount-1-resource myFilesResource
The files can only be found by CDE if they are present in a Files resource. As shown above, the Files resource is then set with the --mount-N-resource flag. More on that in the end to end example section.
2. Spark Resources
These include basic job resources such as executor and driver memory and cores, initial number of executors, min and max executors for Spark Dynamic Allocation.
They can be set both at CDE Spark Job creation as well as runtime. Each of these has a default value which is chosen unless specified by your CLI command.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--executor-cores 4\
--executor-memory "2g" cde job run --name myPySparkJob
--executor-cores 4
--driver-cores 2
The above job will run with the following resource configurations:
--executor-cores 4 --executor-memory "2g" and --driver-cores 2
3. Spark Configurations
Spark offers a wide range of configurations ranging from Python version to memory options, join and dynamic allocation thresholds, and much more.
These can be set via
--conf flag e.g. --conf spark.executor.memoryOverhead=6g or --conf spark.pyspark.python=python3 or --conf spark.yarn.maxAppAttempts=1.
These can also be ovverriden at runtime.
Examples:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--conf spark.executor.memoryOverhead=6g cde job run --name myPySparkJob\
--conf spark.executor.memoryOverhead=2g
In the above example, the "memoryOverhead" setting is overriden to "2g".
4. Command Line Arguments
These are specific variable inputs that lend themselves particularly well to being overridden at runtime.
They are defined with the --arg flag in the CDE Spark Job definition, and require being read in Spark Application code via the Python sys.argv module.
For example, a CDE Spark Job will include the --arg 10 argument from the CLI in order for the value of "10" to be utilized as part of the Spark Application code.
Example CDE CLI command to create the CDE Spark Job:
cde job create --name myPySparkJob\
--type spark\
--application-file myScript.py\
--mount-1-resource myFilesResource\
--conf spark.executor.memoryOverhead=6g\
--arg 10
Example CDE Spark Job application code rerefencing the argument:
import sys
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("CLIArguments")\
.getOrCreate()
print("JOB ARGUMENT")
print(sys.argv[1])
You can override CLI arguments at runtime. The following example sets the input to 5 rather than 10:
cde job run --name myPySparkJob --arg 5
5. Files
These can be set via the --file or --py-files options and allow you to specify file dependencies at CDE Spark Job creation.
More importantly, the funtionality provided by these flags is enhanced by CDE Files Resources and we using them instead is generally recommended.
CDE Files Resources are unique to CDE and allow you to pass and manage entire files as CDE Spark Job dependencies.
You can reference these files within Spark Application code via the /app/mount prefix.
Example:
cde job create --name penv-udf-test --type spark \
--mount-1-prefix appFiles/ \
--mount-1-resource cde-cli-files-resource \
--python-env-resource-name cde-cli-penv-resource \
--application-file /app/mount/appFiles/deps_udf_dependency.py \
--conf spark.executorEnv.PYTHONPATH=/app/mount/appFiles/deps_udf.zip \
--py-file /app/mount/appFiles/deps_udf.zip
Here, the "--py-file" flag is used to set in order to distribute dependencies that have been compressed and then loaded in a CDE Files Resource.
Notice that unlike in a Spark Submit, you cannot use --file to specify the application code and you must use the --application-file flag as shown in example 1.
CDE Files Resources are mounted on the Spark Application pod at CDE Job runtime. Thanks to this, you can also reference a file from within your Spark Application code by using the ConfigParser module and referencing the "/app/mount" directory.
Example properties file "parameters.conf" that has been loaded in the CDE Files Resource referenced by the CDE Spark Job:
[general]
property_1: s3a://go01-demo/
property_2: datalake/
Example PySpark application code referencing the "parameters.conf" file:
from pyspark.sql import SparkSession
import configparser
config = configparser.ConfigParser()
config.read('/app/mount/parameters.conf')
data_lake_name=config.get("general","property_1")
file_path=config.get("general","property_2")
spark = SparkSession.\
builder.\
appName('INGEST').\
config("spark.kubernetes.access.hadoopFileSystems", data_lake_name).getOrCreate()
cloudPath=data_lake_name+file_path
myDf = spark.read.csv(cloudPath + "/myCsv.csv", header=True, inferSchema=True)
myDf.show()
End to End CDE Spark Job Workflow Example
Now that you have gained exposure to the different options we will utilize these in the context of a simplified CDE Spark Job creation workflow that will show how to actually implement the above:
Create a CDE Files Resource in order to host all Application code and file dependencies.
Upload Application code and file dependencies to the CDE Files Resource in order to make those accessible to CDE Spark Jobs in the future.
Create the CDE Spark Job by referencing Application code, file dependencies and other Spark configurations in order to then:
Run the CDE Spark Job with either no additional configurations or by overriding configurations in order to execute your Spark Application.
1. Create a CDE Files Resource
cde resource create --name myProperties
2. Upload Application Code and File Dependencies
cde resource upload --name myProperties\
--local-path cde_jobs/propertiesFile_1.conf\
--local-path cde_jobs/propertiesFile_2.conf\
--local-path cde_jobs/sparkJob.py
3. Create CDE Spark Job
cde job create --name myPySparkJob\
--type spark\
--application-file sparkJob.py\
--mount-1-resource myProperties\
--executor-cores 2\
--executor-memory "2g"
4. Run the CDE Spark Job with Different Options
Example 1: Run the job with two CLI arguments and read properties file 1
cde job run --name myPySparkJob\
--arg MY_DB\
--arg CUSTOMER_TABLE\
--arg propertiesFile_1.conf
Example 2: Run the job with two CLI arguments and read properties file 2
cde job run --name myPySparkJob\
--arg MY_DB\
--arg SALES_TABLE\
--arg propertiesFile_2.conf
Application code in sparkJob.py:
import sys
from pyspark.sql import SparkSession
import configparser
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.getOrCreate()
print("JOB ARGUMENTS")
print(sys.argv)
print(sys.argv[0])
print(sys.argv[1])
print(sys.argv[2])
print(sys.argv[3])
dbname = sys.argv[1]
tablename = sys.argv[2]
config = configparser.ConfigParser()
config.read('/app/mount/{}.properties'.format(sys.argv[3]))
property_1=config.get("general","property_1")
property_2=config.get("general","property_2")
def myFunction(dbname, tablename, property_1, property_2):
print("DBNAME\n")
print(dbname)
print("TABLNAME\n")
print(tablename)
print("PROPERTY1\n")
print(property_1)
print("PROPERTY2\n")
print(property_2)
print("COMPLETE!\n")
# A list of Rows. Infer schema from the first row, create a DataFrame and print the schema
myFunction(dbname, tablename, property_1, property_2)
Summary and Next Steps
Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more.
In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting:
Installing the CDE CLI
Simple Introduction to the CDE CLI
CDE CLI Demo
CDE Concepts
CDE CLI Command Reference
CDE CLI Spark Flag Reference
CDE CLI Airflow Flag Reference
CDE CLI list command syntax reference
CDE Jobs API Reference
... View more
11-29-2023
04:52 PM
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository and run the following commands in the terminal with the CLI: You can easily list all jobs and job runs. % cde job list
% cde run list However, that is often impossible if you have a large number of jobs/runs in your Virtual Cluster. Therefore, using filters can be very important. Setup Prior to running the filtering commands you must set up some jobs and related dependencies. Run the following commands in bulk. To learn more about these please visit this Cloudera Community Article. % cde resource create --name myScripts \
--type files
% cde resource upload --name myScripts \
--local-path cde_jobs/spark_geospatial.py \
--local-path cde_jobs/utils.py
% cde resource describe --name myScripts
% cde resource create --name myData \
--type files
% cde resource upload-archive --name myData \
--local-path data/ne_50m_admin_0_countries_lakes.zip
cde credential create --name my-docker-creds \
--type docker-basic \
--docker-server hub.docker.com \
cde resource create --name dex-spark-runtime-sedona-geospatial \
--image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 \
--image-engine spark3 \
--type custom-runtime-image
cde job create --name geospatialRdd \
--type spark \
--mount-1-prefix code/ --mount-1-resource myScripts \
--mount-2-prefix data/ --mount-2-resource myData \
--runtime-image-resource-name dex-spark-runtime-sedona-geospatial \
--packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 \
--application-file code/spark_geospatial.py \
--arg myArg \
--max-executors 4 \
--min-executors 2 \
--executor-cores 2
cde job run --name geospatialRdd --executor-cores 4 Monitoring Examples Now the monitoring examples: Filter all jobs by name where name equals "geospatialRdd" % cde job list --filter 'name[eq]geospatialRdd'
[
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] You can nest filters. For example, filter all jobs where job application file equals "code/spark_geospatial.py": % cde job list --filter 'spark.file[eq]code/spark_geospatial.py'
[
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] You can use different operators. For example, search all jobs whose name contains "spark": % cde job list --filter 'name[rlike]spark'
[
{
"name": "sparkxml",
"type": "spark",
"created": "2023-11-23T07:11:41Z",
"modified": "2023-11-23T07:39:32Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "files"
}
],
"spark": {
"file": "read_xml.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1,
"conf": {
"dex.safariEnabled": "false",
"spark.jars.packages": "com.databricks:spark-xml_2.12:0.16.0",
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
}
] Search all jobs created on or after 11/23/23: % cde job list --filter 'created[gte]2023-11-23'
API User Password: [
{
"name": "asdfsdfsdfsdf",
"type": "airflow",
"created": "2023-11-23T06:56:45Z",
"modified": "2023-11-23T06:56:45Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "PipelineResource-asdfsdfsdfsdf-1700722602468"
}
],
"airflow": {
"dagID": "asdfsdfsdfsdf",
"dagFile": "dag.py"
},
"schedule": {
"enabled": false,
"user": "dschoberle",
"start": "Thu, 23 Nov 2023 06:56:44 GMT",
"catchup": true
}
},
{
"name": "geospatialRdd",
"type": "spark",
"created": "2023-11-29T00:59:11Z",
"modified": "2023-11-29T00:59:11Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"spark": {
"file": "code/spark_geospatial.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
},
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial"
}
] Search all jobs with executorCores less than 2: % cde job list --filter 'spark.executorCores[lt]2'
API User Password: [
{
"name": "CDEPY_SPARK_JOB",
"type": "spark",
"created": "2023-11-14T23:02:48Z",
"modified": "2023-11-14T23:02:48Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "CDEPY_DEMO"
}
],
"spark": {
"file": "pysparksql.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 1,
"conf": {
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
},
{
"name": "CDEPY_SPARK_JOB_APAC",
"type": "spark",
"created": "2023-11-15T03:33:36Z",
"modified": "2023-11-15T03:33:36Z",
"retentionPolicy": "keep_indefinitely",
"mounts": [
{
"resourceName": "CDEPY_DEMO_APAC"
}
],
"spark": {
"file": "pysparksql.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "4g",
"executorCores": 1,
"conf": {
"spark.pyspark.python": "python3"
},
"logLevel": "INFO"
},
"schedule": {
"enabled": false,
"user": "pauldefusco"
}
},
] List all runs for job "geospatialRdd": % cde run list --filter 'job[eq]geospatialRdd'
[
{
"id": 21815,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:32:02Z",
"ended": "2023-11-29T00:32:36Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-f542530da24f485da4993338dca81d3c",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-f542530da24f485da4993338dca81d3c/jobs/",
"spec": {
"file": "code/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 4,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
},
{
"id": 21825,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:48:29Z",
"ended": "2023-11-29T00:49:01Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-e5460856fb3a459ba7ee2c748c802d07",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-e5460856fb3a459ba7ee2c748c802d07/jobs/",
"spec": {
"file": "myScripts/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 2,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] You can combine multiple filters. Return all job runs from today (11/29/23) i.e. where the start date is greater than or equal to 11/29 and the end date is less than or equal to 11/30. Notice all times default to +00 UTC timezone. % cde run list --filter 'started[gte]2023-11-29' --filter 'ended[lte]2023-11-30'
[
{
"id": 21907,
"job": "ge_data_quality-pauldefusco-banking",
"type": "spark",
"status": "succeeded",
"user": "pauldefusco",
"started": "2023-11-29T02:56:44Z",
"ended": "2023-11-29T02:57:46Z",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "cde_demo_files-pauldefusco-banking"
}
],
"runtimeImageResourceName": "dex-spark-runtime-ge-data-quality-pauldefusco-banking",
"spark": {
"sparkAppID": "spark-8f9d7999056f4b53a01cc2afc5304cca",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-8f9d7999056f4b53a01cc2afc5304cca/jobs/",
"spec": {
"file": "ge_data_quality.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
},
{
"id": 21908,
"job": "data_quality_orchestration-pauldefusco-banking",
"type": "airflow",
"status": "running",
"user": "pauldefusco",
"started": "2023-11-29T03:00:01Z",
"ended": "0001-01-01T00:00:00Z",
"airflow": {
"dagID": "CDE_Demo_pauldefusco-banking",
"dagRunID": "scheduled__2023-11-29T02:55:00+00:00",
"dagFile": "airflow.py",
"executionDate": "2023-11-29T02:55:00Z"
}
},
{
"id": 21909,
"job": "batch_load-pauldefusco-banking",
"type": "spark",
"status": "running",
"user": "pauldefusco",
"started": "2023-11-29T03:00:14Z",
"ended": "0001-01-01T00:00:00Z",
"mounts": [
{
"dirPrefix": "jobCode/",
"resourceName": "cde_demo_files-pauldefusco-banking"
}
],
"runtimeImageResourceName": "dex-spark-runtime-ge-data-quality-pauldefusco-banking",
"spark": {
"sparkAppID": "spark-3d8a4704418841929d325af0e0190a20",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/livy-batch-14907-dyL7LLeM",
"spec": {
"file": "jobCode/batch_load.py",
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 1
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] List all successful airflow jobs created by user pauldefusco that started after 3 am UTC on 11/29/23: % cde run list --filter 'type[eq]airflow' --filter 'status[eq]succeeded' --filter 'user[eq]pauldefusco' --filter 'started[gte]2023-11-29T03'
[
{
"id": 21908,
"job": "data_quality_orchestration-pauldefusco-banking",
"type": "airflow",
"status": "succeeded",
"user": "pauldefusco",
"started": "2023-11-29T03:00:01Z",
"ended": "2023-11-29T03:03:01Z",
"airflow": {
"dagID": "CDE_Demo_pauldefusco-banking",
"dagRunID": "scheduled__2023-11-29T02:55:00+00:00",
"dagFile": "airflow.py",
"executionDate": "2023-11-29T02:55:00Z"
}
}
] List all CDE Resources will return all types ("python-env", "files", "custom-runtime-image"): % cde resource list
[
{
"name": "BankingPyEnv",
"type": "python-env",
"status": "pending-build",
"created": "2023-11-07T21:27:16Z",
"modified": "2023-11-07T21:27:16Z",
"retentionPolicy": "keep_indefinitely",
"pythonEnv": {
"pythonVersion": "python3",
"type": "python-env"
}
},
{
"name": "CDEPY_DEMO_APAC",
"type": "files",
"status": "ready",
"signature": "5d216f3c4a10578ffadba415b13022d9e383bc22",
"created": "2023-11-15T03:33:36Z",
"modified": "2023-11-15T03:33:36Z",
"retentionPolicy": "keep_indefinitely"
},
{
"name": "dex-spark-runtime-sedona-geospatial",
"type": "custom-runtime-image",
"status": "ready",
"created": "2023-11-28T23:51:11Z",
"modified": "2023-11-28T23:51:11Z",
"retentionPolicy": "keep_indefinitely",
"customRuntimeImage": {
"engine": "spark3",
"image": "pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003"
}
}
] List all CDE Resources named "myScripts": % cde resource list --filter 'name[eq]myScripts'
[
{
"name": "myScripts",
"type": "files",
"status": "ready",
"signature": "17f820aacdad9bbd17a24d78a5b93cd0ec9e467b",
"created": "2023-11-28T23:31:31Z",
"modified": "2023-11-29T01:48:12Z",
"retentionPolicy": "keep_indefinitely"
}
] List all CDE Resources of type Python Environment: % cde resource list --filter 'type[eq]python-env'
[
{
"name": "BankingPyEnv",
"type": "python-env",
"status": "pending-build",
"created": "2023-11-07T21:27:16Z",
"modified": "2023-11-07T21:27:16Z",
"retentionPolicy": "keep_indefinitely",
"pythonEnv": {
"pythonVersion": "python3",
"type": "python-env"
}
}
] Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-29-2023
04:42 PM
2 Kudos
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. Apache Iceberg Apache Iceberg is a cloud-native, high-performance open table format for organizing petabyte-scale analytic datasets on a file system or object store. Combined with Cloudera Data Platform (CDP), users can build an open data lakehouse architecture for multi-function analytics and to deploy large scale end-to-end pipelines. Open data lakehouse on CDP simplifies advanced analytics on all data with a unified platform for structured and unstructured data and integrated data services to enable any analytics use case from ML, BI to stream analytics and real-time analytics. Apache Iceberg is the secret sauce of the open lakehouse. CDE Sessions A Cloudera Data Engineering (CDE) Session is an interactive short-lived development environment for running Spark commands to help you iterate upon and build your Spark workloads. You can use CDE Sessions in CDE Virtual Clusters of type "All Purpose - Tier 2". The following commands illustrate a basic Iceberg Time Travel Example. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository and run the following commands in your terminal. Create the Session: % cde session create --name interactiveSession \
--type pyspark \
--executor-cores 2 \
--executor-memory "2g"
{
"name": "interactiveSession",
"creator": "pauldefusco",
"created": "2023-11-28T22:00:47Z",
"type": "pyspark",
"lastStateUpdated": "2023-11-28T22:00:47Z",
"state": "starting",
"interactiveSpark": {
"id": 5,
"driverCores": 1,
"executorCores": 2,
"driverMemory": "1g",
"executorMemory": "2g",
"numExecutors": 1
}
} Show session metadata: % cde session describe --name interactiveSession
{
"name": "interactiveSession",
"creator": "pauldefusco",
"created": "2023-11-28T22:00:47Z",
"type": "pyspark",
"lastStateUpdated": "2023-11-28T22:01:16Z",
"state": "available",
"interactiveSpark": {
"id": 5,
"appId": "spark-3fe3bd8905a04eef8805e6b973ec4289",
"driverCores": 1,
"executorCores": 2,
"driverMemory": "1g",
"executorMemory": "2g",
"numExecutors": 1
}
} Interact via the PySpark Shell from your terminal (the session is running in CDE): % cde session interact --name interactiveSession
Starting REPL...
Waiting for the session to go into an available state...
Connected to Cloudera Data Engineering...
Press Ctrl+D (i.e. EOF) to exit
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\
/_/
Type in expressions to have them evaluated.
>>> Run some basic Spark SQL operations: from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
some_df = spark.createDataFrame(rows)
some_df.printSchema()
>>> from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
>>> rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
>>> some_df = spark.createDataFrame(rows)
>>> some_df.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
>>> Notice that we didn't need to create a Spark Session. The Spark Context is already running inside the CDE Session. Here are all the confs the session is running with. Notice that Iceberg dependencies have already been accounted for: >>> def printConfs(confs):
for ele1,ele2 in confs:
print("{:<14}{:<11}".format(ele1,ele2))
>>> printConfs(confs)
spark.eventLog.enabledtrue
spark.driver.hostinteractivesession-b7d65d8c1d6005a9-driver-svc.dex-app-58kqsms2.svc
spark.kubernetes.executor.annotation.created-bylivy
spark.kubernetes.memoryOverheadFactor0.1
spark.sql.catalog.spark_catalogorg.apache.iceberg.spark.SparkSessionCatalog
spark.kubernetes.container.imagecontainer.repository.cloudera.com/cloudera/dex/dex-livy-runtime-3.3.0-7.2.16.3:1.19.3-b29
spark.kubernetes.executor.label.nameexecutor
spark.kubernetes.driver.connectionTimeout60000
spark.hadoop.yarn.resourcemanager.principalpauldefusco
...
spark.yarn.isPythontrue
spark.kubernetes.submission.connectionTimeout60000
spark.kryo.registrationRequiredfalse
spark.sql.catalog.spark_catalog.typehive
spark.kubernetes.driver.pod.nameinteractivesession-b7d65d8c1d6005a9-driver Load a CSV file from Cloud Storage: >>> cloudPath = "s3a://go01-demo/datalake/pdefusco/cde119_workshop"
>>> car_installs = spark.read.csv(cloudPath + "/car_installs_119.csv", header=True, inferSchema=True)
>>> car_installs.show()
+-----+-----+----------------+--------------------+
| id|model| VIN| serial_no|
+-----+-----+----------------+--------------------+
|16413| D|433248UCGTTV245J|5600942CL3R015666...|
|16414| D|404328UCGTTV965J|204542CL4R0156661...|
|16415| B|647168UCGTTV8Z5J|6302942CL2R015666...|
|16416| B|454608UCGTTV7H5J|4853942CL1R015666...|
|16417| D|529408UCGTTV6R5J|2428342CL9R015666...|
|16418| B|362858UCGTTV7A5J|903142CL2R0156661...|
|16419| E|609158UCGTTV245J|3804142CL7R015666...|
|16420| D| 8478UCGTTV825J|6135442CL7R015666...|
|16421| B|539488UCGTTV4R5J|306642CL6R0156661...|
|16422| B|190928UCGTTV6A5J|5466242CL1R015666...|
|16423| B|316268UCGTTV4M5J|4244342CL5R015666...|
|16424| B|298898UCGTTV3Y5J|3865742CL4R015666...|
|16425| B| 28688UCGTTV9T5J|6328542CL5R015666...|
|16426| D|494858UCGTTV295J|463642CL5R0156661...|
|16427| D|503338UCGTTV5Y5J|4358642CL2R015666...|
|16428| D|167128UCGTTV2H5J|3809342CL1R015666...|
|16429| D|547178UCGTTV7M5J|2768042CL3R015666...|
|16430| B|503998UCGTTV4Q5J|2568142CL6R015666...|
|16431| D|433998UCGTTV9Y5J|6338642CL6R015666...|
|16432| B|378548UCGTTV7V5J|2648942CL1R015666...|
+-----+-----+----------------+--------------------+ Create a Hive Managed Table with Spark: >>> username = "pauldefusco"
>>> spark.sql("DROP DATABASE IF EXISTS MYDB_{} CASCADE".format(username))
>>> spark.sql("CREATE DATABASE IF NOT EXISTS MYDB_{}".format(username))
>>> car_installs.write.mode("overwrite").saveAsTable('MYDB_{0}.CAR_INSTALLS_{0}'.format(username), format="parquet") Migrate the table to Iceberg Table Format: spark.sql("ALTER TABLE MYDB_{0}.CAR_INSTALLS_{0} UNSET TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL')".format(username))
spark.sql("CALL spark_catalog.system.migrate('MYDB_{0}.CAR_INSTALLS_{0}')".format(username)) You can query Iceberg Metadata tables to track Iceberg Snapshots, History, Partitions, etc: >>> spark.read.format("iceberg").load("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.history".format(username)).show(20, False)
+-----------------------+-------------------+---------+-------------------+
|made_current_at |snapshot_id |parent_id|is_current_ancestor|
+-----------------------+-------------------+---------+-------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |true |
+-----------------------+-------------------+---------+-------------------+
>>> spark.read.format("iceberg").load("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.snapshots".format(username)).show(20, False)
+-----------------------+-------------------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at |snapshot_id |parent_id|operation|manifest_list |summary |
+-----------------------+-------------------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |append |s3a://go01-demo/warehouse/tablespace/external/hive/mydb_pauldefusco.db/car_installs_pauldefusco/metadata/snap-6191572403226489858-1-bf191e06-38cd-4d6e-9757-b8762c999177.avro|{added-data-files -> 2, added-records -> 82066, added-files-size -> 1825400, changed-partition-count -> 1, total-records -> 82066, total-files-size -> 1825400, total-data-files -> 2, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}|
+-----------------------+-------------------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Insert some data. Notice that Iceberg provides a PySpark API to create, append, and overwrite data in an Iceberg table from a Spark Dataframe. In this case we will append some data that we sample from the same table: # PRE-INSERT TIMESTAMP
>>> from datetime import datetime
>>> now = datetime.now()
>>> timestamp = datetime.timestamp(now)
>>> print("PRE-INSERT TIMESTAMP: ", timestamp)
PRE-INSERT TIMESTAMP: 1701302029.338524
# PRE-INSERT COUNT
>>> spark.sql("SELECT COUNT(*) FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).show()
+--------+
|count(1)|
+--------+
| 82066|
+--------+
>>> temp_df = spark.sql("SELECT * FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).sample(fraction=0.1, seed=3)
>>> temp_df.writeTo("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).append() Check the new count Post insert: # POST-INSERT COUNT
>>> spark.sql("SELECT COUNT(*) FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username)).show()
+--------+
|count(1)|
+--------+
| 90276|
+--------+ Notice that the table history and snapshots have been updated: >>> spark.sql("SELECT * FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.history".format(username)).show(20, False)
+-----------------------+-------------------+-------------------+-------------------+
|made_current_at |snapshot_id |parent_id |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |true |
|2023-11-30 00:00:15.263|1032812961485886468|6191572403226489858|true |
+-----------------------+-------------------+-------------------+-------------------+
>>> spark.sql("SELECT * FROM spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}.snapshots".format(username)).show(20, False)
+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at |snapshot_id |parent_id |operation|manifest_list |summary |
+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2023-11-29 23:58:43.427|6191572403226489858|null |append |s3a://go01-demo/warehouse/tablespace/external/hive/mydb_pauldefusco.db/car_installs_pauldefusco/metadata/snap-6191572403226489858-1-bf191e06-38cd-4d6e-9757-b8762c999177.avro|{added-data-files -> 2, added-records -> 82066, added-files-size -> 1825400, changed-partition-count -> 1, total-records -> 82066, total-files-size -> 1825400, total-data-files -> 2, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0} |
|2023-11-30 00:00:15.263|1032812961485886468|6191572403226489858|append |s3a://go01-demo/warehouse/tablespace/external/hive/mydb_pauldefusco.db/car_installs_pauldefusco/metadata/snap-1032812961485886468-1-142965b8-67ea-4b53-b76d-558ab5e74e1f.avro|{spark.app.id -> spark-93d1909a680948fea5303b55986704ac, added-data-files -> 1, added-records -> 8210, added-files-size -> 183954, changed-partition-count -> 1, total-records -> 90276, total-files-size -> 2009354, total-data-files -> 3, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}|
+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ Time travel to pre-insert table state: # TIME TRAVEL AS OF PREVIOUS TIMESTAMP
>>> df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load("spark_catalog.MYDB_{0}.CAR_INSTALLS_{0}".format(username))
# POST TIME TRAVEL COUNT
>>> print(df.count())
82066 Finally, drop the database: >>> spark.sql("DROP DATABASE IF EXISTS MYDB_{} CASCADE".format(username)) Exit the Spark Shell (Ctrl+D). List commands that were run in the session. Notice that this could be a lot, so the example below only includes a few initial commands. % cde session statements --name interactiveSession
+--------------------------------+------------------------------------------+
| CODE | OUTPUT |
+--------------------------------+------------------------------------------+
| print("hello Spark") | hello Spark |
+--------------------------------+------------------------------------------+
| from pyspark.sql.types import | |
| Row, StructField, StructType, | |
| StringType, IntegerType | |
+--------------------------------+------------------------------------------+
| rows = [Row(name="John", | |
| age=19), Row(name="Smith", | |
| age=23), Row(name="Sarah", | |
| age=18)] | |
+--------------------------------+------------------------------------------+
| some_df = | |
| spark.createDataFrame(rows) | |
+--------------------------------+------------------------------------------+
| some_df.printSchema() | root |-- name: string |
| | (nullable = true) |-- age: |
| | long (nullable = true) |
+--------------------------------+------------------------------------------+ List all sessions: % cde session list
+---------------------------+-----------+---------+-------------+----------------------+----------------------+-------------+
| NAME | STATE | TYPE | DESCRIPTION | CREATED | LAST UPDATED | CREATOR |
+---------------------------+-----------+---------+-------------+----------------------+----------------------+-------------+
| francetemp | killed | pyspark | | 2023-11-16T15:59:35Z | 2023-11-16T16:02:16Z | jmarchand |
| IcebergSession | available | pyspark | | 2023-11-29T21:24:27Z | 2023-11-29T21:56:56Z | pauldefusco |
| interactiveSession | killed | pyspark | | 2023-11-28T22:00:47Z | 2023-11-28T22:01:16Z | pauldefusco |
| interactiveSessionIceberg | available | pyspark | | 2023-11-29T23:17:58Z | 2023-11-29T23:56:06Z | pauldefusco |
| myNewSession | killed | pyspark | | 2023-11-28T21:58:38Z | 2023-11-28T21:59:06Z | pauldefusco |
| mySparkSession | killed | pyspark | | 2023-11-28T21:44:30Z | 2023-11-28T21:45:01Z | pauldefusco |
| TA-demo | killed | pyspark | | 2023-11-13T10:12:12Z | 2023-11-13T10:13:41Z | glivni |
+---------------------------+-----------+---------+-------------+----------------------+----------------------+-------------+ Kill session: % cde session kill --name interactiveSession Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-29-2023
04:34 PM
Cloudera Data Engineering CDE is the Cloudera Data Engineering Service, a containerized managed service for Cloudera Data Platform designed for Large Scale Batch Pipelines with Spark, Airflow and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. With CDE, you define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to run your Spark workloads, helping to control your cloud costs. Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and more. CDE Files Resources A resource in Cloudera Data Engineering (CDE) is a named collection of files used by a job. Resources can include application code, configuration files, custom Docker images, and Python virtual environment specifications. Resources are associated with virtual clusters. A resource can be used by multiple jobs, and jobs can use multiple resources.The resource types supported by CDE are files, python-env, and custom-runtime-image. In this article we will walk through some useful commands for operating with CDE Files Resources efficiently. Requirements The following are required in order to reproduce these commands in your CDE environment: A CDE Service on version 1.19.0 or above. A working installation of the CDE CLI. Please follow these instructions to install the CLI. Steps Clone this git repository in your local machine. Then run the following commands: Create a Files Resource: % cde resource create --name myScripts \
--type files Uplaod multiple files to the same Files Resource: % cde resource upload --name myScripts \
--local-path cde_jobs/spark_geospatial.py \
--local-path cde_jobs/utils.py
3.5KB/3.5KB 100% [==============================================] spark_geospatial.py
4.0KB/4.0KB 100% [==============================================] utils.py Describe Files resource: % cde resource describe --name myScripts
{
"name": "myScripts",
"type": "files",
"status": "ready",
"signature": "08531cbe538858eb20bda5ff1b7567ae4623d885",
"created": "2023-11-28T23:31:31Z",
"modified": "2023-11-28T23:33:32Z",
"retentionPolicy": "keep_indefinitely",
"files": [
{
"path": "spark_geospatial.py",
"signature": "ec91fb5bddfcd16a0bcbe344f229b5e326b759c5",
"sizeBytes": 3529,
"created": "2023-11-28T23:33:32Z",
"modified": "2023-11-28T23:33:32Z"
},
{
"path": "utils.py",
"signature": "aa5a8ea4b4f240183da8bd2d2b354eeaa58fd97a",
"sizeBytes": 3996,
"created": "2023-11-28T23:33:32Z",
"modified": "2023-11-28T23:33:32Z"
}
]
} Create a Files Resource for data: % cde resource create --name myData \
--type files Upload an archive file to the resource: % cde resource upload-archive --name myData \
--local-path data/ne_50m_admin_0_countries_lakes.zip
817.5KB/817.5KB 100% [==============================================] ne_50m_admin_0_countries_lakes.zip Describe resource metadata. Notice that the archive has been unarchived for you: % cde resource describe --name myData
{
"name": "myData",
"type": "files",
"status": "ready",
"signature": "d552dff8fb80a0c7067afa1c4227b29010cce67b",
"created": "2023-11-28T23:35:43Z",
"modified": "2023-11-28T23:36:56Z",
"retentionPolicy": "keep_indefinitely",
"files": [
{
"path": "ne_50m_admin_0_countries_lakes.cpg",
"signature": "663b90c899fa25a111067be0c22ffc64dcf581c2",
"sizeBytes": 5,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.dbf",
"signature": "eec48a122399782bbef02aa8108e99aeaf52e506",
"sizeBytes": 786828,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.prj",
"signature": "308d6355be935e0f7853161b1adda5bcd48188ff",
"sizeBytes": 143,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.README.html",
"signature": "4bec87fbbe5f4e0e18edb3d6a4f10e9e2a705581",
"sizeBytes": 38988,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.shp",
"signature": "57c38f48c5234db925a9fb1b31785250bd7c8d86",
"sizeBytes": 1652200,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.shx",
"signature": "983eba7b34cf94b0cfe8bda8b2b7d533bd233c49",
"sizeBytes": 2036,
"created": "2023-11-28T23:36:56Z",
"modified": "2023-11-28T23:36:56Z"
},
{
"path": "ne_50m_admin_0_countries_lakes.VERSION.txt",
"signature": "9e3d18e5216a7b4dfd402b00a25ee794842b481b",
"sizeBytes": 7,
"created": "2023-11-28T23:36:55Z",
"modified": "2023-11-28T23:36:55Z"
}
]
} Create CDE Credentials for accessing your Docker repository: cde credential create --name my-docker-creds \
--type docker-basic \
--docker-server hub.docker.com \
--docker-username pauldefusco Create a CDE Custom Docker Runtime Resource: cde resource create --name dex-spark-runtime-sedona-geospatial \
--image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-sedona-geospatial-003 \
--image-engine spark3 \
--type custom-runtime-image Notice that the custom image has already been created. If you want to learn more about how to create Custom Docker Resources please visit this Cloudera Community Article Finally, create a job leveraging all three resources above: cde job create --name geospatialRdd \
--type spark \
--mount-1-prefix code/ --mount-1-resource myScripts \
--mount-2-prefix data/ --mount-2-resource myData \
--runtime-image-resource-name dex-spark-runtime-sedona-geospatial \
--packages org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2 \
--application-file code/spark_geospatial.py \
--arg myArg \
--max-executors 4 \
--min-executors 2 \
--executor-cores 2 Notice the following: When using multiple Files resources you should prefix each i.e. "data" and "code". The "data" prefix is used at line 82 ("/app/mount/data") in the "spark_geospatial.py" script in order to access the data from the resource. The "code" prefix is usded in the same CLI command in the application file argument. If you are using any Spark packages you can set these directly at job creation. You can pass one or multiple arguments to the Python script via the --arg argument. The arguments are referenced in the script with the "sys.argv" syntax e.g. line 60 in "geospatial_rdd.py". Run the job: cde job run --name geospatialRdd --executor-cores 4 Notice that at runtime, you can override spark configs that were set at job creation. For example, the "--executor-cores" was originally set to 2 and is now overridden to 4. List job runs filtering by job name: % cde run list --filter 'job[eq]geospatialRdd'
[
{
"id": 21815,
"job": "geospatialRdd",
"type": "spark",
"status": "failed",
"user": "pauldefusco",
"started": "2023-11-29T00:32:02Z",
"ended": "2023-11-29T00:32:36Z",
"mounts": [
{
"dirPrefix": "data/",
"resourceName": "myData"
},
{
"dirPrefix": "code/",
"resourceName": "myScripts"
}
],
"runtimeImageResourceName": "dex-spark-runtime-sedona-geospatial",
"spark": {
"sparkAppID": "spark-f542530da24f485da4993338dca81d3c",
"sparkAppURL": "https://58kqsms2.cde-g6hpr9f8.go01-dem.ylcu-atmi.cloudera.site/hs/history/spark-f542530da24f485da4993338dca81d3c/jobs/",
"spec": {
"file": "code/geospatial_rdd.py",
"args": [
"myArg"
],
"driverMemory": "1g",
"driverCores": 1,
"executorMemory": "1g",
"executorCores": 4,
"conf": {
"spark.dynamicAllocation.maxExecutors": "4",
"spark.dynamicAllocation.minExecutors": "2",
"spark.jars.packages": "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.5.0,org.datasyslab:geotools-wrapper:1.5.0-28.2"
}
}
},
"identity": {
"disableRoleProxy": true,
"role": "instance"
}
}
] Open Spark UI for respective run: % cde run ui --id 21815 You can delete single files from resources: % cde resource delete-file --name myScripts --resource-path utils.py You can delete the resource: % cde resource delete --name myScripts Summary and Next Steps Cloudera Data Engineering (CDE) provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, and so on. In this article we have reviewed some advanced use cases for the CLI. If you are using the CDE CLI you might also find the following articles and demos interesting: Installing the CDE CLI Simple Introduction to the CDE CLI CDE CLI Demo CDE Concepts CDE CLI Command Reference CDE CLI Spark Flag Reference CDE CLI Airflow Flag Reference CDE CLI list command syntax reference CDE Jobs API Reference
... View more
11-22-2023
11:47 PM
Objective
This article provides a quickstart for the SparkXML package in Cloudera Data Engineering. You can run the following commands to parse XML files with Spark in Cloudera Data Engineering (CDE).
CDE is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
The CDE is fully integrated with the Cloudera Data Platform (CDP), enabling end-to-end visibility and security with SDX as well as seamless integrations with CDP services such as Data Warehouse and Machine Learning. Data Engineering on CDP powers consistent, repeatable, and automated data engineering workflows on a hybrid cloud platform anywhere
Spark-XML is a library for parsing and querying XML data with Apache Spark, for Spark SQL and DataFrames. Spark Data Engineers use the package in CDE to parse XML files at scale. In the rest of this tutorial, we will run a few commands to demonstrate basic functionality of this package in CDE.
The code is also provided in this git repository.
Requirements
The following are required to reproduce the Demo in your CDE Virtual Cluster:
CDE Service version 1.19 and above with Spark version 3.x (Spark 2 is not compatible with SparkXML).
A Working installation of the CDE CLI. Instructions to install the CLI are provided in Using the Cloudera Data Engineering command line interface.
A working installation of git in your local machine. Please clone this git repository and keep in mind all commands assume they are run in the project's main directory.
No code edits required but familiarity with Python, Spark and XML is recommended.
CDE CLI Steps
Create a CDE Files Resource cde resource create --name files --type files
Upload Files to the Resource cde resource upload --name files --local-path read_xml.py --local-path books.xml
Create a CDE Spark Job cde job create --name sparkxml --application-file read_xml.py --mount-1-resource files --type spark --packages com.databricks:spark-xml_2.12:0.16.0
Run the CDE Spark Job cde job run --name sparkxml
+-----+--------------------+--------------------+---------------+-----+------------+--------------------+
| _id| author| description| genre|price|publish_date| title|
+-----+--------------------+--------------------+---------------+-----+------------+--------------------+
|bk101|Gambardella, Matthew|\n\n\n An...| Computer|44.95| 2000-10-01|XML Developer's G...|
|bk102| Ralls, Kim|A former architec...| Fantasy| 5.95| 2000-12-16| Midnight Rain|
|bk103| Corets, Eva|After the collaps...| Fantasy| 5.95| 2000-11-17| Maeve Ascendant|
|bk104| Corets, Eva|In post-apocalyps...| Fantasy| 5.95| 2001-03-10| Oberon's Legacy|
|bk105| Corets, Eva|The two daughters...| Fantasy| 5.95| 2001-09-10| The Sundered Grail|
|bk106| Randall, Cynthia|When Carla meets ...| Romance| 4.95| 2000-09-02| Lover Birds|
|bk107| Thurman, Paula|A deep sea diver ...| Romance| 4.95| 2000-11-02| Splish Splash|
|bk108| Knorr, Stefan|An anthology of h...| Horror| 4.95| 2000-12-06| Creepy Crawlies|
|bk109| Kress, Peter|After an inadvert...|Science Fiction| 6.95| 2000-11-02| Paradox Lost|
|bk110| O'Brien, Tim|Microsoft's .NET ...| Computer|36.95| 2000-12-09|Microsoft .NET: T...|
|bk111| O'Brien, Tim|The Microsoft MSX...| Computer|36.95| 2000-12-01|MSXML3: A Compreh...|
|bk112| Galos, Mike|Microsoft Visual ...| Computer|49.95| 2001-04-16|Visual Studio 7: ...|
+-----+--------------------+--------------------+---------------+-----+------------+--------------------+
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- description: string (nullable = true)
|-- genre: string (nullable = true)
|-- price: double (nullable = true)
|-- publish_date: string (nullable = true)
|-- title: string (nullable = true)
References
Spark XML Package
Cloudera Data Engineering Documentation
... View more
11-18-2023
01:26 PM
Objective
This article contains basic instructions for creating and deploying a Custom Runtime with VS Code and Nvidia libraries to your Cloudera Machine Learning (CML) Workspace. With this runtime, you can increase your productivity when developing your deep learning use cases in CML Sessions. This example is also valid for Cloudera Data Science Workbench (CDSW) clusters.
If you are using CML or CDSW for deep learning, GenAI, or LLM use cases, please scroll down to the bottom of this page to check out more examples.
Using Custom Runtimes
Cloudera ML Runtimes are a set of Docker images created to enable machine learning development and host data applications in the Cloudera Data Platform (CDP) and the Cloudera Machine Learning (CML) service.
ML Runtimes provide a flexible, fully customizable, lightweight development and production machine learning environment for both CPU and GPU processing frameworks while enabling unfettered access to data, on-demand resources, and the ability to install and use any libraries/algorithms without IT assistance.
PBJ Runtimes
Powered by Jupyter (PBJ) Runtimes are the second generation of ML Runtimes. While the original ML Runtimes relied on a custom proprietary integration with CML, PBJ Runtimes rely on Jupyter protocols for ecosystem compatibility and openness.
Open Source
For data scientists who need to fully understand the environment they are working in, Cloudera provides the Dockerfiles and all dependencies in this git repository that enables the construction of the official Cloudera ML Runtime images.
The open sources PBJ Runtime Dockerfiles serve as a blueprint to create custom ML Runtimes so data scientists or partners can build ML Runtime images on their selected OS (base image), with the kernel of their choice, or just integrate their existing ML container images with Cloudera Machine Learning.
In this example we reuse the Dockerfiles provided this git repository to create a runtime with both VSCode and Nvidia libraries.
Requirements
In order to use this runtime you need:
A CML Workspace or CDSW Cluster. AWS and Azure Public Cloud, or OCP and ECS Private Cloud OK.
Workspace Admin rights and access to the Runtime Catalog.
Basic familiarity with Docker and a working installation of Docker on your local machine.
Steps to Reproduce
Clone the git repository
Clone this git repository to obtain all necessary files.
mkdir mydir
cd mydir
git clone https://github.com/pdefusco/Nvidia_VSCode_Runtime_CML.git
Explore Dockerfile
Open the Dockerfile and familiarize yourself with the code. Note the following:
To create a new image we are extending an existing CML image. This image has the CUDA libraries and is available here. FROM docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-pbj-workbench-python3.10-cuda:2023.08.2-b8
We then install the VS Code Editor on top as shown in the PBJ VS Code runtime available here. Creating a symlink is necessary in order to ensure the right editor is launched at runtime. # Install latest version. See https://github.com/coder/code-server/blob/main/install.sh for details
RUN curl -fsSL https://code-server.dev/install.sh | sh -s -- --version 4.16.1
# Create launch script and symlink to the default editor launcher
RUN printf "#!/bin/bash\n/usr/bin/code-server --auth=none --bind-addr=127.0.0.1:8090 --disable-telemetry" > /usr/local/bin/vscode
RUN chmod +x /usr/local/bin/vscode
RUN ln -s /usr/local/bin/vscode /usr/local/bin/ml-runtime-editor
The remaining lines are just environment variables that must be overridden in order to distinguish this build from others. In other words, we need to add some unique metadata for this image. Specifically you must update the values for ML_RUNTIME_SHORT_VERSION and ML_RUNTIME_MAINTENANCE_VERSION and make sure these are congruent with ML_RUNTIME_FULL_VERSION. Adding a unique description is also highly recommended. # Override Runtime label and environment variables metadata
ENV ML_RUNTIME_EDITOR="VsCode" \
ML_RUNTIME_EDITION="Community" \
ML_RUNTIME_SHORT_VERSION="2023.11" \
ML_RUNTIME_MAINTENANCE_VERSION="1" \
ML_RUNTIME_FULL_VERSION="2023.11.1" \
ML_RUNTIME_DESCRIPTION="This runtime includes VsCode editor v4.16.1 and is based on PBJ Workbench image with Python 3.10 and CUDA"
LABEL com.cloudera.ml.runtime.editor=$ML_RUNTIME_EDITOR \
com.cloudera.ml.runtime.edition=$ML_RUNTIME_EDITION \
com.cloudera.ml.runtime.full-version=$ML_RUNTIME_FULL_VERSION \
com.cloudera.ml.runtime.short-version=$ML_RUNTIME_SHORT_VERSION \
com.cloudera.ml.runtime.maintenance-version=$ML_RUNTIME_MAINTENANCE_VERSION \
com.cloudera.ml.runtime.description=$ML_RUNTIME_DESCRIPTION
Build Dockerfile and Push Image
Run the following command to build the docker image. Edit the following with your username and preferred image tag:
docker build . -t pauldefusco/vscode4_cuda11_cml_runtime:latest
Push the image to your preferred Docker repository. In this example we will use a personal repository but CML and CDSW can also be used with other enterprise solutions. If your CML Workspace resides in an Airgapped environment you can use your Podman Local Container Registry.
docker push pauldefusco/vscode4_cuda11_cml_runtime
Add the Runtime to your CML Workspace Runtime Catalog and CML Project
This step can only be completed by a CML user with Workspace Admin rights. If you don't have Admin rights please reach out to your CML or CDP Admin.
1. In your CML Workspace navigate to the Runtime Catalog tab (available on the left side of your screen). Then click on "Add Runtime" at the top right corner of your screen.
Add the runtime:
Navigate back to your CML Project, open Project Settings -> Runtime tab and add the runtime there.
Congratulations, you are now ready to use VS Code in a CML Session with Nvidia libraries!
Conclusions and Next Steps
Cloudera ML Runtimes are a set of Docker images created to enable machine learning development in a flexible, fully customizable, lightweight development and production machine learning environment for both CPU and GPU processing frameworks while enabling unfettered access to data, on-demand resources, and the ability to install and use any libraries/algorithms without IT assistance.
You can create custom CML Runtimes by extending the Cloudera Machine Learning base runtimes, PBJ and open runtimes available in this git repository as we have done in this example; or create your own from scratch as shown in this example.
If you are using CML for Deep Learning, GenAI and LLM use cases here are some more examples you may find interesting:
CML LLM Hands on Lab
LLM Demo in CML
How to Launch an Applied Machine Learning Prototype (AMP) in CML
AMP: Intelligent QA Chatbot with NiFi, Pinecone, and Llama2
AMP: Text Summarization and more with Amazon Bedrock
AMP: Fine-Tuning a Foundation Model for Multiple Tasks (with QLoRA)
AMP: LLM Chatbot Augmented with Enterprise Data
AMP: Semantic Image Search with Convolutional Neural Networks
AMP: Deep Learning for Anomaly Detection
AMP: Deep Learning for Question Answering
AMP: Automatic Text Summarization
Quickstarts with PyTorch, Tensorflow and MXNet in CML
Distributed PyTorch with Horovod and CML Workers in CML
Distributed Tensorflow with CML Workers in CML
An end to end example of PyTorch and MLFlow in CML
... View more
Labels:
11-16-2023
11:27 AM
The Cloudera Data Platform (CDP) is a hybrid data platform designed to deliver faster and easier data management, analytics and AI at enterprise scale. Cloudera Machine Learning (CML) is one of CDP’s Cloud Native Data Services designed to enable secure, governed Data Science. Data Scientists and Engineers use CML to securely analyze large amounts of data via interactive notebooks. Since 2022 CML enhances this capability with the Data Connections feature by providing boiler template code to access and push down SQL queries to the CDW service. This allows the user to run large scale queries directly in the Data Warehouse while accessing and visualizing results from a simple notebook with a small consumption footprint. In 2023 CML doubled down with CML Custom Data Connections. Custom Data Connections allow the CML user to create their own boiler template code so they or their collaborators can easily connect to 3rd party tools such as Snowflake, Postgres, legacy on-prem databases (Oracle, MSSQL, MySQL), serverless cloud databases (Redshift, Snowflake, SAP HANA Cloud, BigQuery), APIs, and specialized data stores (Neo4j). This article provides a tutorial on how to create a MySQL Custom Data Connection in your own CML Workspace. The code referenced is provided in this git repository. Data Connections Refresher You can use Data Connections from a CML Session to connect to a CDP environment (CDW or Spark) regardless of the Editor choice by following the below steps: Open your CML Session and click on the “Data” tab at the top right of your screen. Scroll down and select the CDW or Spark Data Lake connection of your choice. Copy and paste the code in your Editor. Run the sample SHOW DATABASES command and create your SQL statements. Learning from the Postgres Custom Data Connection The CML Engineering Team has created two custom data connection templates that you can reuse to connect to Snowflake and Postgres. In this section we will use the Postgres template to learn how to deploy an existing custom connection. Steps to Deploy the Postgres Custom Connection to your Project Create a CML Project by cloning the provided template with this URL: https://github.com/pdefusco/Using_CustomConn_CML.git Notice the mysqlconn and postgresconn folders are included in the project. Open the postgres folder and familiarize yourself with the code in pg-conn.py. This is the source code for the Custom Connection. The PostgresCustomImp class is a child of CustomConnection which is imported from cml.data_v1.customconnection. You must extend this class in order to create a custom connection. The get_base_connection method uses the psycopg2 module to establish a connection to a given Postgres source. The connection requires a hostname, a default port, a database name, a user and password. When implementing your own custom connection you can choose which connection parameters to include as needed. The get_pandas_dataframe method executes the provided SQL via the Pandas read_sql method. The get_cursor method returns a cursor object. This is optional as the SQL command is executed in get_pandas_dataframe. The parameters required to initialize a connection are set manually via the UI (see below). The override_parameters and check_params_or_env methods ensure that these are set correctly. While these methods are also optional, we recommend implementing them. Navigate to the Workspace Site Administration page and open the Data Connections Tab. Click on the “New Connection” icon. Fill out the form as shown below: Name: Custom Postgres Type: Custom Connection Type Display: Postgres Project: select your project Connection Files: select the “postgresconn” folder Custom Parameters: create the following four key value pairs: PG_HOST : hh-pgsql-public.ebi.ac.uk PG_PORT : 5432 PG_DB : pfmegrnargs PG_USER : reader Note: in this example we are connecting to the RNAcentral Public Postgres Database. Please visit this URL for more information: https://rnacentral.org/help/public-database Navigate back to your project. Open the “Project Settings” page and then the “Data Connections” tab. Make the new connection available in the project by clicking on “Sync with Workspace”. Launch a CML Session with JupyterLab as your Editor. A small resource profile without GPUs is ok. There is no need to enable a Spark Runtime Add-On. The Data Connections window will load automatically. Notice the new “Custom Postgres” connection with a reusable template code block. Open the “using_connections” notebook. Notice the code block to connect to the Postgres database has already been prepopulated for you. Normally you would copy and paste from the Data Connections pop up window. Execute the first four cells and validate that query results are output in the notebook (do not run the entire notebook!). Creating a MySQL Custom Data Connection Now that we have connected to Postgres with a provided Custom Connection template we can follow the same steps to create a custom MySQL connection. Open the mysqlconn folder and familiarize yourself with the code. Notice the source code for this connection is in a separate folder. This is required in order to select a folder in the Custom Data Connection creation form in the Workspace Settings. As in the previous example, we create a MySQLCustomImp class which inherits from the CustomConnection interface in the cml.data_v1.customconnection module. This module does not have to be pip installed and is already provided to you in CML by default. The implemented class methods are similar to the Postgres example. Notice that in this case we don’t have a method to return the connection cursor. In addition, we leverage the mysql-connector-python package rather than psycopg2. To use this custom connection, go through the same steps and set the following Custom Parameters: MYSQL_HOST : mysql-rfam-public.ebi.ac.uk MYSQL_PORT : 4497 MYSQL_DB : Rfam MYSQL_USER: rfamro Note: in this example we are connecting to the Rfam MySQL public database. For more information please visit this URL: https://docs.rfam.org/en/latest/database.html Conclusions In this article we highlighted CML Custom Data Connections. In summary: CML is a Cloud Native Platform for Enterprise Machine Learning. The built-in integrations with the Cloudera Data Platform (CDP) allow CML Users to operate in a secure, governed Machine Learning environment at scale. CML Data Connections enhance Data Analysis, Exploration, and Model Experimentation by providing Data Scientists with an easy interface to process large amounts of data from a notebook with minimum compute resources. Custom Data Connections augment this capability by opening access to 3rd party systems such as external RDBMSs or Cloud Vendor Databases such as Snowflake. CML Users are free to implement their own Custom Connections while giving the CML Admins the ability to approve them at the Workspace level. What Custom Data Connection are you using in CML? Please don’t hesitate to comment with your favorite external data sources.
... View more
Labels:
10-25-2023
08:27 PM
Objective
Great Expectations is a Python-based open-source library for validating, documenting, and profiling your data. It helps you maintain data quality and improve communication about data between teams. Software developers have long known that automated testing is essential for managing complex codebases. Great Expectations brings the same discipline, confidence, and acceleration to data science and data engineering teams. (Source: Great Expectations)
CDP Data Engineering is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Data Engineering is fully integrated with the Cloudera Data Platform, enabling end-to-end visibility and security with SDX as well as seamless integrations with CDP services such as data warehouses and machine learning. Data engineering on CDP powers consistent, repeatable, and automated data engineering workflows on a hybrid cloud platform anywhere. (Source: Streamline and operationalize data pipelines securely at any scale.)
Spark Data Engineers use Great Expectations in CDE to enforce data quality standards on enterprise datasets and data engineering pipelines at massive scale. In the rest of this tutorial, we will walk through a basic workflow with Great Expectations and Spark.
All code, dependencies, and CLI commands are available for reuse in this git repository.
Requirements
The following are required to reproduce the demo in your CDE virtual cluster:
CDE Service version 1.19 and above
A working installation of the CDE CLI Instructions to install the CLI are provided here.
A working installation of git on your local machine Please clone the git repository and keep in mind that all commands assume they are run in the project's main directory.
Custom Docker Runtime as CDE Resource
By default, CDE workloads rely on CDE Resources to mount dependencies into a job container. CDE resources can be of three types:"files,, Python environment, and custom Docker untime.
In our case, we will create a custom Docker runtime to ensure Great Expectations and all required packages are correctly installed. Please open the Dockerfile located in the CDE_Runtime folder to explore more details.
The below steps have already been executed, and the image is available for reuse in my public Docker registry. If you want, create your own image by personalizing the value in the --name parameter to reflect your username and replacing the value in the --docker-username parameter with your DockerHub username.
docker build --network=host -t pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile docker run it pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality Dockerfile /bin/bash docker push pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality
Next, create CDE Resource Credentials and the CDE Custom Docker Runtime resource:
cde credential create --name docker-creds-pauldefusco --type docker-basic --docker-server hub.docker.com --docker-username pauldefusco cde resource create --name dex-spark-runtime-great-expectations-data-quality --image pauldefusco/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-great-expectations-data-quality --image-engine spark3 --type custom-runtime-image
CDE FIles Resources for Job Code
Next, create another CDE Files Resource to store your code and dependencies in your virtual cluster for reuse.
Before running the below command, open properties.conf and replace your Cloud Default Storage location and CDP Workload Username. If you need help finding these, please contact your CDP administrator. Then upload scripts and dependencies.
cde resource create --name job_code_data_quality cde resource upload --name job_code_data_quality --local-path code/airflow.py --local-path code/batch_load.py --local-path code/great_expectations.py --local-path code/parameters.conf --local-path code/utils.py
Finally, we will create two CDE Spark jobs for the data quality pipeline and one CDE Airflow job to orchestrate it. Before doing so, open code/great_expectations.py and familiarize yourself with the code. Here are some of the most important points about the job:
We create a synthetic banking dataset with the Faker and dbldatagen libraries. This gives us 100,000 banking transactions with transaction amount, currency, account numbers, routing numbers, and much more banking data. The data columns are of different types, including numerical, timestamp, and categorical. Review the utils.py script to learn more details.
FakerTextUS = FakerTextFactory(locale=['en_US'], providers=[bank]) partition parameters, etc. self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested) fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested)) .withColumn("name", percentNulls=0.1, text=FakerTextUS("name") ) .withColumn("address", text=FakerTextUS("address" )) .withColumn("email", text=FakerTextUS("ascii_company_email") ) .withColumn("aba_routing", text=FakerTextUS("aba" )) .withColumn("bank_country", text=FakerTextUS("bank_country") ) .withColumn("account_no", text=FakerTextUS("bban" )) .withColumn("int_account_no", text=FakerTextUS("iban") ) .withColumn("swift11", text=FakerTextUS("swift11" )) .withColumn("credit_card_number", text=FakerTextUS("credit_card_number") ) .withColumn("credit_card_provider", text=FakerTextUS("credit_card_provider") ) .withColumn("event_type", "string", values=["purchase", "cash_advance"],random=True) .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00, end="2020-12-31 23:59:00",interval="1 minute", random=True ) .withColumn("longitude", "float", minValue=-180, maxValue=180, random=True) .withColumn("latitude", "float", minValue=-90, maxValue=90, random=True) .withColumn("transaction_currency", values=["USD", "EUR", "KWD", "BHD", "GBP", "CHF", "MEX"]) .withColumn("transaction_amount", "decimal", minValue=0.01, maxValue=30000, random=True) ) df = fakerDataspec.build()
In ge_data_quality.py, the "SparkDFDataset" class is imported from the "great_expectations.dataset.sparkdf_dataset" module. This allows you to wrap a Spark Dataframe and make it compatible with the Great Expectations API
gdf = SparkDFDataset(df)
From the GE Documentation, "Expectations are declarative statements and the interface to the Great Expectations library, supporting your validation, profiling, and translation. Expectations are implemented as classes; some are in the core library, with many others contributed by community members." In the script, we implemented a few wrapper methods for a variety of built-in expectations. For example, the "expect_column_to_exist" expectation checks for column existence.
assert gdf.expect_column_to_exist(column).success, Column {column} is not found."
The GE Gallery offers a constantly growing number of expectations to test for different conditions on different column value types. For example, in the script, we use expectations to check for nulls for all columns: longitude and latitude min and max for transactions; mean and standard deviation for transaction amounts; email string validity with REGEX; and finally, whether the values in the categorical Currency column match or are contained in a provided test set. Here are some more examples that can be found in the ge_data_quality.py script:
# EXPECTATION ON ALL COLUMNS Ensure the existence of all mandatory columns. def run_existance_expactation(gdf, MANDATORY_COLUMNS): for the column in MANDATORY_COLUMNS: try: assert gdf.expect_column_to_exist(column).success, Column {column} is not found." print(f"Column {column} is found") except Exception as e: print(e)
# EXPECTATION ON NUMERIC COLUMN # Ensure minimum longitude is between -180 and 180 def run_longitude_min_expectation(gdf): try: test_result = gdf.expect_column_min_to_be_between(column="longitude", min_value=-180, max_value=180).success, f"Min for column longitude is not within expected range\n" assert test_result.success, f"Min for column longitude is within expected range\n" except Exception as e: print(e)
# EXPECTATION ON STRING COLUMNS Use REGEX to ensure email is in correct format def run_email_match_regex_expectation(gdf): try: test_result = gdf.expect_column_values_to_match_regex(column="email", regex="^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$").success, f"Values for column Email are not within REGEX pattern." assert test_result.success, f"Values for column Email are within REGEX Pattern\n" except Exception as e: print(e)
Please review the ge_data_quality.py script to find more examples.
Again, the idea is to organize tests in a clear, reliable, and reusable manner. When combined with the modularity of the CDE Job and Dependency construct, using Great Expectations and Spark allows you to structure your data pipelines under clear data quality rules.
Finally, the Airflow Job is scheduled to run every five minutes by default. Therefore, there is no need to manually run any of the above jobs. Run the below commands to create the jobs. Then open the CDE Job Runs page to validate the pipeline.
cde job create --name batch_load --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/batch_load.py cde job create --name data_quality --type spark --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --runtime-image-resource-name dex-spark-runtime-great-expectations-data-quality --application-file jobCode/great_expectations.py cde job create --name data_quality_orchestration --type airflow --mount-1-prefix jobCode/ --mount-1-resource job_code_data_quality --dag-file airflow.py
Summary
With CDE Runtimes, you can choose any data quality package of your choice. This article in particular showcased a simple data quality pipeline with Great Expectations, a leading open source package for testing and validating data at scale. You can easily leverage GE at scale with Spark in Cloudera Data Engineering in order to complement Great Expectations' reusable declarative expectations with advanced Spark and Airflow Job observability, monitoring, and development capabilities provided by CDE.
... View more
- « Previous
-
- 1
- 2
- Next »