Member since
04-12-2023
8
Posts
2
Kudos Received
0
Solutions
07-29-2024
01:45 AM
Problem Overview As organizations mature in implementing AI systems, they are looking to standardize workflows across different machine learning use cases. Here is one key challenge to address in the ML lifecycle. ML teams spend considerable time in the development lifecycle on data exploration and data analysis activities. These tasks are crucial because the type and quality of the datasets used as inputs for model training directly affect how much time is spent training the model and how well it performs afterward. Before being used for model training, attributes in a dataset known as features go through a lot of changes. These transformations can be simple, e.g., changing a categorical variable such as “opt for email subscriptions” from yes and no to boolean values 1 and 0. Or, it can be complex, including merging multiple input fields into a new field, e.g., labeling a song with a combination of attributes such as music genre, decade of origin, country, and band size to come up with a unique “musicality” feature. Since the feature development process is demanding, ML teams want to share common features and also establish common internal standards in terms of how new features in a dataset are created or updated. Discovering available features and reusing them if available would save significant time for an ML Engineer before she decides to build one from scratch. Feature stores address this need by Making features consistently available for training and serving Decoupling the feature access from the underlying data infrastructure. The “Decoupling” aspect mentioned earlier is important. Datasets used for developing features in an ML lifecycle could be stored on multiple data stores in different formats, requiring different access mechanisms. ML Engineers need to have a way to consistently discover and access the features they need to develop solutions for their business use cases without really worrying about the infrastructure that stores the data or the data access mechanisms. In this article, we discuss how we can set up a Feature store such as FEAST for developers to take advantage of Feature consistency and low-latency feature availability. We will integrate Feast on CDP on Cloudera Machine Learning. Architecture Since Cloudera Machine Learning is a highly extensible ML platform, integrating feature stores is quite easy. The below picture shows an example of integrating a feature store such as FEAST on the Cloudera Data Platform. Some important aspects of this implementation are provided below: Component Name Description Hosting Platform Cloudera Machine Learning The Feature store is set up in a containerized instance of Cloudera machine learning. This allows the store to scale up and down as needed. Feast Offline Store Spark on Hive We use Spark on Hive as the offline store Online Store SQLite This is a low-latency database that is also hosted on the Cloudera Machine Learning platform for demo purposes. In a real-life use case, this can be a separate database ( e.g. Postgres). See Feast documentation for support online databases Catalog Feast Registry / Catalog Contains the feature services and views that are then used to fetch features Feast Application Application on Cloudera Machine Learning A web-based, read-only front-end application using Streamlit that provides an easy-to-use UI for feature discovery. Implementation: Prerequisite Steps : The following prerequisites are needed for setting up Feast on CML: A Cloudera Machine Learning service instance access Access to the S3 bucket in the Datalake environment from Cloudera Machine Learning Launch a new Cloudera Machine Learning project and clone the Github repo in resources directly at project creation or by starting a new terminal session. Follow the instructions step by step to setup feast on Cloudera Machine learning as a part of the README.md in the GitHub repo Run the DataLoad.ipynb jupyter notebook to load the dataset. Setting up Elements of the FEAST using Feast CLI: Start a new Session in Cloudera Machine Learning with the “Enable Spark” option and an editor like Jupyter Notebook. Once the session is launched, start a new Terminal window and run the following commands in sequence. You may run into some warnings from Feast; we will ignore them for now. $ cd feast_project/feature_repo
$ feast apply If the setup is complete, you should be able to see the setup of the feature service as follows: Let us take a moment to understand what all this means. This demo Feast setup uses a driver dataset for a ride aggregator. The instructions in the GitHub repo help you set up this dataset for consumption by a Feature store such as Feast. Feast messages tell us that it has created an entity called a driver with different feature views and services for driver activity. We can also see that it has set up our online store in a SQLite table. Understanding Feature Store Configurations |- feast_project
|- data : Online feature database
|- feature_repo
|- feature_store.yaml : Feature store configuration file for offline and online stores
|- example_repo.py : Feature definition file
|- DataLoad.ipynb : Validate Spark's ability to fetch the offline store data
|- feast-ux.py : used to load the Feast UI
|- feature-store-dev.ipynb : Interactive notebook for Feast historical and online store access The above folder structure gets installed when you clone the GitHub repository in the resources. A Feature store has to be configured in 2 places : The feature_store.yaml is used to define the location of the offline and online stores. This needs to include the location of the S3 bucket, which will be used by Spark to load the offline features. Similarly, the online feature store configurations are also added here. Specifically, you will need to change the lines here to your specific spark configurations: spark.sql.warehouse.dir: "s3a://your-path-to-hive-folder"
spark.hadoop.fs.s2a.s3guard.ddb.region: "us-east-1"
spark.kerberos.access.hadoopFileSystems: "s3a://your-bucket/" The example_repo.py shows the Feast configurations for setting up the feature store, including source, views, and services. While no changes are required to be made here, it is recommended that you review Feast documentation to understand the configuration details. Use Case Architecture Diagram Feature Use case Architecture As mentioned earlier, the above architecture shows the different components of a driver statistics dataset for a ride aggregator that can be used to serve both historical and on-demand features. Feature Discovery Feast provides a very intuitive interface besides good CLI capabilities to help with feature discovery. Here is an example of using the feastcli to get the list of features for a specific feature view driver-hourly-stats cdsw@tpwutuac3d78s486:~/feast_project/feature_repo$ feast feature-views describe driver_hourly_stats
spec:
name: driver_hourly_stats
entities:
- driver
features:
- name: conv_rate
valueType: FLOAT
- name: acc_rate
valueType: FLOAT
- name: avg_daily_trips
valueType: INT64\
description: Average daily trips
tags:
team: driver_performance
ttl: 259200000s
batchSource:
type: BATCH_SPARK
timestampField: event_timestamp
createdTimestampColumn: created
dataSourceClassType: feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource
name: driver_hourly_stats_source
sparkOptions:
table: driver_stats
online: true
entityColumns:
- name: driver_id
valueType: INT64
meta:
createdTimestamp: '2024-07-19T05:35:52.471076Z'
lastUpdatedTimestamp: '2024-07-19T05:35:52.471076Z' Similarly, FEAST UI could also be used for feature discovery. Perform the following steps to launch Feast UI as an application in Cloudera Machine Learning Create a New Application in Cloudera Machine Learning using the following configuration parameters: Name : Feast UI File : feast-ux.py Resources : 1 vCPU , 2 GB Memory You should now be able to launch the FEAST UI application as shown below: Use the menu items on the left for feature service discovery, including feature views and feature services available. Feature Consumption Now that we have set up the feature store, we will need to use Feast libraries to access these features. Launch a new session with feature-store-dev.ipynb to understand how to consume offline and online features from the feature store. Given below is an example code for using the online store to get some pre-computed features with low latency # Fetching Feature vectors for inference
from pprint import pprint
from feast import FeatureStore
store = FeatureStore(repo_path="./feast_project/feature_repo")
feature_vector = store.get_online_features(
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
entity_rows=[
# {join_key: entity_value}
{"driver_id": 1001},
{"driver_id": 1002},
],
).to_dict()
pprint(feature_vector)
##OUTPUT
{'acc_rate': [0.011025987565517426, 0.3090273141860962],
'avg_daily_trips': [711, 44],
'conv_rate': [0.8127095699310303, 0.13138850033283234],
'driver_id': [1001, 1002]} Resources and References Github Repo: Feast on CML: Repository to use to set up the above feature store use case https://feast.dev/: API calls and build an understanding how Feast works.
... View more
Labels:
06-27-2024
10:32 PM
Purpose Building Custom Operators helps Data engineers abstract and reuse code across workloads. A typical use case could be that some custom logic wrapped around Airflow tasks (e.g. config setups) needs to be executed before business functionality is run in a workload. In this example, we will create a custom arithmetic operator that will do some arithmetic operations as a part of our pipeline Instructions There are 2 tasks in this example, in part 1 we will understand how to create custom operators and upload them to a PyPI mirror. In part 2 we will use these operators in CDE. Pre-requisites : Create an account at https://test.pypi.org/ Create and use API tokens to authenticate and upload your package. Refer to pypi.org documentation on using API tokens here Clone / Download the source code from the link here Part 1: Creating a Python package with a new Custom Airflow Operator Clone this repository in your local folder Change the current directory to the folder cde_custom_airflowoperator: cd your-path/cde_custom_airflowoperator Next, we need to create a custom Python library with our operator. Create a Python package out of our custom DAG. Review the Airflow DAG in src/arithmetic_operator.py. This DAG extends the Airflow baseoperator to create a new custom operator called ArithmeticOperator. The operator performs arithmetic operations on 2 numbers based on the type of operation specified in the parameter. Finally, let us create our Python package and deploy it to a test Pypi mirror. The easiest way to do so is to follow step-by-step instructions in the link provided by python.org here. Do not forget to create a testpypi account and authentication with API tokens as mentioned in the pre-requisites. Once this operator is uploaded and the instructions mentioned earlier are followed, you should be able to see the package such as below: Part 2: Using our Custom Airflow Operator in an Airflow Job CDE SETUP: First, set up our airflow environment in CDE to fetch the custom operator build file. Select the Airflow configuration tab by first clicking on the cluster details as shown below Next, use the airflow configuration tab shown below and set up the CDE Airflow environment with your custom operator as follows: change the Pypi repository URL to https://test.pypi.org ( or any other pypi mirror repo that you have used for uploading your python package). if you have used testpypi for your custom airflow operator keep other fields blank and click on validate configutations now you need to add a single requirements.txt in step 2 i.e. the Build stage. Modify the requirements.txt in the CDE_setup folder to use the package name that you have used for your custom operator you should see the package getting built as shown below. once succesfully built you should be able to see the package installed as shown below. Your version number may vary based on what you have used during package build Finally click on the Activate button to activate the package in the airflow experience. Using Custom Operators Now let us create a new CDE Job and run it. Let us use the CDE User Interface to create a new airflow job which uses this operator. Use the dag file Dag1.py in the folder CDE_Setup to upload for this job. Click on create and run to execute the job Let us validate by checking the Airflow UI within CDE. As shown below the job has executed succesfully Finally, let us open one of the logs to check the execution of the addition task of the job Summary Here is a summary of the learning goals we have reached through this example: Building Custom Airflow Operators Setup the airflow environment in Cloudera Data Engineering Deployed an airflow DAG that uses our custom operator
... View more
Labels:
04-25-2024
09:04 PM
Overview
Once Data Engineers complete the development and testing of their workloads, moving to deployment can be a challenging task. This process often involves a lengthy checklist, including the modification of multiple configurations that differ across environments. For instance, the configuration spark.yarn.access.hadoopFileSystems must now point to the Data Lake or Data Store appropriate for the production environment. Data Engineers may also opt to use a "staging" environment to test job performance, necessitating changes to these configurations twice: initially from development to staging, and subsequently for production deployment.
A recommended practice in such cases is to use templating the configurations within the workload. This approach allows for easier management of deployments across different environments. In this article, we demonstrate a pattern of templating Data Engineering workloads in Cloudera Data Engineering in Airflow. This best practice allows workload deployment to be environment agnostic and minimizes the deployment effort when moving workloads across environments for example from development to staging to production.
Pre-requisites:
Familiarity with Cloudera Data Engineering service ( see reference for further details)
Familiarity with Airflow and Python
Methodology
The diagram below demonstrates the pattern of templating for a workload orchestrated by Airflow in Cloudera Data Engineering ( CDE). Using this pattern allows all the jobs to get the correct value of the variables. For example, the purpose we template the hdfs_location, which could vary across deployment environments. Our goal is to ensure that by changing the hdfs_location in the Airflow settings of the specific environment, the jobs get automatically updated.
The three major steps in terms of setup:
Step 1: Airflow Configuration Setup: we set the variables in Airflow Admin settings.
Step 2: CDEJobRUNOperator modification: We use Jinja templating format to set up these variables inside the CDEJobRun operator to pass these variables to the CDE Job via the CDEJobRunOperator
Step 3: CDE Job modification: we apply the variables in the workload e.g. a pyspark or scala job.
Let us work with a real-life example to understand how to execute these three steps in Cloudera Data Engineering(CDE).
Step 1: Airflow Configuration Setup
In Cloudera Data Engineering, the airflow configuration for the virtual cluster is accessed as follows:
Click your CDE Service and select the Virtual Cluster you would like to use
Click on the Cluster Details icon clicking on the cluster details as shown below:
Next, click on the Airflow UI as shown below to launch the Airflow application user interface:
In the Airflow User interface, click on the Admin menu and variable sub-menu to launch the variables menu page as below. This shows the variables configured in Airflow currently. To add a new variable, simply select the blue + Sign below to open the add variable page.
Add a new variable called datafile_location to provide the location of the datafile for your spark application to read. You can specify the actual path of the variable and a description in the appropriate fields.
You can use this same screen to edit the variable path when you want to make modifications by selecting the row and the edit option in front of the key.
This concludes the action in the Airflow configuration. Next, let us see how we should make a change to the Airflow Operator which calls the CDE Job in the next section
Step 2: CDEJobRunOperator modification
Important Note: You can modify the CDEJobRunOperator in two ways based on how you have generated your Airflow DAG.
Option 1: If you have created the Airflow DAG through code, use the Manual CDEJobRunOperator Modification section.
Option 2: If you have used the CDE Airflow UI Editor for creating the Airflow job, then Modifying in Pipeline Editor Option ( Refer picture below for the Airflow UI Editor in CDE)
Option 1: Manual CDEJobRunOperator Modification
Note: Use this option only if you have coded your Airflow job manually and have NOT used the CDE pipeline Editor. If you have used the AirflowUI use option 2
In this step, we modify the CDEJobRunOperator to access the variables from the Airflow configurations and then pass them on to the CDE Job. To access variables from Airflow we need
Use Jinja templating structure so variables must be encapsulated in {{<full-variable-name}}
Use airflow object structure. So variables are always accessed as var.value.variable-name. Check references for more details on airflow variable naming and accessing methods
So a sample way to access the variables defined in Airflow above inside the CDEJobRunOperator is provided below: cde_job_1 = CDEJobRunOperator(
job_name='some_data_exploration_job',
variables={'datafile_location': '{{ var.value.datafile_location}}'},
overrides={'spark': {'conf': {'spark.yarn.access.hadoopFileSystems': '{{var.value.hdfs_location}}'}}},
trigger_rule='all_success',
task_id='cde_job_1',
dag=dag,
)
This concludes the changes we need to make in a Job to access the variables set up in our Airflow configuration.
Option 2: Modifying in the Pipeline Editor
Note: Use this option ONLY if you have used the CDE pipeline Editor. If you have created an Airflow DAG code, then use Option 1 earlier.
CDE provides a visual pipeline editor for generating simple workflows for those who prefer a drag-and-drop interface for creating simple Airflow pipelines. Please note that not all the operators from Airflow are supported here. If the Airflow Job and the CDEJobRunOperator were created using the pipeline, then we can set up the variables inside the pipeline. To set the variables select the Airflow Job that was created with Airflow UI and select Editor as shown below
In the Editor, click on the CDEJobRunOperator and add the variable as shown below in the name and value
In the Name text box enter: datafile_location
In the Value text box enter: {{var.value.datafile_location}}
Important Note: The curly braces {{ and }} are mandatory to ensure the variable has the right template format.
Step 3: CDE Job Modification
To access the variables inside the DAG we need to set up the CDE Job with the same variable names. There are two ways to do this. We can use the User Interface or the Command Line Interface. Here is a way to set up the variables in the User interface for datafile_location.
The picture above shows the changes that you need to make to your Job to accept arguments.
First, click on the airflow job ( or create a new one)
Add the argument and use the same variable name that you used in Airflow and CDEJobRunOperator
Important Note: Do not forget to add (three opening curly braces) {{{ variable-name}}} (closing curly braces) otherwise the variable will not get the value you need.
Finally, after these configurations, you can access the variables inside your CDE job by using the sys package. These variables are passed as arguments, and can therefore be accessed with the sys package. Here I have printed the Spark configuration and the variables inside a pyspark job that is called by the CDEJobRunOperator we defined earlier
import sys
print(f"Argument passed to this Spark Job is : {sys.argv}")
We can use this simple test to check if the variable value is captured as arguments. As you can see below the first value contains the data_location value we have set in airflow. We can now use this to get the variable value.
This concludes our 3 step process for templating our Cloudera Data Engineering Workloads. Using such a templating approach, we can deploy the code outside our development environment easily without code modifications. We only need to add or edit the variables in Airflow to enable the code to work in the new environment such as staging or production.
Reference:
CDE Documentation
Airflow Variable Documentation
... View more
Labels:
01-30-2024
10:03 PM
Summary In this article, we learn how to use CML to automate Machine Learning workflows between development and production environments using an automation toolchain such as Gitlab. This solution can be extended to other platforms such as Github( using Github actions) or Atlassian (using Bitbucket pipelines). Some prior understanding of how the DevOps pipeline works and how it can be applied in Machine learning is assumed here. There are some good resources online that can be referred for a deep-dive, but just enough information is provided below. A (very) brief Introduction to CI-CD for Machine Learning A DevOps pipeline is a set of automated processes and tools that allows software development teams to build and deploy code to a production environment. A few of the important components of the DevOps pipeline include Continuous Integration (CI) and Continuous Delivery and Deployment (CD), where: Continuous Integration is the practice or ability to make frequent commits to a common code repository Continuous Delivery & Deployments are the practice of the main or trunk branch of an application's source code to be in a releasable state. Integrating the practices of DevOps to machine learning, popularly called MLOps allows ML teams to automate the lifecycle of a machine learning workflow. Below is an explanation of a simple pattern for setting up CI-CD in Cloudera Machine Learning: The CDP Data Lake is the storage layer for all formats of data that will be processed by the Machine Learning workloads. SDX layer in the Cloudera Platform provides fine-grained access security, as well as establishes the framework in the data lake for auditability and lineage. The workspace is a managed K8s layer (can be a Cluster or namespace) that segregates the computed profile across the development and production workloads. Projects are generally the compilation of artifacts including models, datasets, folders, and applications that will be developed to address one or many business use cases. In this case, we want to automate the CI/CD pipeline between the projects in the Development and Production workspace. The Building Blocks of a Simple CI-CD Pipeline in CML Overview: The picture above demonstrates a simple CI-CD pipeline flow in CML, which expands the ideas established earlier on a development and production workspace with automated deployments between the two using Gitlab as a DevOps toolchain. Three activity groups labelled in the picture are integral to this automation workflow as explained below: Activity Group Activity Description Details 1 ML Development Includes all Data Science iterative development activities including ingestion, data munging, model training, evaluation, and testing deployments in Development to ensure satisfactory performance 2 Continuous Integration In this step, the developer intends to commit the code to the branch of the repository. – If the commit is permitted to the main branch then this acts as a trigger for Production deployments – If the commit is made on a separate branch, a pull request/ merge request that is approved by the main branch acts as a trigger for deployment 3 Continuous Deployment The committed branch then uses a DevOps pipeline toolchain to deploy the workload into the production workspace target project. To demonstrate automation, we will use a simple Machine learning workflow in the repository here. In this workflow, we will copy two jobs in a source project in CML. Setup CML Project Create 2 CML Projects as shown below, using a Python Template, in the workspaces for Development and Production. Create two CML jobs as follows : Job1 : Job Name : bootstrap Compute Profile : 1 vCPU, 2GB Memory Type : Manual File : bootstrap.py Job 2: Job Name : train Compute Profile : 1 vCPU, 2GB Memory Type : Dependent File : fit.py On successfully saving these two files, you should see the Jobs in our project as follows. (Ensure that the jobs can run successfully before proceeding further) Setup Automation in Gitlab: To build our deployment automation, we will use the following components in CML and Gitlab CML Target Workspace Domain name: starts from http://xxx.cloudera.site ( see below). You need to save this somewhere CDP Access Keys for remotely accessing Workloads: Create a new access key and save the keys: Create Variables in Gitlab ( Available at Project → Settings → CI-CD): The API Key and CDSW_DOMAIN name thatwere obtained earlier. The other details are provided below: PROJ_NAME < target project name in production> RUN_TIME The runtime image, the default runtime at the time of writing this article is ( docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-workbench-python3.9-standard:2023.08.2-b8). Use the one per your need. RUN_TIME_ADDON The runtime addon that is needed for setting up the docker container in production:” hadoop-cli-7.2.16-hf3” CURL commands to execute the REST APIs from Gitlab. The CML API v2 details can be found at http://your-cml-domainname.cloudera.site/api/v2/swagger.html as shown in the image below: (you can see many different API calls as you scroll through the page) Pipeline in GitLab: Go to Gitlab Project → Build → Pipeline Editor to enter the pipeline code: # This file is a template, and might need editing before it works on your project.
deploy-to-prod-cluster:
stage: deploy
image: docker:stable
before_script:
- apk add --update curl && rm -rf /var/cache/apk/*
- apk add jq
script:
# Search for the specific Target Deployment Project
- 'echo "$CDSW_DOMAIN/api/v2/projects?search_filter=%7B%22name%22%3A%22$PROJECT_NAME%22%7D&sort=%2Bname"'
- 'echo $API_KEY'
- 'curl -X GET "$CDSW_DOMAIN/api/v2/projects?search_filter=%7B%22name%22%3A%22$PROJ_NAME%22%7D" -H "accept: application/json" -H "Authorization: Bearer $API_KEY" | jq'
- 'PROJECT_ID=$(curl -X GET "$CDSW_DOMAIN/api/v2/projects?search_filter=%7B%22name%22%3A%22$PROJ_NAME%22%7D" -H "accept: application/json" -H "Authorization: Bearer $API_KEY" | jq -r ".projects|.[0]|.id")'
- 'echo $PROJECT_ID'
- 'pwd'
# - 'ls -l'
# - 'RUN_TIME="docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-workbench-python3.7-standard:2022.04.1-b6"'
# create a Job in Target Project folder
- 'JOB_ID=$(curl -X POST "$CDSW_DOMAIN/api/v2/projects/$PROJECT_ID/jobs" -H "accept: application/json" -H "Authorization: Bearer $API_KEY" -H "Content-Type: application/json" -d "{ \"arguments\": \"\", \"attachments\": [ \"\" ], \"cpu\": 2, \"environment\": { \"additionalProp1\": \"\", \"additionalProp2\": \"string\", \"additionalProp3\": \"string\" }, \"kernel\": \"\", \"kill_on_timeout\": true, \"memory\": 4, \"name\": \"bootstrap\", \"nvidia_gpu\": 0, \"parent_job_id\": \"\", \"project_id\": \"string\", \"recipients\": [ { \"email\": \"\", \"notify_on_failure\": true, \"notify_on_stop\": true, \"notify_on_success\": true, \"notify_on_timeout\": true } ], \"runtime_addon_identifiers\": [ \"$RUN_TIME_ADDON\" ], \"runtime_identifier\": \"$RUN_TIME\", \"schedule\": \"\", \"script\": \"0_bootstrap.py\", \"timeout\": 0}" | jq -r ".id")'
- 'echo $JOB_ID'
# Create a dependent Job in the target deployment folder
- 'CHILD_JOB_ID=$(curl -X POST "$CDSW_DOMAIN/api/v2/projects/$PROJECT_ID/jobs" -H "accept: application/json" -H "Authorization: Bearer $API_KEY" -H "Content-Type: application/json" -d "{ \"arguments\": \"\", \"attachments\": [ \"\" ], \"cpu\": 2, \"environment\": { \"additionalProp1\": \"\", \"additionalProp2\": \"string\", \"additionalProp3\": \"string\" }, \"kernel\": \"\", \"kill_on_timeout\": true, \"memory\": 4, \"name\": \"train_model\", \"nvidia_gpu\": 0, \"parent_job_id\": \"$JOB_ID\", \"project_id\": \"string\", \"recipients\": [ { \"email\": \"\", \"notify_on_failure\": true, \"notify_on_stop\": true, \"notify_on_success\": true, \"notify_on_timeout\": true } ], \"runtime_addon_identifiers\": [ \"$RUN_TIME_ADDON\" ], \"runtime_identifier\": \"$RUN_TIME\", \"schedule\": \"\", \"script\": \"fit.py\", \"timeout\": 0}" | jq -r ".id")'
# Execute the job run
- 'curl -X POST "$CDSW_DOMAIN/api/v2/projects/$PROJECT_ID/jobs/$JOB_ID/runs" -H "accept: application/json" -H "Authorization: Bearer $API_KEY" -H "Content-Type: application/json" -d "{ \"arguments\": \"string\", \"environment\": { \"additionalProp1\": \"string\", \"additionalProp2\": \"string\", \"additionalProp3\": \"string\" }, \"job_id\": \"string\", \"project_id\": \"string\"}" | jq' The above pipeline copies two batch jobs simulating a data ingestion and training workload to the Target Workspace. You may need to edit when you want to copy additional artifacts to the target workspace. Run the Automation Pipeline A code commit in the source project will trigger a deployment to the Target. Your deployment pipeline should execute as below: To see the details of the actual pipeline execution, click on the pipeline as shown below: Clicking on Deploy shows the actual pipeline deployment in a Gitlab container, which can be used to check pipeline execution details and errors (if any). References: Simple DevOps CML Project: here Pipeline Editor: here CML API: here
... View more
11-30-2023
12:42 AM
1 Kudo
Introduction
This article demonstrates how a Machine Learning(ML) engineer can use the Model Registry Service in Cloudera Machine Learning for cataloging, versioning, and deploying models. The Model Registry can serve as a catalog for models in the DataLake, besides helping provide model lineage information for deployed models for Data and Administrators. For details on how to use the model registry and additional documentation, review the references section below.
Creating the Model Registry for the Data Lake
Model Registry is one of the core building blocks toward MLOps or Devops for Data Science workflows. It is important to note that a single model registry is created for a CDP DataLake and serves as a model catalog for all the models in the DataLake. To create a model registry, click on CML Control Plane and create a new model registry. If there is an existing model registry for that DataLake / CDP environment, you will not be allowed to create a new one. The screen below shows the creation of the Model Registry. Please note that there may be certain differences to creation of Model Registry based on the chosen Cloud Provider (e.g. In Azure you may be asked to provide NFS Details. Refer to Cloudera documentation here for more details based on the form factor chosen)
Once the creation process is initiated, you should be able see the details of the registry creation process by clicking on the registry name and looking at the Event history logs as below:
Setting up access to the Model Registry
As mentioned earlier, setting up Model Registry access differs slightly based on the type of CDP environment. Here, use a RAZ enabled environment ( CDP DataLake with access control mechanisms configured through Apache Ranger). First, copy the machine User Workload User Name in the Model Registry Details page below: Here, non-RAZ enabled development environment is used. As a first step, identify the model user and use the same to set up the access permissions for my user.
This concludes the one time setup needed for the model registry for the DataLake. To understand how to store models in model registry and deploy them in Cloudera Machine Learning service, refer to this article.
References:
Cloudera Reference Docs
Using Model Registry in Cloudera Machine Learning service
... View more
Labels:
11-29-2023
09:21 PM
Introduction:
This article demonstrates how a Machine Learning(ML) engineer can use the Model Registry Service in Cloudera Machine Learning for cataloging, versioning, and deploying models. ML Engineers work iteratively in an effort to improve model performance using one or more target metrics. This process step is called Experimentation, where the engineer works through a series of trial experiments by changing different parameters of the underlying model, called hyper-parameters, to improve the target metric. In CML, the Experiments feature allows users to compare experiment runs and visualize the impact of parameters on the target metric (for more information, refer to the article on hyper-parameter tuning here). Once a chosen experiment run is deemed satisfactory, use the CML Model Registry to register the model. The Model Registry can serve as a centralized catalog for all models that are deemed ready for deployment. Subsequent improvements can then be registered in the catalog as new versions.
This article assumes that a model registry has already been set up in the Cloudera machine learning service. To learn more about how to set up a model registry, check the References section at the end of the article for resources.
The Value of Model Registry in Machine Learning Workflows
Including model registry in machine learning workflows provides a number of benefits for data science teams:
It serves as a catalog, helps model reuse and controls proliferation of models since teams can use the catalog to discover existing model artifacts
It supports Model versioning and provides lineage information as new models are deployed after retraining with addition of new data
It helps provide a foundation for DevOps automation using stage transitions of models from development to staging to production.
It is now possible to understand exactly which experiment led to the model being deployed in production. This helps in improving the auditability and traceability of ML models.
Using the Model Registry to store and deploy models
Only models that have been created with MLflow experiments can be stored in the Model Registry. Hence, as a first step, ensure that the MLFlow APIs are used to create an experiment and logged experiments as a part of this run. Creating and logging experiment runs in CML is outside the scope of this blog. For more details, refer to the blog here or refer to the Cloudera documentation. Now, you will see how to deploy models to the model registry. In this case, I will use the model in the experiment run in the blog Hyperparameter Tuning with MLFlow Experiments to add the model to the model registry and subsequently deploy the model as an API endpoint.
Registering a CML Experiment in Model Registry
Click on the Experiments menu item in CML Menus to review the experiment runs, and click on a specific experiment run to see the details of all the experiments in the run.
Let us choose one specific model from an experiment run based on the performance of the target metric. The highlighted row is the chosen model, and click on the model to bring up the Model Registration page.
The Model Registration page shows us the model artifacts as well as provides us with starter code to load the model as an API endpoint for inference. Click on Register Model, which allows registering the model into created model registry.
At this point, enter the name, description, visibility scope of the model, and version information, such as below. Once entered and submitted, the model registry confirms the registration of the model.
As a final step, check the Model Registry to look for the model that is registered and validate the availability of the model
Deploying Models from Model Registry
Data and System Administrators are responsible for managing the data artifacts—tables, files, jobs, and models—in the production systems. To manage a controlled deployment process, they are typically responsible for transitioning these artifacts from development to staging and production systems. The model registry provides a single pane of glass that can be used for model management purposes. For example, administrators can enforce policies to ensure only models from the Model Registry are deployed to production, which allows lineage traceability of the deployed model, i.e., which experiment was used to deploy the model.
To deploy the model, identify the experiment and the model run name that is used to deploy the model. Save the name of the Experiment ID and the Run name as shown below:
Tip: If you have used the log_model() API function in creating these experiment runs, then the input example is available as a json file by launching a new terminal window in a CML session.
cd ~/<experiment name>/<experiment run>/
Now, use this process to deploy a model from the Model Registry. Click the menu Model Deployments to bring up the Model Deployments page in Cloudera Machine Learning. This shows all the existing models that are deployed for the project. Click on the New Model button on the top right, as shown below.
This brings up the Deploy Model User Interface, where you can choose the model to deploy. Since the plan is to use a registered model for deployment, select the options as shown below and choose the registered model from the drop-down. The model version gets pre-populated from the prior deployment history of the model. Return to the Model Deployments page to check the build status of the model deployed from the Model Registry, as shown below.
Summary
This blog demonstrates how machine learning engineers and administrators can build model management workflows by using the CML Model Registry. Adding the Model Registry in deployment workflows provides data science teams a number of advantages
It provides a Centralized Model Management catalog and controls the proliferation of models since teams can use the catalog to discover existing model artifacts
It allows versioning of models and therefore helps traceability and lineage as new models are deployed after retraining with addition of new data
It provides a centralized gateway for stage transitions of models from development to staging to production.
It supports model auditability and traceability. It is now possible to understand exactly which experiment led to the model being deployed in production.
References:
Cloudera Reference Docs: here
Setting up Model Registry : here
Article on using Experiments with CML: here
... View more
04-21-2023
12:00 PM
1 Kudo
Introduction:
Cloudera Data Engineering (CDE) service is a robust and flexible platform for managing Data Engineering workloads.CDE service allows you to manage workload configurations such as choosing the spark version to use, orchestrate your DE pipelines using Airflow as well as provides a rich API to manage and automate pipelines remotely from the command line. At times, however, one is unable to use the standard CDE configurations for any of the following reasons,
The workload requires a very customized set of packages and libraries with version specificity that needs to be baked together and is unique from other workloads in the CDE Service.
Additional access requirements (e.g. root access) for package installations are necessary, which are not provided by default CDE runtime libraries
Such unique requirements are addressed by building custom runtimes and pushing these runtimes as docker images in an external docker registry. CDE services pull these images from these remote registries and create the context by setting up the software packages and versions required for running such “special” workloads.
This article provides a step-by-step guide to creating a custom runtime and then pulling this runtime image from a custom docker registry for running a sample CDE Workload.
Runtime Configuration / Requirement:
Spark 3.x
Python 3.9
DBT(https://www.getdbt.com/)
Cloudera Documentation
Reference Documentation link: here
Sample code in the repo: here
Pre-requisites
Before getting started, make sure that you have the following prerequisites setup:
Access to Cloudera Data Engineering ( CDE) Service and Virtual Cluster(VC), including read and write access to the VC ( reference)
Access to Cloudera Data Engineering ( CDE) Service using the Command line tool( reference)
Docker Client installed on a local machine/laptop.
Solution Overview
The figure above describes the high-level steps for the Custom Runtime Build solution. It is mandatory that you start with a Cloudera-provided runtime image as a first step in building your custom docker image. To access the Cloudera docker registry contact your Cloudera account administrator, as this requires a license. Refer here for some details on the process if you are having a private cloud CDP installation.
Step 0: Clone the sample code from git
Create a folder on your local machine where you have the docker client installed and clone the sample code and change to the newly created directory:
$ git clone git@github.com:SuperEllipse/CDE-runtime.git
$ cd CDE-runtime
Steps 1 & 2: Fetch the image and build a custom docker image
In this step, we will fetch the base Image from the Cloudera repository and then customize the image to include packages and Python versions per our requirements.
Note: This requires access to the Cloudera repository. Contact your Cloudera account team contact to enable this if you do not have access. Once access to the repository is established, you should be able to build this Dockerfile into a Docker image. Change the dockerfile to add the name of the user per your requirement i.e. substitute vishrajagopalan to <<my-user-name >>.
Name : Dockerfile
FROM container.repository.cloudera.com/cloudera/dex/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15
USER root
RUN groupadd -r vishrajagopalan && useradd -r -g vishrajagopalan vishrajagopalan
RUN yum install ${YUM_OPTIONS} gcc openssl-devel libffi-devel bzip2-devel wget python39 python39-devel && yum clean all && rm -rf /var/cache/yum
RUN update-alternatives --remove-all python
RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.9 1
RUN rm /usr/bin/python3
RUN ln -s /usr/bin/python3.9 /usr/bin/python3
RUN yum -y install python39-pip
RUN /usr/bin/python3.9 -m pip install --upgrade pip
RUN /usr/bin/python3.9 -m pip install pandas==2.0.0 impyla==0.18.0 dbt-core==1.3.1 dbt-impala==1.3.1 dbt-hive==1.3.1 impyla==0.18.0 confluent-kafka[avro,json,protobuf]==1.9.2
ENV PYTHONPATH="${PYTHONPATH}:/usr/local/lib64/python3.9/site-packages:/usr/local/lib/python3.9/site-packages"
#RUN echo $PYTHONPATH
RUN dbt --version
RUN /usr/bin/python3.9 - -c "import pandas; from impala.dbapi import connect "
USER vishrajagopalan
Steps To Build Dockerfile:
The following steps are needed to be executed in sequence for you to build and push your docker image.
Recommended changes: As you can see I have used my initials/ username to tag the docker image (the -t option in the docker build command is used to tag a docker image). You can use any tag name you like but I recommend using the <<my-user-name >> you used earlier.
The docker build step could take up to 20 minutes based on network bandwidth for the very first step. Subsequent builds will be much faster because Docker will cache the layers and will only rebuild those layers that have been changed.
docker build --network=host -t vishrajagopalan/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-custom . -f Dockerfile
Step 3: Push docker image to remote Registry
Before you upload a Docker Image you need to log in to the Docker registry you intend to use. I have used a docker hub and have executed the command below to log in to the docker hub. This requires an existing docker hub account to be created if you have not done so already. The image below is how you could use the public docker hub ( hub.docker.io) to store your image. This could vary depending on the Docker registry that you plan to use. But ensuring that you are able to login to your docker registry is critical for you to push the custom image you built above to this registry. After logging into the docker registry, execute the docker push command shown below, making the changes to the tag ( i.e. replace vishrajagopalan with the tag name you want to use).
Execute the command below only after you have successfully logged in to the registry, else this will not work. docker push vishrajagopalan/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-custom
After executing the above command successfully you should be able to see the pushed Docker Image in your Docker registry. Here I can see that my image with the right tag above has been pushed to the docker hub.
Once the Docker Image has been successfully pushed to a remote registry, we need to enable our CDE cluster to pull this image. Note: you need to have the CDE command line tool setup with the cluster endpoint on which you want to run the commands/jobs below. Refer to the additional reads section for help on this topic
Run the command below and enter the password to your docker registry. This creates a docker secret and is saved by CDE in the Kubernetes cluster, in order to be used during workload execution.
Steps 4 & 5: CDE Job runs with the custom runtime image
Create a CDE credential that saves the docker login information as a secret. You will need to enter your password after you enter this command. Change the URL from hub.docker.com to the private registry that you plan to use. cde credential create --name docker-creds --type docker-basic --docker-server hub.docker.com --docker-username vishrajagopalan
Run the command below to create a resource type of custom-runtime-image in CDE referring to the Runtime Image information to be used. cde resource create --name dex-spark-runtime-custom --image vishrajagopalan/dex-spark-runtime-3.2.3-7.2.15.8:1.20.0-b15-custom --image-engine spark3 --type custom-runtime-image
Run the first command below to execute the sample spark script provided in the GitHub repo. This executes a SPARK job in an ephemeral context, without saving any resources in the CDE Cluster. If you have not cloned the GitHub, you could also copy the sample Spark code provided in an editor( refer section at the end called Sample Spark Code). cde spark submit --user=vishrajagopalan spark-sql.py --runtime-image-resource-name=dex-spark-runtime-custom
On submission, you should be able to note the job id as shown below, which can then be used to fetch details in the CDE User Interface.
We need to validate that CDE has indeed pulled the DockerImage to run as a container. In order to do so, we can check Spark Submitter Kubernetes logs and Jobs logs ( see images below) respectively. The specific image Tag name should be changed to the Tag you used when building the image.
A new job for scheduled Runs
As an alternative to doing ad hoc job runs, we can create a new job and schedule a job run. Using the code example below, you may need to change the path of the spark-sql.py file to the location on your local machine, as well as the user name to your CDP account name.
The images show the execution of the job in the CDE user interface: cde resource create --name sparkfiles-resource
cde resource upload --name sparkfiles-resource --local-path $HOME/Work/sandbox/CDE-runtime/spark-sql.py
cde job create --name custom_runtime_job_spark3 --type spark --mount-1-resource dex-spark-runtime-custom --mount-2-resource sparkfiles-resource --application-file spark-sql.py --user vishrajagopalan --runtime-image-resource-name dex-spark-runtime-custom
cde job run --name custom_runtime_job_spark3
Summary
This article demonstrates the steps needed to build a custom runtime image, push the image in a private docker registry and use this custom runtime in a CDE workload. While this example demonstrates usage with a Docker hub, you can also use the private registry in your organization for this purpose.
Additional References
Creating Custom Runtime help doc: here
Creating and Updating Docker Tokens in CDE: here
Using CLI API to automate access to CDE: here
Paul de Fusco’s CDE Tour: here
Sample Spark Code
Filename: spark-sql.py
Use this sample spark code to copy into your favorite editor and save it as spark-sql.py. This code needs to be only used if you have not cloned the GitHub repo. from __future__ import print_function
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
import sys
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.getOrCreate()
# A list of Rows. Infer schema from the first row, create a DataFrame and print the schema
rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
some_df = spark.createDataFrame(rows)
some_df.printSchema()
# A list of tuples
tuples = [("John", 19), ("Smith", 23), ("Sarah", 18)]
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
StructField("person_age", IntegerType(), False)])
# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = spark.createDataFrame(tuples, schema)
another_df.printSchema()
for each in another_df.collect():
print(each[0])
print("Python Version")
print(sys.version)
spark.stop()
... View more
04-20-2023
09:00 AM
Introduction Data Scientists work iteratively in a stochastic space to build models for addressing business use cases. In a typical Machine Learning workflow, Data Scientists iterate by combining their training data with an array of algorithms. In this activity, called model training and evaluation, there are two decisions being made. The first decision involves identifying the dataset for training and testing. Identifying a dataset that is representative of the actual data that the ML system is likely to encounter in a real-life scenario is a key success factor for the system. This helps a trained model to make better predictions when applied to real data. The second decision involves comparing different algorithms around a set of performance metrics. Since both of these decisions tend to be more heuristic-based rather than rule-based, this process is also called experimentation in machine learning parlance. Cloudera Machine Learning (CML) has simplified experimentation during the model training step of a Data Science workflow with the integration of MLflow Tracking. This managed MLflow component, together with Model Registry and Feature Store (planned for future CML releases) will greatly aid in improving the productivity of data science teams in scaling their training workflows and deploying their models. Here is a quick look at what MLflow Experiment Tracking looks like in CML. Later we will learn how to analyze experiment runs in detail. This article demonstrates how to get started with the new and improved experiments, powered by MLFlow, in Cloudera Machine Learning using a use case of flight cancellation prediction. This use case is available as an Applied Machine Learning Prototype (AMP) on the CML platform. For new users of the Cloudera Machine Learning platform, AMPs provide end-to-end example machine learning projects in CML (refer to image below). To know more about AMPs you can check the documentation here. AMPs provide end-to-end example machine learning projects in CML (refer to the image below). To know more about AMPs you can check the documentation here. Pre-requisites Access to a Cloudera Machine Learning environment A CML Workspace with a minimum compute resources of 16 vCPU / 32 GB RAM Credits and Resources for further reading Hyperopt: http://hyperopt.github.io/hyperopt/ XGboost: https://xgboost.readthedocs.io/en/stable/ Getting Started with CML Experiments Feature Setup up the CML Project : Note: Optional. Skip this section if you know how to launch a CML AMP by directly selecting the Flight Cancellation Prediction AMP This section provides a step-by-step explanation of how to set up a CML project that will be subsequently used for demonstrating the Experiments feature in the next section. If you are familiar with setting up a project using a Cloudera Machine Learning AMP, you can directly launch the CML AMP for Flight Cancellation Prediction from the AMPs menu item and skip this section. Let us now start From the Control Pane click on Machine Learning and choose a workspace where you want to create a project ( refer to the images below) To start using the Experiments feature in CML, let us first set up a CML Project. As mentioned earlier, a convenient way to quickstart use cases in CML is the AMPs and we will use one such AMP to work with the MLflow Experiments feature. There are more than one ways to create AMPs within the CML platform. We will use the intuitive option of creating the AMP by choosing the AMP menu item and selecting the cancellation prediction AMP. On the subsequent pop-up, click on Configure Project. Next, you need to go back to your Projects and select the newly created Project called Canceled Flight Predictions ( see below) As a final step to project creation, you need to now click Launch Project, which builds the project files as well as creates project artifacts. Use the default Engine and Runtime parameters and launch the project. This Kicks off the AMP setup Steps as below and you should wait for all the 10 Steps to be completed to ensure that the project has been installed in your workspace ( Refer to figure below). “Just Enough” MLflow Library Explanation This section provides some basic information on MLflow experiments. You are encouraged to use this as a starting point and then dive deeper into official documentation on your own. Set up an Experiment: When you are starting to train model Hyperparameters, use a new experiment name as a best practice with a call to mlflow.set_experiment (“uniqueName”) function. If you do not use this function, a default experiment is created for you. However, this may lead to headaches later when trying to track results, so best to pick a name. Runs of an Experiment: Each combination of parameter values that one would like to change in an experiment is tracked as a Run. At the end of an Experiment, you are comparing the outcomes across multiple runs. To start a new run, call the function mlflow.start_run() function Tracking the metric: This function tracks the metric of interest with a call to function mlflow.track_metric(). In the example stated above it could be any or all of the parameters described ( e.g. learning rate, or Optimizer type). Finally, you also need to track the outcome metric (e.g. accuracy or error). The way to track the outcome metrics is also to call the function mlflow.track_metric(). Launching Experiments in CML. Download the Ipython notebook t called 5A_Model_Experiments.ipynb from the GitHub repo here. This is the Ipython notebook that you will use for the Hyperparameter search using the Experiments feature in CML. Upload this Jupyter notebook into the code folder. You should now be able to see the uploaded file below. If you are not sure how to upload, just click the Files section, navigate to the code folder, and click the upload option seen in the top right corner. Next, click on Open in “Workbench” and select Start a new Session. Here choose at least the resource Profile below or higher as shown below: The Python notebook provides a step-by-step explanation of the different steps in MLflow Experiments creation. I recommend a code walkthrough and reading the code comments and documentation to understand the steps performed. Here is a summary of all the key steps in this notebook: Setup of necessary packages such as scikitlearn, mlflow, and XGBoost for model development Ingestion of preprocessed data: Here we are ingesting a preprocessed file created by the AMP Installing Hyperopt - We will use this open-source library for “smart hyperparameter search” Hyperparameter search space and search strategy Comparison of model performance between baseline and the best Hyperparameters Go ahead and execute all the steps by clicking “Run All” for the Notebook from the Run menu. This executes all of the above steps and, in approximately 10-15 minutes, the first set of Experiment runs will be logged. Next, come back to the Project work area and click on the Experiments menu. You should see the experiment name “HPsearch” below: Click on the “HP Search” above to bring the Experiment Details Page below ( Figure XX: Image - ExperimentDetails.jpeg). This shows the Information captured by MLflow tracking. The Metrics column is of interest to us because we are interested in tracking the accuracy of our flight prediction model, as measured by metrics “test-auc” (Area Under Curve). We can observe that the test-auc values are increasing, which means that our search for Hyperparameters is going to lead to improvement in the model. The higher our test-auc value the more likely our flight cancellation prediction service is likely to predict accurately in real-world scenarios. One of the nicest features of MLflow tracking is the multiple options available for us to compare our experiment runs. As a next step, select the checkboxes against the completed runs and then click on the compare button. This allows us to compare the Runs in tabular and graphical formats, as shown below. Please note, you may need to change the timeout to increase the number of runs in the code below: # This will take a considerable time and computing power to tune. For the purpose of efficiency, we are stopping the run at 10 minutes
with mlflow.start_run(run_name='xgb_loss_threshold'):
best_params = fmin(
fn=train_model,
space=search_space,
algo=tpe.suggest,
loss_threshold=-0.7, # stop the grid search once we've reached an AUC of 0.92 or higher
timeout=60*10 # stop after 10 minutes regardless if we reach an AUC of 0.92
) The “Run Comparison” page demonstrates the Hyperparameters for every experiment run and the outcomes from that run in the Parameters and Metrics section as shown in the images below. Notice that you can now see improvements in accuracy metric graphically by just clicking on “test-auc”. Finally, what should we do, if we would like to handpick the Hyperparameters combinations across different runs? We can do this by going back to the comparisons page (follow the same approach described earlier of selecting the runs and clicking on the compare button). Now, scroll down to the choice of graphs provided by MLflow. There are multiple choices here, but the Parallel Coordinates Plot provides a great way for you to hand pick Hyperparameters that influence the best outcomes ( refer to Figure XX - Image: ParalleCoordinatesPlot.jpeg). The chart explains the best way to pick the best hyperparameters among runs compared. Until now, we have used the Experiments feature to compare the different runs. Finally, let us make a comparison between our baseline model and a model built with optimal Hyperparameters obtained from our experiments. We need to check if it is worthwhile training our model with these chosen parameters. If we go back to the 5A_Experiments.ipynb notebook and compare the outcomes of the baseline model and the tuned model, we can notice that there is an increase in the accuracy of cancellation predictions that we have achieved through Hyperparameter tuning. Summary: This article demonstrates how the Experiments feature in Cloudera Machine Learning can be used for empirical experimentation in Machine Learning workflows. We have taken a typical Data Science workflow uses the case of tuning the Hyperparameters of a prediction model to show how this can be achieved and the different options available to us in the Experiments feature to track our experiments and compare the outcomes of the experiment runs for improved model outcomes. The MLflow package is available as a part of the Cloudera Machine Learning experience (no pip install needed) and by using a set of API calls, we are able to quickly set up experiments in our ML projects. This feature, using managed MLflow tracking, eases the life of the Data Science team during the critical training phase of model development and results in improved productivity through accurate and easy logging of training runs on ML models.
... View more
Labels: