Created on 04-28-2016 07:46 PM - edited 08-17-2019 12:34 PM
HDF/NiFi is a key element in the architectural transformations with the future of data. It is a great tool to manage data flows but it also provides means to access data in motion as a stream source. This feature gives an easy way to expose data endpoints in data flows allowing users to access the data as soon as possible and perform desired analytics, online machine learning, etc.
Use case description
In this article, I will describe how to easily implement the following scenario:
This example will use the US Presidential Election as a subject and will show:
Note: it exists similar examples where tweets are directly retrieved from Spark using a specific connector. This use case is to demonstrate how easy it is to connect NiFi with Spark and, from this point, to perform stream analytics on a large variety of data.
Prerequisites
To implement this use case, I downloaded the HDP 2.4 Sandbox and installed it following the recommendations. Then, I used Ambari to add and install NiFi. Before running installation, I changed few parameters in the configuration to allow site-to-site communications between NiFi and Spark:
# Site to Site properties nifi.remote.input.socket.host=127.0.0.1 nifi.remote.input.socket.port=9999 nifi.remote.input.secure=false
Once NiFi is installed, it is possible to access the NiFi GUI on port 9090 (depending of your installation, you may need to add a port forwarding rule to allow this access).
NiFi data flow to retrieve tweets
To retrieve tweets in NiFi, I just drag and drop a GetTwitter processor where I set the following parameters:
Then I drag and drop an Output port named "toSpark" and I connect the two with the success relationship. That's all on NiFi side!
Zeppelin notebook
Then I access Zeppelin notebook on port 9995 and I create a new notebook to run my analytics.
The first thing to do is to add the dependencies to the NiFi code we are going to use (at the time I write this article, the latest released version is 0.6.1):
%dep z.reset z.load("org.apache.nifi:nifi-spark-receiver:0.6.1") z.load("org.apache.nifi:nifi-site-to-site-client:0.6.1")
Note: it it is not the first thing you do when launching the notebook, the cell will not be executed correctly. In this case, you must go to the Interpreter page and restart Spark interpreter before running again your notebook.
I am going to present the frequency analysis and the sentiment analysis in an independent manner. The two parts could be easily combined but this way you can pick the use case you are more interested in.
1. Frequency analysis
To ensure we start with a clean environment, I first delete the Hive table I am gonna to use later to avoid errors in case the table already exists:
%sql DROP TABLE IF EXISTS tweets
Then I create the table with the following schema:
%sql CREATE TABLE tweets ( time struct<millis:bigint>, candidate string, count int )
At this point, we are ready to go and to write our Scala code. Here is the full code, I will give explanations below.
%spark import java.nio.charset.StandardCharsets import scala.collection.mutable.ListBuffer import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.nifi.remote.client.SiteToSiteClient import org.apache.nifi.remote.client.SiteToSiteClientConfig import org.apache.nifi.spark.NiFiReceiver import org.apache.nifi.spark.NiFiDataPacket case class TweetRecord(time: Time, candidate: String, count: Integer) val clientConfig = new SiteToSiteClient.Builder() .url("http://127.0.0.1:9090/nifi") .portName("toSpark") .buildConfig(); val ssc = new StreamingContext(sc, Seconds(60)) val tweetStream = ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY)) .map( (packet: NiFiDataPacket) => new String(packet.getContent(), StandardCharsets.UTF_8) ) .map { (tweet: String) => var myList = new ListBuffer[(String, Int)]() if (tweet.contains("@tedcruz")) myList += (("@tedcruz", 1)) if (tweet.contains("@realDonaldTrump")) myList += (("@realDonaldTrump", 1)) if (tweet.contains("@HillaryClinton")) myList += (("@HillaryClinton", 1)) if (tweet.contains("@BernieSanders")) myList += (("@BernieSanders", 1)) myList.toList } .flatMap( identity ) .reduceByKey ( (x,y) => x + y ) tweetStream.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => rdd.map( t => TweetRecord(time, t._1, t._2) ) .toDF() .write .insertInto("tweets") } ssc.start()
The first thing is to create an object TweetRecord that will contain my record to store in Hive. In this use case, I just want the date, the candidate name and the number of tweets received during the time window.
Then I create my configuration to talk to NiFi where I defined the NiFi endpoint and the output port name set previously. I also create my streaming context with a time window of 60 seconds.
At this point, I initialize my RDD stream and I retrieve each FlowFile from NiFi as a NiFiDataPacket from which I only extract the content (not the attributes) as a string. Then, I determine the candidate(s) related to the tweet and perform a reduce by key function to get the number of tweets by candidate.
Then I transform my stream into a data frame containing TweetRecord objects and I insert it into a Hive table named "tweets".
Note: in Spark RDD, when calling foreachRDD, you have a org.apache.spark.streaming.Time object (unix timestamp in milliseconds) that will be stored as a struct<millis:bigint> into Hive if you don't perform any preliminary conversion.
In the end, I just need to request my Hive table to get the information about the frequency:
%sql select from_unixtime(time.millis / 1000) as d, candidate, count from tweets
And here is the result after few minutes (you may rerun the query when you want to get real-time updated data):
2. Sentiment analysis
Sentiment analysis is a wide subject and can be addressed in a lot of ways. In this case, I'll keep it simple. I'll use data from this website to get two lists of words. One list of negative words, and one list of positive words. For a given tweet, I will increment a counter each time a positive word is found, and decrement it each time there is negative word. At the end, the tweet is declared positive when the counter is positive, negative when the counter is negative and neutral when the counter is equal to 0.
To ensure we start with a clean environment, I first delete the Hive table I am gonna to use later to avoid errors in case the table already exists:
%sql DROP TABLE IF EXISTS tweets_sentiment
Then I create the table with the following schema:
%sql CREATE TABLE tweets_sentiment ( time struct<millis:bigint>, candidate string, sentiment string )
Here is the code I used to define my spark streaming job:
Note: to keep it short, I don't display the full lists of words.
%spark import java.nio.charset.StandardCharsets import scala.collection.mutable.ListBuffer import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.nifi.remote.client.SiteToSiteClient import org.apache.nifi.remote.client.SiteToSiteClientConfig import org.apache.nifi.spark.NiFiReceiver import org.apache.nifi.spark.NiFiDataPacket case class TweetRecord(time: Time, candidate: String, tweet: String) val clientConfig = new SiteToSiteClient.Builder() .url("http://127.0.0.1:9090/nifi") .portName("toSpark") .buildConfig(); val ssc = new StreamingContext(sc, Seconds(60)) val tweetStream = ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY)) .map( (packet: NiFiDataPacket) => new String(packet.getContent(), StandardCharsets.UTF_8) ) .map { (tweet: String) => var myList = new ListBuffer[(String, String)]() if (tweet.contains("@tedcruz")) myList += (("@tedcruz", tweet)) if (tweet.contains("@realDonaldTrump")) myList += (("@realDonaldTrump", tweet)) if (tweet.contains("@HillaryClinton")) myList += (("@HillaryClinton", tweet)) if (tweet.contains("@BernieSanders")) myList += (("@BernieSanders", tweet)) myList.toList } .flatMap( identity ) .filter( (tuple) => tuple._1 != "" ) .map { (tuple) => val positive = List("abundance", "accessible", "acclaim", "acclaimed", "...") val negative = List("2-faces", "abnormal", "abominable", "abomination", "...") val tweet = tuple._2 var st = 0; val words = tweet.split(" ") positive.foreach(p => words.foreach(w => if(p==w) st = st+1 ) ) negative.foreach(p=> words.foreach(w=> if(p==w) st = st-1 ) ) if(st>0) (tuple._1, "positive") else if(st<0) (tuple._1, "negative") else (tuple._1, "neutral") } tweetStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => rdd.map( t => TweetRecord(time, t._1, t._2) ) .toDF() .write .insertInto("tweets_sentiment") } ssc.start()
The code is quite similar to the frequency one, except we compute the sentimental value of each tweet.
The stored data gives us the possibility to display the percentage of negative/neutral/positive tweets for each candidate:
Or to display the trend along the time for a given candidate (on a short period, there is not so much to say though):
As you can imagine, there are lots of things we can monitor this way along a political campaign.
Conclusion
That's all for this use case. This example shows how easy it is to use NiFi as a streaming source inside a complex architectural model. Multiple NiFi instances can be deployed to ensure data flows between different locations and to ingest data in HDP data lakes, but it also provides endpoints for particular users that would require accessing data as a stream at a specific point of the flow. This is really powerful and it gives a lot of flexibility.
Created on 04-29-2016 02:32 PM
This is awesome. Very nice combination of tools.
Do you have the Notebook and NIFI file in Github?
Created on 02-27-2017 02:59 PM
Hi! Nice example of Nifi integration
I encountered a problem doing the same thing with HDP sandox 2.5
I get this error message on zeppelin
<console>:11: error: identifier expected but ',' found. time struct<millis:bigint>,
Do you import more librairies before ?