1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2557 | 04-03-2024 06:39 AM | |
| 3921 | 01-12-2024 08:19 AM | |
| 2122 | 12-07-2023 01:49 PM | |
| 3173 | 08-02-2023 07:30 AM | |
| 4306 | 03-29-2023 01:22 PM |
09-09-2016
08:13 PM
10 Kudos
Sensor Reading with Apache NiFi 1.0.0 There are many types of sensors, devices and meters that can be great sources of data. Some can push data, some can pull data, some provide APIs, some give you access to install software.
How To Access Sensors One option is to install MiNiFi on the device if you have root access. This will provide fast access to allow you to script and manage your local data. Another option for bigger devices is to install a full Java based NiFi node. It starts becoming harder once you have tens of thousands of devices. You can install an HDF Edge and communicate from this node to your HDF cluster via Site-to-Site protocol. From this Edge Node that acts as an accumulator for many devices (a good idea so that you don't send 10,000 network requests a second from each set of devices, keep as much traffic locally to save time, time-outs, networking and cloud costs). You can also now aggregate and send larger batches of data and also process some summaries and aggregates locally in NiFi. This will also let you populate local databases, dashboards and statistics that may only be of interest to the local source of the sensors (perhaps a plant manager or automated monitoring system). Another option is to have devices push or pull to a local or remote NiFi install via various protocols including TCP/IP, UDP/IP, REST HTTP, JMS, MQTT, SFTP and Email. Device Push to NiFi Your device can send messages to NiFi via any number of protocols listed. For my example, I push via MQTT. My local NiFi node will consume these messages via ConsumeMQTT.
Reference: Paho-MQTT Your device will need to run Linux (or something related), have Python 2.7 or better and PiP installed. With Pip, you can install the Eclipse library that you need to send MQTT messages. pip install paho-mqtt import paho.mqtt.client as paho
client = paho.Client()
client.connect("servername", 1883, 60)
client.publish("sensor", payload="Test", qos=0, retain=True)
Where "servername" is the name of the server you are sending the message to (it could also be on the NiFi Node, another server, a bigger device, a central aggregator or messaging server). I would recommend having it as close in the network as possible. "sensor" is the name of the topic that we are publishing the message to, NiFi will consume this message. I have cron job setup to run every minute and publish messages (* * * * * /opt/demo/sendit.sh ) NiFi Poll Device NiFi can poll your device and consume from various protocols like JMS, MQTT, SFTP, TCP and UDP. For my example, I chose a REST API over HTTP to get past hurdles of firewalls and such. I setup a Flask Server on RPI, to run my REST API, I run this in a shell script. export FLASK_APP=hello.py
flask run --host=0.0.0.0 --port=8888 --no-debugger
To install Flask, you need to run pip install flask
Sensor Reading Code #!flask/bin/python
from flask import Flask, jsonify
import sys
import time
import datetime
import subprocess
import sys
import urllib2
import json
import paho.mqtt.client as paho
from sense_hat import SenseHat
sense = SenseHat()
sense.clear()
app = Flask(__name__)
@app.route('/pi/api/v1.0/sensors', methods=['GET'])
def get_sensors():
p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = p.communicate()
temp = sense.get_temperature()
temp = round(temp, 1)
temph = sense.get_temperature_from_humidity()
temph = round(temph, 1)
tempp = sense.get_temperature_from_pressure()
tempp = round(tempp, 1)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
tasks = [ { 'tempp': tempp, 'temph': temph, 'cputemp': out, 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]
# As an option we can push this message when we get called as well
client = paho.Client()
client.connect("mqttmessageserver", 1883, 60)
client.publish("sensor", payload=jsonify({'readings': tasks}), qos=0, retain=True)
return jsonify({'readings': tasks})
@app.route('/pi/api/v1.0/location', methods=['GET'])
def get_loc():
orientation = sense.get_orientation()
pitch = orientation['pitch']
roll = orientation['roll']
yaw = orientation['yaw']
acceleration = sense.get_accelerometer_raw()
x = acceleration['x']
y = acceleration['y']
z = acceleration['z']
x=round(x, 0)
y=round(y, 0)
z=round(z, 0)
tasks = [ { 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z } ]
return jsonify({'readings': tasks})
@app.route('/pi/api/v1.0/show', methods=['GET'])
def get_pi():
temp = sense.get_temperature()
temp = round(temp, 1)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
# 8x8 RGB
sense.clear()
info = 'T(C): ' + str(temp) + 'H: ' + str(humidity) + 'P: ' + str(pressure)
sense.show_message(info, text_colour=[255, 0, 0])
sense.clear()
tasks = [ { 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]
return jsonify({'readings': tasks})
if __name__ == '__main__':
app.run(debug=True)
The device I am testing is a Raspberry Pi 3 Model B with a Sense Hat sensor attachment. Besides having sensors for temperature, humidity and barometric pressures it also has a 8x8 light grid for displaying text and simple graphics. We can use this to print messages (sense.show_message) or warnings that we send from NiFi. This allows for 2 way very visceral communication to remote devices. This could be used to notify local personnel of conditions.
nifi 1.0.0 Flows
JSON File Landed in HDFS in our HDP 2.5 Cluster [root@myserverhdp sensors]# hdfs dfs -ls /sensor
Found 2 items
-rw-r--r-- 3 root hdfs 202 2016-09-09 17:26 /sensor/181528179026826
drwxr-xr-x - hdfs hdfs 0 2016-09-09 15:43 /sensor/failure
[root@tspanndev13 sensors]# hdfs dfs -cat /sensor/181528179026826
{
"readings": [
{
"cputemp": "temp=55.8'C\n",
"humidity": 40.8,
"pressure": 1014.1,
"temp": 40.0,
"tempf": 84.0,
"temph": 40.0,
"tempp": 39.1
}
]
}
The final results of our flow is a JSON file on HDFS. We could easily send a copy of the data to Phoenix via PutSQL or to Hive via PutHiveQL or to Spark Streaming for additional processing via Site-To-Site or Kafka.
Resources: https://github.com/topshed/RPi_8x8GridDraw https://www.raspberrypi.org/learning/sense-hat-data-logger/worksheet/ https://www.raspberrypi.org/learning/astro-pi-flight-data-analysis/worksheet/ https://www.raspberrypi.org/learning/astro-pi-guide/sensors/temperature.md https://breadfit.wordpress.com/2015/06/24/installing-mosquitto-under-centos/
... View more
Labels:
09-05-2016
10:52 PM
11 Kudos
JSON Batch to Single Row Phoenix I grabbed open data on Crime from Philly's Open Data (https://www.opendataphilly.org/dataset/crime-incidents), after a free sign up you get access to JSON crime data (https://data.phila.gov/resource/sspu-uyfa.json) You can grab individual dates or ranges for thousands of records. I wanted to spool each JSON record as a separate HBase row. With the flexibility of Apache NiFi 1.0.0, I can specify run times via cron or other familiar setup. This is my master flow. First I use GetHTTP to retrieve the SSL JSON messages, I split the records up and store them as RAW JSON in HDFS as well as send some of them via Email, format them for Phoenix SQL and store them in Phoenix/HBase. All with no coding and in a simple flow. For extra output, I can send them to Reimann server for monitoring. Setting up SSL for accessing HTTPS data like Philly Crime, require a little configuration and knowing what Java JRE you are using to run NiFi. You can run service nifi status to quickly get which JRE. Split the Records The Open Data set has many rows of data, let's split them up and pull out the attributes we want from the JSON. Phoenix Another part that requires specific formatting is setting up the Phoenix connection. Make sure you point to the correct driver and if you have security make sure that is set. Load the Data (Upsert) Once your data is loaded you can check quickly with /usr/hdp/current/phoenix-client/bin/sqlline.py localhost:2181:/hbase-unsecure The SQL for this data set is pretty straight forward. CREATE TABLE phillycrime (dc_dist varchar,
dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar,
text_general_code varchar,ucr_general varchar);
{"dc_dist":"18","dc_key":"200918067518","dispatch_date":"2009-10-02","dispatch_date_time":"2009-10-02T14:24:00.000","dispatch_time":"14:24:00","hour":"14","location_block":"S 38TH ST / MARKETUT ST","psa":"3","text_general_code":"Other Assaults","ucr_general":"800"}
upsert into phillycrime values ('18', '200918067518', '2009-10-02','2009-10-02T14:24:00.000','14:24:00','14', 'S 38TH ST / MARKETUT ST','3','Other Assaults','800');
!tables
!describe phillycrime The DC_KEY is unique so I used that as the Phoenix key. Now all the data I get will be added and any repeats will safely update. Sometimes during the data we may reget some of the same data, that's okay, it will just update to the same value.
... View more
Labels:
08-30-2016
12:07 AM
try double quotes https://docs.mongodb.com/manual/reference/mongodb-extended-json/
... View more
08-27-2016
06:45 PM
3 Kudos
I have written a small Java 8 Spring Boot application to view Twitter tweets that I have ingested with NiFi and stored as JSON files in HDFS. I have an external Hive table on top of those, from those raw tweets I ran I Spark Scala job that added Stanford CoreNLP Sentiment and saved it to an ORC Hive Table. That is the table I am querying in my Spring Boot visualization program. To show something in a simple AngularJS HTML5 page, I have also queried that microservice which has a method for calling Spring Social Twitter to get live tweets. For this you will need JDK 1.8 and Maven installed on your machine or VM. I used Eclipse as my IDE, but I usually use IntelliJ, either will work fine. Java Bean I have a few of the fields specified, this is to put our Hive data into and transport to AngularJS as JSON serialized. public class Twitter2 implements Serializable {
private static final long serialVersionUID = 7409772495079484269L;
private String geo;
private String unixtime;
private String handle;
private String location;
private String tag;
private String tweet_id; .... }
Core Spring Boot App package com.dataflowdeveloper;
import javax.sql.DataSource;
import org.apache.commons.dbcp.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.social.twitter.api.Twitter;
import org.springframework.social.twitter.api.impl.TwitterTemplate;
@Configuration
@ComponentScan
@EnableAutoConfiguration
@SpringBootApplication
public class HiveApplication {
public static void main(String[] args) {
SpringApplication.run(HiveApplication.class, args);
}
@Configuration
@Profile("default")
static class LocalConfiguration {
Logger logger = LoggerFactory.getLogger(LocalConfiguration.class);
@Value("${consumerkey}")
private String consumerKey;
@Value("${consumersecret}")
private String consumerSecret;
@Value("${accesstoken}")
private String accessToken;
@Value("${accesstokensecret}")
private String accessTokenSecret;
@Bean
public Twitter twitter() {
Twitter twitter = null;
try {
twitter = new TwitterTemplate(consumerKey, consumerSecret, accessToken, accessTokenSecret);
} catch (Exception e) {
logger.error("Error:", e);
}
return twitter;
}
@Value("${hiveuri}")
private String databaseUri;
@Value("${hivepassword}")
private String password;
@Value("${hiveusername}")
private String username;
@Bean
public DataSource dataSource() {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(databaseUri);
dataSource.setDriverClassName("org.apache.hive.jdbc.HiveDriver");
dataSource.setUsername(username);
dataSource.setPassword(password);
logger.error("Initialized Hive");
return dataSource;
}
}
} Rest Controller This is a Spring Boot class annotated with @RestController. A pretty simple query that can be called from curl or any REST client like AngularJS via $http({method: 'GET', url: '/query/' + $query).success(function(data) {$scope.tweetlist = data; // response data}); ...
@RequestMapping("/query/{query}")
public List<Twitter2> query(@PathVariable(value="query") String query)
{
return dataSourceService.search(query);
}
Datasource Service Just regular plain old JDBC. package com.dataflowdeveloper;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component("DataSourceService")
public class DataSourceService {
Logger logger = LoggerFactory.getLogger(DataSourceService.class);
@Autowired
public DataSource dataSource;
public Twitter2 defaultValue() {
return new Twitter2();
}
@Value("${querylimit}")
private String querylimit;
public List<Twitter2> search(String query) {
..
}
} application.properties Under src/main/resources I have a properties file (could be YAML or properties style) with a few name/value pairs like hivepassword=secretstuff. Maven Build Script I had some issues with Spring Boot, Hadoop and Hive having multiple copies of log4j, so see my POM exclusions to prevent build issues. <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dataflowdeveloper</groupId>
<artifactId>hive</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hive</name>
<description>Apache Hive Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
<type>jar</type>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
<type>jar</type>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-social-twitter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-jdbc-core</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
To Build mvn package -DskipTests To Run I have included Jetty in my POM, so the server runs with Jetty. java -Xms512m -Xmx512m -Dhdp.version=2.4.0.0-169 -Djava.net.preferIPv4Stack=true -jar target/hive-0.0.1-SNAPSHOT.jar IPv4 Stack is required in some networking environments and I set HDP version to my current Sandbox version I am calling. If you are using the sandbox make sure the Thrift port is open and available. You may need more RAM depending on what you are doing. A few gigabytes wouldn't hurt if you have it. [INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.982 s
[INFO] Finished at: 2016-08-26T16:35:56-04:00
[INFO] Final Memory: 28M/447M
[INFO] ------------------------------------------------------------------------ Spring Boot let's you set an ASCII art banner as seen above with src/main/resources/banner.txt. You can see I set the port to 9999 as to not collide with Ambari or other HDP services. 08-26 17:11:28.721 INFO 38783 --- [tp1841396611-12] org.apache.hive.jdbc.Utils : Supplied authorities: localhost:10000
2016-08-26 17:11:28.722 INFO 38783 --- [tp1841396611-12] org.apache.hive.jdbc.Utils : Resolved authority: localhost:10000
2016-08-26 17:11:28.722 INFO 38783 --- [tp1841396611-12] org.apache.hive.jdbc.HiveConnection : Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10000/default
2016-08-26 17:12:24.768 ERROR 38783 --- [tp1841396611-12] com.dataflowdeveloper.DataSourceService : Size=1
2016-08-26 17:12:24.768 ERROR 38783 --- [tp1841396611-12] com.dataflowdeveloper.DataController : Query:hadoop,IP:127.0.0.1 Browser:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 URLs Made Available By Application http://localhost:9999/timeline/<twitter handle> http://localhost:9999/profile/<twitter handle> http://localhost:9999/query/<hive query text> http://localhost:9999/?query=hadoop References https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC
... View more
Labels:
08-27-2016
06:45 PM
4 Kudos
I am running a free mongodb instance (DBaaS) on at mlab.com. From NiFi, I can read from and write to MongoDB very easily. It is a great way to pull data out of a large collection of MongoDB databases; in some startups or enterprises a lot of little MEAN apps have been written and have small silos of data locked in MongoDB datasets. These can be streamed into a data lake very easily with NiFi. Once stored in HDFS, the data can be accessed via SparkSQL, Hive, Zeppelin and other tools very easily. An example of a Twitter tweet as a JSON document in a MongoDB collection in a MongoDB database stored in an online NoSQL store. The nifi flow for storing to MongoDB is trivial. A simple flow to read MongoDB JSON records and land them as JSON in HDFS. Here is an example of another source to store to HDFS or MongoDB for example. We use a GetHTTP processor to access an SSL protected resource. There are a few options for storing something to MongoDB. You need to format the mongodb URI correctly. You need the username:password@yoururl. Then set your database and collection name. Insert mode is most common, but you can do an upsert. There are a few options for writing to MongoDB. Write Concern acknowledged if you want to get an acknowledgement of storing to all nodes in MongoDB cluster. https://docs.mongodb.com/v3.0/reference/write-concern/
... View more
Labels:
08-24-2016
09:40 PM
it's connected the JDK JRE used to run NIFI such as /opt/jdk1.8.0_91/jre/lib/security/cacerts https://docs.oracle.com/cd/E19957-01/817-3331/6miuccqo3/index.html default password changeit it's a JKS SSL requires this in any Java application, it's a thing. The browser does this for you automagically.
... View more
08-23-2016
03:56 PM
You can also easily save to ORC, Parquet or another format. I recommend ORC so you can do fast queries from Hive.
... View more
08-23-2016
03:54 PM
put in a full path, for me running in hadoop it is saved to HDFS, see below. if you are running standalone not compiled with hadoop, it will store to a local file system, probably /<YOURCURRENTUSER>/something or /tmp check the spark history UI
... View more
08-23-2016
03:54 PM
once you do a write (df1.write.format("orc").mode(org.) it will save it to an HDFS or local directory depending if you have hadoop enabled. it will create it under the running user like /root/<your name> or under a spark user directory if that was created. it doesn't hurt to put in a full path for the write like /mystuff/awesome. To read an AVRO file, use the avro tools see here: https://github.com/airisdata/avroparquet
... View more
08-19-2016
12:19 AM
3 Kudos
Use Case: Process a Media Feed, Store Everything, Run Sentiment Analysis on the Stream, and Act on a Condition I have my GetTwitter processor looking at my twitter handle and the keyword Hadoop, something I tend to tweet frequently. I use an EvaluateJsonPath to pull out all the attributes I like (msg, user name, geo information, etc...). I use a AttributesToJSON processor to make a new JSON file from just my attributes for a smaller tweet. I store the raw JSON data in HDFS as well in a separate directory. Sending HTML Email is a bit tricky, you need to make sure you don't include extra text, so nothing in the message but RAW HTML as seen below and don't Attach Files or Include All Attributes in Message. Make sure you set the Content Type to text/html. For sentiment analysis I wanted to run something easy, so I use an ExecuteStreamCommand to run a Python 2.7 script that uses NLTK Vader SentimentIntensityAnalyzer. The NiFi part is easy, just a command and call a shell script. The hard part is setting up Python and NLTK on the HDP 2.4 Sandbox. The NLTK with text corpus for proper analysis is almost 10 gigabytes of data. NLTK for Sentiment Analysis How to Analyze Sentiment If you don't have Python 2.7 or Python 3.4 installed on your box, as my VM had Python 2.6, you need to install Python 2.7 while keeping your existing Python 2.6 for existing application. This is a bit tricky so I have detailed these steps so you will be able to install and run this great ML tool. Directions on how to install Python 2.7 on Centos 6.x can be found here. More details can be found here. sudo yum install -y centos-release-SCL
sudo yum install -y python27
sudo yum groupinstall "Development tools" -y
sudo yum install zlib-devel -y
sudo yum install bzip2-devel -y
sudo yum install openssl-devel -y
sudo yum install ncurses-devel -y
sudo yum install sqlite-devel -y
cd /opt
sudo wget --no-check-certificate https://www.python.org/ftp/python/2.7.6/Python-2.7.6.tar.xz
sudo tar xf Python-2.7.6.tar.xz
cd Python-2.7.6
sudo ./configure --prefix=/usr/local
sudo make && sudo make altinstall
Now we can use: /usr/local/bin/python2.7
/usr/local/bin/python2.7 get-pip.py
wget https://bootstrap.pypa.io/get-pip.py
sudo /usr/local/bin/pip2.7 install -U nltk
sudo /usr/local/bin/pip2.7 install -U numpy
sudo /usr/local/bin/python2.7 -m nltk.downloader -d /usr/local/share/nltk_data all
(almost 10 gig of data)
sudo /usr/local/bin/pip2.7 install vaderSentiment
sudo /usr/local/bin/pip2.7 install twython
The run.sh called from ExecuteStreamCommand. I use the BASH shell script since I want to make sure Python 2.7 is used. There are other ways, but this works for me. /usr/local/bin/python2.7 /opt/demo/sentiment/sentiment.py "$@" That script calls sentiment.py with parameters passed from NiFi. from nltk.sentiment.vader import SentimentIntensityAnalyzer
import sys
sid = SentimentIntensityAnalyzer()
ss = sid.polarity_scores(sys.argv[1])
print('Compound {0} Negative {1} Neutral {2} Positive {3} '.format(ss['compound'],ss['neg'],ss['neu'],ss['pos']))
Once the data is in Hadoop, I was also running a Scala Spark 1.6 with Spark SQL batch job to process Stanford CoreNLP sentiment analysis on it as well. I also tried running that as a Scala Spark 1.6 Spark Streaming that did the same thing but received the data from Kafka (could also receive from NiFi Site-To-Site). Another option is to write a Processor in Java or Scala that can run that as part of the flow. With Apache NiFi, you have a lot of options depending on your needs, all get the features and benefits that only Apache NiFi provides. Now we have a bunch of data! Hooray, both raw and slimmed down. A select portion was converted to HTML and emailed out. Note, I have used Gmail and Outlook.com/Hotmail to send, but they tend to shut you down after a while for spam concerns. I use my own mail server (Dataflowdeveloper.com) since I have full control, you can use your corporate server as long as you have SMTP login and permissions. You may need to check with your administrators on that for firewall, ports and other security precautions. What to do with an HDFS directory full of same schema JSON files from Twitter? I also used a Spark batch job to produce an ORC Hive table with an extra column for Stanford Sentiment. You can quickly run queries on that via beeline, DBVisualizer or Ambari Hive View. beeline
!connect jdbc:hive2://localhost:10000/default;
!set showHeader true;
set hive.vectorized.execution.enabled=true;
set hive.execution.engine=tez;
set hive.vectorized.execution.enabled =true;
set hive.vectorized.execution.reduce.enabled =true;
set hive.compute.query.using.stats=true;
set hive.cbo.enable=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
show tables;
describe sparktwitterorc;
analyze table sparktwitterorc compute statistics;
analyze table sparktwitterorc compute statistics for columns;
I do a one-time compute statistics to enhance performance. That table is now ready for high-speed queries. You can also run a fast Hive query from the command-line: beeline -u jdbc:hive2://localhost:10000/default -e "SELECT * FROM rawtwitter where sentiment is not null and time like 'Thu Aug 18%' and lower(msg) like '%hadoop%' LIMIT 100;"
Spark SQL says my data looks like: |-- coordinates: string (nullable = true)
|-- followers_count: string (nullable = true)
|-- friends_count: string (nullable = true)
|-- geo: string (nullable = true)
|-- handle: string (nullable = true)
|-- hashtags: string (nullable = true)
|-- language: string (nullable = true)
|-- location: string (nullable = true)
|-- msg: string (nullable = true)
|-- place: string (nullable = true)
|-- profile_image_url: string (nullable = true)
|-- retweet_count: string (nullable = true)
|-- sentiment: string (nullable = true)
|-- source: string (nullable = true)
|-- tag: string (nullable = true)
|-- time: string (nullable = true)
|-- time_zone: string (nullable = true)
|-- tweet_id: string (nullable = true)
|-- unixtime: string (nullable = true)
|-- user_name: string (nullable = true)
My Hive Table on this directory of tweets looks like: create table rawtwitter(
handle string,
hashtags string,
msg string,
language string,
time string,
tweet_id string,
unixtime string,
user_name string,
geo string,
coordinates string,
location string,
time_zone string,
retweet_count string,
followers_count string,
friends_count string,
place string,
source string,
profile_image_url string,
tag string,
sentiment string,
stanfordSentiment string
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/social/twitter';
Now you can create charts and graphs from your social data.
... View more
Labels: