Created on 05-23-2018 03:58 PM - edited 08-17-2019 07:20 AM
This article is designed to extend the great work by @Ali Bajwa: Sample HDF/NiFi flow to Push Tweets into Solr/Banana, HDFS/Hive and my article Connecting Solr to Spark - Apache Zeppelin Notebook
I have included the complete notebook on my Github site, which can be found on my Github site.
Step 1 - Follow Ali's tutorial to establish an Apache Solr collection called "tweets"
Step 2 - Verify the version of Apache Spark being used, and visit the Solr-Spark connector site. The key is to match the version of Spark the version of the Solr-Spark connector. In the example below, the version of Spark is 2.2.0, and the connector version is 3.4.4
%spark2 sc sc.version res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@617d134a res1: String = 2.2.0.2.6.4.0-91
Step 3 - Include the Solr-Spark dependency in Zeppelin. Important note: This needs to be run before the Spark Context has been initialized.
%dep
z.load("com.lucidworks.spark:spark-solr:jar:3.4.4")
//Must be used before SparkInterpreter (%spark2) initialized
//Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter
Step 4 - Download the Stanford CoreNLP libraries found on here: http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip . Upzip the download and move it to the /tmp directory. Note: This can be accomplished on the command line or the following Zeppelin paragraph will work as well.
%sh wget -O /tmp/stanford-corenlp-full-2018-02-27.zip http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip unzip /tmp/stanford-corenlp-full-2018-02-27.zip
Step 5 - In Zeppelin's Interpreters configurations for Spark, include the following artifact: /tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar
Step 6 - Include the following Spark dependencies for Stanford CoreNLP and Spark CoreNLP. Important note: This needs to be run before the Spark Context has been initialized.
%dep
z.load("edu.stanford.nlp:stanford-corenlp:3.9.1")
//In Spark Interper Settings Add the following artifact
// /tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar
%dep
z.load("databricks:spark-corenlp:0.2.0-s_2.11")
Step 7 - Run Solr query and return results into Spark DataFrame. Note: Zookeeper host might need to use full names:
"zkhost" -> "host-1.domain.com:2181,host-2.domain.com:2181,host-3.domain.com:2181/solr",
%spark2
val options = Map(
"collection" -> "Tweets",
"zkhost" -> "localhost:2181/solr",
// "query" -> "Keyword, 'More Keywords'"
)
val df = spark.read.format("solr").options(options).load
df.cache()
Step 8 - Review results of the Solr query
%spark2 df.count() df.printSchema() df.take(1)
Step 9 - Filter the Tweets in the Spark DataFrame to ensure the timestamp and language aren't null. Once filter has been completed, add the sentiment value to the tweets.
%spark2
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.databricks.spark.corenlp.functions._
val df_TweetSentiment = df.filter("text_t is not null and language_s = 'en' and timestamp_s is not null ").select($"timestamp_s", $"text_t", $"location", sentiment($"text_t").as('sentimentScore))
Step 10 - Correctly cast the timestamp value
%spark2
val df_TweetSentiment1 = df_TweetSentiment.withColumn("timestampZ", unix_timestamp($"timestamp_s", "EEE MMM dd HH:mm:ss ZZZZZ yyyy").cast(TimestampType)).drop($"timestamp_s")
Step 11 - Valid results and create temporary table TweetSentiment
df_TweetSentiment1.printSchema()
df_TweetSentiment1.take(1)
df_TweetSentiment1.count()
df_TweetSentiment1.cache()
df_TweetSentiment1.createOrReplaceTempView("TweetSentiment")Step 12 - Query the table TweetSentiment
%sql select sentimentScore, count(sentimentScore) from TweetSentiment group by sentimentScore
Created on 10-29-2018 07:19 PM
Ian, thanks so much for such a great article.
I'm getting the following error when trying to run step 9 -"error: not found: value sentiment" when trying to set "df_TweetSentiment".
Could you help me with that?
Many thanks!