Community Articles

Find and share helpful community-sourced technical articles.
avatar
Contributor

This article is designed to extend the great work by @Ali Bajwa: Sample HDF/NiFi flow to Push Tweets into Solr/Banana, HDFS/Hive and my article Connecting Solr to Spark - Apache Zeppelin Notebook

I have included the complete notebook on my Github site, which can be found on my Github site.

Step 1 - Follow Ali's tutorial to establish an Apache Solr collection called "tweets"

Step 2 - Verify the version of Apache Spark being used, and visit the Solr-Spark connector site. The key is to match the version of Spark the version of the Solr-Spark connector. In the example below, the version of Spark is 2.2.0, and the connector version is 3.4.4

%spark2
sc
sc.version
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@617d134a
res1: String = 2.2.0.2.6.4.0-91

Step 3 - Include the Solr-Spark dependency in Zeppelin. Important note: This needs to be run before the Spark Context has been initialized.

%dep
z.load("com.lucidworks.spark:spark-solr:jar:3.4.4")
//Must be used before SparkInterpreter (%spark2) initialized
//Hint: put this paragraph before any Spark code and restart Zeppelin/Interpreter

Step 4 - Download the Stanford CoreNLP libraries found on here: http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip . Upzip the download and move it to the /tmp directory. Note: This can be accomplished on the command line or the following Zeppelin paragraph will work as well.

%sh
wget -O /tmp/stanford-corenlp-full-2018-02-27.zip http://nlp.stanford.edu/software/stanford-corenlp-full-2018-02-27.zip
unzip /tmp/stanford-corenlp-full-2018-02-27.zip
 

Step 5 - In Zeppelin's Interpreters configurations for Spark, include the following artifact: /tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar

74547-screen-shot-2018-05-23-at-122025-pm.png

Step 6 - Include the following Spark dependencies for Stanford CoreNLP and Spark CoreNLP. Important note: This needs to be run before the Spark Context has been initialized.

%dep
z.load("edu.stanford.nlp:stanford-corenlp:3.9.1")

//In Spark Interper Settings Add the following artifact 
//  /tmp/stanford-corenlp-full-2018-02-27/stanford-corenlp-3.9.1-models.jar
%dep
z.load("databricks:spark-corenlp:0.2.0-s_2.11")

Step 7 - Run Solr query and return results into Spark DataFrame. Note: Zookeeper host might need to use full names:

"zkhost" -> "host-1.domain.com:2181,host-2.domain.com:2181,host-3.domain.com:2181/solr",

%spark2
val options = Map(
  "collection" -> "Tweets",
  "zkhost" -> "localhost:2181/solr",
//   "query" -> "Keyword, 'More Keywords'"
)
val df = spark.read.format("solr").options(options).load
df.cache()

Step 8 - Review results of the Solr query

%spark2 
df.count()
df.printSchema() 
df.take(1)

Step 9 - Filter the Tweets in the Spark DataFrame to ensure the timestamp and language aren't null. Once filter has been completed, add the sentiment value to the tweets.

%spark2
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.databricks.spark.corenlp.functions._

val df_TweetSentiment = df.filter("text_t is not null and language_s = 'en' and timestamp_s is not null ").select($"timestamp_s", $"text_t", $"location", sentiment($"text_t").as('sentimentScore))

Step 10 - Correctly cast the timestamp value

%spark2

val df_TweetSentiment1 = df_TweetSentiment.withColumn("timestampZ",  unix_timestamp($"timestamp_s", "EEE MMM dd HH:mm:ss ZZZZZ yyyy").cast(TimestampType)).drop($"timestamp_s")

Step 11 - Valid results and create temporary table TweetSentiment

df_TweetSentiment1.printSchema()
df_TweetSentiment1.take(1)
df_TweetSentiment1.count()
df_TweetSentiment1.cache()
df_TweetSentiment1.createOrReplaceTempView("TweetSentiment")

Step 12 - Query the table TweetSentiment

%sql
select sentimentScore, count(sentimentScore) from TweetSentiment group by sentimentScore

74548-screen-shot-2018-05-23-at-125315-pm.png

4,180 Views
Comments

Ian, thanks so much for such a great article.
I'm getting the following error when trying to run step 9 -"error: not found: value sentiment" when trying to set "df_TweetSentiment".

Could you help me with that?

Many thanks!