Created on 05-23-2018 03:58 PM - edited 08-17-2019 07:20 AM
This article is designed to extend the great work by @Ali Bajwa: Sample HDF/NiFi flow to Push Tweets into Solr/Banana, HDFS/Hive and my article Connecting Solr to Spark - Apache Zeppelin Notebook
I have included the complete notebook on my Github site, which can be found on my Github site.
Step 1 - Follow Ali's tutorial to establish an Apache Solr collection called "tweets"
Step 2 - Verify the version of Apache Spark being used, and visit the Solr-Spark connector site. The key is to match the version of Spark the version of the Solr-Spark connector. In the example below, the version of Spark is 2.2.0, and the connector version is 3.4.4
%spark2 sc sc.version res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@617d134a res1: String = 2.2.0.2.6.4.0-91
Step 3 - Include the Solr-Spark dependency in Zeppelin. Important note: This needs to be run before the Spark Context has been initialized.
%dep z.load("com.lucidworks.spark:spark-solr:jar:3.4.4") //Must be used before SparkInterpreter (%spark2) initialized //Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter
Step 4 - Download the Stanford CoreNLP libraries found on here: http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip . Upzip the download and move it to the /tmp directory. Note: This can be accomplished on the command line or the following Zeppelin paragraph will work as well.
%sh wget -O /tmp/stanford-corenlp-full-2018-02-27.zip http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip unzip /tmp/stanford-corenlp-full-2018-02-27.zip
Step 5 - In Zeppelin's Interpreters configurations for Spark, include the following artifact: /tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar
Step 6 - Include the following Spark dependencies for Stanford CoreNLP and Spark CoreNLP. Important note: This needs to be run before the Spark Context has been initialized.
%dep z.load("edu.stanford.nlp:stanford-corenlp:3.9.1") //In Spark Interper Settings Add the following artifact // /tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar
%dep z.load("databricks:spark-corenlp:0.2.0-s_2.11")
Step 7 - Run Solr query and return results into Spark DataFrame. Note: Zookeeper host might need to use full names:
"zkhost" -> "host-1.domain.com:2181,host-2.domain.com:2181,host-3.domain.com:2181/solr",
%spark2 val options = Map( "collection" -> "Tweets", "zkhost" -> "localhost:2181/solr", // "query" -> "Keyword, 'More Keywords'" ) val df = spark.read.format("solr").options(options).load df.cache()
Step 8 - Review results of the Solr query
%spark2 df.count() df.printSchema() df.take(1)
Step 9 - Filter the Tweets in the Spark DataFrame to ensure the timestamp and language aren't null. Once filter has been completed, add the sentiment value to the tweets.
%spark2 import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import com.databricks.spark.corenlp.functions._ val df_TweetSentiment = df.filter("text_t is not null and language_s = 'en' and timestamp_s is not null ").select($"timestamp_s", $"text_t", $"location", sentiment($"text_t").as('sentimentScore))
Step 10 - Correctly cast the timestamp value
%spark2 val df_TweetSentiment1 = df_TweetSentiment.withColumn("timestampZ", unix_timestamp($"timestamp_s", "EEE MMM dd HH:mm:ss ZZZZZ yyyy").cast(TimestampType)).drop($"timestamp_s")
Step 11 - Valid results and create temporary table TweetSentiment
df_TweetSentiment1.printSchema() df_TweetSentiment1.take(1) df_TweetSentiment1.count() df_TweetSentiment1.cache() df_TweetSentiment1.createOrReplaceTempView("TweetSentiment")
Step 12 - Query the table TweetSentiment
%sql select sentimentScore, count(sentimentScore) from TweetSentiment group by sentimentScore
Created on 10-29-2018 07:19 PM
Ian, thanks so much for such a great article.
I'm getting the following error when trying to run step 9 -"error: not found: value sentiment" when trying to set "df_TweetSentiment".
Could you help me with that?
Many thanks!