Member since
04-11-2016
471
Posts
325
Kudos Received
118
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2075 | 03-09-2018 05:31 PM | |
2640 | 03-07-2018 09:45 AM | |
2535 | 03-07-2018 09:31 AM | |
4398 | 03-03-2018 01:37 PM | |
2468 | 10-17-2017 02:15 PM |
04-29-2016
02:56 PM
Hi Chakra, In fact the identifier attribute only requires the name of the attribute you want to reference and not the value of the attribute itself. I think that if you replace ${filename} by filename, it will work as you expect. Hope that helps
... View more
04-29-2016
12:41 PM
The installation guide is pretty well documented and should be the first thing to read : https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_installing_manually_book/content/ch_getting_ready_chapter.html In terms of version, you should consider the last one released. And if you only want to install HDP on a single machine, depending of what your are expecting, you should consider downloading the Sandbox : http://hortonworks.com/products/sandbox/ Hope that helps.
... View more
04-29-2016
09:37 AM
2 Kudos
Hi, You will find an article here : https://community.hortonworks.com/articles/25726/spark-streaming-explained-kafka-to-phoenix.html Writing to HDFS can be easily achieved with RDD.saveAsTextFile() for example. HTH
... View more
04-28-2016
07:46 PM
14 Kudos
HDF/NiFi is a key element in the architectural transformations with the future of data. It is a great tool to manage data flows but it also provides means to access data in motion as a stream source. This feature gives an easy way to expose data endpoints in data flows allowing users to access the data as soon as possible and perform desired analytics, online machine learning, etc.
Use case description
In this article, I will describe how to easily implement the following scenario:
Use Apache NiFi to gather tweets filtered on specifics terms, Use Apache Spark streaming to retrieve the data, transform it and store it into an Apache Hive table, Use Apache Zeppelin to access the data and display real-time analytics.
This example will use the US Presidential Election as a subject and will show:
The frequency of tweets mentioning twitter accounts of one of the main candidates: Ted Cruz, Donald Trump, Hillary Clinton and Bernie Sanders. A sentiment analysis of the tweets with the percentage of negative, positive and neutral tweets for each one of the candidates.
Note: it exists similar examples where tweets are directly retrieved from Spark using a specific connector. This use case is to demonstrate how easy it is to connect NiFi with Spark and, from this point, to perform stream analytics on a large variety of data.
Prerequisites
To implement this use case, I downloaded the HDP 2.4 Sandbox and installed it following the recommendations. Then, I used Ambari to add and install NiFi. Before running installation, I changed few parameters in the configuration to allow site-to-site communications between NiFi and Spark:
# Site to Site properties
nifi.remote.input.socket.host=127.0.0.1
nifi.remote.input.socket.port=9999
nifi.remote.input.secure=false
Once NiFi is installed, it is possible to access the NiFi GUI on port 9090 (depending of your installation, you may need to add a port forwarding rule to allow this access).
NiFi data flow to retrieve tweets
To retrieve tweets in NiFi, I just drag and drop a GetTwitter processor where I set the following parameters:
Endpoint set to Filter Endpoint I set my Twitter App credentials provided by Twitter I set the terms to filter on to : "@tedcruz,@realDonaldTrump,@HillaryClinton,@BernieSanders"
Then I drag and drop an Output port named "toSpark" and I connect the two with the success relationship. That's all on NiFi side!
Zeppelin notebook
Then I access Zeppelin notebook on port 9995 and I create a new notebook to run my analytics.
The first thing to do is to add the dependencies to the NiFi code we are going to use (at the time I write this article, the latest released version is 0.6.1):
%dep
z.reset
z.load("org.apache.nifi:nifi-spark-receiver:0.6.1")
z.load("org.apache.nifi:nifi-site-to-site-client:0.6.1")
Note: it it is not the first thing you do when launching the notebook, the cell will not be executed correctly. In this case, you must go to the Interpreter page and restart Spark interpreter before running again your notebook.
I am going to present the frequency analysis and the sentiment analysis in an independent manner. The two parts could be easily combined but this way you can pick the use case you are more interested in.
1. Frequency analysis
To ensure we start with a clean environment, I first delete the Hive table I am gonna to use later to avoid errors in case the table already exists:
%sql
DROP TABLE IF EXISTS tweets
Then I create the table with the following schema:
%sql
CREATE TABLE tweets (
time struct<millis:bigint>,
candidate string,
count int
)
At this point, we are ready to go and to write our Scala code. Here is the full code, I will give explanations below.
%spark
import java.nio.charset.StandardCharsets
import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.nifi.remote.client.SiteToSiteClient
import org.apache.nifi.remote.client.SiteToSiteClientConfig
import org.apache.nifi.spark.NiFiReceiver
import org.apache.nifi.spark.NiFiDataPacket
case class TweetRecord(time: Time, candidate: String, count: Integer)
val clientConfig = new SiteToSiteClient.Builder()
.url("http://127.0.0.1:9090/nifi")
.portName("toSpark")
.buildConfig();
val ssc = new StreamingContext(sc, Seconds(60))
val tweetStream = ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY))
.map( (packet: NiFiDataPacket) => new String(packet.getContent(), StandardCharsets.UTF_8) )
.map { (tweet: String) => var myList = new ListBuffer[(String, Int)]()
if (tweet.contains("@tedcruz"))
myList += (("@tedcruz", 1))
if (tweet.contains("@realDonaldTrump"))
myList += (("@realDonaldTrump", 1))
if (tweet.contains("@HillaryClinton"))
myList += (("@HillaryClinton", 1))
if (tweet.contains("@BernieSanders"))
myList += (("@BernieSanders", 1))
myList.toList
}
.flatMap( identity )
.reduceByKey ( (x,y) => x + y )
tweetStream.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => rdd.map( t => TweetRecord(time, t._1, t._2) )
.toDF()
.write
.insertInto("tweets")
}
ssc.start()
The first thing is to create an object TweetRecord that will contain my record to store in Hive. In this use case, I just want the date, the candidate name and the number of tweets received during the time window.
Then I create my configuration to talk to NiFi where I defined the NiFi endpoint and the output port name set previously. I also create my streaming context with a time window of 60 seconds.
At this point, I initialize my RDD stream and I retrieve each FlowFile from NiFi as a NiFiDataPacket from which I only extract the content (not the attributes) as a string. Then, I determine the candidate(s) related to the tweet and perform a reduce by key function to get the number of tweets by candidate.
Then I transform my stream into a data frame containing TweetRecord objects and I insert it into a Hive table named "tweets".
Note: in Spark RDD, when calling foreachRDD, you have a org.apache.spark.streaming.Time object (unix timestamp in milliseconds) that will be stored as a struct<millis:bigint> into Hive if you don't perform any preliminary conversion.
In the end, I just need to request my Hive table to get the information about the frequency:
%sql
select from_unixtime(time.millis / 1000) as d, candidate, count from tweets
And here is the result after few minutes (you may rerun the query when you want to get real-time updated data):
2. Sentiment analysis
Sentiment analysis is a wide subject and can be addressed in a lot of ways. In this case, I'll keep it simple. I'll use data from this website to get two lists of words. One list of negative words, and one list of positive words. For a given tweet, I will increment a counter each time a positive word is found, and decrement it each time there is negative word. At the end, the tweet is declared positive when the counter is positive, negative when the counter is negative and neutral when the counter is equal to 0.
To ensure we start with a clean environment, I first delete the Hive
table I am gonna to use later to avoid errors in case the table already
exists:
%sql
DROP TABLE IF EXISTS tweets_sentiment
Then I create the table with the following schema:
%sql
CREATE TABLE tweets_sentiment (
time struct<millis:bigint>,
candidate string,
sentiment string
)
Here is the code I used to define my spark streaming job:
Note: to keep it short, I don't display the full lists of words.
%spark
import java.nio.charset.StandardCharsets
import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.nifi.remote.client.SiteToSiteClient
import org.apache.nifi.remote.client.SiteToSiteClientConfig
import org.apache.nifi.spark.NiFiReceiver
import org.apache.nifi.spark.NiFiDataPacket
case class TweetRecord(time: Time, candidate: String, tweet: String)
val clientConfig = new SiteToSiteClient.Builder()
.url("http://127.0.0.1:9090/nifi")
.portName("toSpark")
.buildConfig();
val ssc = new StreamingContext(sc, Seconds(60))
val tweetStream = ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY))
.map( (packet: NiFiDataPacket) => new String(packet.getContent(), StandardCharsets.UTF_8) )
.map { (tweet: String) => var myList = new ListBuffer[(String, String)]()
if (tweet.contains("@tedcruz"))
myList += (("@tedcruz", tweet))
if (tweet.contains("@realDonaldTrump"))
myList += (("@realDonaldTrump", tweet))
if (tweet.contains("@HillaryClinton"))
myList += (("@HillaryClinton", tweet))
if (tweet.contains("@BernieSanders"))
myList += (("@BernieSanders", tweet))
myList.toList
}
.flatMap( identity )
.filter( (tuple) => tuple._1 != "" )
.map { (tuple) =>
val positive = List("abundance", "accessible", "acclaim", "acclaimed", "...")
val negative = List("2-faces", "abnormal", "abominable", "abomination", "...")
val tweet = tuple._2
var st = 0;
val words = tweet.split(" ")
positive.foreach(p =>
words.foreach(w =>
if(p==w) st = st+1
)
)
negative.foreach(p=>
words.foreach(w=>
if(p==w) st = st-1
)
)
if(st>0)
(tuple._1, "positive")
else if(st<0)
(tuple._1, "negative")
else
(tuple._1, "neutral")
}
tweetStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => rdd.map( t => TweetRecord(time, t._1, t._2) )
.toDF()
.write
.insertInto("tweets_sentiment")
}
ssc.start()
The code is quite similar to the frequency one, except we compute the sentimental value of each tweet.
The stored data gives us the possibility to display the percentage of negative/neutral/positive tweets for each candidate:
Or to display the trend along the time for a given candidate (on a short period, there is not so much to say though):
As you can imagine, there are lots of things we can monitor this way along a political campaign.
Conclusion
That's all for this use case. This example shows how easy it is to use NiFi as a streaming source inside a complex architectural model. Multiple NiFi instances can be deployed to ensure data flows between different locations and to ingest data in HDP data lakes, but it also provides endpoints for particular users that would require accessing data as a stream at a specific point of the flow. This is really powerful and it gives a lot of flexibility.
... View more
Labels:
04-28-2016
08:56 AM
You should run the command where the jar is and it is probably in the target directory under storm-twitter. There is no argument host=.... This argument is either "local" or "cluster" depending on how you want to run your topology with storm (in a local way to ease debugging for example or in a distributed way). A final word : you should not post your Twitter credentials, it is private data and could be used by someone else to act on your behalf. I strongly encourage you to regenerate new credentials from the Twitter apps page.
... View more
04-26-2016
07:05 AM
When running a maven command, the best practice is to run it from within the directory containing the pom.xml file (file containing all the Maven instructions for packaging the product). In this case, you should run the command in the clone of the github project where there is the pom file. cd /path/to/storm-twitter /opt/apache-maven-3.3.9/bin/mvn clean package You may also want to add Maven in your path to be able to directly call mvn command.
... View more
04-25-2016
07:42 AM
To build the jar, you must use maven and run the command "clean package" that will create an uber jar (with all dependencies included). Then you will be able to run your topology using: storm jar storm-twitter-0.0.1-SNAPSHOT.jar fr.pvillard.storm.topology.Topology <local|cluster> <consumer key> <consumer key secret> <access token key> <access token secret> where Topology class is the class where I defined the topology and where there is the main method.
... View more
04-24-2016
04:24 PM
Regarding Twitter API, I used Twitter4j (doc and examples here: http://twitter4j.org/en/code-examples.html). You will need to create a Twitter App from your twitter account to create access tokens that will be needed by the API (https://apps.twitter.com/). Regarding the topology itself, you can have a look at the README on github which explains the idea (in short: it only gets tweets containing specific keywords, aggregates the number of tweets for a given keyword in a given time period using ticks, and stores this information in both HDFS and Hive).
Note: if one of the provided answers in this thread is answering your initial question, could you mark it as accepted? It helps other users when there are looking for information 😉 Thanks.
... View more
04-23-2016
10:52 AM
Hi Wellington, Sounds like Storm is not in a good shape. Is everything green for Storm in Ambari?
... View more
04-23-2016
10:43 AM
Do you use SSL connection? Do you have a full stack trace in log files of NiFi?
... View more