Member since
11-28-2016
15
Posts
43
Kudos Received
0
Solutions
08-07-2023
07:09 AM
@ibrooks Excellent article here. I will definitely be using this in my CML Demos going foward. I just sent a PR w/ some flow improvements you may like to use as well.
... View more
12-20-2018
07:21 PM
4 Kudos
This article is designed to the steps need to convert an existing Spark pipeline model into a MLeap bundle. To follow the steps required to build the pipeline model used in the tutorial, please download the complete Apache Zeppelin notebook here on my GitHub site.
Tools Used In Tutorial
Spark 2.3
MLeap Spark 0.13
Step 1 - Load MLeap Spark Dependencies This dependency is required to convert Spark models into MLeap bundles. in the following section of code, decency files are added to my Zeppelin notebook. Keep in mind this document needs to be present on Maven. %dep
//In Zeppelin
//Run this before initializing Spark Context
z.load("ml.combust.mleap:mleap-spark_2.11:0.13.0")
Step 2 - Build Pipeline Model The Spark pipeline model needs to be built before it can be converted. In this section of code, a Spark pipeline model is built from previously defined stages. To view the complete code list, please download the notebook from my Github site. A pipeline model will be built once it is fitted against a Spark dataframe. %spark2
//In Zeppelin
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
// Build pipeline from previously defined stages
val pipeline = new Pipeline().setStages(Array(TFtokenizer, hashingTF, TFIDF))
// Fit pipeline on a dataframe
val PipeLineModel = pipeline.fit(df_abstracts)
// Save Pipeline to Disk (Optional)
PipeLineModel.write.overwrite().save("/tmp/spark-pipelineModel-tfidf")
// Save the dataframe's schema
val schema = df_abstracts.schema
Step 3 - Export Spark Pipeline Model Into MLeap Bundle (Zip and Directory) Using the MLeap libraries, the Spark pipeline model can be converted into MLeap bundle. In the following section of code, the Spark pipeline model is converted into a MLeap bundle. I have included code for zip and directory serialization. Check the file system to see if the files were created. %spark2
//In Zeppelin
import ml.combust.bundle.BundleFile
import ml.combust.bundle.serializer.SerializationFormat
import org.apache.spark.ml.mleap.SparkUtil
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.bundle._
import resource._
//Init Spark Bundle Context to MLeap
val sbc = SparkBundleContext().withDataset(PipeLineModel.transform(df_abstracts))
// Serialize Pipeline to Zip
// *** This will fail if the file exist ***
(for(bundlefile <- managed(BundleFile("jar:file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline.zip"))) yield {PipeLineModel.writeBundle.save(bundlefile)(sbc).get}).tried.get
//Serialize Pipeline to Directory
for(bundle <- managed(BundleFile("file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline-dir"))) {PipeLineModel.writeBundle.format(SerializationFormat.Json).sav(bundle)}
Step 4 - Import MLeap Bundle (Optional) An optional step is to load the converted MLeap bundle. In the follow section of code, the MLeap bundle can be loaded from file. %spark2
//In Zeppelin
import ml.combust.bundle.BundleFile
import ml.combust.mleap.runtime.MleapSupport._
import resource._
// Deserialize a zip bundle
val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline.zip"))) yield { bundleFile.loadMleapBundle().get}).opt.get
+
... View more
08-03-2018
06:54 PM
6 Kudos
This tutorial will be designed to extent my other work, and the goal is to provide the reader a frame work for a complete end-to-end solution for sentiment analysis on streaming Twitter data. The solution is going to use many different components in the HDF/HDP stack including NiFi, Solr, Spark, and Zeppelin, and it will also utilize libraries that are available in the Open Source Community. I have included the Reference Architecture, which depicts the order of events for the solution. *Note: Before starting this tutorial, the reader would be best served reading my other tutorials listed below: Twitter Sentiment using Spark Core NLP in Apache Zeppelin Connecting Solr to Spark - Apache Zeppelin Notebook *Prerequisites Established Apache Solr collections called "Tweets" and "TweetSentiment" Downloaded and installed Open Scoring Completed this Tutorial: Build and Convert a Spark NLP Pipeline into PMML in Apache Zeppelin Step 1 - Download and deploy NiFi flow. The NiFi Flow can be found here: tweetsentimentflow.xml Step 2 - In NiFi flow, configure "Get Garden Hose" NiFi processor with your valid Twitter API Credentials.
Customer Key Customer Secret Access Token Access Token Secret Step 3 - In NiFi flow, configure both of the "PutSolrContentStream" NiFi processors with location of your Solr instance. This is example the Solr instance is located on the same host. Step 4 - From the command line, start OpenScoring server. Please note, OpenScoring was downloaded to the /tmp directory in this example. $ java -jar /tmp/openscoring/openscoring-server/target/openscoring-server-executable-1.4-SNAPSHOT.jar --port 3333
Step 5 - Start building corpus of Tweets, which will be used to train the Spark Pipeline model in the following steps. In NiFi flow, turn on the processors that will move the raw Tweets into Solr. In the NiFi flow, make sure the "Get File" processor remains turned off until the model has been trained and deployed. Step 7 - Validate flow is working as expected by querying the Tweets Solr collection in the Solr UI. Step 8 - Follow the tutorial to Build and Convert a Spark NLP Pipeline into PMML in Apache Zeppelin and save PMML object as TweetPipeline.pmml to the same host that is running OpenScoring. Step 9 - Use OpenScoring to deploy PMML model based on Spark Pipeline. $ curl -X PUT --data-binary @TweetPipeline.pmml -H "Content-type: text/xml" http://localhost:3333/openscoring/model/TweetPipe Step 10 - In NiFi flow, configure the "InvokeHTTP" NiFi processor with the host information location of the OpenScoring API end point. Step 11 - In the NiFi Flow, enable all of the processors, and validate the flow is working as expected by querying the TweetSentiment Solr collection in the Solr UI.
... View more
Labels:
10-31-2018
12:13 AM
Hi Ian! Thanks for posting this article. At the "pmml.buildFile" I'm getting the following error: java.lang.IllegalArgumentException: iperbole_ Any ideas? Thank you very much!
... View more
10-29-2018
07:19 PM
Ian, thanks so much for such a great article. I'm getting the following error when trying to run step 9 -"error: not found: value sentiment" when trying to set "df_TweetSentiment". Could you help me with that? Many thanks!
... View more
05-23-2018
03:52 PM
4 Kudos
This article is designed to extend the great work by @Ali Bajwa: Sample HDF/NiFi flow to Push Tweets into Solr/Banana, HDFS/Hive
I have included the complete notebook on my Github site, which can be found here
Step 1 - Follow Ali's tutorial to establish an Apache Solr collection called "tweets"
Step 2 - Verify the version of Apache Spark being used, and visit the Solr-Spark connector site. The key is to match the version of Spark the version of the Solr-Spark connector. In the example below, the version of Spark is 2.2.0, and the connector version is 3.4.4
%spark2
sc
sc.version
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@617d134a
res1: String = 2.2.0.2.6.4.0-91
Step 3 - Include the Solr-Spark dependency in Zeppelin. Important note: This needs to be run before the Spark Context has been initialized. %dep
z.load("com.lucidworks.spark:spark-solr:jar:3.4.4")
//Must be used before SparkInterpreter (%spark2) initialized
//Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter
Step 4 - Run Solr query and return results into Spark DataFrame. Note: Zookeeper host might need to use full names: "zkhost" -> "host-1.domain.com:2181,host-2.domain.com:2181,host-3.domain.com:2181/solr", %spark2
val options = Map(
"collection" -> "Tweets",
"zkhost" -> "localhost:2181/solr",
// "query" -> "Keyword, 'More Keywords'"
)
val df = spark.read.format("solr").options(options).load
df.cache()
Step 5 - Review results of the Solr query %spark2
df.count()
df.printSchema()
df.take(1)
... View more
03-29-2018
09:06 PM
This is awesome.
... View more
06-21-2017
09:02 AM
I've also found that a Zeppelin style hides any geoJSON boundary type overlays. The offending rule seems to be the following, removing this in chrome dev tools makes the overlay appear. YMMV... ..\zeppelin-web\src\app\notebook\paragraph\paragraph.css
.paragraph div svg {
width: 100%;
}
... View more
03-22-2017
07:48 PM
1 Kudo
Varun, I do not believe this functionality exist for Zeppelin at this time. You would beed an additional query to complete this task.
... View more