Community Articles

Find and share helpful community-sourced technical articles.
avatar
Contributor

This article is designed to the steps need to convert an existing Spark pipeline model into a MLeap bundle. To follow the steps required to build the pipeline model used in the tutorial, please download the complete Apache Zeppelin notebook here on my GitHub site.

Tools Used In Tutorial

Spark 2.3

MLeap Spark 0.13

Step 1 - Load MLeap Spark Dependencies

This dependency is required to convert Spark models into MLeap bundles. in the following section of code, decency files are added to my Zeppelin notebook. Keep in mind this document needs to be present on Maven.

%dep
//In Zeppelin

//Run this before initializing Spark Context
z.load("ml.combust.mleap:mleap-spark_2.11:0.13.0")


Step 2 - Build Pipeline Model

The Spark pipeline model needs to be built before it can be converted. In this section of code, a Spark pipeline model is built from previously defined stages. To view the complete code list, please download the notebook from my Github site. A pipeline model will be built once it is fitted against a Spark dataframe.

%spark2
//In Zeppelin

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType

// Build pipeline from previously defined stages
val pipeline = new Pipeline().setStages(Array(TFtokenizer, hashingTF, TFIDF))

// Fit pipeline on a dataframe
val PipeLineModel = pipeline.fit(df_abstracts)

// Save Pipeline to Disk (Optional)
PipeLineModel.write.overwrite().save("/tmp/spark-pipelineModel-tfidf")

// Save the dataframe's schema
val schema = df_abstracts.schema


Step 3 - Export Spark Pipeline Model Into MLeap Bundle (Zip and Directory)

Using the MLeap libraries, the Spark pipeline model can be converted into MLeap bundle. In the following section of code, the Spark pipeline model is converted into a MLeap bundle. I have included code for zip and directory serialization. Check the file system to see if the files were created.

%spark2
//In Zeppelin 

import ml.combust.bundle.BundleFile
import ml.combust.bundle.serializer.SerializationFormat
import org.apache.spark.ml.mleap.SparkUtil
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.bundle._
import resource._

 //Init Spark Bundle Context to MLeap
  val sbc = SparkBundleContext().withDataset(PipeLineModel.transform(df_abstracts))

// Serialize Pipeline to Zip
// *** This will fail if the file exist ***
  (for(bundlefile <- managed(BundleFile("jar:file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline.zip"))) yield {PipeLineModel.writeBundle.save(bundlefile)(sbc).get}).tried.get

 //Serialize Pipeline to Directory
  for(bundle <- managed(BundleFile("file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline-dir"))) {PipeLineModel.writeBundle.format(SerializationFormat.Json).sav(bundle)}


Step 4 - Import MLeap Bundle (Optional)

An optional step is to load the converted MLeap bundle. In the follow section of code, the MLeap bundle can be loaded from file.

%spark2
//In Zeppelin

import ml.combust.bundle.BundleFile
import ml.combust.mleap.runtime.MleapSupport._
import resource._

// Deserialize a zip bundle
val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline.zip"))) yield { bundleFile.loadMleapBundle().get}).opt.get

+

2,512 Views