Created on 08-31-2016 01:50 AM - edited 08-17-2019 10:32 AM
As the release of Spark 2.0 finally came, the machine learning library of Spark has been changed from the mllib to ml. One of the biggest change in the new ml library is the introduction of so-called machine learning pipeline. It provides a high level abstraction of the machine learning flow and greatly simplified the creation of machine learning process. In this tutorial, we will walk through the steps on how to create a machine learning pipeline and also explain what is under the hood in the pipeline. (Note: the code in this article is based off the technical preview ml library from Spark 1.6 and with minor change, they should run on Spark 2.0 as well)
In this tutorial, we will demonstrate the process to create a pipeline in Spark to predict airline flight delay. The dataset we use contain the airline flights information from 2007 and 2008. We use 2007 data as the training dataset and 2008 data as the testing dataset. We will predict flight delay using 2 classification models, logistic regression and decision tree. The tutorial will walk step-by-step how to use the new ml library transformers to do data munging and then chain them together with machine learning algorithm to create a pipeline to complete the whole process in one step. The tutorial is not meant to be a data science guideline on how to process data and choose machine learning model. It only meant to guide you on how to build Spark ml pipeline in Scala. The choices of data munging decisions as well as the models are not optimal from data science aspect.
We will use the Hortonworks HDP 2.4 sandbox for this tutorial. Please prepare the Sandbox following this instruction and go to the Zeppelin UI at http://127.0.0.1:9995 All the code example are executed inside Zeppelin notebook
First, create a new Zeppelin notebook and name it whatever you like
Next, use the code below to download the dataset from internet and upload them to HDFS
%sh wget http://stat-computing.org/dataexpo/2009/2007.csv.bz2 -O /tmp/flights_2007.csv.bz2 wget http://stat-computing.org/dataexpo/2009/2008.csv.bz2 -O /tmp/flights_2008.csv.bz2 hdfs dfs -mkdir /tmp/airflightsdelays hdfs dfs -put /tmp/flights_2007.csv.bz2 /tmp/flights_2008.csv.bz2 /tmp/airflightsdelays/
Next, we will load the data from HDFS into Spark dataframe so it could be processed by ml pipeline. The new ml pipeline only process data inside dataframe, not in RDD like the old mllib. There are 2 dataframe being created, one for training data and one for testing data. A helper function is created to convert the military format time into a integer which is the number of minutes from midnight so we could use it as numeric feature. The detail about the dataset could be found here.
%spark import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType,StructField,StringType} //calculate minuted from midnight, input is military time format def getMinuteOfDay(depTime: String) : Int = (depTime.toInt / 100).toInt * 60 + (depTime.toInt % 100) //define schema for raw data case class Flight(Month: String, DayofMonth: String, DayOfWeek: String, DepTime: Int, CRSDepTime: Int, ArrTime: Int, CRSArrTime: Int, UniqueCarrier: String, ActualElapsedTime: Int, CRSElapsedTime: Int, AirTime: Int, ArrDelay: Double, DepDelay: Int, Origin: String, Distance: Int) val flight2007 = sc.textFile("/tmp/airflightsdelays/flights_2007.csv.bz2") val header = flight2007.first val trainingData = flight2007 .filter(x => x != header) .map(x => x.split(",")) .filter(x => x(21) == "0") .filter(x => x(17) == "ORD") .filter(x => x(14) != "NA") .map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)) .toDF trainingData.cache val flight2008 = sc.textFile("/tmp/airflightsdelays/flights_2008.csv.bz2") val testingData = flight2008 .filter(x => x != header) .map(x => x.split(",")) .filter(x => x(21) == "0") .filter(x => x(17) == "ORD") .filter(x => x(14) != "NA") .map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)) .toDF testingData.cache
The building blocks of ml pipeline are transformers and estimators.
Transformers
Estimators
Most Transforms are under org.apache.spark.ml.feature package and the code below shows 2 of them, StringIndexer and VectorAssembler.
StringIndexer converts String values that are part of a look-up into categorical indices, which could be used by machine learning algorithms in ml library. In the dataset we used, we will transfer month, day of month, day of week, carrier and origin airport code using this transformer. Notice we provide the input column name and the output column name as parameters at the time of initialization of the StringIndexer.
VectorAssembler constructs Vector from raw feature columns. Most ml machine learning algorithms take features in the form of vector.
import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.ml.feature.VectorAssembler //tranformor to convert string to category values val monthIndexer = new StringIndexer().setInputCol("Month").setOutputCol("MonthCat") val dayofMonthIndexer = new StringIndexer().setInputCol("DayofMonth").setOutputCol("DayofMonthCat") val dayOfWeekIndexer = new StringIndexer().setInputCol("DayOfWeek").setOutputCol("DayOfWeekCat") val uniqueCarrierIndexer = new StringIndexer().setInputCol("UniqueCarrier").setOutputCol("UniqueCarrierCat") val originIndexer = new StringIndexer().setInputCol("Origin").setOutputCol("OriginCat") //assemble raw feature val assembler = new VectorAssembler() .setInputCols(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "OriginCat", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "ActualElapsedTime", "CRSElapsedTime", "AirTime","DepDelay", "Distance")) .setOutputCol("rawFeatures")
In the last section, we created a few more transformers and defined the parameters for each of them so they operate on the input dataframe and produce desired output dataframe.
VectorSlicer takes a vector as input column and create a new vector which contain only part of the attributes of the original vector.
Binarizer create a binary value, 0 or 1, from input column of double value with a threshold.
StandardScaler is used to scale vector to a new vector which values are in similar scale.
We will initialize an estimator, the Logistic Regression Classifier, and chain them together in a machine learning pipeline. When the pipeline is used to fit training data, the transformers and estimator will apply to the dataframe in the sequence defined in the pipeline. The pipeline model produced by fitting the training data contains almost exactly the same process like the pipeline estimator. The testing data will go through the same data munging process except for the last step, where the original estimator in the pipeline estimator is replaced with a model, which is a transformer.
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.Binarizer import org.apache.spark.ml.feature.VectorSlicer import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.StandardScaler //vestor slicer val slicer = new VectorSlicer().setInputCol("rawFeatures").setOutputCol("slicedfeatures").setNames(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "DepTime", "ArrTime", "ActualElapsedTime", "AirTime", "DepDelay", "Distance")) //scale the features val scaler = new StandardScaler().setInputCol("slicedfeatures").setOutputCol("features").setWithStd(true).setWithMean(true) //labels for binary classifier val binarizerClassifier = new Binarizer().setInputCol("ArrDelay").setOutputCol("binaryLabel").setThreshold(15.0) //logistic regression val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("binaryLabel").setFeaturesCol("features") // Chain indexers and tree in a Pipeline val lrPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, slicer, scaler, binarizerClassifier, lr)) // Train model. val lrModel = lrPipeline.fit(trainingData) // Make predictions. val lrPredictions = lrModel.transform(testingData) // Select example rows to display. lrPredictions.select("prediction", "binaryLabel", "features").show(20)
The code below uses another estimator, Decision Tree Classifier. The pipeline is made up with different chain of transformers. But the concept is the same as the previous pipeline. You can see how easy it is to create a different pipeline using existing transformer.
PCA is a dimension reduction transformer.
VectorIndexer convert values inside vector that could be categorical indices to indices.
Bucktizer convert continuous value into categorical indices based on provided threshold.
import org.apache.spark.ml.feature.Bucketizer import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.DecisionTreeClassifier import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.feature.PCA //index category index in raw feature val indexer = new VectorIndexer().setInputCol("rawFeatures").setOutputCol("rawFeaturesIndexed").setMaxCategories(10) //PCA val pca = new PCA().setInputCol("rawFeaturesIndexed").setOutputCol("features").setK(10) //label for multi class classifier val bucketizer = new Bucketizer().setInputCol("ArrDelay").setOutputCol("multiClassLabel").setSplits(Array(Double.NegativeInfinity, 0.0, 15.0, Double.PositiveInfinity)) // Train a DecisionTree model. val dt = new DecisionTreeClassifier().setLabelCol("multiClassLabel").setFeaturesCol("features") // Chain all into a Pipeline val dtPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, indexer, pca, bucketizer, dt)) // Train model. val dtModel = dtPipeline.fit(trainingData) // Make predictions. val dtPredictions = dtModel.transform(testingData) // Select example rows to display. dtPredictions.select("prediction", "multiClassLabel", "features").show(20)
Spark machine learning pipeline is a very efficient way of creating machine learning flow. It eliminates the needs to write a lot of boiler-plate code during the data munging process. It also guarantee the training data and testing data go through exactly the same data processing without any additional effort. The same pipeline concept has been implemented in many other popular machine learning library and glad to see it finally available in Spark.
Since the tutorial was first published, Spark 2.x has gain lots of popularity and many users asked how to run this tutorial in Spark 2.x. With HDP 2.5 Sandbox, Spark 2.0 is included as Tech Preview. Because the version of Zeppelin in HDP 2.5 is not compatible with Spark 2.0 yet, the only to run the sample code is to use spark-shell in console
First you need to switch the version of Spark and launch spark-shell
export SPARK_MAJOR_VERSION=2 spark-shell
Once inside spark-shell, confirm Spark 2.x is the version used, you should see followings with the HDP 2.5 Sandbox
scala> sc.version res5: String = 2.0.0.2.5.0.0-1245
The only code that need to be modified for the above tutorial code to work in side Spark 2.0 is a import line. spark.milib is the lib for Spark 1.x, just change it to spark.ml, all code would work in Spark2.0
import org.apache.spark.ml.linalg.Vectors
For reference, here are the code that works in Spark2.0 spark-shell
import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType,StructField,StringType} def getMinuteOfDay(depTime: String) : Int = (depTime.toInt / 100).toInt * 60 + (depTime.toInt % 100) case class Flight(Month: String, DayofMonth: String, DayOfWeek: String, DepTime: Int, CRSDepTime: Int, ArrTime: Int, CRSArrTime: Int, UniqueCarrier: String, ActualElapsedTime: Int, CRSElapsedTime: Int, AirTime: Int, ArrDelay: Double, DepDelay: Int, Origin: String, Distance: Int) val flight2007 = sc.textFile("/tmp/airflightsdelays/flights_2007.csv.bz2") val header = flight2007.first val trainingData = flight2007.filter(x => x != header).map(x => x.split(",")).filter(x => x(21) == "0").filter(x => x(17) == "ORD").filter(x => x(14) != "NA").map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)).toDF trainingData.cache val flight2008 = sc.textFile("/tmp/airflightsdelays/flights_2008.csv.bz2") val testingData = flight2008.filter(x => x != header).map(x => x.split(",")).filter(x => x(21) == "0").filter(x => x(17) == "ORD").filter(x => x(14) != "NA").map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)).toDF testingData.cache import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.ml.feature.VectorAssembler val monthIndexer = new StringIndexer().setInputCol("Month").setOutputCol("MonthCat") val dayofMonthIndexer = new StringIndexer().setInputCol("DayofMonth").setOutputCol("DayofMonthCat") val dayOfWeekIndexer = new StringIndexer().setInputCol("DayOfWeek").setOutputCol("DayOfWeekCat") val uniqueCarrierIndexer = new StringIndexer().setInputCol("UniqueCarrier").setOutputCol("UniqueCarrierCat") val originIndexer = new StringIndexer().setInputCol("Origin").setOutputCol("OriginCat") val assembler = new VectorAssembler().setInputCols(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "OriginCat", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "ActualElapsedTime", "CRSElapsedTime", "AirTime","DepDelay", "Distance")).setOutputCol("rawFeatures") import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.Binarizer import org.apache.spark.ml.feature.VectorSlicer import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.StandardScaler val slicer = new VectorSlicer().setInputCol("rawFeatures").setOutputCol("slicedfeatures").setNames(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "DepTime", "ArrTime", "ActualElapsedTime", "AirTime", "DepDelay", "Distance")) val scaler = new StandardScaler().setInputCol("slicedfeatures").setOutputCol("features").setWithStd(true).setWithMean(true) val binarizerClassifier = new Binarizer().setInputCol("ArrDelay").setOutputCol("binaryLabel").setThreshold(15.0) val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("binaryLabel").setFeaturesCol("features") val lrPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, slicer, scaler, binarizerClassifier, lr)) val lrModel = lrPipeline.fit(trainingData) val lrPredictions = lrModel.transform(testingData) lrPredictions.select("prediction", "binaryLabel", "features").show(20) import org.apache.spark.ml.feature.Bucketizer import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.DecisionTreeClassifier import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.feature.PCA val indexer = new VectorIndexer().setInputCol("rawFeatures").setOutputCol("rawFeaturesIndexed").setMaxCategories(10) val pca = new PCA().setInputCol("rawFeaturesIndexed").setOutputCol("features").setK(10) val bucketizer = new Bucketizer().setInputCol("ArrDelay").setOutputCol("multiClassLabel").setSplits(Array(Double.NegativeInfinity, 0.0, 15.0, Double.PositiveInfinity)) val dt = new DecisionTreeClassifier().setLabelCol("multiClassLabel").setFeaturesCol("features") val dtPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, indexer, pca, bucketizer, dt)) val dtModel = dtPipeline.fit(trainingData) val dtPredictions = dtModel.transform(testingData) dtPredictions.select("prediction", "multiClassLabel", "features").show(20)
Created on 01-24-2019 08:01 PM
Hi, This example is good!! And can u pls show more about continue steps:How to do the evaluation(MulticlassMetrics) and hyperparameter tuning for the example abolve. Thanks!
Created on 02-13-2020 08:06 PM - edited 02-13-2020 08:08 PM
Nice example and very useful
Thanks from Dubai Airport transfer