Created on 12-20-201807:21 PM - edited 09-16-202201:45 AM
This article is designed to the steps need to convert an existing Spark pipeline model into a MLeap bundle. To follow the steps required to build the pipeline model used in the tutorial, please download the complete Apache Zeppelin notebook here on my GitHub site.
This dependency is required to convert Spark models into MLeap bundles. in the following section of code, decency files are added to my Zeppelin notebook. Keep in mind this document needs to be present on Maven.
%dep
//In Zeppelin
//Run this before initializing Spark Context
z.load("ml.combust.mleap:mleap-spark_2.11:0.13.0")
Step 2 - Build Pipeline Model
The Spark pipeline model needs to be built before it can be converted. In this section of code, a Spark pipeline model is built from previously defined stages. To view the complete code list, please download the notebook from my Github site. A pipeline model will be built once it is fitted against a Spark dataframe.
%spark2
//In Zeppelin
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
// Build pipeline from previously defined stages
val pipeline = new Pipeline().setStages(Array(TFtokenizer, hashingTF, TFIDF))
// Fit pipeline on a dataframe
val PipeLineModel = pipeline.fit(df_abstracts)
// Save Pipeline to Disk (Optional)
PipeLineModel.write.overwrite().save("/tmp/spark-pipelineModel-tfidf")
// Save the dataframe's schema
val schema = df_abstracts.schema
Step 3 - Export Spark Pipeline Model Into MLeap Bundle (Zip and Directory)
Using the MLeap libraries, the Spark pipeline model can be converted into MLeap bundle. In the following section of code, the Spark pipeline model is converted into a MLeap bundle. I have included code for zip and directory serialization. Check the file system to see if the files were created.
%spark2
//In Zeppelin
import ml.combust.bundle.BundleFile
import ml.combust.bundle.serializer.SerializationFormat
import org.apache.spark.ml.mleap.SparkUtil
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.bundle._
import resource._
//Init Spark Bundle Context to MLeap
val sbc = SparkBundleContext().withDataset(PipeLineModel.transform(df_abstracts))
// Serialize Pipeline to Zip
// *** This will fail if the file exist ***
(for(bundlefile <- managed(BundleFile("jar:file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline.zip"))) yield {PipeLineModel.writeBundle.save(bundlefile)(sbc).get}).tried.get
//Serialize Pipeline to Directory
for(bundle <- managed(BundleFile("file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline-dir"))) {PipeLineModel.writeBundle.format(SerializationFormat.Json).sav(bundle)}
Step 4 - Import MLeap Bundle (Optional)
An optional step is to load the converted MLeap bundle. In the follow section of code, the MLeap bundle can be loaded from file.
%spark2
//In Zeppelin
import ml.combust.bundle.BundleFile
import ml.combust.mleap.runtime.MleapSupport._
import resource._
// Deserialize a zip bundle
val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline.zip"))) yield { bundleFile.loadMleapBundle().get}).opt.get