Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Master Collaborator

As the release of Spark 2.0 finally came, the machine learning library of Spark has been changed from the mllib to ml. One of the biggest change in the new ml library is the introduction of so-called machine learning pipeline. It provides a high level abstraction of the machine learning flow and greatly simplified the creation of machine learning process. In this tutorial, we will walk through the steps on how to create a machine learning pipeline and also explain what is under the hood in the pipeline. (Note: the code in this article is based off the technical preview ml library from Spark 1.6 and with minor change, they should run on Spark 2.0 as well)

In this tutorial, we will demonstrate the process to create a pipeline in Spark to predict airline flight delay. The dataset we use contain the airline flights information from 2007 and 2008. We use 2007 data as the training dataset and 2008 data as the testing dataset. We will predict flight delay using 2 classification models, logistic regression and decision tree. The tutorial will walk step-by-step how to use the new ml library transformers to do data munging and then chain them together with machine learning algorithm to create a pipeline to complete the whole process in one step. The tutorial is not meant to be a data science guideline on how to process data and choose machine learning model. It only meant to guide you on how to build Spark ml pipeline in Scala. The choices of data munging decisions as well as the models are not optimal from data science aspect.

Environment

We will use the Hortonworks HDP 2.4 sandbox for this tutorial. Please prepare the Sandbox following this instruction and go to the Zeppelin UI at http://127.0.0.1:9995 All the code example are executed inside Zeppelin notebook

Get the dataset

First, create a new Zeppelin notebook and name it whatever you like

7057-screen-shot-2016-08-29-at-32414-pm.png

Next, use the code below to download the dataset from internet and upload them to HDFS

%sh
wget http://stat-computing.org/dataexpo/2009/2007.csv.bz2 -O /tmp/flights_2007.csv.bz2
wget http://stat-computing.org/dataexpo/2009/2008.csv.bz2 -O /tmp/flights_2008.csv.bz2

hdfs dfs -mkdir /tmp/airflightsdelays

hdfs dfs -put /tmp/flights_2007.csv.bz2 /tmp/flights_2008.csv.bz2 /tmp/airflightsdelays/

Load data to DataFrame

Next, we will load the data from HDFS into Spark dataframe so it could be processed by ml pipeline. The new ml pipeline only process data inside dataframe, not in RDD like the old mllib. There are 2 dataframe being created, one for training data and one for testing data. A helper function is created to convert the military format time into a integer which is the number of minutes from midnight so we could use it as numeric feature. The detail about the dataset could be found here.

%spark
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.attribute.NominalAttribute
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
//calculate minuted from midnight, input is military time format 
def getMinuteOfDay(depTime: String) : Int = (depTime.toInt / 100).toInt * 60 + (depTime.toInt % 100)
//define schema for raw data
case class Flight(Month: String, DayofMonth: String, DayOfWeek: String, DepTime: Int, CRSDepTime: Int, ArrTime: Int, CRSArrTime: Int, UniqueCarrier: String, ActualElapsedTime: Int, CRSElapsedTime: Int, AirTime: Int, ArrDelay: Double, DepDelay: Int, Origin: String, Distance: Int)
val flight2007 = sc.textFile("/tmp/airflightsdelays/flights_2007.csv.bz2")
val header = flight2007.first
val trainingData = flight2007
                    .filter(x => x != header)
                    .map(x => x.split(","))
                    .filter(x => x(21) == "0")
                    .filter(x => x(17) == "ORD")
                    .filter(x => x(14) != "NA")
                    .map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt))
                    .toDF
trainingData.cache
val flight2008 = sc.textFile("/tmp/airflightsdelays/flights_2008.csv.bz2")
val testingData = flight2008
                    .filter(x => x != header)
                    .map(x => x.split(","))
                    .filter(x => x(21) == "0")
                    .filter(x => x(17) == "ORD")
                    .filter(x => x(14) != "NA")
                    .map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt))
                    .toDF
testingData.cache

Create feature transformers

The building blocks of ml pipeline are transformers and estimators.

Transformers

  • Transform one dataframe to another dataframe.

Estimators

  • Fit one dataframe and produce a model, which is a transformer.

Most Transforms are under org.apache.spark.ml.feature package and the code below shows 2 of them, StringIndexer and VectorAssembler.

StringIndexer converts String values that are part of a look-up into categorical indices, which could be used by machine learning algorithms in ml library. In the dataset we used, we will transfer month, day of month, day of week, carrier and origin airport code using this transformer. Notice we provide the input column name and the output column name as parameters at the time of initialization of the StringIndexer.

VectorAssembler constructs Vector from raw feature columns. Most ml machine learning algorithms take features in the form of vector.

import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler


//tranformor to convert string to category values
val monthIndexer = new StringIndexer().setInputCol("Month").setOutputCol("MonthCat")
val dayofMonthIndexer = new StringIndexer().setInputCol("DayofMonth").setOutputCol("DayofMonthCat")
val dayOfWeekIndexer = new StringIndexer().setInputCol("DayOfWeek").setOutputCol("DayOfWeekCat")
val uniqueCarrierIndexer = new StringIndexer().setInputCol("UniqueCarrier").setOutputCol("UniqueCarrierCat")
val originIndexer = new StringIndexer().setInputCol("Origin").setOutputCol("OriginCat")


//assemble raw feature
val assembler = new VectorAssembler()
                    .setInputCols(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "OriginCat", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "ActualElapsedTime", "CRSElapsedTime", "AirTime","DepDelay", "Distance"))
                    .setOutputCol("rawFeatures")

Create machine learning pipeline

In the last section, we created a few more transformers and defined the parameters for each of them so they operate on the input dataframe and produce desired output dataframe.

VectorSlicer takes a vector as input column and create a new vector which contain only part of the attributes of the original vector.

Binarizer create a binary value, 0 or 1, from input column of double value with a threshold.

StandardScaler is used to scale vector to a new vector which values are in similar scale.

We will initialize an estimator, the Logistic Regression Classifier, and chain them together in a machine learning pipeline. When the pipeline is used to fit training data, the transformers and estimator will apply to the dataframe in the sequence defined in the pipeline. The pipeline model produced by fitting the training data contains almost exactly the same process like the pipeline estimator. The testing data will go through the same data munging process except for the last step, where the original estimator in the pipeline estimator is replaced with a model, which is a transformer.

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StandardScaler
//vestor slicer
val slicer = new VectorSlicer().setInputCol("rawFeatures").setOutputCol("slicedfeatures").setNames(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "DepTime", "ArrTime", "ActualElapsedTime", "AirTime", "DepDelay", "Distance"))
//scale the features
val scaler = new StandardScaler().setInputCol("slicedfeatures").setOutputCol("features").setWithStd(true).setWithMean(true)
//labels for binary classifier
val binarizerClassifier = new Binarizer().setInputCol("ArrDelay").setOutputCol("binaryLabel").setThreshold(15.0)
//logistic regression
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("binaryLabel").setFeaturesCol("features")
// Chain indexers and tree in a Pipeline
val lrPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, slicer, scaler, binarizerClassifier, lr))
// Train model. 
val lrModel = lrPipeline.fit(trainingData)
// Make predictions.
val lrPredictions = lrModel.transform(testingData)
// Select example rows to display.
lrPredictions.select("prediction", "binaryLabel", "features").show(20)

The code below uses another estimator, Decision Tree Classifier. The pipeline is made up with different chain of transformers. But the concept is the same as the previous pipeline. You can see how easy it is to create a different pipeline using existing transformer.

PCA is a dimension reduction transformer.

VectorIndexer convert values inside vector that could be categorical indices to indices.

Bucktizer convert continuous value into categorical indices based on provided threshold.

import org.apache.spark.ml.feature.Bucketizer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.feature.PCA
//index category index in raw feature
val indexer = new VectorIndexer().setInputCol("rawFeatures").setOutputCol("rawFeaturesIndexed").setMaxCategories(10)
//PCA
val pca = new PCA().setInputCol("rawFeaturesIndexed").setOutputCol("features").setK(10)
//label for multi class classifier
val bucketizer = new Bucketizer().setInputCol("ArrDelay").setOutputCol("multiClassLabel").setSplits(Array(Double.NegativeInfinity, 0.0, 15.0, Double.PositiveInfinity))
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier().setLabelCol("multiClassLabel").setFeaturesCol("features")
// Chain all into a Pipeline
val dtPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, indexer, pca, bucketizer, dt))
// Train model. 
val dtModel = dtPipeline.fit(trainingData)
// Make predictions.
val dtPredictions = dtModel.transform(testingData)
// Select example rows to display.
dtPredictions.select("prediction", "multiClassLabel", "features").show(20)

Conclusion

Spark machine learning pipeline is a very efficient way of creating machine learning flow. It eliminates the needs to write a lot of boiler-plate code during the data munging process. It also guarantee the training data and testing data go through exactly the same data processing without any additional effort. The same pipeline concept has been implemented in many other popular machine learning library and glad to see it finally available in Spark.

How to run this tutorial with Spark 2.0

Since the tutorial was first published, Spark 2.x has gain lots of popularity and many users asked how to run this tutorial in Spark 2.x. With HDP 2.5 Sandbox, Spark 2.0 is included as Tech Preview. Because the version of Zeppelin in HDP 2.5 is not compatible with Spark 2.0 yet, the only to run the sample code is to use spark-shell in console

First you need to switch the version of Spark and launch spark-shell

export SPARK_MAJOR_VERSION=2
spark-shell

Once inside spark-shell, confirm Spark 2.x is the version used, you should see followings with the HDP 2.5 Sandbox

scala> sc.version
res5: String = 2.0.0.2.5.0.0-1245

The only code that need to be modified for the above tutorial code to work in side Spark 2.0 is a import line. spark.milib is the lib for Spark 1.x, just change it to spark.ml, all code would work in Spark2.0

import org.apache.spark.ml.linalg.Vectors

For reference, here are the code that works in Spark2.0 spark-shell

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.attribute.NominalAttribute
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}

def getMinuteOfDay(depTime: String) : Int = (depTime.toInt / 100).toInt * 60 + (depTime.toInt % 100)
case class Flight(Month: String, DayofMonth: String, DayOfWeek: String, DepTime: Int, CRSDepTime: Int, ArrTime: Int, CRSArrTime: Int, UniqueCarrier: String, ActualElapsedTime: Int, CRSElapsedTime: Int, AirTime: Int, ArrDelay: Double, DepDelay: Int, Origin: String, Distance: Int)
val flight2007 = sc.textFile("/tmp/airflightsdelays/flights_2007.csv.bz2")
val header = flight2007.first
val trainingData = flight2007.filter(x => x != header).map(x => x.split(",")).filter(x => x(21) == "0").filter(x => x(17) == "ORD").filter(x => x(14) != "NA").map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)).toDF
trainingData.cache
val flight2008 = sc.textFile("/tmp/airflightsdelays/flights_2008.csv.bz2")
val testingData = flight2008.filter(x => x != header).map(x => x.split(",")).filter(x => x(21) == "0").filter(x => x(17) == "ORD").filter(x => x(14) != "NA").map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)).toDF
testingData.cache

import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler

val monthIndexer = new StringIndexer().setInputCol("Month").setOutputCol("MonthCat")
val dayofMonthIndexer = new StringIndexer().setInputCol("DayofMonth").setOutputCol("DayofMonthCat")
val dayOfWeekIndexer = new StringIndexer().setInputCol("DayOfWeek").setOutputCol("DayOfWeekCat")
val uniqueCarrierIndexer = new StringIndexer().setInputCol("UniqueCarrier").setOutputCol("UniqueCarrierCat")
val originIndexer = new StringIndexer().setInputCol("Origin").setOutputCol("OriginCat")

val assembler = new VectorAssembler().setInputCols(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "OriginCat", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "ActualElapsedTime", "CRSElapsedTime", "AirTime","DepDelay", "Distance")).setOutputCol("rawFeatures")

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StandardScaler

val slicer = new VectorSlicer().setInputCol("rawFeatures").setOutputCol("slicedfeatures").setNames(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "DepTime", "ArrTime", "ActualElapsedTime", "AirTime", "DepDelay", "Distance"))
val scaler = new StandardScaler().setInputCol("slicedfeatures").setOutputCol("features").setWithStd(true).setWithMean(true)
val binarizerClassifier = new Binarizer().setInputCol("ArrDelay").setOutputCol("binaryLabel").setThreshold(15.0)
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("binaryLabel").setFeaturesCol("features")
val lrPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, slicer, scaler, binarizerClassifier, lr))
val lrModel = lrPipeline.fit(trainingData)
val lrPredictions = lrModel.transform(testingData)
lrPredictions.select("prediction", "binaryLabel", "features").show(20)

import org.apache.spark.ml.feature.Bucketizer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.feature.PCA

val indexer = new VectorIndexer().setInputCol("rawFeatures").setOutputCol("rawFeaturesIndexed").setMaxCategories(10)
val pca = new PCA().setInputCol("rawFeaturesIndexed").setOutputCol("features").setK(10)
val bucketizer = new Bucketizer().setInputCol("ArrDelay").setOutputCol("multiClassLabel").setSplits(Array(Double.NegativeInfinity, 0.0, 15.0, Double.PositiveInfinity))
val dt = new DecisionTreeClassifier().setLabelCol("multiClassLabel").setFeaturesCol("features")
val dtPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, indexer, pca, bucketizer, dt))
val dtModel = dtPipeline.fit(trainingData)
val dtPredictions = dtModel.transform(testingData)
dtPredictions.select("prediction", "multiClassLabel", "features").show(20)
46,054 Views
Comments
avatar
New Contributor

Hi, This example is good!! And can u pls show more about continue steps:How to do the evaluation(MulticlassMetrics) and hyperparameter tuning for the example abolve. Thanks!

avatar
New Contributor

Nice example and very useful

Thanks from Dubai Airport transfer 

https://www.busrentdubai.ae/dubai-airport-transfer/