Created on 08-30-2023 05:07 PM - edited 09-06-2023 07:43 PM
Objective
This article provides a Quickstart for using XGBoost with Spark in Cloudera Machine Learning. CML users who are looking to test a distributed version of XGBoost are welcome to use the code in their environments.
The notebook is available in this Git repository.
Cloudera Machine Learning
Cloudera Machine Learning (CML) on the Cloudera Data Platform increases data scientists' productivity by providing a single, unified platform that is all-inclusive for powering any AI use case.
CML is purpose-built for agile experimentation and production ML workflows. It is designed to manage everything from data preparation to MLOps and predictive reporting.
For more information on CML, please visit this page.
XGBoost and Spark
In Machine Learning, Decision Tree methods are a type of Supervised Learning model. They have been used for decades in both classification and regression tasks. There are many types; generally, they are constructed by identifying ways to split data into hierarchical structures. Data is split into leaf nodes from the top of the tree by using features in order of predictive importance. Decision trees are highly interpretable but are known to suffer from a tendency to overfit the data.
Ensemble methods combine multiple learners to increase model performance. Anyone can create one by combining existing models for a given task but typically they are applied to Decision Trees. Ensemble methods such as Random Forests, bagging, and Gradient Boosting iteratively reward good base learners to ultimately yield models with better accuracy and lower variance.
XGBoost stands for Extreme Gradient Boosting and is a scalable, distributed gradient-boosted decision tree (GBDT) machine learning library. It is very popular among data scientists because of its consistent results across many Kaggle competitions and research projects in academia.
Apache Spark is a powerful open-source engine for big data processing and analytics. Combining XGBoost and Spark allows you to leverage the model performance gains provided by the former while distributing the work to the latter. This can dramatically improve the quality and performance of your Machine Learning models.
Using the example in CML
Before launching the notebook in a CML Session we recommend ensuring that the “Enable CPU Bursting” workspace option is set in the “Site Administration” -> “Runtime” page (only CML Admins have access to it). This configuration controls if CML pod specifications for CML sessions receive a resource limit. In other words, when enabled the Spark Executor Cores and Memory are not limited by the CML Session Resource Profile.
Next, launch a CML Session with the following Options and Resource Profiles:
Editor: JupyterLab
Kernel: Python 3.8 or above
Edition: Standard
Version: any version ok
Enable Spark: Spark 3.2.0 and above ok
Resource Profile: 2vCPU / 4GiB Memory - 0 GPU
Then open the terminal and install the project requirements with:
pip3 install -r requirements.txt
In the notebook ensure to update the SparkSession properties according to your CDP Environment in the second cell. If your CML Workspace is in AWS you will have to set both "spark.hadoop.fs.s3a.s3guard.ddb.region" and "spark.kerberos.access.hadoopFileSystems".
If your CML Workspace is in Azure, OCP, or Cloudera ECS you only need to set "spark.kerberos.access.hadoopFileSystems" and can remove the other property.
The correct values for both options are available in the CDP Management Console under Environment Configurations. If you have issues finding these please contact your CDP Administrator.
No other changes are required. You can select “Run All Cells” from the “Run” option in the JupyterLab menu and follow along as the code outputs are populated in each cell.
Code Walk-Through
This section will highlight the most important code in the notebook.
from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.linalg import Vectors
spark = SparkSession\
.builder\
.appName("SparkXGBoostClassifier Example")\
.config("spark.hadoop.fs.s3a.s3guard.ddb.region", "us-east-2")\
.config("spark.kerberos.access.hadoopFileSystems","s3a://go01-demo")\
.config("spark.dynamic allocation.enabled", "false")\
.config("spark.executor.cores", "4")\
.config("spark.executor.memory", "4g")\
.config("spark.executor.instances", "2")\
.config("spark.driver.core","4")\
.config("spark.driver.memory","4g")\
.getOrCreate()
import os
print("https://spark-"+os.environ["CDSW_ENGINE_ID"]+"."+os.environ["CDSW_DOMAIN"])
df_train = spark.createDataFrame([
(Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1, False, 2.0),
(Vectors.dense(4.0, 5.0, 6.0), 0, True, 1.0),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, True, 2.0),
], ["features", "label", "isVal", "weight"])
df_test = spark.createDataFrame([
(Vectors.dense(1.0, 2.0, 3.0), ),
], ["features"])
xgb_classifier = SparkXGBClassifier(max_depth=5, missing=0.0,
validation_indicator_col='isVal', weight_col='weight',
early_stopping_rounds=1, eval_metric='logloss', num_workers=2)
xgb_clf_model = xgb_classifier.fit(df_train)
xgb_clf_model.transform(df_test).show()
Classifier Sample Prediction on Test Data:
+-------------+-------------+----------+-----------+
| features|rawPrediction|prediction|probability|
+-------------+-------------+----------+-----------+
|[1.0,2.0,3.0]| [-0.0,0.0]| 0.0| [0.5,0.5]|
+-------------+-------------+----------+-----------+
Summary and Next Steps
This basic example provided a Quickstart for Distributed XGBoost with PySpark in Cloudera Machine Learning.
Integrating XGBoost and Spark provides data scientists with the opportunity to leverage the strengths of both. XGBoost is used to achieve high model performance. Spark is used to distribute advanced Machine Learning algorithms across large clusters of worker nodes.
If you use CML you may also benefit from the following projects: