Member since
11-28-2016
15
Posts
43
Kudos Received
0
Solutions
08-02-2023
07:47 AM
4 Kudos
Introduction:
This article is designed to get the reader to call a deployed CML model from Apache NiFi or Cloudera Data Flow as quickly as possible. It will require using one of Cloudera's Applied Machine Learning Prototypes (AMPs) on Customer Churn and the Apache NiFi Flow provided in this article. (Git Repo Here)
Prerequisites:
Apache NiFi instance is running.
CML's AMP Customer Churn App is running with a Churn Model deployed.
Download the user data file and Apache Nifi flow from this repo.
The user data file needs to be added to the file location where NiFi's GetFile Processor has permissions.
Launch CML Churn AMP
In the AMP catalog, select Churn Modeling with skit-learn, and configure a new project. This will take 5-7 minutes to complete. You will need to deploy a model inside of CML, which isn't in the scope of this article. Once you finish, you will be able to see the following screen, which will provide the URL location and API key for the model.
NiFi Flow Overview:
This article will include a single customer file in JSON and an Apache NiFi Data flow, which will need to be configured to call the model deployed in CML. The following screenshot displays the template once it has been added to the working canvas.
Processor Configuration:
GetFile Processor
In the GetFile processor, the input directory needs to contain the customer JSON file that is included with the article.
EvaluateJSON Processor
The included Data Flow file has this processor configured, but this step is essential to appreciate. This processor will take values from the JSON file and add them as attribute values. Once these values are an attribute, they can be used to call the model in the next step.
InvokeHTTP Processor
You will need to add the URL location of the model deployed in CML, which is pictured above. You will also need to include the API key with the model call in the image below:
Successful Results
Once the processors have been configured, the model can be called the results can be displayed by opening the queue and selecting the response value.
Here is an example of deployed model's response.
... View more
12-20-2018
07:21 PM
4 Kudos
This article is designed to the steps need to convert an existing Spark pipeline model into a MLeap bundle. To follow the steps required to build the pipeline model used in the tutorial, please download the complete Apache Zeppelin notebook here on my GitHub site.
Tools Used In Tutorial
Spark 2.3
MLeap Spark 0.13
Step 1 - Load MLeap Spark Dependencies This dependency is required to convert Spark models into MLeap bundles. in the following section of code, decency files are added to my Zeppelin notebook. Keep in mind this document needs to be present on Maven. %dep
//In Zeppelin
//Run this before initializing Spark Context
z.load("ml.combust.mleap:mleap-spark_2.11:0.13.0")
Step 2 - Build Pipeline Model The Spark pipeline model needs to be built before it can be converted. In this section of code, a Spark pipeline model is built from previously defined stages. To view the complete code list, please download the notebook from my Github site. A pipeline model will be built once it is fitted against a Spark dataframe. %spark2
//In Zeppelin
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
// Build pipeline from previously defined stages
val pipeline = new Pipeline().setStages(Array(TFtokenizer, hashingTF, TFIDF))
// Fit pipeline on a dataframe
val PipeLineModel = pipeline.fit(df_abstracts)
// Save Pipeline to Disk (Optional)
PipeLineModel.write.overwrite().save("/tmp/spark-pipelineModel-tfidf")
// Save the dataframe's schema
val schema = df_abstracts.schema
Step 3 - Export Spark Pipeline Model Into MLeap Bundle (Zip and Directory) Using the MLeap libraries, the Spark pipeline model can be converted into MLeap bundle. In the following section of code, the Spark pipeline model is converted into a MLeap bundle. I have included code for zip and directory serialization. Check the file system to see if the files were created. %spark2
//In Zeppelin
import ml.combust.bundle.BundleFile
import ml.combust.bundle.serializer.SerializationFormat
import org.apache.spark.ml.mleap.SparkUtil
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.bundle._
import resource._
//Init Spark Bundle Context to MLeap
val sbc = SparkBundleContext().withDataset(PipeLineModel.transform(df_abstracts))
// Serialize Pipeline to Zip
// *** This will fail if the file exist ***
(for(bundlefile <- managed(BundleFile("jar:file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline.zip"))) yield {PipeLineModel.writeBundle.save(bundlefile)(sbc).get}).tried.get
//Serialize Pipeline to Directory
for(bundle <- managed(BundleFile("file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline-dir"))) {PipeLineModel.writeBundle.format(SerializationFormat.Json).sav(bundle)}
Step 4 - Import MLeap Bundle (Optional) An optional step is to load the converted MLeap bundle. In the follow section of code, the MLeap bundle can be loaded from file. %spark2
//In Zeppelin
import ml.combust.bundle.BundleFile
import ml.combust.mleap.runtime.MleapSupport._
import resource._
// Deserialize a zip bundle
val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/MLeapModels/spark-nlp-tfidf-pipeline.zip"))) yield { bundleFile.loadMleapBundle().get}).opt.get
+
... View more
08-03-2018
06:54 PM
6 Kudos
This tutorial will be designed to extent my other work, and the goal is to provide the reader a frame work for a complete end-to-end solution for sentiment analysis on streaming Twitter data. The solution is going to use many different components in the HDF/HDP stack including NiFi, Solr, Spark, and Zeppelin, and it will also utilize libraries that are available in the Open Source Community. I have included the Reference Architecture, which depicts the order of events for the solution. *Note: Before starting this tutorial, the reader would be best served reading my other tutorials listed below: Twitter Sentiment using Spark Core NLP in Apache Zeppelin Connecting Solr to Spark - Apache Zeppelin Notebook *Prerequisites Established Apache Solr collections called "Tweets" and "TweetSentiment" Downloaded and installed Open Scoring Completed this Tutorial: Build and Convert a Spark NLP Pipeline into PMML in Apache Zeppelin Step 1 - Download and deploy NiFi flow. The NiFi Flow can be found here: tweetsentimentflow.xml Step 2 - In NiFi flow, configure "Get Garden Hose" NiFi processor with your valid Twitter API Credentials.
Customer Key Customer Secret Access Token Access Token Secret Step 3 - In NiFi flow, configure both of the "PutSolrContentStream" NiFi processors with location of your Solr instance. This is example the Solr instance is located on the same host. Step 4 - From the command line, start OpenScoring server. Please note, OpenScoring was downloaded to the /tmp directory in this example. $ java -jar /tmp/openscoring/openscoring-server/target/openscoring-server-executable-1.4-SNAPSHOT.jar --port 3333
Step 5 - Start building corpus of Tweets, which will be used to train the Spark Pipeline model in the following steps. In NiFi flow, turn on the processors that will move the raw Tweets into Solr. In the NiFi flow, make sure the "Get File" processor remains turned off until the model has been trained and deployed. Step 7 - Validate flow is working as expected by querying the Tweets Solr collection in the Solr UI. Step 8 - Follow the tutorial to Build and Convert a Spark NLP Pipeline into PMML in Apache Zeppelin and save PMML object as TweetPipeline.pmml to the same host that is running OpenScoring. Step 9 - Use OpenScoring to deploy PMML model based on Spark Pipeline. $ curl -X PUT --data-binary @TweetPipeline.pmml -H "Content-type: text/xml" http://localhost:3333/openscoring/model/TweetPipe Step 10 - In NiFi flow, configure the "InvokeHTTP" NiFi processor with the host information location of the OpenScoring API end point. Step 11 - In the NiFi Flow, enable all of the processors, and validate the flow is working as expected by querying the TweetSentiment Solr collection in the Solr UI.
... View more
Labels:
08-02-2018
04:13 PM
7 Kudos
This article is designed to extend my articles
Twitter Sentiment using Spark Core NLP in Apache Zeppelin and Connecting Solr to Spark - Apache Zeppelin Notebook
I have included the complete notebook on my Github site, which can be found on my
GitHub site.
Step 1 - Follow the tutorial in the provide articles above, and establish an Apache Solr collection called "tweets"
Step 2 - Verify the version of Apache Spark being used, and visit the
Solr-Spark connector site. The key is to match the version of Spark the version of the Solr-Spark connector. In the example below, the version of Spark is 2.2.0, and the connector version is 3.4.4 %spark2
sc
sc.version
</p><p>
Step 3 - Include the Solr-Spark dependency in Zeppelin. Important note: This needs to be run before
the Spark Context has been initialized</p>
<pre>%dep
z.load("com.lucidworks.spark:spark-solr:jar:3.4.4")
//Must be used before SparkInterpreter (%spark2) initialized
//Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter
Step 4 - Download the Stanford CoreNLP libraries found on here: <a href="http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip">http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip</a>
Upzip the download and move it to the /tmp directory. Note: This can be accomplished
on the command line or the following Zeppelin paragraph will work as well
%sh
wget /tmp/stanford-corenlp-full-2018-02-27.zip http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip unzip /tmp/stanford-corenlp-full-2018-02-27.zip
Step 5 - In Zeppelin's
Interpreters configurations for Spark, include the following artifact:
/tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar
Step 6 - Include the following Spark dependencies for Stanford CoreNLP and Spark CoreNLP. Important
note: This needs to be run before the Spark Context has been initialized
%dep
z.load("edu.stanford.nlp:stanford-corenlp:3.9.1")
//In Spark Interper Settings Add the following artifact
// /tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar
%dep
z.load("databricks:spark-corenlp:0.2.0-s_2.11")
Step 7 Include the following Spark dependencies for
JPMML-SparkML and JPMML-Model. Important note: This needs to be run before the Spark Context has been initialized. %dep
z.load("org.jpmml:jpmml-sparkml:1.4.5")
</p><pre>%dep
z.load("org.jpmml:pmml-model:1.4.5")
Step 8 - Run Solr query and return results into Spark DataFrame. Note: Zookeeper host might need to use full names:
"zkhost" -> "host-1.domain.com:2181,host-2.domain.com:2181,host-3.domain.com:2181/solr" %spark2
val options = Map( "collection" ->
"Tweets", "zkhost" -> "localhost:2181/solr",
// "query" -> "Keyword, 'More Keywords'"
)
val df = spark.read.format("solr").options(options).load df.cache()
</p><p>
Step 9 - Review results of the Solr query</p><pre>%spark2
df.count()
df.printSchema()
df.take(1)
</p><p>
Step 10 - Filter the
Tweets in the Spark DataFrame to ensure the Tweet text isn't null Once filter
has been completed, add the sentiment value to the tweets.</p>
<pre>%spark2
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.databricks.spark.corenlp.functions._
val df_TweetSentiment = df.filter("text_t is not null").select($"text_t", sentiment($"text_t").as('sentimentScore))
Step 11 - Valid
results
%spark2
df_TweetSentiment.cache()
df_TweetSentiment1.printSchema()
df_TweetSentiment1.take(1)
df_TweetSentiment1.count()
Step 12 - Build Stages
to build features that will be fed into a Logistic Regression model for
classification
Stage 1 -Regex Tokenizer will be used to separate
each word into individual "tokens"
Stage 2 -Count Vectorizer will count the number
of occurrences each token occurs in the text corpus
Stage 3 -Term frequency-inverse document
frequency (TF-IDF) is a feature vectorization method widely used in text mining
to reflect the importance of a term to a document in the corpus.
Stage 4 -Logistic
Regression for classification to predict sentiment score
%spark2
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer, RegexTokenizer, CountVectorizer, CountVectorizerModel}
import org.apache.spark.ml.classification.LogisticRegression
val tokenizer = new RegexTokenizer()
.setInputCol("text_t")
.setOutputCol("words")
.setPattern("\W+")
.setGaps(true)
val wordsData = tokenizer.transform(df_TweetSentiment)
val cvModel = new CountVectorizer()
.setInputCol("words").setOutputCol("rawFeatures")
.setMinDF(4)
.fit(wordsData)
val featurizedData = cvModel.transform(wordsData)
val idf = new IDF()
.setInputCol("rawFeatures")
.setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("sentimentScore", "features").show()
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
.setLabelCol("sentimentScore")
Step 13 - Build Spark Pipeline from Stages %spark2
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
val pipeline = new Pipeline()
.setStages(Array(tokenizer, cvModel, idfModel, lr))
val PipeLineModel = pipeline.fit(df_TweetSentiment)
//Save Pipeline to Disk (Optional)
PipeLineModel.write.overwrite().save("/tmp/spark-IDF-model")
val schema = df_TweetSentiment.schema Step 14 - Export Spark Pipeline to PMML using JPMML-SparkML %spark2
import org.jpmml.sparkml.PMMLBuilder
import java.io.File
val pmml = new PMMLBuilder(schema, PipeLineModel)
val file = pmml.buildFile(new File("/tmp/TweetPipeline.pmml"))
... View more
05-23-2018
06:00 PM
Something like this will do the trick assuming the column name is timestamp_s. This will create a new data frame with a new timestamp column added to it. The new timestamp format is defined by this string: "EEE MMM dd HH:mm:ss ZZZZZ yyyy" val df_second = df_First.withColumn("timestampZ", unix_timestamp($"timestamp_s","EEE MMM dd HH:mm:ss ZZZZZ yyyy").cast(TimestampType)).drop($"timestamp_s")
... View more
05-23-2018
03:58 PM
6 Kudos
This article is designed to extend the great work by @Ali Bajwa: Sample HDF/NiFi flow to Push Tweets into Solr/Banana, HDFS/Hive and my article Connecting Solr to Spark - Apache Zeppelin Notebook
I have included the complete notebook on my Github site, which can be found on my Github site.
Step 1 - Follow Ali's tutorial to establish an Apache Solr collection called "tweets"
Step 2 - Verify the version of Apache Spark being used, and visit the Solr-Spark connector site. The key is to match the version of Spark the version of the Solr-Spark connector. In the example below, the version of Spark is 2.2.0, and the connector version is 3.4.4
%spark2
sc
sc.version
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@617d134a
res1: String = 2.2.0.2.6.4.0-91
Step 3 - Include the Solr-Spark dependency in Zeppelin. Important note: This needs to be run before the Spark Context has been initialized.
%dep
z.load("com.lucidworks.spark:spark-solr:jar:3.4.4")
//Must be used before SparkInterpreter (%spark2) initialized
//Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter
Step 4 - Download the Stanford CoreNLP libraries found on here: http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip . Upzip the download and move it to the /tmp directory. Note: This can be accomplished on the command line or the following Zeppelin paragraph will work as well. %sh
wget -O /tmp/stanford-corenlp-full-2018-02-27.zip http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip
unzip /tmp/stanford-corenlp-full-2018-02-27.zip
Step 5 - In Zeppelin's Interpreters configurations for Spark, include the following artifact: /tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar Step 6 - Include the following Spark dependencies for Stanford CoreNLP and Spark CoreNLP. Important note: This needs to be run before the Spark Context has been initialized. %dep
z.load("edu.stanford.nlp:stanford-corenlp:3.9.1")
//In Spark Interper Settings Add the following artifact
// /tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar
%dep
z.load("databricks:spark-corenlp:0.2.0-s_2.11")
Step 7 - Run Solr query and return results into Spark DataFrame. Note: Zookeeper host might need to use full names: "zkhost" -> "host-1.domain.com:2181,host-2.domain.com:2181,host-3.domain.com:2181/solr",
%spark2
val options = Map(
"collection" -> "Tweets",
"zkhost" -> "localhost:2181/solr",
// "query" -> "Keyword, 'More Keywords'"
)
val df = spark.read.format("solr").options(options).load
df.cache()
Step 8 - Review results of the Solr query
%spark2
df.count()
df.printSchema()
df.take(1)
Step 9 - Filter the Tweets in the Spark DataFrame to ensure the timestamp and language aren't null. Once filter has been completed, add the sentiment value to the tweets. %spark2
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.databricks.spark.corenlp.functions._
val df_TweetSentiment = df.filter("text_t is not null and language_s = 'en' and timestamp_s is not null ").select($"timestamp_s", $"text_t", $"location", sentiment($"text_t").as('sentimentScore))
Step 10 - Correctly cast the timestamp value %spark2
val df_TweetSentiment1 = df_TweetSentiment.withColumn("timestampZ", unix_timestamp($"timestamp_s", "EEE MMM dd HH:mm:ss ZZZZZ yyyy").cast(TimestampType)).drop($"timestamp_s")
Step 11 - Valid results and create temporary table TweetSentiment df_TweetSentiment1.printSchema()
df_TweetSentiment1.take(1)
df_TweetSentiment1.count()
df_TweetSentiment1.cache()
df_TweetSentiment1.createOrReplaceTempView("TweetSentiment") Step 12 - Query the table TweetSentiment %sql
select sentimentScore, count(sentimentScore) from TweetSentiment group by sentimentScore
... View more
Labels:
05-23-2018
03:52 PM
4 Kudos
This article is designed to extend the great work by @Ali Bajwa: Sample HDF/NiFi flow to Push Tweets into Solr/Banana, HDFS/Hive
I have included the complete notebook on my Github site, which can be found here
Step 1 - Follow Ali's tutorial to establish an Apache Solr collection called "tweets"
Step 2 - Verify the version of Apache Spark being used, and visit the Solr-Spark connector site. The key is to match the version of Spark the version of the Solr-Spark connector. In the example below, the version of Spark is 2.2.0, and the connector version is 3.4.4
%spark2
sc
sc.version
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@617d134a
res1: String = 2.2.0.2.6.4.0-91
Step 3 - Include the Solr-Spark dependency in Zeppelin. Important note: This needs to be run before the Spark Context has been initialized. %dep
z.load("com.lucidworks.spark:spark-solr:jar:3.4.4")
//Must be used before SparkInterpreter (%spark2) initialized
//Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter
Step 4 - Run Solr query and return results into Spark DataFrame. Note: Zookeeper host might need to use full names: "zkhost" -> "host-1.domain.com:2181,host-2.domain.com:2181,host-3.domain.com:2181/solr", %spark2
val options = Map(
"collection" -> "Tweets",
"zkhost" -> "localhost:2181/solr",
// "query" -> "Keyword, 'More Keywords'"
)
val df = spark.read.format("solr").options(options).load
df.cache()
Step 5 - Review results of the Solr query %spark2
df.count()
df.printSchema()
df.take(1)
... View more
03-24-2017
06:00 PM
I haven't tried that yet. I have got markers from an existing file working at this point in time.
... View more