Created on 03-27-2024 04:42 PM
CML Model Deployment with MLFlow and APIv2
Cloudera Machine Learning (CML) on Cloudera Data Platform (CDP) accelerates time-to-value by enabling data scientists to collaborate in a single unified platform that is all inclusive for powering any AI use case. Purpose-built for agile experimentation and production ML workflows, Cloudera Machine Learning manages everything from data preparation to MLOps, to predictive reporting.
CML exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line.
In this example we will showcase how to use APIv2 to programmatically register an XGBoost experiment via MLFlow Tracking, Registry, and deploy it as a model endpoint in CML.
MLOps in CML
CML has extensive MLOps features and a set of model and lifecycle management capabilities to enable the repeatable, transparent, and governed approaches necessary for scaling model deployments and ML use cases. It´s built to support open source standards and is fully integrated with CDP, enabling customers to integrate into existing and future tooling while not being locked into a single vendor.
CML enables enterprises to proactively monitor technical metrics such as service level agreements (SLA) adherence, uptime, and resource use as well as prediction metrics including model distribution, drift, and skew from a single governance interface. Users can set custom alerts and eliminate the model “black box” effect with native tools for visualizing model lineage, performance, and trends. Some of the benefits with CML include:
Increased model security for Model REST endpoints, which allows models to be served in a CML production environment without compromising security.
Use Case
In this example we will create a basic MLOps pipeline to put a credit card fraud classifier into production. We will create a model prototype with XGBoost, register and manage experiments with MLFlow Tracking, and stage the best experiment run in the MLFlow Registry.
Next, we will deploy the model from the Registry into an API Endpoint, and finally redeploy it with additional resources for High Availability and increased serving performance.
The full code is available in this git repository.
Step by Step Guide
Setup
Create a CML Project with Python 3.9 / Workbench Editor Runtime.
Launch a CML Session and install requirements.
Open script "00_datagen.py" and update lines 140 and 141 with your Iceberg database name and Spark Data Connection Name. Then run it.
Script 1: Create the Model Experiment
Run script "01_train_xgboost.py" in order to create an MLFlow Experiment.
Code highlights:
import mlflow
EXPERIMENT_NAME = "xgb-cc-fraud-{0}-{1}".format(USERNAME, DATE)
mlflow.set_experiment(EXPERIMENT_NAME)
with mlflow.start_run():
model = XGBClassifier(use_label_encoder=False, eval_metric="logloss")
model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
mlflow.log_param("accuracy", accuracy)
mlflow.xgboost.log_model(model, artifact_path="artifacts")
def getLatestExperimentInfo(experimentName):
"""
Method to capture the latest Experiment Id and Run ID for the provided experimentName
"""
experimentId = mlflow.get_experiment_by_name(experimentName).experiment_id
runsDf = mlflow.search_runs(experimentId, run_view_type=1)
experimentId = runsDf.iloc[-1]['experiment_id']
experimentRunId = runsDf.iloc[-1]['run_id']
return experimentId, experimentRunId
experimentId, experimentRunId = getLatestExperimentInfo(EXPERIMENT_NAME)
#Replace Experiment Run ID here:
run = mlflow.get_run(experimentRunId)
pd.DataFrame(data=[run.data.params], index=["Value"]).T
pd.DataFrame(data=[run.data.metrics], index=["Value"]).T
client = mlflow.tracking.MlflowClient()
client.list_artifacts(run_id=run.info.run_id)
Script 2: Register the Model Experiment
Run script "02_cml_api_endpoint.py" in order to register an MLFlow Experiment.
Code highlights:
import cmlapi
from cmlapi.rest import ApiException
class ModelRegistration():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, username, experimentName):
self.client = cmlapi.default_client()
self.username = username
self.experimentName = experimentName
def registerModelFromExperimentRun(self, modelName, experimentId, experimentRunId, modelPath):
"""
Method to register a model from an Experiment Run
Input: requires an experiment run
Output: api_response object
"""
model_name = 'xgb-cc-' + username
CreateRegisteredModelRequest = {
"project_id": os.environ['CDSW_PROJECT_ID'],
"experiment_id" : experimentId,
"run_id": experimentRunId,
"model_name": modelName,
"model_path": modelPath
}
try:
# Register a model.
api_response = self.client.create_registered_model(CreateRegisteredModelRequest)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_registered_model: %s\n" % e)
return api_response
modelReg = ModelRegistration(username, experimentName)
modelPath = "artifacts"
modelName = "FraudCLF-" + username
registeredModelResponse = modelReg.registerModelFromExperimentRun(modelName, experimentId, experimentRunId, modelPath)
modelId = registeredModelResponse.model_id
modelVersionId = registeredModelResponse.model_versions[0].model_version_id
registeredModelResponse.model_versions[0].model_version_id
Script 3: Deploy Endpoint from Registry Model
Run script "03_api_deployment.py" in order to create a Model Endpoint from the registered model.
Code highlights:
class ModelDeployment():
"""
Class to manage the model deployment of the xgboost model
"""
def __init__(self, projectId, username):
self.client = cmlapi.default_client()
self.projectId = projectId
self.username = username
def listRegisteredModels(self):
"""
Method to retrieve registered models by the user
"""
#str | Search filter is an optional HTTP parameter to filter results by. Supported search_filter = {\"model_name\": \"model_name\"} search_filter = {\"creator_id\": \"<sso name or user name>\"}. (optional)
search_filter = {"creator_id" : self.username, "model_name": "FraudCLF-"+self.username}
search = json.dumps(search_filter)
page_size = 1000
try:
# List registered models.
api_response = self.client.list_registered_models(search_filter=search, page_size=page_size)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_registered_models: %s\n" % e)
return api_response
def getRegisteredModel(self, modelId):
"""
Method to return registered model metadata including model version id
"""
search_filter = {"creator_id" : self.username}
search = json.dumps(search_filter)
try:
# Get a registered model.
api_response = self.client.get_registered_model(modelId, search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->get_registered_model: %s\n" % e)
return api_response
def createModel(self, projectId, modelName, registeredModelId, description = "Fraud Detection 2024"):
"""
Method to create a model
"""
CreateModelRequest = {
"project_id": projectId,
"name" : modelName,
"description": description,
"registered_model_id": registeredModelId,
"disable_authentication": True
}
try:
# Create a model.
api_response = self.client.create_model(CreateModelRequest, projectId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model: %s\n" % e)
return api_response
def createModelBuild(self, projectId, modelVersionId, modelCreationId, runtimeId):
"""
Method to create a Model build
"""
# Create Model Build
CreateModelBuildRequest = {
"registered_model_version_id": modelVersionId,
"runtime_identifier": runtimeId,
"comment": "invoking model build",
"model_id": modelCreationId
}
try:
# Create a model build.
api_response = self.client.create_model_build(CreateModelBuildRequest, projectId, modelCreationId)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_build: %s\n" % e)
return api_response
def createModelDeployment(self, modelBuildId, projectId, modelCreationId):
"""
Method to deploy a model build
"""
CreateModelDeploymentRequest = {
"cpu" : "2",
"memory" : "4"
}
try:
# Create a model deployment.
api_response = self.client.create_model_deployment(CreateModelDeploymentRequest, projectId, modelCreationId, modelBuildId)
pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->create_model_deployment: %s\n" % e)
return api_response
def listRuntimes(self):
"""
Method to list available runtimes
"""
search_filter = {"kernel": "Python 3.9", "edition": "Standard", "editor": "Workbench"}
# str | Search filter is an optional HTTP parameter to filter results by.
# Supported search filter keys are: [\"image_identifier\", \"editor\", \"kernel\", \"edition\", \"description\", \"full_version\"].
# For example: search_filter = {\"kernel\":\"Python 3.7\",\"editor\":\"JupyterLab\"},. (optional)
search = json.dumps(search_filter)
try:
# List the available runtimes, optionally filtered, sorted, and paginated.
api_response = self.client.list_runtimes(search_filter=search, page_size=1000)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CMLServiceApi->list_runtimes: %s\n" % e)
return api_response
Once you have created your model endpoint, give it a minute and then try a test request:
{"dataframe_split":
{"columns": ["age", "credit_card_balance", "bank_account_balance", "mortgage_balance", "sec_bank_account_balance", "savings_account_balance", "sec_savings_account_balance", "total_est_nworth", "primary_loan_balance", "secondary_loan_balance", "uni_loan_balance", "longitude", "latitude", "transaction_amount"],
"data":[[35.5, 20000.5, 3900.5, 14000.5, 2944.5, 3400.5, 12000.5, 29000.5, 1300.5, 15000.5, 10000.5, 2000.5, 90.5, 120.5]]}}
Script 4: Endpoint Redeployment
Run script "04_api_redeployment.py" in order to create a new model deployment with increased resources.
Code highlights:
deployment = ModelDeployment(projectId, username)
getLatestDeploymentResponse = deployment.get_latest_deployment_details(modelName)
listRuntimesResponse = deployment.listRuntimes()
listRuntimesResponse
runtimeId = 'docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-workbench-python3.9-standard:2024.02.1-b4' # Copy a runtime ID from previous output
cpu = 2
mem = 4
replicas = 2
createModelBuildResponse = deployment.createModelBuild(projectId, modelVersionId, modelCreationId, runtimeId, cpu, mem, replicas)
modelBuildId = createModelBuildResponse.id
deployment.createModelDeployment(modelBuildId, projectId, modelCreationId)
Summary and Next Steps
Cloudera Machine Learning exposes a REST API that you can use to perform operations related to projects, jobs, and runs. You can use API commands to integrate CML with third-party workflow tools or to control CML from the command line.
CML's API accelerates your data science projects by allowing you to build end to end pipelines programmatically. When coupled with CML MLFlow Tracking and MLFlow Registry, it can be used to manage models from inception to production.