Community Articles

Find and share helpful community-sourced technical articles.
avatar

HDF/NiFi is a key element in the architectural transformations with the future of data. It is a great tool to manage data flows but it also provides means to access data in motion as a stream source. This feature gives an easy way to expose data endpoints in data flows allowing users to access the data as soon as possible and perform desired analytics, online machine learning, etc.

3763-screen-shot-2016-04-28-at-120311-pm.png

Use case description

In this article, I will describe how to easily implement the following scenario:

  • Use Apache NiFi to gather tweets filtered on specifics terms,
  • Use Apache Spark streaming to retrieve the data, transform it and store it into an Apache Hive table,
  • Use Apache Zeppelin to access the data and display real-time analytics.

This example will use the US Presidential Election as a subject and will show:

  • The frequency of tweets mentioning twitter accounts of one of the main candidates: Ted Cruz, Donald Trump, Hillary Clinton and Bernie Sanders.
  • A sentiment analysis of the tweets with the percentage of negative, positive and neutral tweets for each one of the candidates.

Note: it exists similar examples where tweets are directly retrieved from Spark using a specific connector. This use case is to demonstrate how easy it is to connect NiFi with Spark and, from this point, to perform stream analytics on a large variety of data.

Prerequisites

To implement this use case, I downloaded the HDP 2.4 Sandbox and installed it following the recommendations. Then, I used Ambari to add and install NiFi. Before running installation, I changed few parameters in the configuration to allow site-to-site communications between NiFi and Spark:

# Site to Site properties
nifi.remote.input.socket.host=127.0.0.1
nifi.remote.input.socket.port=9999
nifi.remote.input.secure=false

Once NiFi is installed, it is possible to access the NiFi GUI on port 9090 (depending of your installation, you may need to add a port forwarding rule to allow this access).

NiFi data flow to retrieve tweets

To retrieve tweets in NiFi, I just drag and drop a GetTwitter processor where I set the following parameters:

  • Endpoint set to Filter Endpoint
  • I set my Twitter App credentials provided by Twitter
  • I set the terms to filter on to : "@tedcruz,@realDonaldTrump,@HillaryClinton,@BernieSanders"

Then I drag and drop an Output port named "toSpark" and I connect the two with the success relationship. That's all on NiFi side!

3764-nifispark.png

Zeppelin notebook

Then I access Zeppelin notebook on port 9995 and I create a new notebook to run my analytics.

The first thing to do is to add the dependencies to the NiFi code we are going to use (at the time I write this article, the latest released version is 0.6.1):

%dep
z.reset
z.load("org.apache.nifi:nifi-spark-receiver:0.6.1")
z.load("org.apache.nifi:nifi-site-to-site-client:0.6.1")

Note: it it is not the first thing you do when launching the notebook, the cell will not be executed correctly. In this case, you must go to the Interpreter page and restart Spark interpreter before running again your notebook.

I am going to present the frequency analysis and the sentiment analysis in an independent manner. The two parts could be easily combined but this way you can pick the use case you are more interested in.

1. Frequency analysis

To ensure we start with a clean environment, I first delete the Hive table I am gonna to use later to avoid errors in case the table already exists:

%sql
DROP TABLE IF EXISTS tweets

Then I create the table with the following schema:

%sql
CREATE TABLE tweets (
  time   struct<millis:bigint>,
  candidate   string,
  count   int
)

At this point, we are ready to go and to write our Scala code. Here is the full code, I will give explanations below.

%spark
import java.nio.charset.StandardCharsets
import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.nifi.remote.client.SiteToSiteClient
import org.apache.nifi.remote.client.SiteToSiteClientConfig
import org.apache.nifi.spark.NiFiReceiver
import org.apache.nifi.spark.NiFiDataPacket
case class TweetRecord(time: Time, candidate: String, count: Integer)
val clientConfig = new SiteToSiteClient.Builder()
                        .url("http://127.0.0.1:9090/nifi")
                        .portName("toSpark")
                        .buildConfig();
val ssc = new StreamingContext(sc, Seconds(60))
val tweetStream = ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY))
                        .map( (packet: NiFiDataPacket) => new String(packet.getContent(), StandardCharsets.UTF_8) )
                        .map {  (tweet: String) =>  var myList = new ListBuffer[(String, Int)]()
                                                    if (tweet.contains("@tedcruz"))
                                                        myList += (("@tedcruz", 1))
                                                    if (tweet.contains("@realDonaldTrump"))
                                                        myList += (("@realDonaldTrump", 1))
                                                    if (tweet.contains("@HillaryClinton"))
                                                        myList += (("@HillaryClinton", 1))
                                                    if (tweet.contains("@BernieSanders"))
                                                        myList += (("@BernieSanders", 1))
                                                    myList.toList
                        }
                        .flatMap( identity )
                        .reduceByKey ( (x,y) => x + y )
                        
tweetStream.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => rdd.map( t => TweetRecord(time, t._1, t._2) )
                                                                        .toDF()
                                                                        .write
                                                                        .insertInto("tweets") 
                        }
ssc.start()

The first thing is to create an object TweetRecord that will contain my record to store in Hive. In this use case, I just want the date, the candidate name and the number of tweets received during the time window.

Then I create my configuration to talk to NiFi where I defined the NiFi endpoint and the output port name set previously. I also create my streaming context with a time window of 60 seconds.

At this point, I initialize my RDD stream and I retrieve each FlowFile from NiFi as a NiFiDataPacket from which I only extract the content (not the attributes) as a string. Then, I determine the candidate(s) related to the tweet and perform a reduce by key function to get the number of tweets by candidate.

Then I transform my stream into a data frame containing TweetRecord objects and I insert it into a Hive table named "tweets".

Note: in Spark RDD, when calling foreachRDD, you have a org.apache.spark.streaming.Time object (unix timestamp in milliseconds) that will be stored as a struct<millis:bigint> into Hive if you don't perform any preliminary conversion.

In the end, I just need to request my Hive table to get the information about the frequency:

%sql
select from_unixtime(time.millis / 1000) as d, candidate, count from tweets

And here is the result after few minutes (you may rerun the query when you want to get real-time updated data):

3837-uspreszeppelin.png

2. Sentiment analysis

Sentiment analysis is a wide subject and can be addressed in a lot of ways. In this case, I'll keep it simple. I'll use data from this website to get two lists of words. One list of negative words, and one list of positive words. For a given tweet, I will increment a counter each time a positive word is found, and decrement it each time there is negative word. At the end, the tweet is declared positive when the counter is positive, negative when the counter is negative and neutral when the counter is equal to 0.

To ensure we start with a clean environment, I first delete the Hive table I am gonna to use later to avoid errors in case the table already exists:

%sql
DROP TABLE IF EXISTS tweets_sentiment

Then I create the table with the following schema:

%sql
CREATE TABLE tweets_sentiment (
  time   struct<millis:bigint>,
  candidate   string,
  sentiment string
)

Here is the code I used to define my spark streaming job:

Note: to keep it short, I don't display the full lists of words.

%spark
import java.nio.charset.StandardCharsets
import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.nifi.remote.client.SiteToSiteClient
import org.apache.nifi.remote.client.SiteToSiteClientConfig
import org.apache.nifi.spark.NiFiReceiver
import org.apache.nifi.spark.NiFiDataPacket
case class TweetRecord(time: Time, candidate: String, tweet: String)
val clientConfig = new SiteToSiteClient.Builder()
                        .url("http://127.0.0.1:9090/nifi")
                        .portName("toSpark")
                        .buildConfig();
val ssc = new StreamingContext(sc, Seconds(60))
val tweetStream = ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY))
                        .map( (packet: NiFiDataPacket) => new String(packet.getContent(), StandardCharsets.UTF_8) )
                        .map {  (tweet: String) =>  var myList = new ListBuffer[(String, String)]()
                                                    if (tweet.contains("@tedcruz"))
                                                        myList += (("@tedcruz", tweet))
                                                    if (tweet.contains("@realDonaldTrump"))
                                                        myList += (("@realDonaldTrump", tweet))
                                                    if (tweet.contains("@HillaryClinton"))
                                                        myList += (("@HillaryClinton", tweet))
                                                    if (tweet.contains("@BernieSanders"))
                                                        myList += (("@BernieSanders", tweet))
                                                    myList.toList
                        }
                        .flatMap( identity )
                        .filter( (tuple) => tuple._1 != "" )
                        .map { (tuple) =>   
                                    val positive = List("abundance", "accessible", "acclaim", "acclaimed", "...")
                                    val negative = List("2-faces", "abnormal", "abominable", "abomination", "...")
                                    val tweet = tuple._2
                                    var st = 0;
                                    val words = tweet.split(" ")    
                                    positive.foreach(p =>
                                        words.foreach(w =>
                                            if(p==w) st = st+1
                                        )
                                    )
                                    negative.foreach(p=>
                                        words.foreach(w=>
                                            if(p==w) st = st-1
                                        )
                                    )
                                    
                                    if(st>0)
                                        (tuple._1, "positive")
                                    else if(st<0)
                                        (tuple._1, "negative")
                                    else
                                        (tuple._1, "neutral")
                        }
                        
tweetStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => rdd.map( t => TweetRecord(time, t._1, t._2) )
                                                                        .toDF()
                                                                        .write
                                                                        .insertInto("tweets_sentiment") 
                        }
ssc.start()

The code is quite similar to the frequency one, except we compute the sentimental value of each tweet.

The stored data gives us the possibility to display the percentage of negative/neutral/positive tweets for each candidate:

3804-sentimentpres.png

Or to display the trend along the time for a given candidate (on a short period, there is not so much to say though):

3836-sentimenttrend.png

As you can imagine, there are lots of things we can monitor this way along a political campaign.

Conclusion

That's all for this use case. This example shows how easy it is to use NiFi as a streaming source inside a complex architectural model. Multiple NiFi instances can be deployed to ensure data flows between different locations and to ingest data in HDP data lakes, but it also provides endpoints for particular users that would require accessing data as a stream at a specific point of the flow. This is really powerful and it gives a lot of flexibility.

10,999 Views
Comments

This is awesome. Very nice combination of tools.

Do you have the Notebook and NIFI file in Github?

Hi! Nice example of Nifi integration

I encountered a problem doing the same thing with HDP sandox 2.5

I get this error message on zeppelin

<console>:11: error: identifier expected but ',' found. time struct<millis:bigint>,

Do you import more librairies before ?