Community Articles

Find and share helpful community-sourced technical articles.
avatar
Rising Star

In the last few years, Spark has become one of the default choices for Data Analysis, Engineering, and Machine Learning in Cloudera Machine Learning (CML). CML is the Cloud-Native autoscaling service in the Cloudera Data Platform (CDP) for secure and governed Machine Learning and interactive Data Analysis at Enterprise Scale.

In this article, we will share important considerations when using Spark in CML in order to help Developers and Data Scientists in their daily usage.  

CML Sessions

A CML Workspace provides an autoscaling Kubernetes cluster for running workloads such as interactive notebook Sessions, as well as Batch Jobs, Experiments, Applications, and Models. These are available in the CML Project, a construct to isolate users in their own environments.  


Workspace implementation details are outside of this article’s scope. Luckily neither the CML User nor Admin is required to know these or operate Kubernetes at all. Under the covers, CDP provides the necessary infrastructure and integrations with the Shared Data Experience so that the Workspace is as easy to deploy as the click of a button. 


When the CML User launches Sessions, Jobs, or other types of workloads, the user is prompted to select a Resource Profile, a Runtime, and an Editor.

 

The Resource Profile is the amount of CPU, Memory, and optionally GPU resources that are assigned to the Session when it is launched. The Runtime consists of all the dependencies (.e.g Pandas, Numpy, Tensorflow, etc.) that are loaded into the Session Pod by default. Finally, the user can select from predefined editors such as RStudio, JupyterLabs, and VS Code, or optionally select a custom editor if one has been installed. As an alternative, the user can deploy a local IDE such as IntelliJ or VSCode and connect to CML remotely.  

 

CML Sessions and Spark 

CML beginners often ask how CML Sessions relate to Spark Sessions and Applications. The two are fundamentally different concepts. 

As we saw in the previous section when a CML Session is launched a Pod is deployed with the allotted Resource Profile. Due to CML’s flexibility, Sessions can execute workloads with any open-source framework in languages such as Scala, Python, R, Julia, or Java. 

 

This is due to the fact that Sessions rely on CML Runtimes which can be thought of as a virtual machine customized to have all the necessary dependencies to access the computing cluster while keeping each project’s environment entirely isolated.

 

Under the hood, CML deploys a container in the Kubernetes cluster with the assigned Resource Profile CPU-Memory and mounts Runtime dependencies.

 

Spark is just one of the different frameworks that one could use within a Session. For example, a CML User could create a Spark Application from his or her Session Notebook. In this event, CML will deploy the Spark Application in Client Mode thus requesting additional Pods to run Spark Executors in from CML and the Kubernetes cluster. 

 

Picking a Resource Profile

 

It’s very important to notice that only the Spark Driver will run within the Session while the Spark Executors will have their own resources. This implies that there is no need to launch a Session with a large Resource Profile when trying to run Spark Applications at scale. The Session Resource Profile should be picked primarily on the basis of the Spark Driver’s necessary profile.

 

If the user wishes to deploy more than one Spark Application from the same CML Session, these will be managed by the same Spark Driver. There is at most only one Spark Driver per Session.

 

Overridden Spark Configurations

 

It’s important to note that most Spark Configurations are left untouched by CML. The Spark on Kubernetes public documentation should thus serve as a primary reference when setting Spark Session Configs. However, the following Spark options are overridden by CML and should never be modified by the Spark Developer:

  • spark.driver.host
  • spark.driver.port
  • spark.blockmanager.port

 

So in general what important Spark Configurations should be set when using Spark in CML? 

 

  • Spark Dynamic Allocation is enabled by default in CML. We recommend using it for exploring datasets in interactive sessions or running large scale Data Engineering Spark jobs while keeping an eye on the “spark.dynamicAllocation.maxExecutors” property in order to prevent runaway charges. 
  • The "spark.executor.memory", "spark.executor.instances", "spark.executor.cores", "spark.driver.memory" properties are also very important as they allow you to predefine the amount of resources allocated to your Spark jobs. However, unlike Spark on Yarn clusters, CML provides dramatically more flexibility when it comes to deploying cloud resources. Because Spark clusters in CML are ephemeral and run in isolated environments, you will have less resource contention and generally you won't have to carefully evaluate the impact of your jobs on someone else's workloads. 

For an introduction to tuning Spark jobs and tuning resource allocation visit this guide.

 

 

spark = SparkSession\
    .builder\
    .appName("DistributedSession")\
    .config("spark.dynamicAllocation.minExecutors", 1)\
    .config("spark.dynamicAllocation.maxExecutors", 4)\
    .config("spark.executor.memory","2g")\
    .config("spark.executor.cores","8")\
    .config("spark.driver.memory","2g")\
    .getOrCreate()

 

 

 

 

  • To access data in Cloud Storage you should set the “spark.yarn.access.hadoopFileSystems” config to your Storage location. This value is shown in the CDP Management Console Summary tab of the CDP Environment tied to your CML Workspace. Notice that the "spark.hadoop.fs.s3a.s3guard.ddb.region" option is no longer necessary in recent CML versions.

 

spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.yarn.access.hadoopFileSystems", "s3a://gd-uat/")\
    .getOrCreate()

 

 

 

  • Spark in CML runs in Client mode by default. Generally local mode is not recommended unless you prototyping a small use case in a notebook.

 

spark = SparkSession\
    .builder\
    .master("local")\
    .appName("SimpleSession")\
    .getOrCreate()

 

 

 

  • If using Apache Iceberg, you must select an Iceberg compatible Runtime and set  "spark.sql.extensions", "spark.sql.catalog.spark_catalog", "spark.sql.catalog.local", "spark.sql.catalog.local.type", "spark.sql.catalog.spark_catalog.type" as shown below:

 

spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type","hadoop") \
.config("spark.sql.catalog.spark_catalog.type","hive") \
.getOrCreate()

 

 

 

  • As an alternative to the above, we recommend using Spark Data Connections to launch a Spark Session and directly connect to your Data Lake. If you use an Iceberg compatible Spark Runtime Add-On the Iceberg configs will be set automatically. Notice the Cloud Storage config is not required as it is set by default.

 

import cml.data_v1 as cmldata

from pyspark import SparkContext

#Optional Spark Configs
SparkContext.setSystemProperty('spark.executor.cores', '4')
SparkContext.setSystemProperty('spark.executor.memory', '8g')

#Boilerplate Code provided to you by CML Data Connections
CONNECTION_NAME = "go01-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show()

 

 

 

 

Using Different Spark Versions

When running distributed workloads at scale in CML Spark is generally easier to use than others. First of all, other frameworks run in a single Session and require using the Workers API to distribute runs across multiple nodes. Instead, Spark on Kubernetes runs in client mode and deploys as many Executor containers as needed by default.

 

In addition, Spark dependencies can be injected into the Session via CML Runtime Add-Ons. In simple terms, this means that you can launch a Session with or without Spark at the click of a button in a form. Behind the scenes, the Spark Add-On bits are loaded on the fly.

 

An additional benefit of this is that the end user can pick between Spark versions when launching the CML Session using a form rather than having to install or customize any project dependencies (or even run multiple sessions with different versions of Spark at the same time). As of time of this writing, CML supports Spark 2.4.8 and 3.2.0. The versions are managed at the Workspace level and depend on the Workspace version.

 

Deploying a SparkML Model as an API Endpoint

To support Production Machine Learning CML features “Models” i.e. a Model Management Tool to deploy models in API Endpoints, track requests and responses, and much more. 

 

Typically a CML Model deploys a Script in a running Pod and requires loading all necessary model artifacts such as “pkl” or “joblib” files from Local or Cloud Storage. The model is wrapped by an arbitrary function that takes the Request JSON as input, processes it via the model’s “predict” method (or equivalents), and finally returns a Response.

 

While this is fairly straightforward with frameworks such as SciKit-Learn and Dask, SparkML models are more challenging as they require running a SparkSession which does not lend itself well to the latency required by an API Endpoint. To avoid this, a number of projects and formats have been developed such as MLeap, PMML, and more. 

 

In CML, we recommend using the mlflow.spark module to create MLFlow Experiments. If the Developer chooses to log Model Artifacts via the MLFlow Tracking API these are stored in each Experiment Run for later access. If the developer then opts to register an Experiment Run to the MLFlow Registry, CML will provide an option to deploy the model from the Registry to the CML API Endpoint without having to craft a script for it. MLFlow will automatically convert the SparkML Model to the pyfunc flavor. As of the time of this writing, MLFlow Registry is in Tech Preview and we recommend reaching out to your Cloudera Account Team if you need help deploying it.

 

As an alternative, we recommend using the ONNX format to serialize the SparkML Model. Once converted, the model can be loaded into the CML Model Pod and wrapped into a function as explained above. This GitHub repository provides an example for doing so. First, convert the model from SparkML to ONNX as shown in this notebook. Then, deploy it to a CML Model with a script similar to this.

 

Conclusions

In this article, we highlighted some of the most frequently asked questions about using Spark in CML. In summary:

  • CML allows you to easily deploy small and large Spark Applications via CML Sessions. 
  • CML supports Spark on Kubernetes with Spark Versions 2.4.8 and 3.2.0. Spark version support is subject to change and depends on CML Workspace versions.
  • CML Sessions are different from Spark Applications. The Spark Driver runs in the CML Session while the Spark Executors run in separate Kubernetes Pods. 
  • Most Spark Configurations in CML are not overridden except for the three above which should never be edited by the CML User. 
  • If you want to deploy a SparkML Model to an API Endpoint do not launch a Spark Application in it. Instead, default to MLFlow’s Model Registry to deploy models. If you prefer not to use MLFlow convert the SparkML model to a format such as ONNX.
2,463 Views
0 Kudos