Community Articles

Find and share helpful community-sourced technical articles.
avatar
Rising Star

Objective

This article provides a Quickstart for using XGBoost with Spark in Cloudera Machine Learning. CML users who are looking to test a distributed version of XGBoost are welcome to use the code in their environments. 

The notebook is available in this Git repository.

 Cloudera Machine Learning

Cloudera Machine Learning (CML) on the Cloudera Data Platform increases data scientists' productivity by providing a single, unified platform that is all-inclusive for powering any AI use case.

CML is purpose-built for agile experimentation and production ML workflows. It is designed to manage everything from data preparation to MLOps and predictive reporting. 

For more information on CML, please visit this page.

XGBoost and Spark

In Machine Learning, Decision Tree methods are a type of Supervised Learning model. They have been used for decades in both classification and regression tasks. There are many types; generally, they are constructed by identifying ways to split data into hierarchical structures. Data is split into leaf nodes from the top of the tree by using features in order of predictive importance. Decision trees are highly interpretable but are known to suffer from a tendency to overfit the data.

Ensemble methods combine multiple learners to increase model performance. Anyone can create one by combining existing models for a given task but typically they are applied to Decision Trees. Ensemble methods such as Random Forests, bagging, and Gradient Boosting iteratively reward good base learners to ultimately yield models with better accuracy and lower variance.

XGBoost stands for Extreme Gradient Boosting and is a scalable, distributed gradient-boosted decision tree (GBDT) machine learning library. It is very popular among data scientists because of its consistent results across many Kaggle competitions and research projects in academia.

Apache Spark is a powerful open-source engine for big data processing and analytics. Combining XGBoost and Spark allows you to leverage the model performance gains provided by the former while distributing the work to the latter. This can dramatically improve the quality and performance of your Machine Learning models.

Using the example in CML

 Before launching the notebook in a CML Session we recommend ensuring that the “Enable CPU Bursting”  workspace option is set in the “Site Administration” -> “Runtime” page (only CML Admins have access to it). This configuration controls if CML pod specifications for CML sessions receive a resource limit. In other words, when enabled the Spark Executor Cores and Memory are not limited by the CML Session Resource Profile.

Next, launch a CML Session with the following Options and Resource Profiles:

 

Editor: JupyterLab

Kernel: Python 3.8 or above

Edition: Standard

Version: any version ok

Enable Spark: Spark 3.2.0 and above ok

Resource Profile: 2vCPU / 4GiB Memory - 0 GPU

 

Then open the terminal and install the project requirements with:

 

pip3 install -r requirements.txt

 

In the notebook ensure to update the SparkSession properties according to your CDP Environment in the second cell. If your CML Workspace is in AWS you will have to set both  "spark.hadoop.fs.s3a.s3guard.ddb.region" and "spark.kerberos.access.hadoopFileSystems".

If your CML Workspace is in Azure, OCP, or Cloudera ECS you only need to set "spark.kerberos.access.hadoopFileSystems" and can remove the other property.

The correct values for both options are available in the CDP Management Console under Environment Configurations. If you have issues finding these please contact your CDP Administrator.

No other changes are required. You can select “Run All Cells” from the “Run” option in the JupyterLab menu and follow along as the code outputs are populated in each cell. 

Code Walk-Through

This section will highlight the most important code in the notebook.

  • Cell 1: The "SparkXGBClassifier" class is imported from the "xgboost.spark" module. XGBoost also provides a "SparkXGBRegressor" class for Regression tasks

 

from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.linalg import Vectors

 

  • Cell 2: The SparkSession object is created. The "xgboost.spark" module requires disabling Spark Dynamic Allocation. Therefore we set four Executors with basic Memory and Core configurations.

 

spark = SparkSession\
    .builder\
    .appName("SparkXGBoostClassifier Example")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region", "us-east-2")\
    .config("spark.kerberos.access.hadoopFileSystems","s3a://go01-demo")\
    .config("spark.dynamic allocation.enabled", "false")\
    .config("spark.executor.cores", "4")\
    .config("spark.executor.memory", "4g")\
    .config("spark.executor.instances", "2")\
    .config("spark.driver.core","4")\
    .config("spark.driver.memory","4g")\
    .getOrCreate()

 

  • Cell 3: The code to reach the Spark UI in CML. Uncomment and run the cell and open the URL provided in the output to follow along in the Spark UI.

 

import os
print("https://spark-"+os.environ["CDSW_ENGINE_ID"]+"."+os.environ["CDSW_DOMAIN"])

 

  • Cell 4 and 6: Two basic Spark Dataframes are created as training and test data.

 

df_train = spark.createDataFrame([
    (Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
    (Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1, False, 2.0),
    (Vectors.dense(4.0, 5.0, 6.0), 0, True, 1.0),
    (Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, True, 2.0),
], ["features", "label", "isVal", "weight"])

df_test = spark.createDataFrame([
    (Vectors.dense(1.0, 2.0, 3.0), ),
], ["features"])

 

  • Cell 7: An object of the "SparkXGBClassifier" class is instantiated with default hyperparameters. Notice the "num_workers" option is set to 2. This value should be set to the number of Executors you want to distribute your SparkXGBoost Application across.

 

xgb_classifier = SparkXGBClassifier(max_depth=5, missing=0.0,
    validation_indicator_col='isVal', weight_col='weight',
    early_stopping_rounds=1, eval_metric='logloss', num_workers=2)

 

  • Cell 8: The classifier is trained on the training data

 

xgb_clf_model = xgb_classifier.fit(df_train)

 

  • Cell 9: The classifier is used to run inference on the test data.

 

xgb_clf_model.transform(df_test).show()

 

Classifier Sample Prediction on Test Data:

 

+-------------+-------------+----------+-----------+

| features|rawPrediction|prediction|probability|

+-------------+-------------+----------+-----------+

|[1.0,2.0,3.0]| [-0.0,0.0]| 0.0| [0.5,0.5]|

+-------------+-------------+----------+-----------+

 

Summary and Next Steps

This basic example provided a Quickstart for Distributed XGBoost with PySpark in Cloudera Machine Learning.

Integrating XGBoost and Spark provides data scientists with the opportunity to leverage the strengths of both. XGBoost is used to achieve high model performance. Spark is used to distribute advanced Machine Learning algorithms across large clusters of worker nodes.

If you use CML you may also benefit from the following projects:

  • Telco Churn Demo: Build an End-to-end ML Project in CML and increase ML Explainability with the LIME library.
  • Learn how to use Cloudera Applied ML Prototypes to discover more projects using MLFlow, Streamlit, Tensorflow, PyTorch, and many more.
  • CSA2CML: Build a real-time anomaly detection dashboard with Flink, CML, and Streamlit.
  • SDX2CDE: Explore ML Governance and Security features in SDX to increase legal compliance and enhance ML Ops best practices.
  • APIv2: Familiarize yourself with APIv2, CML's go-to Python library for ML Ops and DevOps.
2,576 Views
0 Kudos