Member since
11-22-2019
7
Posts
2
Kudos Received
0
Solutions
09-07-2023
12:06 PM
Pandas is a fast, powerful and flexible Python data analysis and manipulation tool. It has gained incredible popularity in the Data Science community as one of the most important packages for data wrangling and preparation. With the Pandas DataFrame at the core of its API, it is very simple to use and performant on relatively small datasets. On the other hand, Apache Spark is an open-source, distributed processing system used for big data workloads. It has also gained extreme popularity as the go-to engine for prototyping and deploying Data Engineering and Machine Learning pipelines at scale. Due to the maturity and reliability that the Spark project has achieved in recent years it is widely used for production use cases in the enterprise. Like Pandas, Spark features an API with the DataFrame as a foundational data structure for analyzing data at scale. However, while it is dramatically more performant at scale than Pandas it has never been quite as easy and intuitive to use. The Koalas project was created to address this gap. The idea was simple: provide users with the ability to run their existing Pandas code on Spark. With Spark 3.2 the project has been fully incorporated in PySpark as the "Pandas API on Spark". Cloudera Machine Learning (CML) is Cloudera’s new cloud-native machine learning service, built for CDP. The CML service provisions clusters, also known as ML workspaces, that run natively on Kubernetes. ML workspaces support fully-containerized execution of Python, R, Scala, and Spark workloads through flexible and extensible engines. The Pandas on Spark API is a common choice among CML users. The attached notebook provides a simple Quickstart for a CML Session. To use it in CML you must have: A CML Session with the Python Kernel and the Spark Add on enabled (Spark Version 3.2 or above only). Pandas and PyArrow installed. The notebook in this Git repository contains the full CML Quickstart. Here are some important recommendations: Even though Pandas on Spark does not require a SparkSession or SparkContext object, use the CML Spark Data Connection to launch a SparkSession object and set custom configurations. For example, this will allow you to read Iceberg tables into a Pandas On Spark DataFrame with a single line of code. import cml.data_v1 as cmldata
CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()
df_from_sql = ps.read_table('default.car_sales') When you load a table from the Catalog set the 'compute.default_index_type' option to 'distributed' to ensure your data is not folded into a single Spark partition: ps.set_option('compute.default_index_type', 'distributed')
df_from_sql_distributed = ps.read_table('default.car_sales') The same Spark performance concepts such as trying to avoid shuffles apply to Pandas on Spark. For example you should leverage checkpointing when utilizing the same DataFrame repeatedly, shuffle with caution, and review the Spark Plan when needed: df_from_sql_distributed.spark.explain()
== Physical Plan ==
*(1) Project [monotonically_increasing_id() AS __index_level_0__#1002L, customer_id#992L, model#993, saleprice#994, sale_date#995, vin#996]
+- *(1) Scan HiveAcidRelation(org.apache.spark.sql.SparkSession@46e4ace2,default.car_sales,Map(transactional -> true, numFilesErasureCoded -> 0, bucketing_version -> 2, transient_lastDdlTime -> 1679572954, serialization.format -> 1, transactional_properties -> insert_only, table -> default.car_sales)) [customer_id#992L,model#993,saleprice#994,sale_date#995,vin#996] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: struct<customer_id:bigint,model:string,saleprice:double,sale_date:string,vin:string> df_from_sql_distributed = df_from_sql_distributed.spark.local_checkpoint()
df_from_sql_distributed.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#1002L, customer_id#992L, model#993, saleprice#994, sale_date#995, vin#996]
+- *(1) Scan ExistingRDD[__index_level_0__#1002L,customer_id#992L,model#993,saleprice#994,sale_date#995,vin#996,__natural_order__#1009L] Plotting with a PySpark DataFrame can be a challenge and often requires collecting into a Pandas DataFrame. With Pandas On Spark you can call plotly's "plot()" method directly on a DataFrame: psdf[['Fee','Discount']].plot.area() In summary, the Pandas API on Spark offers the following benefits: You can run your Pandas code faster. You can use the Pandas API with the distributed horsepower of Spark. You can have a single codebase for everything: small data and big data. The same Pandas syntax is compatible with Dask so you can even more easily choose the engine to run it on. When deployed on CML, the Pandas on Spark API is easy to use: CML Sessions allow you to easily deploy resources in Kubernetes to execute ML workloads. CML Runtimes allow you to switch between programming languages, editors, and preloaded libraries with agility. Among these benefits, you can pick which version of Spark to use on the fly. In this example we used Spark 3.2.
... View more
08-30-2023
05:07 PM
Objective This article provides a Quickstart for using XGBoost with Spark in Cloudera Machine Learning. CML users who are looking to test a distributed version of XGBoost are welcome to use the code in their environments. The notebook is available in this Git repository . Cloudera Machine Learning Cloudera Machine Learning (CML) on the Cloudera Data Platform increases data scientists' productivity by providing a single, unified platform that is all-inclusive for powering any AI use case. CML is purpose-built for agile experimentation and production ML workflows. It is designed to manage everything from data preparation to MLOps and predictive reporting. For more information on CML, please visit this page . XGBoost and Spark In Machine Learning, Decision Tree methods are a type of Supervised Learning model. They have been used for decades in both classification and regression tasks. There are many types; generally, they are constructed by identifying ways to split data into hierarchical structures. Data is split into leaf nodes from the top of the tree by using features in order of predictive importance. Decision trees are highly interpretable but are known to suffer from a tendency to overfit the data. Ensemble methods combine multiple learners to increase model performance. Anyone can create one by combining existing models for a given task but typically they are applied to Decision Trees. Ensemble methods such as Random Forests, bagging, and Gradient Boosting iteratively reward good base learners to ultimately yield models with better accuracy and lower variance. XGBoost stands for Extreme Gradient Boosting and is a scalable, distributed gradient-boosted decision tree (GBDT) machine learning library. It is very popular among data scientists because of its consistent results across many Kaggle competitions and research projects in academia. Apache Spark is a powerful open-source engine for big data processing and analytics. Combining XGBoost and Spark allows you to leverage the model performance gains provided by the former while distributing the work to the latter. This can dramatically improve the quality and performance of your Machine Learning models. Using the example in CML Before launching the notebook in a CML Session we recommend ensuring that the “Enable CPU Bursting” workspace option is set in the “Site Administration” -> “Runtime” page (only CML Admins have access to it). This configuration controls if CML pod specifications for CML sessions receive a resource limit. In other words, when enabled the Spark Executor Cores and Memory are not limited by the CML Session Resource Profile. Next, launch a CML Session with the following Options and Resource Profiles: Editor: JupyterLab
Kernel: Python 3.8 or above
Edition: Standard
Version: any version ok
Enable Spark: Spark 3.2.0 and above ok
Resource Profile: 2vCPU / 4GiB Memory - 0 GPU Then open the terminal and install the project requirements with: pip3 install -r requirements.txt In the notebook ensure to update the SparkSession properties according to your CDP Environment in the second cell. If your CML Workspace is in AWS you will have to set both "spark.hadoop.fs.s3a.s3guard.ddb.region" and " spark.kerberos.access.hadoopFileSystems". If your CML Workspace is in Azure, OCP, or Cloudera ECS you only need to set " spark.kerberos.access.hadoopFileSystems" and can remove the other property. The correct values for both options are available in the CDP Management Console under Environment Configurations. If you have issues finding these please contact your CDP Administrator. No other changes are required. You can select “Run All Cells” from the “Run” option in the JupyterLab menu and follow along as the code outputs are populated in each cell. Code Walk-Through This section will highlight the most important code in the notebook. Cell 1: The "SparkXGBClassifier" class is imported from the "xgboost.spark" module. XGBoost also provides a "SparkXGBRegressor" class for Regression tasks from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.linalg import Vectors Cell 2: The SparkSession object is created. The "xgboost.spark" module requires disabling Spark Dynamic Allocation. Therefore we set four Executors with basic Memory and Core configurations. spark = SparkSession\
.builder\
.appName("SparkXGBoostClassifier Example")\
.config("spark.hadoop.fs.s3a.s3guard.ddb.region", "us-east-2")\
.config("spark.kerberos.access.hadoopFileSystems","s3a://go01-demo")\
.config("spark.dynamic allocation.enabled", "false")\
.config("spark.executor.cores", "4")\
.config("spark.executor.memory", "4g")\
.config("spark.executor.instances", "2")\
.config("spark.driver.core","4")\
.config("spark.driver.memory","4g")\
.getOrCreate() Cell 3: The code to reach the Spark UI in CML. Uncomment and run the cell and open the URL provided in the output to follow along in the Spark UI. import os
print("https://spark-"+os.environ["CDSW_ENGINE_ID"]+"."+os.environ["CDSW_DOMAIN"]) Cell 4 and 6: Two basic Spark Dataframes are created as training and test data. df_train = spark.createDataFrame([
(Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1, False, 2.0),
(Vectors.dense(4.0, 5.0, 6.0), 0, True, 1.0),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, True, 2.0),
], ["features", "label", "isVal", "weight"])
df_test = spark.createDataFrame([
(Vectors.dense(1.0, 2.0, 3.0), ),
], ["features"]) Cell 7: An object of the "SparkXGBClassifier" class is instantiated with default hyperparameters. Notice the "num_workers" option is set to 2. This value should be set to the number of Executors you want to distribute your SparkXGBoost Application across. xgb_classifier = SparkXGBClassifier(max_depth=5, missing=0.0,
validation_indicator_col='isVal', weight_col='weight',
early_stopping_rounds=1, eval_metric='logloss', num_workers=2) Cell 8: The classifier is trained on the training data xgb_clf_model = xgb_classifier.fit(df_train) Cell 9: The classifier is used to run inference on the test data. xgb_clf_model.transform(df_test).show() Classifier Sample Prediction on Test Data: +-------------+-------------+----------+-----------+
| features|rawPrediction|prediction|probability|
+-------------+-------------+----------+-----------+
|[1.0,2.0,3.0]| [-0.0,0.0]| 0.0| [0.5,0.5]|
+-------------+-------------+----------+-----------+ Summary and Next Steps This basic example provided a Quickstart for Distributed XGBoost with PySpark in Cloudera Machine Learning. Integrating XGBoost and Spark provides data scientists with the opportunity to leverage the strengths of both. XGBoost is used to achieve high model performance. Spark is used to distribute advanced Machine Learning algorithms across large clusters of worker nodes. If you use CML you may also benefit from the following projects: Telco Churn Demo : Build an End-to-end ML Project in CML and increase ML Explainability with the LIME library. Learn how to use Cloudera Applied ML Prototypes to discover more projects using MLFlow, Streamlit, Tensorflow, PyTorch, and many more. CSA2CML : Build a real-time anomaly detection dashboard with Flink, CML, and Streamlit. SDX2CDE : Explore ML Governance and Security features in SDX to increase legal compliance and enhance ML Ops best practices. APIv2 : Familiarize yourself with APIv2, CML's go-to Python library for ML Ops and DevOps.
... View more
06-05-2023
08:45 PM
1 Kudo
The Cloudera Data Platform (CDP) is a hybrid data platform designed to deliver faster and easier data management, analytics and AI at enterprise scale. Cloudera Machine Learning (CML) is one of CDP’s Cloud Native Data Services designed to enable secure, governed Data Science.
With immediate access to enterprise data pipelines, scalable compute resources, and access to preferred tools, Data Scientists and Engineers use CML to streamline the process of getting analytic workloads into production and intelligently manage machine learning use cases and MLOps processes.
While CML Data Scientists spend most of their time prototyping and productionizing models in CML Projects, the CML Admin needs to be familiar with the CML Workspace, its basic architecture and how it allocates Cloud resources.
With this article we will share some foundational concepts in order to help CML Administrators better understand how to size Workspaces.
What is a Workspace?
CML workloads are executed within Workspaces and in turn within Projects and Teams. To the CML User, the Workspace is a high-level construct to create CML Projects, store CML Runtimes, and perform other administrative tasks such as creating Resource Profiles.
However, under the covers, the Workspace is better defined as an auto-scaling service for Data Science leveraging Kubernetes. The Kubernetes cluster runs in Cloud Virtual Machines using AKS and EKS in the Public Cloud or OCP and Cloudera ECS in the Private Cloud. The CML Administrator or Data Scientist is not required to know or handle Kubernetes in any way. CML automatically deploys and manages the infrastructure resources for you in your CDP Environment of choice.
When a Workspace is created for the first time a node is deployed to the underlying infrastructure. This is a fixed resource that is required to run at all times for a small cost.
Subsequently, when a CML User runs a workload such as a Notebook, a Model API Endpoint, or a Batch Job, the CML Workspace provisions the necessary Pod(s) thus requesting a second node from the underlying infrastructure.
As mentioned above, the auto-scaling process is fully automated and does not require any supervision. Auto-scaling events are fast and designed so that CML Users are not aware of them. Running workloads are not affected by the auto-scaling event e.g. running Sessions will continue undisturbed. If needed, any pending workloads such as new CML Sessions or previously scheduled CML Jobs will be queued automatically until new resources are deployed.
At a high level, the pods carve out resources from the node(s) which is then released when the workload is complete. Thus, the CML Customer is only charged on the go as cloud resources are consumed and then discarded.
The CML User explicitly picks the amount of CPU, Memory and optionally GPU resources when launching the workload. This amount is called a Resource Profile (e.g. 1 CPU / 2 GiB Mem) and it is predefined by the CML Admin at the Workspace level in order to provide an approval process and prevent Data Scientists from consuming too many resources without control.
Sizing Considerations
When deploying the Workspace for the first time, the user is prompted to select an instance type and an Autoscale Range (see image below). In the Public Cloud, these are AWS or Azure instances. The Autoscale Range is simply a min and max boundary of the instances that can be deployed by the Service.
Typically, the more CPU, Memory, and GPU resources available per instance, the higher the hourly cost to run them but the more CML workloads can be deployed per instance without requiring the autoscaler to deploy an additional node.
Because a typical workload such as a Data Exploration Notebook only requires a small Resource Profile, it is not uncommon to have multiple users working concurrently within the same node and thus at a fairly limited hourly cost. This means that instance types of relatively small size can be chosen when deploying a workspace. In the event of more horsepower being required, the Workspace will simply autoscale by adding as many instances as required and allowed by the Workspace Autoscale Range.
However, if you plan on running workloads that cannot horizontally scale in a distributed fashion with frameworks such as Spark, TensorFlow, etc., then it may make sense to choose a more powerful instance type. This could be the case in Time Series Machine Learning where algorithms cannot always be distributed.
Finally, it’s important to note that CML Instance Types and autoscale ranges can be changed even after a Workspace has been deployed.
Cost Management Considerations
Instance hourly rates are publicly available on the Cloudera Pricing Site . In addition, your Cloudera Account Team can provide additional recommendations to plan and size your Workspace according to your use cases.
CML is designed to allow the Administrator to closely monitor and limit usage in order to prevent runaway cloud charges. As mentioned above, Resource Profiles are whitelisted by the CML Admin in order to prevent CML Users from requesting resources without supervision. To be specific, the CML User will only be able to launch Jobs, Sessions, Applications, etc. with the CPU/Mem/GPU profiles designated in the Runtime menu as shown below.
Furthermore, CML Users are also users at the CDP Environment level. In other words, each Workspace can grant or deny access to a particular CDP User.
Finally, within each Workspace, the CML Admin can create Quotas to directly limit a User’s maximum amount of CPU, Memory, and GPU use across all workloads at any given time. Quota consumption is only a subset of the Workspace Autoscale ranges which can be viewed as a second option for managing costs at the global level.
Using Multiple Workspaces
It is common practice to create multiple CML Workspaces as each additional Workspace can provide workload isolation and a quick second option in case of failure. CML Customers typically deploy them based on scope such as Use Case, Business Organization, or function e.g. DEV vs QA vs PROD.
The additional workspace(s) can be created in the same CDP Environment or in a separate CDP Environment. In the former case, the Workspaces will share the same SDX Data Lake and thus their users will be able to access and transform the same datasets while being governed and secured by the same Atlas and Ranger services. In the latter case, creating Workspaces in different CDP Environments will guarantee that they won’t be adversely affected in case of a failure at the CDP Environment level.
For example, the below image shows two workspaces deployed in the same CDP Environment while a third one is in a separate one. Notice the first Workspace is undergoing a change in instance types and autoscale range.
Additionally, CML supports MLFlow Registry which allows you to deploy models from one Workspace to another. As a result, multiple workspaces can support DevOps pipelines across multiple CDP Environments and even allow you to deploy models from Public to Private Cloud and vice versa (Hybrid Machine Learning).
Although each Workspace comes with a small fixed hourly charge, another advantage is that you will be able to select different instance types and autoscale ranges for each deployment which in turn could allow you to save money by enforcing stricter limitations on particular business functions or user groups.
A Sizing Exercise Example
With all these considerations in mind, we recommend you go through a similar exercise as below when planning your Workspace deployment.
Step 1: Estimate the number of CML Users and optionally whether these will be working within the same or different Teams, Use Cases, and CDP Data Lakes.
Step 2: Estimate average and peak CPU, Memory, and optionally GPU consumption per User. If planning on more than one Team, determine if the average and peak dramatically varies between them.
Step 3: Decide if you need more than one workspace. Try to group users into Teams and Use Cases as much as reasonably possible based on similarities in Data Lake Access, average and peak consumption. Other factors may include whether users need GPUs, special Ranger ACLs, and types of workloads (e.g. primarily hosting API Model Endpoints vs Exploratory Data Science in Notebooks vs Spark ETL in CML Jobs).
Step 4: Sum up all CPU, Memory, and GPU required per workspace at peak and average, then add 20%.
Step 5: Look up CPU, Memory, and GPU resources per AWS or Azure Instance types and estimate how many instances would be required to fit the sum from Step 4. Pick an Instance Type that will fit most of your average workloads with a reasonable instance count (i.e. within the 3-6 range) and your peak workloads with no more than 10 instances. If this is not possible, divide the workload further into two separate workspaces where one has the same or smaller instance types and the other has larger instance types.
Conclusions
In this article, we highlighted some of the most fundamental considerations for sizing a CML Workspace. In summary:
CML Workspaces are autoscaling Kubernetes clusters providing Workload Isolation. CML automatically deploys and manages the infrastructure resources for you and requires no knowledge or interaction with the Kubernetes resources under the hood.
When planning for the deployment of Workspaces it is important to keep in mind that multiple Workspaces can and should be deployed based on Use Case, Team, Function, and Resource Consumption estimates.
Generally, sizing a Workspace consists of an exercise of estimating average and peak consumption in terms of CML Resource Profiles and mapping the estimates to AWS or Azure Instance Types. Additional considerations such as workload type, Data Lake access and SLAs should be prioritized as decision factors.
... View more
06-05-2023
06:26 PM
In the last few years, Spark has become one of the default choices for Data Analysis, Engineering, and Machine Learning in Cloudera Machine Learning (CML). CML is the Cloud-Native autoscaling service in the Cloudera Data Platform (CDP) for secure and governed Machine Learning and interactive Data Analysis at Enterprise Scale.
In this article, we will share important considerations when using Spark in CML in order to help Developers and Data Scientists in their daily usage.
CML Sessions
A CML Workspace provides an autoscaling Kubernetes cluster for running workloads such as interactive notebook Sessions, as well as Batch Jobs, Experiments, Applications, and Models. These are available in the CML Project, a construct to isolate users in their own environments.
Workspace implementation details are outside of this article’s scope. Luckily neither the CML User nor Admin is required to know these or operate Kubernetes at all. Under the covers, CDP provides the necessary infrastructure and integrations with the Shared Data Experience so that the Workspace is as easy to deploy as the click of a button.
When the CML User launches Sessions, Jobs, or other types of workloads, the user is prompted to select a Resource Profile, a Runtime, and an Editor.
The Resource Profile is the amount of CPU, Memory, and optionally GPU resources that are assigned to the Session when it is launched. The Runtime consists of all the dependencies (.e.g Pandas, Numpy, Tensorflow, etc.) that are loaded into the Session Pod by default. Finally, the user can select from predefined editors such as RStudio, JupyterLabs, and VS Code, or optionally select a custom editor if one has been installed. As an alternative, the user can deploy a local IDE such as IntelliJ or VSCode and connect to CML remotely.
CML Sessions and Spark
CML beginners often ask how CML Sessions relate to Spark Sessions and Applications. The two are fundamentally different concepts.
As we saw in the previous section when a CML Session is launched a Pod is deployed with the allotted Resource Profile. Due to CML’s flexibility, Sessions can execute workloads with any open-source framework in languages such as Scala, Python, R, Julia, or Java.
This is due to the fact that Sessions rely on CML Runtimes which can be thought of as a virtual machine customized to have all the necessary dependencies to access the computing cluster while keeping each project’s environment entirely isolated.
Under the hood, CML deploys a container in the Kubernetes cluster with the assigned Resource Profile CPU-Memory and mounts Runtime dependencies.
Spark is just one of the different frameworks that one could use within a Session. For example, a CML User could create a Spark Application from his or her Session Notebook. In this event, CML will deploy the Spark Application in Client Mode thus requesting additional Pods to run Spark Executors in from CML and the Kubernetes cluster.
Picking a Resource Profile
It’s very important to notice that only the Spark Driver will run within the Session while the Spark Executors will have their own resources. This implies that there is no need to launch a Session with a large Resource Profile when trying to run Spark Applications at scale. The Session Resource Profile should be picked primarily on the basis of the Spark Driver’s necessary profile.
If the user wishes to deploy more than one Spark Application from the same CML Session, these will be managed by the same Spark Driver. There is at most only one Spark Driver per Session.
Overridden Spark Configurations
It’s important to note that most Spark Configurations are left untouched by CML. The Spark on Kubernetes public documentation should thus serve as a primary reference when setting Spark Session Configs. However, the following Spark options are overridden by CML and should never be modified by the Spark Developer:
spark.driver.host
spark.driver.port
spark.blockmanager.port
So in general what important Spark Configurations should be set when using Spark in CML?
Spark Dynamic Allocation is enabled by default in CML. We recommend using it for exploring datasets in interactive sessions or running large scale Data Engineering Spark jobs while keeping an eye on the “spark.dynamicAllocation.maxExecutors” property in order to prevent runaway charges.
The "spark.executor.memory" , "spark.executor.instances" , "spark.executor.cores" , "spark.driver.memory" properties are also very important as they allow you to predefine the amount of resources allocated to your Spark jobs. However, unlike Spark on Yarn clusters, CML provides dramatically more flexibility when it comes to deploying cloud resources. Because Spark clusters in CML are ephemeral and run in isolated environments, you will have less resource contention and generally you won't have to carefully evaluate the impact of your jobs on someone else's workloads.
For an introduction to tuning Spark jobs and tuning resource allocation visit this guide .
spark = SparkSession\
.builder\
.appName("DistributedSession")\
.config("spark.dynamicAllocation.minExecutors", 1)\
.config("spark.dynamicAllocation.maxExecutors", 4)\
.config("spark.executor.memory","2g")\
.config("spark.executor.instances","4")\
.config("spark.executor.cores","8")\
.config("spark.driver.memory","2g")\
.getOrCreate()
To access data in Cloud Storage you should set the “spark.yarn.access.hadoopFileSystems” config to your Storage location. This value is shown in the CDP Management Console Summary tab of the CDP Environment tied to your CML Workspace. Notice that the "spark.hadoop.fs.s3a.s3guard.ddb.region" option is no longer necessary in recent CML versions. spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.yarn.access.hadoopFileSystems", "s3a://gd-uat/")\
.getOrCreate()
Spark in CML runs in Client mode by default. Generally local mode is not recommended unless you prototyping a small use case in a notebook. spark = SparkSession\
.builder\
.master("local")\
.appName("SimpleSession")\
.getOrCreate()
If using Apache Iceberg, you must select an Iceberg compatible Runtime and set "spark.sql.extensions", "spark.sql.catalog.spark_catalog", "spark.sql.catalog.local", "spark.sql.catalog.local.type", "spark.sql.catalog.spark_catalog.type" as shown below: spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type","hadoop") \
.config("spark.sql.catalog.spark_catalog.type","hive") \
.getOrCreate()
As an alternative to the above, we recommend using Spark Data Connections to launch a Spark Session and directly connect to your Data Lake. If you use an Iceberg compatible Spark Runtime Add-On the Iceberg configs will be set automatically. Notice the Cloud Storage config is not required as it is set by default. import cml.data_v1 as cmldata
from pyspark import SparkContext
#Optional Spark Configs
SparkContext.setSystemProperty('spark.executor.cores', '4')
SparkContext.setSystemProperty('spark.executor.memory', '8g')
#Boilerplate Code provided to you by CML Data Connections
CONNECTION_NAME = "go01-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()
# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show()
Using Different Spark Versions
When running distributed workloads at scale in CML Spark is generally easier to use than others. First of all, other frameworks run in a single Session and require using the Workers API to distribute runs across multiple nodes. Instead, Spark on Kubernetes runs in client mode and deploys as many Executor containers as needed by default.
In addition, Spark dependencies can be injected into the Session via CML Runtime Add-Ons. In simple terms, this means that you can launch a Session with or without Spark at the click of a button in a form. Behind the scenes, the Spark Add-On bits are loaded on the fly.
An additional benefit of this is that the end user can pick between Spark versions when launching the CML Session using a form rather than having to install or customize any project dependencies (or even run multiple sessions with different versions of Spark at the same time). As of time of this writing, CML supports Spark 2.4.8 and 3.2.0. The versions are managed at the Workspace level and depend on the Workspace version.
Deploying a SparkML Model as an API Endpoint
To support Production Machine Learning CML features “Models” i.e. a Model Management Tool to deploy models in API Endpoints, track requests and responses, and much more.
Typically a CML Model deploys a Script in a running Pod and requires loading all necessary model artifacts such as “pkl” or “joblib” files from Local or Cloud Storage. The model is wrapped by an arbitrary function that takes the Request JSON as input, processes it via the model’s “predict” method (or equivalents), and finally returns a Response.
While this is fairly straightforward with frameworks such as SciKit-Learn and Dask, SparkML models are more challenging as they require running a SparkSession which does not lend itself well to the latency required by an API Endpoint. To avoid this, a number of projects and formats have been developed such as MLeap, PMML, and more.
In CML, we recommend using the mlflow.spark module to create MLFlow Experiments. If the Developer chooses to log Model Artifacts via the MLFlow Tracking API these are stored in each Experiment Run for later access. If the developer then opts to register an Experiment Run to the MLFlow Registry, CML will provide an option to deploy the model from the Registry to the CML API Endpoint without having to craft a script for it. MLFlow will automatically convert the SparkML Model to the pyfunc flavor. As of the time of this writing, MLFlow Registry is in Tech Preview and we recommend reaching out to your Cloudera Account Team if you need help deploying it.
As an alternative, we recommend using the ONNX format to serialize the SparkML Model. Once converted, the model can be loaded into the CML Model Pod and wrapped into a function as explained above. This GitHub repository provides an example for doing so. First, convert the model from SparkML to ONNX as shown in this notebook . Then, deploy it to a CML Model with a script similar to this .
Conclusions
In this article, we highlighted some of the most frequently asked questions about using Spark in CML. In summary:
CML allows you to easily deploy small and large Spark Applications via CML Sessions.
CML supports Spark on Kubernetes with Spark Versions 2.4.8 and 3.2.0. Spark version support is subject to change and depends on CML Workspace versions.
CML Sessions are different from Spark Applications. The Spark Driver runs in the CML Session while the Spark Executors run in separate Kubernetes Pods.
Most Spark Configurations in CML are not overridden except for the three above which should never be edited by the CML User.
If you want to deploy a SparkML Model to an API Endpoint do not launch a Spark Application in it. Instead, default to MLFlow’s Model Registry to deploy models. If you prefer not to use MLFlow convert the SparkML model to a format such as ONNX.
... View more
08-19-2021
06:24 AM
Hello, I am running this from the company network and I believe we have some sort of certificate for using cloudera-impala. When I copy the URL from the impala_prod it gives me at the end also a uid(which is my ID) and a password which is a standard password(not given by me at any point in time). So when I run this script this is the error I recieve: java.sql.SQLException: java.sql.SQLException: [Cloudera][ImpalaJDBCDriver](500170) Error occurred while setting up ALTUS Dynamic Discovery: Unable to load credentials from provider files. Do you have any ideas how can I fix this?
... View more
04-29-2020
07:35 PM
1 Kudo
I think this answer may have been edited since I originally looked at it. I don't recall seeing that link to the Scala tutorial and the suggestion that importing should work. This still does not work for me. If I follow the code in that tutorial, I get: import bobsdelights.Fruit
Name: Unknown Error
Message: <console>:80: error: not found: value bobsdelights
import bobsdelights.Fruit
^ Have you confirmed that you are able to import local Scala code from one file to another without first executing that first file in the shell? The intention is to be able to run an experiment and I can only submit one script via the experiment UI.
... View more
04-15-2020
05:01 PM
Hi @ChineduLB , You can use `.groupBy` and `concat_ws(",",collect_list)` functions and to generate `ID` use `row_number` window function. val df=Seq(("1","User1","Admin"),("2","User1","Accounts"),("3","User2","Finance"),("4","User3","Sales"),("5","User3","Finance")).toDF("ID","USER","DEPT") import org.apache.spark.sql.expressions.Window df.groupBy("USER"). agg(concat_ws(",",collect_list("DEPT")).alias("DEPARTMENT")). withColumn("ID",row_number().over(w)). select("ID","USER","DEPARTMENT").show()
... View more