Created on 03-27-2024 05:39 PM
Logging Iceberg Metrics with MLFlow Tracking in CML
Cloudera Machine Learning (CML) on Cloudera Data Platform (CDP) accelerates time-to-value by enabling data scientists to collaborate in a single unified platform that is all inclusive for powering any AI use case. Purpose-built for agile experimentation and production ML workflows, Cloudera Machine Learning manages everything from data preparation to MLOps, to predictive reporting.
CML is compatible with the MLflow Tracking API and makes use of the MLflow client library as the default method to log experiments. Existing projects with existing experiments are still available and usable. CML’s experiment tracking features allow you to use the MLflow client library for logging parameters, code versions, metrics, and output files when running your machine learning code.
Apache Iceberg is a table format for huge analytics datasets in the cloud that defines how metadata is stored and data files are organized. Iceberg is also a library that compute engines can use to read/write a table. CML offers Data Connections to connect to Data Sources available within the CDP Environment including Iceberg Open Lakehouses.
In this example we will create an experiment with MLFlow Tracking and log Iceberg metadata in order to enhance machine learning reproducibility in the context of MLOps.
Step by Step Guide
The code samples provided below are extracts from the accompanying notebook. The full code can be found in this git repository.
Setup
Create a CML Project with Python 3.9 / JupyterLab Editor Runtime.
Launch a CML Session and install requirements.
Run Notebook
Run each cell in the notebook.
Code highlights:
import mlflow.spark
import cml.data_v1 as cmldata
#Edit your connection name here:
CONNECTION_NAME = "se-aw-mdl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()
def exp1(df):
mlflow.set_experiment("sparkml-experiment")
##EXPERIMENT 1
df.writeTo("spark_catalog.default.training").using("iceberg").createOrReplace()
spark.sql("SELECT * FROM spark_catalog.default.training").show()
### SHOW TABLE HISTORY AND SNAPSHOTS
spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)
snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": training_df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=8
regParam=0.01
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
return runs_df
def exp2(df):
mlflow.set_experiment("sparkml-experiment")
##EXPERIMENT 2
### ICEBERG INSERT DATA - APPEND FROM DATAFRAME
# PRE-INSERT
spark.sql("SELECT * FROM spark_catalog.default.training").show()
temp_df = spark.sql("SELECT * FROM spark_catalog.default.training")
temp_df.writeTo("spark_catalog.default.training").append()
df = spark.sql("SELECT * FROM spark_catalog.default.training")
# PROST-INSERT
spark.sql("SELECT * FROM spark_catalog.default.training").show()
spark.read.format("iceberg").load("spark_catalog.default.training.history").show(20, False)
spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").show(20, False)
snapshot_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("snapshot_id").tail(1)[0][0]
committed_at = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("committed_at").tail(1)[0][0].strftime('%m/%d/%Y')
parent_id = spark.read.format("iceberg").load("spark_catalog.default.training.snapshots").select("parent_id").tail(1)[0][0]
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=10
regParam=0.002
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
return runs_df
def exp3(df, snapshot_id):
##EXPERIMENT 3
df = spark.read.option("snapshot-id", snapshot_id).table("spark_catalog.default.training")
committed_at = spark.sql("SELECT committed_at FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).collect()[0][0].strftime('%m/%d/%Y')
parent_id = str(spark.sql("SELECT parent_id FROM spark_catalog.default.training.snapshots WHERE snapshot_id = {};".format(snapshot_id)).tail(1)[0][0])
tags = {
"iceberg_snapshot_id": snapshot_id,
"iceberg_snapshot_committed_at": committed_at,
"iceberg_parent_id": parent_id,
"row_count": training_df.count()
}
### MLFLOW EXPERIMENT RUN
with mlflow.start_run() as run:
maxIter=7
regParam=0.005
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=maxIter, regParam=regParam)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
mlflow.log_param("maxIter", maxIter)
mlflow.log_param("regParam", regParam)
#prediction = model.transform(test)
mlflow.set_tags(tags)
mlflow.end_run()
experiment_id = mlflow.get_experiment_by_name("sparkml-experiment").experiment_id
runs_df = mlflow.search_runs(experiment_id, run_view_type=1)
#spark.stop()
return runs_df
Summary and Next Steps
Large ML organizations require standardized best practices such as tracking models and respective dependencies, model developers, and matching those with datasets in order to keep a consistent view of all MLOps practices.
MLFlow Tracking in CML allows you to achieve this goal by allowing you to specify datasets and other custom metadata when tracking experiment runs. In the above example we tracked Iceberg metadata in order to allow data scientists to retrain an existing pipeline with datasets as of arbitrary points in time. In the process, we used tags in order to implement a consistent taxonomy across all experiment runs.