Member since
11-22-2019
27
Posts
10
Kudos Received
0
Solutions
09-03-2024
04:45 PM
Objective
Cloudera Data Engineering (CDE) is a cloud-native service provided by Cloudera. It is designed to simplify and enhance the development, deployment, and management of data engineering workloads at scale. CDE is part of the Cloudera Data Platform (CDP), which is a comprehensive, enterprise-grade platform for managing and analyzing data across hybrid and multi-cloud environments.
Cloudera Data Engineering offers several advantages. With CDE, you can create a "CDE Spark-Submit" using the same syntax as your regular Spark-Submit. Alternatively, you can specify your Spark-Submit as a "CDE Job of type Spark" using a reusable Job Definition, which enhances observability, troubleshooting, and dependency management.
These unique capabilities of CDE are especially useful for Spark Data Engineers who develop and deploy Spark Pipelines at scale. This includes working with different Spark-Submit definitions and dynamic, complex dependencies across multiple clusters.
For example, when packaging a JAR for a Spark Submit, you can include various types of dependencies that your Spark application requires to run properly. These can consist of application code (compiled Scala/Java code), third-party libraries (external dependencies), configuration and resource files (for application configuration or runtime data), and custom JARs (any internal or utility libraries your application needs).
In this article, you will learn how to effectively manage JAR dependencies and simplify Cloudera Data Engineering in various scenarios.
Example 1: CDE Job with Scala Application Code in Spark Jar
Scala Spark applications are typically developed and deployed in the following manner:
Set Up Project in IDE: Use SBT to set up a Scala project in your IDE.
Write Code: Write your Scala application.
Compile & Package: Use the sbt package to compile and package your code into a JAR.
Submit to Spark: Use spark-submit to run your JAR on a Spark cluster.
In this example, you will build a CDE Spark Job with a Scala application that has already been compiled into a JAR. To learn how to complete these steps, please visit this tutorial.
cde resource create --name cde_scala_job_files
cde resource upload --name cde_scala_job_files --local-path jars/cdejobjar_2.12-1.0.jar
cde job create \
--name cde-scala-job \
--type spark \
--mount-1-resource cde_scala_job_files \
--application-file cdejobjar_2.12-1.0.jar \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g
cde job run --name cde-scala-job
You can add further JAR dependencies with the ```--jar``` or ```--jars``` options. In this case, you can add the Spark XML library from the same CDE Files Resource:
cde resource upload --name cde_scala_job_files --local-path jars/spark-xml_2.12-0.16.0.jar
cde job create \
--name cde-scala-job-jar-dependency \
--type spark \
--mount-1-resource cde_scala_job_files \
--application-file cdejobjar_2.12-1.0.jar \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--jar spark-xml_2.12-0.16.0.jar
cde job run --name cde-scala-job-jar-dependency
Notice that you could achieve the same by using two CDE file resources, each containing one of the JARs. You can create as many CDE file resources as needed for each JAR file.
In the foloowing example, you will be referencing the application code JAR located in the "cde_scala_job_files" CDE Files Resource that you previously created, as well as an additional JAR for the Spark-XML package from a new CDE Files Resource that you will create as "cde_spark_xml_jar".
Note the use of the new "--mount-N-prefix" option below. When you are using more than one CDE Resource with the same "CDE Job Create" command, you need to assign an alias to each Files Resource so that each command option can correctly reference the files.
cde resource create --name cde_spark_xml_jar
cde resource upload --name cde_spark_xml_jar --local-path jars/spark-xml_2.12-0.16.0.jar
cde job create \
--name cde-scala-job-multiple-jar-resources \
--type spark \
--mount-1-prefix scala_app_code \
--mount-1-resource cde_scala_job_files \
--mount-2-prefix spark_xml_jar \
--mount-2-resource cde_spark_xml_jar \
--application-file scala_app_code/cdejobjar_2.12-1.0.jar \
--conf spark.sql.shuffle.partitions=10 \
--executor-cores 2 \
--executor-memory 2g \
--jar spark_xml_jar/spark-xml_2.12-0.16.0.jar
cde job run --name cde-scala-job-multiple-jar-resources
Example 2: CDE Job with PySpark Application Code and Jar Dependency from Maven
For Maven dependencies, you can use the `--packages` option to automatically download and include dependencies. This is often more convenient than manually managing JAR files. In the following example, the `--packages` option replaces the `--jars` option.
In this example, you will reference the Spark-XML package from Maven so that you can use it to parse the sample "books.xml" file from the CDE Files Resource.
cde resource create --name spark_files --type files
cde resource upload --name spark_files --local-path read_xml.py --local-path books.xml
cde job create --name sparkxml \
--application-file read_xml.py \
--mount-1-resource spark_files \
--type spark \
--packages com.databricks:spark-xml_2.12:0.16.0
cde job run --name sparkxml
Like in the previous example, multiple CDE file resources can be used to manage PySpark Application code and the sample XML file. Notice that the application code in ```read_xml_multi_resource.py``` is different. At line 67, the ```sample_xml_file``` Files Resource is referenced directly in the application code with its alias ```xml_data```.
cde resource create --name sample_xml_file --type files
cde resource create --name pyspark_script --type files
cde resource upload --name pyspark_script --local-path read_xml_multi_resource.py
cde resource upload --name sample_xml_file --local-path books.xml
cde job create --name sparkxml-multi-deps \
--application-file code/read_xml_multi_resource.py \
--mount-1-prefix code \
--mount-1-resource pyspark_script \
--mount-2-prefix xml_data \
--mount-2-resource sample_xml_file \
--type spark \
--packages com.databricks:spark-xml_2.12:0.16.0
cde job run --name sparkxml-multi-deps
Example 3: CDE Job with PySpark Application Code and Jar Dependency from CDE Files Resource
Similar to example 1, you can reference JARs directly uploaded into CDE Files Resources instead of using Maven as in example 2.
The following commands pick up from example 2 but replace the ```packages``` option with the ```jars``` option.
Notice that the ```--jars``` option is used in the ```cde job run``` command rather than the ```cde job create```. The ```---jars``` option can either be set at CDE Job creation or runtime.
cde resource create --name spark_xml_jar --type files
cde resource upload --name spark_xml_jar --local-path jars/spark-xml_2.12-0.16.0.jar
cde job create --name sparkxml-multi-deps-jar-from-res \
--application-file code/read_xml_multi_resource.py \
--mount-1-prefix code \
--mount-1-resource pyspark_script \
--mount-2-prefix xml_data \
--mount-2-resource sample_xml_file \
--mount-3-prefix deps \
--mount-2-resource spark_xml_jar \
--type spark \
cde job run --name sparkxml-multi-deps-jar-from-res \
--jar deps/spark-xml_2.12-0.16.0.jar
Summary
In this article, the CDE CLI was used to simplify Spark JAR management with Cloudera Data Engineering.
You can utilize the CDE CLI to create CDE Job Definitions using Spark JAR dependencies and to create CDE file resources to store and reference one or multiple JARs.
Cloudera Data Engineering offers significant improvements in Spark Dependency Management compared to traditional Spark-Submits outside of CDE.
The Job Runs page in the CDE UI can be used to monitor JAR dependencies applied to each job execution. Cloudera Data Engineering presents substantial advancements in Spark Observability and Troubleshooting compared to traditional Spark-Submits outside of CDE.
References & Useful Articles
CDE Concepts
Using the CDE CLI
CDE CLI Command Reference
... View more
08-28-2024
05:40 PM
Objective
Cloudera Machine Learning (CML) is a platform designed to help organizations build, deploy, and manage machine learning models at scale. It is part of Cloudera’s suite of enterprise data platforms and solutions, focusing on providing a robust environment for data scientists, analysts, and engineers to collaborate on end-to-end machine learning workflows.
PyGurobi is a Python interface for the Gurobi Optimizer, a powerful and widely used solver for mathematical optimization problems. Gurobi is known for its high performance in solving a variety of optimization problems, including linear programming (LP), quadratic programming (QP), mixed-integer programming (MIP), and others.
In this tutorial, you will use PyGurobi on CML to optimize product prices and maximize enterprise revenue.
Requirements
The following are required to reproduce this example:
CML Workspace in AWS, Azure, OCP, or ECS.
Basic knowledge of Python for Machine Learning including Sci-Kit Learn, Spark, Iceberg, and XGBoost.
You should have basic familiarity with linear and nonlinear programming. If you are new to mathematical optimization, please visit this link for a quick introduction.
Step by Step Instructions
Supporting code for reproducing the tutorial can be found in this Git repository.
Launch a CML Session with the following runtime and resource profile: Editor: JupyterLab
Kernel: Python 3.10
Edition: Standard
Version: 2024.05
Enable Spark: Spark 3.2 or above
Resource Profile: 2 CPU / 4 GB Mem / 0 GPU
Runtime Image: docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-jupyterlab-python3.10-standard:2024.05.1-b8
Open the terminal and install the requirements: pip3 install -r requirements.txt
Part 0: Data Generation
Run notebook ```00_datagen_iceberg_pyspark.ipynb``` and observe the following:
A Spark dataframe with 10000 synthetic product price transactions is created.
The P1 and P2 columns represent the prices for two products sold, and the N1 column represents the quantity of Product 1 sold.
The dataframe is stored as an Iceberg table.
Part 1: Pricing Optimization with Gurobi
Run notebook ```01_price_optimization_with_competing_products.ipynb``` and observe the following:
An MLFlow Experiment Context is created with the name "Price Optimization Experiment".
An initial regressor is built to predict prices using the data stored in the Iceberg table. The data is read using PandasOnSpark which is included in the Spark Runtime AddOn by default.
A Price Optimization model is instantiated with an Objective Function and associated Constraints.
The model is trained on the data. Its outputs include an optimal price recommendation for the two products, with an associated product quantity, and finally, a revenue estimate. In other words, revenue is maximized at 70347.77 when prices are 400 and 300 for the two products, respectively.
Part 2: Deploy Optimization Model in an API Endpoint
Run notebook ```02_price_optimization_model_deployment.ipynb``` and observe the following:
CML APIv2 allows you to programmatically execute actions within CML Workspaces. In this example, the API is used to create a small Python Interface to manage model deployments.
In particular, the interface was used to create a separate CML Project to host an API Endpoint. The API Endpoint is used to allocate a dedicated container for the model and provide an entry point for prediction requests.
Navigate back to the CML workspace and notice a new project named ```CML Project for Optimization Model``` has been created. Open it and notice a new Endpoint has been created in the Model Deployments section.
Open the model deployment and, once it has been completed, enter the following sample payload in the Test Request window. Observe the output response.
Test Input:
{"p[1]": [354,353,352,351,354,353,312,311,314,313,352,351], "p[2]": [110,120,320,220,101,100,101,260,355,140,300,299], "n[1]": [54,53,112,151,154,153,52,51,4,53,92,71]}
Sample Test Output:
{
"model_deployment_crn": "crn:cdp:ml:us-west-1:558bc1d2-8867-4357-8524-311d51259233:workspace:f76bd7eb-adde-43eb-9bd9-e16ec2cb0238/c152a438-6449-465e-8685-e1cc0b9988fa",
"prediction": {
"data": {
"n[1]": [
54,
53,
112,
151,
154,
153,
52,
51,
4,
53,
92,
71
],
"p[1]": [
354,
353,
352,
351,
354,
353,
312,
311,
314,
313,
352,
351
],
"p[2]": [
110,
120,
320,
220,
101,
100,
101,
260,
355,
140,
300,
299
]
},
"optimal prices": [
400,
300
],
"optimal product quantities": [
80,
120
],
"total revenue": 68032.83
},
"uuid": "e6700d88-f4e7-4705-988b-89e9c8092194"
}
Summary
In this tutorial, you used PyGurobi in Cloudera Machine Learning to maximize product revenue by identifying optimal prices and sales quantities for two products.
The PyGurobi library allows you to solve complex linear and nonlinear programming such as the above. Cloudera on Cloud provides the tooling necessary to use libraries such as PyGurobi in an enterprise setting. With CML you can easily leverage Spark on Kubernetes, Runtime Add-Ons, Iceberg, Python, MLFlow, and more, to install and containerize workloads and machine learning models at scale, without any custom installations.
Related Articles and Resources
Here are some useful articles about Cloudera Machine Learning (CML) that can help you better understand its features and capabilities:
Cloudera Machine Learning - What You Should Know: This article on the Cloudera Community provides an overview of CML, explaining how it enables teams to deploy machine learning workspaces that auto-scale to fit their needs using Kubernetes. It highlights key features like cost-saving through auto-suspend capabilities and offers a consistent experience across an organization. The article is a good starting point for understanding CML's role within the Cloudera Data Platform (CDP). Introduction to CML.
How to Use Experiments in Cloudera Machine Learning: This guide walks through using experiments in CML, which allows users to run scripts with different inputs and compare metrics, particularly useful for tasks like hyperparameter optimization. The article includes practical examples that illustrate how experiments can be applied in real-world. MLFlow Experiments in CML.
Cloudera Machine Learning Documentation: This hands-on guide from Datafloq provides a detailed checklist for managing CML projects effectively, with a focus on optimizing productivity and data quality. It discusses essential components such as data cleansing and how these contribute to improved decision-making, which is crucial for successful machine learning outcomes. CML Documentation.
Getting Started with the Gurobi Python API: This tutorial provides a comprehensive introduction to using PyGurobi, from creating a model, adding variables and constraints, to setting objectives and optimizing. It explains the use of key functions such as `Model.addVar`, `Model.addConstr`, and `Model.setObjective`, making it an excellent starting point for beginners interested in mathematical optimization using Gurobi in Python Gurobi Help Center.
Python API Overview: This overview explains the various types of models that can be handled by Gurobi, such as Mixed Integer Linear Programs (MILP), Mixed Integer Quadratic Programs (MIQP), and Non-Linear Programs (NLP). It also covers the environments used within the Gurobi Python interface and guides solving models, managing multiple solutions, and handling infeasible models. Python API Overview.
Gurobi Optimizer Python Environment: This resource outlines how to set up and start using the Gurobi Python environment, including installing the Gurobi package via Anaconda or pip. It also highlights the different types of licenses available, including free academic licenses and evaluation licenses for commercial users. Gurobi Optimizer Python Environment.
... View more
08-27-2024
05:01 PM
Objective Cloudera Data Engineering (CDE) is a cloud native service for Cloudera Data Platform that allows you to submit batch jobs to auto-scaling virtual clusters. CDE enables you to spend more time on your applications, and less time on infrastructure. Wheels allow for faster installations and more stability in the package distribution process. In the context of PySpark, Wheels allow you to make python dependent modules available to executors without having to do pip install dependencies on every node and to use application source code as a package. In this tutorial you will create a CDE Spark Job using a Wheel file via the CDE CLI. Project Requirements In order to execute the Hands On Labs you need: * A Spark 3 and Iceberg-enabled CDE Virtual Cluster (Azure, AWS and Private Cloud ok). * The CDE CLI installed on your local machine. If you need to install it for the first time please follow these steps. * Familiarity with Python, PySpark and the CDE CLI is highly recommended. * No script code changes are required. Project Setup Clone this GitHub repository to your local machine or the VM where you will be running the script. mkdir ~/Documents/cde_wheel_jobs
cd ~/Documents/cde_wheel_jobs
git clone https://github.com/pdefusco/CDE_Wheel_Jobs.git Alternatively, if you don't have git installed on your machine, create a folder on your local computer; navigate to this URL and manually download the files. Step by Step Instructions The Spark Job code can be found in the ```mywheel/__main__.py``` file but it does not require modifications. For demo purposes we have chosen to use a simple Spark SQL job. The Wheel has already been created for you and will automatically download to the ```dist``` directory in your local machine upon cloning this project. Using the Wheel with a CDE Spark Submit A CDE Spark Submit is the fastest way to prototype a Spark Job. In this example we will run a CDE Spark Submit with the Wheel file. Once you have the CDE CLI installed on your terminal you can launch a CDE Job from local via the CDE CLI via the ```cde spark submit``` command. Copy the following command and execute it in your terminal: cde spark submit --py-files dist/mywheel-0.0.1-py3-none-any.whl mywheel/__main__.py In the terminal, validate that the Spark Job has launched successfully and note the Job Run ID. Next, navigate to the CDE Job Runs UI and validate job execution: Open the Job Configuration tab and notice that the Wheel has been uploaded in a File Resource for you. However, notice that the Job Configuration tab does not provide means to edit or reschedule the job definition. In other words the entries in the Configuration tab are final. In order to be able to change the definition we will need to create a CDE Spark Job. Using the Wheel with a CDE Spark Job Similar to a CDE Spark Submit a CDE Spark Job is Application code to execute a Spark Job in a CDE Virtual Cluster. However, the CDE Job allows you to easily define, edit and reuse configurations and resources in future runs. Jobs can be run on demand or scheduled. An individual job execution is called a job run. In this example we will create a CDE Resource of type File and upload the Spark Application code and the Wheel dependency. Then, we will run the Job. Execute the following CDE CLI commands in your local terminal. Create the File Resource: cde resource create --name mywheels Upload Application Code and Wheel to the File Resource: cde resource upload --name mywheels --local-path dist/mywheel-0.0.1-py3-none-any.whl cde resource upload --name mywheels --local-path mywheel/__main__.py Navigate to the CDE Resource tab and validate that the Resource and the corresponding files are now available. Create the CDE Spark Job definition. Navigate to the CDE Jobs UI and notice a new CDE Spark Job has been created. The job hasn't run yet so only the Configuration tab is populated with the Spark Job definition. cde job create --name cde_wheel_job --type spark --py-files mywheel-0.0.1-py3-none-any.whl --application-file __main__.py --mount-1-resource mywheels Finally, run the job. Now the Job Runs will include a new entry reflecting Job execution: cde job run --name cde_wheel_job Notice that the CDE Job definition can now be edited. This allows you to make changes to files and dependencies, create or change job execution schedule, and more. For example, the CDE Job can now be executed again. Conclusions & Next Steps CDE is the Cloudera Data Engineering Service, a containerized managed service for Spark and Airflow. If you are exploring CDE you may find the following tutorials relevant: Spark 3 & Iceberg: A quick intro of Time Travel Capabilities with Spark 3. Simple Intro to the CDE CLI: An introduction to the CDE CLI for the CDE beginner. CDE CLI Demo: A more advanced CDE CLI reference with additional details for the CDE user who wants to move beyond the basics. CDE Resource 2 ADLS: An example integration between ADLS and CDE Resource. This pattern is applicable to AWS S3 as well and can be used to pass execution scripts, dependencies, and virtually any file from CDE to 3rd party systems and viceversa. Using CDE Airflow: A guide to Airflow in CDE including examples to integrate with 3rd party systems via Airflow Operators such as BashOperator, HttpOperator, PythonOperator, and more. GitLab2CDE: a CI/CD pipeline to orchestrate Cross-Cluster Workflows for Hybrid/Multicloud Data Engineering. Postman2CDE: An example of the Postman API to bootstrap CDE Services with the CDE API. Oozie2CDEAirflow API: An API to programmatically convert Oozie workflows and dependencies into CDE Airflow and CDE Jobs. This API is designed to easily migrate from Oozie to CDE Airflow and not just Open Source Airflow. For more information on the Cloudera Data Platform and its form factors please visit this site. For more information on migrating Spark jobs to CDE, please reference this guide. If you have any questions about CML or would like to see a demo, please reach out to your Cloudera Account Team or send a message through this portal and we will be in contact with you soon.
... View more
05-30-2024
05:48 PM
2 Kudos
CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Apache Iceberg is an open table format for huge analytic datasets. It provides features that, coupled with Spark as the compute engine, allow you to build data processing pipelines with dramatic gains in terms of scalability, performance, and overall developer productivity.
CDE Provides native Iceberg support. With the release of CDE 1.20, the Spark Runtime has been updated with Apache Iceberg 1.3. This version introduces new features that provide great benefits to Data Engineers.
This article will familiarize you with Iceberg Table Branching and Tagging in CDE.
Table Branching: the ability to create independent lineages of snapshots, each with its lifecycle.
Table Tagging: ability to tag an Iceberg table snapshot.
Requirements
CDE Virtual Cluster of type "All-Purpose" running in CDE Service with version 1.20 or above.
A working installation of the CDE CLI.
The supporting code and associated files and resources are available in this git repository.
Step by Step Instructions
Prerequisites
Create CDE Files Resource: cde resource create --name myFiles --type files
cde resource upload --name myFiles --local-path resources/cell_towers_1.csv --local-path resources/cell_towers_2.csv
Launch CDE Session & Run Spark Commands: cde session create --name icebergSession --type pyspark --mount-1-resource myFiles
cde session interact --name icebergSession
Create Iceberg Table: USERNAME = "pauldefusco"
df = spark.read.csv("/app/mount/cell_towers_1.csv", header=True, inferSchema=True)
df.writeTo("CELL_TOWERS_{}".format(USERNAME)).using("iceberg").tableProperty("write.format.default", "parquet").createOrReplace()
Working with Iceberg Table Branches
Insert Data into Branch: # LOAD NEW TRANSACTION BATCH
batchDf = spark.read.csv("/app/mount/cell_towers_2.csv", header=True, inferSchema=True)
batchDf.printSchema()
batchDf.createOrReplaceTempView("BATCH_TEMP_VIEW".format(USERNAME))
# CREATE TABLE BRANCH
spark.sql("ALTER TABLE CELL_TOWERS_{} CREATE BRANCH ingestion_branch".format(USERNAME))
# WRITE DATA OPERATION ON TABLE BRANCH
batchDf.write.format("iceberg").option("branch", "ingestion_branch").mode("append").save("CELL_TOWERS_{}".format(USERNAME))
Notice that a simple SELECT query against the table still returns the original data. spark.sql("SELECT * FROM CELL_TOWERS_{};".format(USERNAME)).show()
If you want to access the data in the branch, you can specify the branch name in your SELECT query. spark.sql("SELECT * FROM CELL_TOWERS_{} VERSION AS OF 'ingestion_branch';".format(USERNAME)).show()
Track table snapshots post Merge Into operation: # QUERY ICEBERG METADATA HISTORY TABLE
spark.sql("SELECT * FROM CELL_TOWERS_{}.snapshots".format(USERNAME)).show(20, False)
Cherrypicking Snapshots
The cherrypick_snapshot procedure creates a new snapshot incorporating the changes from another snapshot in a metadata-only operation (no new data files are created). To run the cherrypick_snapshot procedure you need to provide two parameters: the name of the table you’re updating and the ID of the snapshot the table should be updated based on. This transaction will return the snapshot IDs before and after the cherry-pick operation as source_snapshot_id and current_snapshot_id.
You will use the cherrypick operation to commit the changes to the table that were staged in the 'ingestion_branch' branch up until now.
# SHOW PAST BRANCH SNAPSHOT ID'S
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{}.refs;".format(USERNAME)).show()
# SAVE THE SNAPSHOT ID CORRESPONDING TO THE CREATED BRANCH
branchSnapshotId = spark.sql("SELECT snapshot_id FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{}.refs WHERE NAME == 'ingestion_branch';".format(USERNAME)).collect()[0][0]
# USE THE PROCEDURE TO CHERRY-PICK THE SNAPSHOT
# THIS IMPLICITLY SETS THE CURRENT TABLE STATE TO THE STATE DEFINED BY THE CHOSEN PRIOR SNAPSHOT ID
spark.sql("CALL spark_catalog.system.cherrypick_snapshot('SPARK_CATALOG.DEFAULT.CELL_TOWERS_{0}',{1})".format(USERNAME, branchSnapshotId))
# VALIDATE THE CHANGES
# THE TABLE ROW COUNT IN THE CURRENT TABLE STATE REFLECTS THE APPEND OPERATION - IT PREVIOSULY ONLY DID BY SELECTING THE BRANCH
spark.sql("SELECT COUNT(*) FROM CELL_TOWERS_{};".format(USERNAME)).show()
Working with Iceberg Table Tags
Tags are immutable labels for Iceberg Snapshot IDs and can be used to reference a particular table version via a simple tag rather than having to work with Snapshot IDs directly.
Create Table Tag: spark.sql("ALTER TABLE SPARK_CATALOG.DEFAULT.CELL_TOWERS_{} CREATE TAG businessOrg RETAIN 365 DAYS".format(USERNAME)).show()
Select your table snapshot as of a particular tag: spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{} VERSION AS OF 'businessOrg';".format(USERNAME)).show()
The refs Metadata Table
The refs metadata table helps you understand and manage your table’s snapshot history and retention policy, making it a crucial part of maintaining data versioning and ensuring that your table’s size is under control. Among its many use cases, the table provides a list of all the named references within an Iceberg table such as Branch names and corresponding Snapshot IDs.
spark.sql("SELECT * FROM SPARK_CATALOG.DEFAULT.CELL_TOWERS_{}.refs;".format(USERNAME)).show()
Summary & Next Steps
CDE supports Apache Iceberg which provides a table format for huge analytic datasets in the cloud. Iceberg enables you to work with large tables, especially on object stores and supports concurrent reads and writes on all storage media. You can use Cloudera Data Engineering virtual clusters running Spark 3 to interact with Apache Iceberg tables.
The Iceberg Metadata Layer can track snapshots under different paths or give particular snapshots a name. These features are respectively called table branching and tagging. Thanks to them, Iceberg Data Engineers can implement data pipelines with advanced isolation, reproducibility, and experimentation capabilities.
... View more
05-30-2024
05:19 PM
1 Kudo
CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
To manage job dependencies, CDE supports creating custom Python environments dedicated to Airflow using the airflow-python-env resource type. With this option, you can install custom libraries for running your Directed Acyclic Graphs (DAGs). The supported version is Python 3.8.
A resource is a named collection of files or other resources referenced by a job. The airflow-python-env resource type allows you to specify a requirements.txt file that defines an environment that you can then activate globally for airflow deployments in a virtual cluster.
You can install and use custom Python packages for Airflow with CDE. Typically this feature is used to install third-party Airflow providers in CDE. However, it can also be used to install any Python package and use it within the DAG logic.
CDEPY is a package that allows you to do all the above with the convenience of Python. With it, you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python. It is available on PyPi at this URL and can be easily installed with a "pip install cdepy" command.
In this example, you will use CDEPY to create a CDE Airflow Python environment with the Amazon Provider for Airflow. Then, you will deploy an Airflow DAG that creates an S3 bucket, reads a Txt file from a CDE Files Resource writes it to the S3 bucket, launches a CDE Spark Job, and finally deletes the S3 bucket.
Requirements
A CDE Service with Version 1.21 or above.
The code including Airflow DAG, PySpark script, and associated resources are available in this git repository.
A local machine with Python and the latest version of the cdepy Python package installed. pip install cdepy
End-to-End Example
Import cdepy modules and set environment variables: from cdepy import cdeconnection
from cdepy import cdeairflowpython
import os
import json
Connect via CdeConnection Object
JOBS_API_URL = "<myJobsAPIurl>"
WORKLOAD_USER = "<myusername>"
WORKLOAD_PASSWORD = "<mypwd>"
Instantiate a CdeConnection object to be able to connect to the CDE Virtual Cluster. myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
myCdeConnection.setToken()
Instantiate a CdeAirflowPythonEnv object to manage Airflow Python Environments. myAirflowPythonEnvManager = cdeairflowpython.CdeAirflowPythonEnv(myCdeConnection)
Create a Maintenance Session to perform any Airflow Python Environments-related actions. myAirflowPythonEnvManager.createMaintenanceSession()
Register a pip repository in CDE. myAirflowPythonEnvManager.createPipRepository()
Check on the Status of the Maintenance Session myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
The output should be ```{"status":"pip-repos-defined"}```. Load requirements.txt file pathToRequirementsTxt = "/resources/requirements.txt"
myAirflowPythonEnvManager.buildAirflowPythonEnv(pathToRequirementsTxt)
The requirements.txt file must be customized before it is uploaded. myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
The response status should be ```{"status":"building"}```. Repeat the request in a couple of minutes. Eventually, once the response status becomes ```{"status":"built"}``` you will be ready to move on.
Validate the status of the Python environment. myAirflowPythonEnvManager.getAirflowPythonEnvironmentDetails()
Explore Maintenace Session logs. myAirflowPythonEnvManager.viewMaintenanceSessionLogs()
Activate the Python environment. myAirflowPythonEnvManager.activateAirflowPythonEnv()
Check on the Python environment build status. myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
The response should be ```{"status":"activating"}```. The maintenance session will then end after a couple of minutes. This means that the environment has been activated.
Once the Airflow Python environment has been activated, you can create a CDE Airflow Job. First, create a pipeline resource and upload the dag to it: CDE_RESOURCE_NAME = "my_pipeline_resource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
LOCAL_FILE_PATH = "resources"
LOCAL_FILE_NAME = "s3BucketDag.py"
myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
Create files resource. The Airflow DAG will use the S3BucketOperator and the BashOperator to read the file from the CDE Files Resource and write it in an S3 bucket. CDE_RESOURCE_NAME = "my_file_resource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
LOCAL_FILE_PATH = "resources"
LOCAL_FILE_NAME = "my_file.txt"
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
Create a CDE Spark Job along with its resources: CDE_RESOURCE_NAME = "my_script_resource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
LOCAL_FILE_PATH = "resources"
LOCAL_FILE_NAME = "pysparksql.py"
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
myCdeClusterManager.createJob(myCdeSparkJobDefinition)
CDE_JOB_NAME = "simple-pyspark"
myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME, CDE_RESOURCE_NAME, APPLICATION_FILE_NAME=LOCAL_FILE_NAME, executorMemory="2g", executorCores=2)
Create & Run CDE Airflow Job: CDE_JOB_NAME = "PythonEnvDag"
DAG_FILE = "s3BucketDag.py"
CDE_RESOURCE_NAME = "my_pipeline_resource"
myCdeAirflowJob = cdejob.CdeAirflowJob(myCdeConnection)
myCdeAirflowJobDefinition = myCdeAirflowJob.createJobDefinition(CDE_JOB_NAME, DAG_FILE, CDE_RESOURCE_NAME)
myCdeClusterManager.createJob(myCdeAirflowJobDefinition)
myCdeClusterManager.runJob(CDE_JOB_NAME)
Optional: Create a new Maintenance Session to delete the Python environment myAirflowPythonEnvManager.createMaintenanceSession()
myAirflowPythonEnvManager.deleteAirflowPythonEnv()
Optional: End the Maintenance Session once you have deleted the Python environment: myAirflowPythonEnvManager.deleteMaintenanceSession()
References
Documentation
Introductory Article to CDEPY
CDEPY on PyPi
... View more
05-30-2024
05:12 PM
1 Kudo
CDP Data Engineering (CDE) is the only cloud-native service purpose-built for enterprise data engineering teams. Building on Apache Spark, Data Engineering is an all-inclusive data engineering toolset that enables orchestration automation with Apache Airflow, advanced pipeline monitoring, visual troubleshooting, and comprehensive management tools to streamline ETL processes across enterprise analytics teams.
Git repositories allow teams to collaborate, manage project artifacts, and promote applications from lower to higher environments. CDE supports integration with Git providers such as GitHub, GitLab, and Bitbucket to synchronize job runs with different versions of your code.
CDEPY is a package that allows you to do all the above with the convenience of Python. With it, you can remotely connect to a Virtual Cluster from your local machine or 3rd party tool as long as it supports Python.
In this tutorial, you will use CDEPY to create a CDE Repository from a Git repository and create a CDE Spark Job using the PySpark script loaded in the repository.
Requirements
A CDE Service with Version 1.21 or above.
The code supporting this article is available in this git repository.
A local machine with Python and the latest version of the cdepy Python package installed. pip install cdepy
End-to-End Example
Import cdepy modules and set environment variables: from cdepy import cdeconnection
from cdepy import cdeairflowpython
from cdepy import cderepositories
from cdepy import cdejob
from cdepy import cdemanager
import os
import json
#Connect via CdeConnection Object
JOBS_API_URL = "<myJobsAPIurl>"
WORKLOAD_USER = "<myusername>"
WORKLOAD_PASSWORD = "<mypwd>"
Instantiate a CdeConnection object to be able to connect to the CDE Virtual Cluster. myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
myCdeConnection.setToken()
Instantiate a CdeRepositoryManager object to be able to interact with CDE repositories. myRepoManager = cderepositories.CdeRepositoryManager(myCdeConnection)
Provide git repository information. Use the provided git repository for testing purposes. repoName = "exampleGitRepository"
repoPath = "https://github.com/pdefusco/cde_git_repo.git"
Create CDE Repository from Git Repository. myRepoManager.createRepository(repoName, repoPath, repoBranch="main")
Show available CDE repositories. json.loads(myRepoManager.listRepositories())
Show CDE Repository Metadata. json.loads(myRepoManager.describeRepository(repoName))
Download the file from the CDE Repository. filePath = "simple-pyspark-sql.py"
myRepoManager.downloadFileFromRepo(repoName, filePath)
Delete CDE Repository. myRepoManager.deleteRepository(repoName)
Validate CDE Repository Deletion. json.loads(myRepoManager.listRepositories())
Create a CDE Spark Job from a CDE Repository: CDE_JOB_NAME = "sparkJobFromRepo"
#Set path of PySpark script inside the CDE Repository:
applicationFilePath = "simple-pyspark-sql.py"
myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME=CDE_JOB_NAME, \
CDE_RESOURCE_NAME=repoName, \
APPLICATION_FILE_NAME=applicationFilePath, \
executorMemory="2g", \
executorCores=2)
myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)
myCdeClusterManager.createJob(myCdeSparkJobDefinition)
myCdeClusterManager.runJob(CDE_JOB_NAME)
Optional: update code in "simple-pyspark-sql.py" in your git repository.
Then pull from git repo to CDE repo to synchronize code changes. myRepoManager.pullRepository(repoName)
Describe the CDE repository again. Notice changes to metadata. json.loads(myRepoManager.describeRepository(repoName))
Download the file from the CDE Repository. myRepoManager.downloadFileFromRepo(repoName, filePath)
Delete CDE Repository. myRepoManager.deleteRepository(repoName)
Validate CDE Repository Deletion. json.loads(myRepoManager.listRepositories())
References
Documentation
Introductory Article to CDEPY
CDEPY on PyPi
... View more
Labels:
05-13-2024
05:33 PM
CDE is the Cloudera Data Engineering Service, a containerized managed service for the Cloudera Data Platform designed for Large Scale Batch and Streaming Pipelines with Spark, Airflow, and Iceberg. It allows you to submit batch jobs to auto-scaling virtual clusters. As a Cloud-Native service, CDE enables you to spend more time on your applications, and less time on infrastructure.
A CDE Session is an interactive short-lived development environment for running Python, Scala, and Spark commands to help you iterate upon and build your Spark workloads. You can use CDE Sessions in CDE Virtual Clusters of type "All Purpose - Tier 2".
A resource in CDE is a named collection of files used by a job or a session. Resources can include application code, configuration files, custom Docker images, and Python virtual environment specifications
As of version 1.20, CDE introduces the ability to leverage CDE Resources when running CDE Interactive Sessions. This means that you can leverage files and Python environments when working with your data interactively.
In this example, we will leverage both CDE Files and Python resources to parse a YAML file.
CDE provides a command line interface (CLI) client. You can use the CLI to create and update jobs, view job details, manage job resources, run jobs, sessions, etc. We will use the CDE CLI to run the following example commands:
Create the CDE Files resource and load a YAML file to it.
%cde resource create --type files --name myYamlFiles
%cde resource upload --name myYamlFiles --local-path cdesessionsresources/myyaml.yaml
Create a CDE Python resource and activate the environment.
%cde resource create --type python-env --name py_yaml_resource
%cde resource upload --name py_yaml_resource --local-path cdesessionsresources/requirements.txt
Allow a moment for the Python resource to build. Check the build status in the UI.
Next, launch the CDE Session:
%cde session create --name pyyamlSession --type pyspark --python-env-resource-name py_yaml_resource --mount-1-resource myYamlFiles
{
"name": "pyyamlSession",
"type": "pyspark",
"creator": "pauldefusco",
"created": "2024-05-13T23:47:29Z",
"mounts": [
{
"dirPrefix": "/",
"resourceName": "myYamlFiles"
}
],
"lastStateUpdated": "2024-05-13T23:47:29Z",
"state": "starting",
"interactiveSpark": {
"id": 4,
"driverCores": 1,
"executorCores": 1,
"driverMemory": "1g",
"executorMemory": "1g",
"numExecutors": 1,
"pythonEnvResourceName": "py_yaml_resource"
}
}
Open the PySpark shell and execute the rest of the following code from there:
%cde session interact --name pyyamlSession
Starting REPL...
Waiting for the session to go into an available state...
Connected to Cloudera Data Engineering...
Press Ctrl+D (i.e. EOF) to exit
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\
/_/
Type in expressions to have them evaluated.
>>>
Import the PyYaml Python module:
>>> import yaml
>>> from yaml import load, dump
>>> from yaml import Loader, Dumper
Load the data from the files resource:
>>> myYamlDocument = "/app/mount/sample.yaml"
Parse the file with PyYaml:
>>> with open(myYamlDocument) as stream:
data = yaml.load(stream, Loader=Loader)
print(data)
{'name': "Martin D'vloper", 'job': 'Developer', 'skill': 'Elite', 'employed': True, 'foods': ['Apple', 'Orange', 'Strawberry', 'Mango'], 'languages': {'perl': 'Elite', 'python': 'Elite', 'pascal': 'Lame'}, 'education': '4 GCSEs\n3 A-Levels\nBSc in the Internet of Things\n'}
... View more
Labels:
04-05-2024
02:21 AM
Share my memo on setting up the .cde/config.yaml: user:(my user in CDP, not the email address)
vcluster-endpoint: (find it in the Adminitration -> Virtual Cluster details -> JOBS API URL)
... View more
03-27-2024
05:39 PM
Logging Iceberg Metrics with MLFlow Tracking in CML Cloudera Machine Learning (CML) on Cloudera Data Platform (CDP) accelerates time-to-value by enabling data scientists to collaborate in a single unified platform that is all inclusive for powering any AI use case. Purpose-built for agile experimentation and production ML workflows, Cloudera Machine Learning manages everything from data preparation to MLOps, to predictive reporting. CML is compatible with the MLflow Tracking API and makes use of the MLflow client library as the default method to log experiments. Existing projects with existing experiments are still available and usable. CML’s experiment tracking features allow you to use the MLflow client library for logging parameters, code versions, metrics, and output files when running your machine learning code. Apache Iceberg is a table format for huge analytics datasets in the cloud that defines how metadata is stored and data files are organized. Iceberg is also a library that compute engines can use to read/write a table. CML offers Data Connections to connect to Data Sources available within the CDP Environment including Iceberg Open Lakehouses. In this example we will create an experiment with MLFlow Tracking and log Iceberg metadata in order to enhance machine learning reproducibility in the context of MLOps. Step by Step Guide The code samples provided below are extracts from the accompanying notebook. The full code can be found in this git repository. Setup Create a CML Project with Python 3.9 / JupyterLab Editor Runtime. Launch a CML Session and install requirements. Run Notebook Run each cell in the notebook. Code highlights: MLFlow Tracking supports modules built specifically for some of the most popular open source frameworks. In this case we will import "mlflow.spark" You can leverage CML Spark Data Connections to launch a SparkSession object with the recommended Iceberg Spark configurations. Spark Data Connections make connecting to your Iceberg data effortless. import mlflow.spark
import cml.data_v1 as cmldata
#Edit your connection name here:
CONNECTION_NAME = "se-aw-mdl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session() The exp1 method acts as a wrapper to your first MLFlow experiment. The experiment name is set with the mlflow.set_exeperiment method. Data is written from a PySpark dataframe to an Iceberg table via a simple routine: "df.writeTo().createOrReplace()" Iceberg History and Snapshots tables are available for you to monitor Iceberg metadata. In this example we save the latest snapshot ID along with its timestamp and parent snapshot ID into Python variables. Within the context of this experiment run, a Spark ML Pipeline is trained to tokenize and classify text. MLFlow Tracking allows you to set custom tags. These tags can be used to search your experiments using the MLFlow client. MLFlow Tracking allows you to create a run context to track metrics according to a specific run. In this particular case we use log_metric method to track the Iceberg variables corresponding to snaphot and write operation timestamp. Once the experiment completes you can retrieve its ID and more metadata using the MLFlow client. def exp1(df):
mlflow.set_experiment("sparkml-experiment")
##EXPERIMENT 1
df.writeTo("spark_catalog.default.training").using("iceberg").createOrReplace()
spark.sql("SELECT * FROM spark_catalog.default.training").show()
### SHOW TABLE HISTORY AND SNAPSHOTS
spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)
snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": training_df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=8
regParam=0.01
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
return runs_df The second experiment is very similar to the first, except data is appended to the Iceberg table via "df.writeTo().append()" As data is inserted into the table, new Iceberg metadata is generated in the Iceberg Metadata Layer and becomes available in the Snapshots and History tables. This metadata is tracked into new Python variables. In this particular example we again use the "log_metric" method to track the Iceberg Snapshot ID and Timestamp for this append operation. Within the context of this experiment run, the Spark ML Pipeline is retrained for the same purpose of tokenizing and classifying text, but using the new version of the data after the append operation. def exp2(df):
mlflow.set_experiment("sparkml-experiment")
##EXPERIMENT 2
### ICEBERG INSERT DATA - APPEND FROM DATAFRAME
# PRE-INSERT
spark.sql("SELECT * FROM spark_catalog.default.training").show()
temp_df = spark.sql("SELECT * FROM spark_catalog.default.training")
temp_df.writeTo("spark_catalog.default.training").append()
df = spark.sql("SELECT * FROM spark_catalog.default.training")
# PROST-INSERT
spark.sql("SELECT * FROM spark_catalog.default.training").show()
spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)
snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=10
regParam=0.002
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
return runs_df Finally, in the third experiment we retrain the Spark ML Pipeline but first we retrieve the data as it was prior to the append operation by applying the provided Iceberg Snapshot ID in the "spark.read.table" method. def exp3(df, snapshot_id):
##EXPERIMENT 3
df = spark.read.option("snapshot-id", snapshot_id).table("spark_catalog.default.training")
committed_at = spark.sql("SELECT committed_at FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).collect()[0][0].strftime('%m/%d/%Y')
parent_id = str(spark.sql("SELECT parent_id FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).tail(1)[0][0])
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": training_df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=7
regParam=0.005
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
#spark.stop()
return runs_df Summary and Next Steps Large ML organizations require standardized best practices such as tracking models and respective dependencies, model developers, and matching those with datasets in order to keep a consistent view of all MLOps practices. MLFlow Tracking in CML allows you to achieve this goal by allowing you to specify datasets and other custom metadata when tracking experiment runs. In the above example we tracked Iceberg metadata in order to allow data scientists to retrain an existing pipeline with datasets as of arbitrary points in time. In the process, we used tags in order to implement a consistent taxonomy across all experiment runs. CML Model Deployment with MLFlow and APIv2 Spark in CML: Recommendatons for using Spark Experiments with MLFlow Registering and Deploying Models with Model Registry Apache Iceberg Documentation Iceberg Time Travel Introducing MLOps and SDX for Models in CML
... View more
03-27-2024
04:42 PM
CML Model Deployment with MLFlow and APIv2 Cloudera Machine Learning (CML) on Cloudera Data Platform (CDP) accelerates time-to-value by enabling data scientists to collaborate in a single unified platform that is all inclusive for powering any AI use case. Purpose-built for agile experimentation and production ML workflows, Cloudera Machine Learning manages everything from data preparation to MLOps, to predictive reporting. CML exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line. In this example we will showcase how to use APIv2 to programmatically register an XGBoost experiment via MLFlow Tracking, Registry, and deploy it as a model endpoint in CML. MLOps in CML CML has extensive MLOps features and a set of model and lifecycle management capabilities to enable the repeatable, transparent, and governed approaches necessary for scaling model deployments and ML use cases. It´s built to support open source standards and is fully integrated with CDP, enabling customers to integrate into existing and future tooling while not being locked into a single vendor. CML enables enterprises to proactively monitor technical metrics such as service level agreements (SLA) adherence, uptime, and resource use as well as prediction metrics including model distribution, drift, and skew from a single governance interface. Users can set custom alerts and eliminate the model “black box” effect with native tools for visualizing model lineage, performance, and trends. Some of the benefits with CML include: Model cataloging and lineage capabilities to allow visibility into the entire ML lifecycle, which eliminates silos and blind spots for full lifecycle transparency, explainability, and accountability. Full end-to-end machine learning lifecycle management that includes everything required to securely deploy machine learning models to production, ensure accuracy, and scale use cases. An extensive model monitoring service designed to track and monitor both technical aspects and accuracy of predictions in a repeatable, secure, and scalable way. New MLOps features for monitoring the functional and business performance of machine learning models such as detecting model performance and drift over time with native storage and access to custom and arbitrary model metrics; measuring and tracking individual prediction accuracy, ensuring models are compliant and performing optimally. The ability to track, manage, and understand large numbers of ML models deployed across the enterprise with model cataloging, full lifecycle lineage, and custom metadata in Apache Atlas. The ability to view the lineage of data tied to the models built and deployed in a single system to help manage and govern the ML lifecycle. Increased model security for Model REST endpoints, which allows models to be served in a CML production environment without compromising security. Use Case In this example we will create a basic MLOps pipeline to put a credit card fraud classifier into production. We will create a model prototype with XGBoost, register and manage experiments with MLFlow Tracking, and stage the best experiment run in the MLFlow Registry. Next, we will deploy the model from the Registry into an API Endpoint, and finally redeploy it with additional resources for High Availability and increased serving performance. The full code is available in this git repository. Step by Step Guide Setup Create a CML Project with Python 3.9 / Workbench Editor Runtime. Launch a CML Session and install requirements. Open script "00_datagen.py" and update lines 140 and 141 with your Iceberg database name and Spark Data Connection Name. Then run it. Script 1: Create the Model Experiment Run script "01_train_xgboost.py" in order to create an MLFlow Experiment. Code highlights: MLFlow is installed in CML by default. You must import mlflow in order to use it in your script. The experiment run is determined by the "mlflow tracking run context". When this executes for the first time an experiment is created. If the same code runs again without changing EXPERIMENT_NAME, a new Experiment Run is logged for the same experiment. Else, a new experiment is created. You can log one or multiple metrics for the specific run with the "mlflow.log_param()" method. Model artifacts such as useful metadata and dependencies are logged with the "mlflow.log_model()" method. import mlflow
EXPERIMENT_NAME = "xgb-cc-fraud-{0}-{1}".format(USERNAME, DATE)
mlflow.set_experiment(EXPERIMENT_NAME)
with mlflow.start_run():
model = XGBClassifier(use_label_encoder=False, eval_metric="logloss")
model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
mlflow.log_param("accuracy", accuracy)
mlflow.xgboost.log_model(model, artifact_path="artifacts") You can use the mlflow library or instantiate an mlflow client to manage experiments. In this example we use the "mlflow.get_experiment_by_name()", "mlflow.search_runs()" and "mlflow.get_run()" methods. In this example we also instantiate the client to list artifacts for a specific run. def getLatestExperimentInfo(experimentName):
"""
Method to capture the latest Experiment Id and Run ID for the provided experimentName
"""
experimentId = mlflow.get_experiment_by_name(experimentName).experiment_id
runsDf = mlflow.search_runs(experimentId, run_view_type=1)
experimentId = runsDf.iloc[-1]['experiment_id']
experimentRunId = runsDf.iloc[-1]['run_id']
return experimentId, experimentRunId
experimentId, experimentRunId = getLatestExperimentInfo(EXPERIMENT_NAME)
#Replace Experiment Run ID here:
run = mlflow.get_run(experimentRunId)
pd.DataFrame(data=[run.data.params], index=["Value"]).T
pd.DataFrame(data=[run.data.metrics], index=["Value"]).T
client = mlflow.tracking.MlflowClient()
client.list_artifacts(run_id=run.info.run_id) Script 2: Register the Model Experiment Run script "02_cml_api_endpoint.py" in order to register an MLFlow Experiment. Code highlights: CML APIv2 is installed in your workspace by default. You must import cmlapi in order to use it in your script. The API provides about 100 python methods for MLOps. In this example, we created a "registerModelFromExperimentRun" method as a wrapper to the API's create_registered_model() method. In this example, we created a ModelRegistration class including the "registerModelFromExperimentRun" method to register the model. Creating your own Python classes and methods to implement the API methods in the context of your project is highly recommended. import cmlapi
from cmlapi.rest import ApiException
class ModelRegistration():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, username, experimentName):
self.client = cmlapi.default_client()
self.username = username
self.experimentName = experimentName
def registerModelFromExperimentRun(self, modelName, experimentId, experimentRunId, modelPath):
"""
Method to register a model from an Experiment Run
Input: requires an experiment run
Output: api_response object
"""
model_name = 'xgb-cc-' + username
CreateRegisteredModelRequest = {
"project_id": os.environ['CDSW_PROJECT_ID'],
"experiment_id" : experimentId,
"run_id": experimentRunId,
"model_name": modelName,
"model_path": modelPath
}
try:
# Register a model.
api_response = self.client.create_registered_model(CreateRegisteredModelRequest)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_registered_model: %s\n" % e)
return api_response The response from the method request contains very useful information. In this example, the registeredModelResponse response includes modelId and modelVersionId variables which are in turn used by other API methods. modelReg = ModelRegistration(username, experimentName)
modelPath = "artifacts"
modelName = "FraudCLF-" + username
registeredModelResponse = modelReg.registerModelFromExperimentRun(modelName, experimentId, experimentRunId, modelPath)
modelId = registeredModelResponse.model_id
modelVersionId = registeredModelResponse.model_versions[0].model_version_id
registeredModelResponse.model_versions[0].model_version_id Script 3: Deploy Endpoint from Registry Model Run script "03_api_deployment.py" in order to create a Model Endpoint from the registered model. Code highlights: In this example we created a ModelDeployment class to manage multiple API wrapper methods. The "listRegisteredModels()" method is a wrapper to the API's "list_registered_models()" method. Notice it is arbitrarily preconfigured to list models corresponding to your user and model name. In the context of a broader MLOps pipeline, these values can obviously be parameterized. This method is necessary for obtaining the "registeredModelId" variable needed for model deployment. The "getRegisteredModel()" method is a wrapper to the API's "get_registered_model()" method. This method is necessary for obtaining the "modelVersionId" variable needed for model deployment. Once registeredModelId and modelVersionId are obtained, you can begin the deployment. The deployment consists of three phases: model creation, model build, and model deployment. The model creation corresponds to the creation of an API Endpoint. Once you run this, you will see a new entry in the Model Deployments tab. The model build corresponds to the creation of the model's container. Thanks to MLFlow Registry, CML automatically packages all dependencies used to train the Experiment into the model endpoint for you. The model deployment corresponds to the activation of the model endpoint. This is when the container with its associated resource profile and endpoint is actually deployed so inference can start. The "listRuntimes()" method is an example of querying the Workspace for all available runtimes in order to select the most appropriate for model build. class ModelDeployment():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, projectId, username):
self.client = cmlapi.default_client()
self.projectId = projectId
self.username = username
def listRegisteredModels(self):
"""
Method to retrieve registered models by the user
"""
#str | Search filter is an optional HTTP parameter to filter results by. Supported search_filter = {\"model_name\": \"model_name\"} search_filter = {\"creator_id\": \"<sso name or user name>\"}. (optional)
search_filter = {"creator_id" : self.username, "model_name": "FraudCLF-"+self.username}
search = json.dumps(search_filter)
page_size = 1000
try:
# List registered models.
api_response = self.client.list_registered_models(search_filter=search, page_size=page_size)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_registered_models: %s\n" % e)
return api_response
def getRegisteredModel(self, modelId):
"""
Method to return registered model metadata including model version id
"""
search_filter = {"creator_id" : self.username}
search = json.dumps(search_filter)
try:
# Get a registered model.
api_response = self.client.get_registered_model(modelId, search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->get_registered_model: %s\n" % e)
return api_response
def createModel(self, projectId, modelName, registeredModelId, description = "Fraud Detection 2024"):
"""
Method to create a model
"""
CreateModelRequest = {
"project_id": projectId,
"name" : modelName,
"description": description,
"registered_model_id": registeredModelId,
"disable_authentication": True
}
try:
# Create a model.
api_response = self.client.create_model(CreateModelRequest, projectId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model: %s\n" % e)
return api_response
def createModelBuild(self, projectId, modelVersionId, modelCreationId, runtimeId):
"""
Method to create a Model build
"""
# Create Model Build
CreateModelBuildRequest = {
"registered_model_version_id": modelVersionId,
"runtime_identifier": runtimeId,
"comment": "invoking model build",
"model_id": modelCreationId
}
try:
# Create a model build.
api_response = self.client.create_model_build(CreateModelBuildRequest, projectId, modelCreationId)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_build: %s\n" % e)
return api_response
def createModelDeployment(self, modelBuildId, projectId, modelCreationId):
"""
Method to deploy a model build
"""
CreateModelDeploymentRequest = {
"cpu" : "2",
"memory" : "4"
}
try:
# Create a model deployment.
api_response = self.client.create_model_deployment(CreateModelDeploymentRequest, projectId, modelCreationId, modelBuildId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_deployment: %s\n" % e)
return api_response
def listRuntimes(self):
"""
Method to list available runtimes
"""
search_filter = {"kernel": "Python 3.9", "edition": "Standard", "editor": "Workbench"}
# str | Search filter is an optional HTTP parameter to filter results by.
# Supported search filter keys are: [\"image_identifier\", \"editor\", \"kernel\", \"edition\", \"description\", \"full_version\"].
# For example: search_filter = {\"kernel\":\"Python 3.7\",\"editor\":\"JupyterLab\"},. (optional)
search = json.dumps(search_filter)
try:
# List the available runtimes, optionally filtered, sorted, and paginated.
api_response = self.client.list_runtimes(search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_runtimes: %s\n" % e)
return api_response Once you have created your model endpoint, give it a minute and then try a test request: {"dataframe_split":
{"columns": ["age", "credit_card_balance", "bank_account_balance", "mortgage_balance", "sec_bank_account_balance", "savings_account_balance", "sec_savings_account_balance", "total_est_nworth", "primary_loan_balance", "secondary_loan_balance", "uni_loan_balance", "longitude", "latitude", "transaction_amount"],
"data":[[35.5, 20000.5, 3900.5, 14000.5, 2944.5, 3400.5, 12000.5, 29000.5, 1300.5, 15000.5, 10000.5, 2000.5, 90.5, 120.5]]}} Script 4: Endpoint Redeployment Run script "04_api_redeployment.py" in order to create a new model deployment with increased resources. Code highlights: A slightly different version of the ModelDeployment class is implemented. This includes the "get_latest_deployment_details()" as an example of creating a wrapper method to the API's "list_models()" and "list_model_deployments()" methods all in one. You can implement your own methods in a similar fashion as best needed in the context of your MLOps pipeline. Once the latest model deployment's metadata has been obtained in one go, a new model build is created with additional CPU, Memory and Replicas. Notice that in the process you also have the ability to switch to a different runtime as needed. deployment = ModelDeployment(projectId, username)
getLatestDeploymentResponse = deployment.get_latest_deployment_details(modelName)
listRuntimesResponse = deployment.listRuntimes()
listRuntimesResponse
runtimeId = 'docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-workbench-python3.9-standard:2024.02.1-b4' # Copy a runtime ID from previous output
cpu = 2
mem = 4
replicas = 2
createModelBuildResponse = deployment.createModelBuild(projectId, modelVersionId, modelCreationId, runtimeId, cpu, mem, replicas)
modelBuildId = createModelBuildResponse.id
deployment.createModelDeployment(modelBuildId, projectId, modelCreationId) Summary and Next Steps Cloudera Machine Learning exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line. CML's API accelerates your data science projects by allowing you to build end to end pipelines programmatically. When coupled with CML MLFlow Tracking and MLFlow Registry, it can be used to manage models from inception to production. APIv2 Documentation APIv2 Examples APIv2 AMP Registering and Deploying Models with Model Registry Securing Models CML Projects CML Runtimes Introducing MLOps and SDX for Models in CML Model Registry GA in CML
... View more
Labels: