Created on 06-05-2023 06:26 PM - edited 10-12-2023 04:11 PM
In the last few years, Spark has become one of the default choices for Data Analysis, Engineering, and Machine Learning in Cloudera Machine Learning (CML). CML is the Cloud-Native autoscaling service in the Cloudera Data Platform (CDP) for secure and governed Machine Learning and interactive Data Analysis at Enterprise Scale.
In this article, we will share important considerations when using Spark in CML in order to help Developers and Data Scientists in their daily usage.
CML Sessions
A CML Workspace provides an autoscaling Kubernetes cluster for running workloads such as interactive notebook Sessions, as well as Batch Jobs, Experiments, Applications, and Models. These are available in the CML Project, a construct to isolate users in their own environments.
Workspace implementation details are outside of this article’s scope. Luckily neither the CML User nor Admin is required to know these or operate Kubernetes at all. Under the covers, CDP provides the necessary infrastructure and integrations with the Shared Data Experience so that the Workspace is as easy to deploy as the click of a button.
When the CML User launches Sessions, Jobs, or other types of workloads, the user is prompted to select a Resource Profile, a Runtime, and an Editor.
The Resource Profile is the amount of CPU, Memory, and optionally GPU resources that are assigned to the Session when it is launched. The Runtime consists of all the dependencies (.e.g Pandas, Numpy, Tensorflow, etc.) that are loaded into the Session Pod by default. Finally, the user can select from predefined editors such as RStudio, JupyterLabs, and VS Code, or optionally select a custom editor if one has been installed. As an alternative, the user can deploy a local IDE such as IntelliJ or VSCode and connect to CML remotely.
CML Sessions and Spark
CML beginners often ask how CML Sessions relate to Spark Sessions and Applications. The two are fundamentally different concepts.
As we saw in the previous section when a CML Session is launched a Pod is deployed with the allotted Resource Profile. Due to CML’s flexibility, Sessions can execute workloads with any open-source framework in languages such as Scala, Python, R, Julia, or Java.
This is due to the fact that Sessions rely on CML Runtimes which can be thought of as a virtual machine customized to have all the necessary dependencies to access the computing cluster while keeping each project’s environment entirely isolated.
Under the hood, CML deploys a container in the Kubernetes cluster with the assigned Resource Profile CPU-Memory and mounts Runtime dependencies.
Spark is just one of the different frameworks that one could use within a Session. For example, a CML User could create a Spark Application from his or her Session Notebook. In this event, CML will deploy the Spark Application in Client Mode thus requesting additional Pods to run Spark Executors in from CML and the Kubernetes cluster.
Picking a Resource Profile
It’s very important to notice that only the Spark Driver will run within the Session while the Spark Executors will have their own resources. This implies that there is no need to launch a Session with a large Resource Profile when trying to run Spark Applications at scale. The Session Resource Profile should be picked primarily on the basis of the Spark Driver’s necessary profile.
If the user wishes to deploy more than one Spark Application from the same CML Session, these will be managed by the same Spark Driver. There is at most only one Spark Driver per Session.
Overridden Spark Configurations
It’s important to note that most Spark Configurations are left untouched by CML. The Spark on Kubernetes public documentation should thus serve as a primary reference when setting Spark Session Configs. However, the following Spark options are overridden by CML and should never be modified by the Spark Developer:
So in general what important Spark Configurations should be set when using Spark in CML?
For an introduction to tuning Spark jobs and tuning resource allocation visit this guide.
spark = SparkSession\
.builder\
.appName("DistributedSession")\
.config("spark.dynamicAllocation.minExecutors", 1)\
.config("spark.dynamicAllocation.maxExecutors", 4)\
.config("spark.executor.memory","2g")\
.config("spark.executor.cores","8")\
.config("spark.driver.memory","2g")\
.getOrCreate()
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.yarn.access.hadoopFileSystems", "s3a://gd-uat/")\
.getOrCreate()
spark = SparkSession\
.builder\
.master("local")\
.appName("SimpleSession")\
.getOrCreate()
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type","hadoop") \
.config("spark.sql.catalog.spark_catalog.type","hive") \
.getOrCreate()
import cml.data_v1 as cmldata
from pyspark import SparkContext
#Optional Spark Configs
SparkContext.setSystemProperty('spark.executor.cores', '4')
SparkContext.setSystemProperty('spark.executor.memory', '8g')
#Boilerplate Code provided to you by CML Data Connections
CONNECTION_NAME = "go01-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()
# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show()
Using Different Spark Versions
When running distributed workloads at scale in CML Spark is generally easier to use than others. First of all, other frameworks run in a single Session and require using the Workers API to distribute runs across multiple nodes. Instead, Spark on Kubernetes runs in client mode and deploys as many Executor containers as needed by default.
In addition, Spark dependencies can be injected into the Session via CML Runtime Add-Ons. In simple terms, this means that you can launch a Session with or without Spark at the click of a button in a form. Behind the scenes, the Spark Add-On bits are loaded on the fly.
An additional benefit of this is that the end user can pick between Spark versions when launching the CML Session using a form rather than having to install or customize any project dependencies (or even run multiple sessions with different versions of Spark at the same time). As of time of this writing, CML supports Spark 2.4.8 and 3.2.0. The versions are managed at the Workspace level and depend on the Workspace version.
Deploying a SparkML Model as an API Endpoint
To support Production Machine Learning CML features “Models” i.e. a Model Management Tool to deploy models in API Endpoints, track requests and responses, and much more.
Typically a CML Model deploys a Script in a running Pod and requires loading all necessary model artifacts such as “pkl” or “joblib” files from Local or Cloud Storage. The model is wrapped by an arbitrary function that takes the Request JSON as input, processes it via the model’s “predict” method (or equivalents), and finally returns a Response.
While this is fairly straightforward with frameworks such as SciKit-Learn and Dask, SparkML models are more challenging as they require running a SparkSession which does not lend itself well to the latency required by an API Endpoint. To avoid this, a number of projects and formats have been developed such as MLeap, PMML, and more.
In CML, we recommend using the mlflow.spark module to create MLFlow Experiments. If the Developer chooses to log Model Artifacts via the MLFlow Tracking API these are stored in each Experiment Run for later access. If the developer then opts to register an Experiment Run to the MLFlow Registry, CML will provide an option to deploy the model from the Registry to the CML API Endpoint without having to craft a script for it. MLFlow will automatically convert the SparkML Model to the pyfunc flavor. As of the time of this writing, MLFlow Registry is in Tech Preview and we recommend reaching out to your Cloudera Account Team if you need help deploying it.
As an alternative, we recommend using the ONNX format to serialize the SparkML Model. Once converted, the model can be loaded into the CML Model Pod and wrapped into a function as explained above. This GitHub repository provides an example for doing so. First, convert the model from SparkML to ONNX as shown in this notebook. Then, deploy it to a CML Model with a script similar to this.
Conclusions
In this article, we highlighted some of the most frequently asked questions about using Spark in CML. In summary: