1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2453 | 04-03-2024 06:39 AM | |
| 3801 | 01-12-2024 08:19 AM | |
| 2049 | 12-07-2023 01:49 PM | |
| 3032 | 08-02-2023 07:30 AM | |
| 4152 | 03-29-2023 01:22 PM |
10-05-2016
01:39 PM
5 Kudos
1. Acquire an EDI File (GetFile, GetFTP, GetHTTP, GetSFTP, Fetch...)
2. Install open source nifi-edireader on NIFI 1.0.0
Download https://github.com/BerryWorksSoftware/edireader
Maven Install Berry Works EDIReader
Download https://github.com/mrcsparker/nifi-edireader-bundle
Maven packge nifi-edireader (must be Maven 3.3 or newer - may have to download and install separately from standard linux package)
cp nifi-edireader-nar/target/nifi-edireader-nar-0.0.1.nar to your NIFI/lib
Restart NiFi Service
3. Add EdiXML Processor and connect from EDI File input
4. Add extra processing, conversion or routing (TransformXML with XSLT or EvaluateXPATH) to convert to JSON
5. Land to HDFS (PutHDFS)
6. Used the Web Form linked below to generate a test EDI file.
ISA*00* *00* *ZZ*SENDER ID *ZZ*RECEIVER ID *010101*0101*U*00401*000000001*0*T*!
GS*IN*SENDER ID*APP RECEIVER*01010101*01010101*1*X*004010
ST*810*0001
BIG*20021208*00001**A999
N1*ST*Timothy Spann*9*122334455
N3*115 xxx ave
N4*xxxtown*nj*08520
N1*BT*Hortonworks*9*122334455
N3*5470 GREAT AMERICA PARKWAY
N4*santa clara*CA*95054
ITD*01*3*2**30**30*****60
FOB*PP
IT1**1*EA*200**UA*EAN
PID*F****Lamp
IT1**4*EA*50**UA*EAN
PID*F****Chair
TDS*2000
CAD*****Routing
ISS*30*CA
CTT*50
SE*19*0001
GE*1*1
IEA*1*000000001
7. Converted to XML
<?xml version="1.0" encoding="UTF-8"?>
<ediroot>
<interchange Standard="ANSI X.12"
AuthorizationQual="00"
Authorization=" "
SecurityQual="00"
Security=" "
Date="010101"
Time="0101"
StandardsId="U"
Version="00401"
Control="000000001"
AckRequest="0"
TestIndicator="T">
<sender>
<address Id="SENDER ID " Qual="ZZ"/>
</sender>
<receiver>
<address Id="RECEIVER ID " Qual="ZZ"/>
</receiver>
<group GroupType="IN"
ApplSender="SENDER ID"
ApplReceiver="APP RECEIVER"
Date="01010101"
Time="01010101"
Control="1"
StandardCode="X"
StandardVersion="004010">
<transaction DocType="810" Name="Invoice" Control="0001">
<segment Id="BIG">
<element Id="BIG01">20021208</element>
<element Id="BIG02">00001</element>
<element Id="BIG04">A999</element>
</segment>
<loop Id="N1">
<segment Id="N1">
<element Id="N101">ST</element>
<element Id="N102">Timothy Spann</element>
<element Id="N103">9</element>
<element Id="N104">122334455</element>
</segment>
<segment Id="N3">
<element Id="N301">115 xxx ave</element>
</segment>
<segment Id="N4">
<element Id="N401">xxxstown</element>
<element Id="N402">nj</element>
<element Id="N403">08520</element>
</segment>
</loop>
<loop Id="N1">
<segment Id="N1">
<element Id="N101">BT</element>
<element Id="N102">Hortonworks</element>
<element Id="N103">9</element>
<element Id="N104">122334455</element>
</segment>
<segment Id="N3">
<element Id="N301">5470 GREAT AMERICA PARKWAY</element>
</segment>
<segment Id="N4">
<element Id="N401">santa clara</element>
<element Id="N402">CA</element>
<element Id="N403">95054</element>
</segment>
</loop>
<segment Id="ITD">
<element Id="ITD01">01</element>
<element Id="ITD02">3</element>
<element Id="ITD03">2</element>
<element Id="ITD05">30</element>
<element Id="ITD07">30</element>
<element Id="ITD12">60</element>
</segment>
<segment Id="FOB">
<element Id="FOB01">PP</element>
</segment>
<loop Id="IT1">
<segment Id="IT1">
<element Id="IT102">1</element>
<element Id="IT103">EA</element>
<element Id="IT104">200</element>
<element Id="IT106">UA</element>
<element Id="IT107">EAN</element>
</segment>
<loop Id="PID">
<segment Id="PID">
<element Id="PID01">F</element>
<element Id="PID05">Lamp</element>
</segment>
</loop>
</loop>
<loop Id="IT1">
<segment Id="IT1">
<element Id="IT102">4</element>
<element Id="IT103">EA</element>
<element Id="IT104">50</element>
<element Id="IT106">UA</element>
<element Id="IT107">EAN</element>
</segment>
<loop Id="PID">
<segment Id="PID">
<element Id="PID01">F</element>
<element Id="PID05">Chair</element>
</segment>
</loop>
</loop>
<segment Id="TDS">
<element Id="TDS01">2000</element>
</segment>
<segment Id="CAD">
<element Id="CAD05">Routing</element>
</segment>
<loop Id="ISS">
<segment Id="ISS">
<element Id="ISS01">30</element>
<element Id="ISS02">CA</element>
</segment>
</loop>
<segment Id="CTT">
<element Id="CTT01">50</element>
</segment>
</transaction>
</group>
</interchange>
</ediroot>
Resources
https://github.com/mrcsparker/nifi-edireader-bundle
https://github.com/BerryWorksSoftware/edireader
https://en.wikipedia.org/wiki/Electronic_data_interchange
https://en.wikipedia.org/wiki/EDIFACT
https://en.wikipedia.org/wiki/FORTRAS
http://databene.org/edifatto.html
https://sourceforge.net/projects/edifatto/
https://secure.edidev.net/edidev-ca/samples/vbNetGen/WebFrmNetGen.aspx (Generate example EDI)
... View more
Labels:
10-01-2016
11:13 PM
2 Kudos
I ran the same flow myself and examined the AVRO file in HDFS using AVRO Cli. Even though I didn't specify SNAPPY compression, it was there in the file. java -jar avro-tools-1.8.0.jar getmeta 23568764174290.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
avro.codec snappyavro.schema {"type":"record","name":"people","doc":"Schema generated by Kite","fields":[{"name":"id","type":"long","doc":"Type inferred from '2'"},{"name":"first_name","type":"string","doc":"Type inferred from 'Gregory'"},{"name":"last_name","type":"string","doc":"Type inferred from 'Vasquez'"},{"name":"email","type":"string","doc":"Type inferred from 'gvasquez1@pcworld.com'"},{"name":"gender","type":"string","doc":"Type inferred from 'Male'"},{"name":"ip_address","type":"string","doc":"Type inferred from '32.8.254.252'"},{"name":"company_name","type":"string","doc":"Type inferred from 'Janyx'"},{"name":"domain_name","type":"string","doc":"Type inferred from 'free.fr'"},{"name":"file_name","type":"string","doc":"Type inferred from 'NonMauris.xls'"},{"name":"mac_address","type":"string","doc":"Type inferred from '03-FB-66-0F-20-A3'"},{"name":"user_agent","type":"string","doc":"Type inferred from '\"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_7;'"},{"name":"lat","type":"string","doc":"Type inferred from ' like Gecko) Version/5.0.4 Safari/533.20.27\"'"},{"name":"long","type":"double","doc":"Type inferred from '26.98829'"}]} It's hard coded in NIFI. https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java It always adds SnappyCompression to every AVRO file. No options. writer.setCodec(CodecFactory.snappyCodec()); Make sure you have a schema set: Record schema Record Schema: ${inferred.avro.schema} If you can make everything Strings and convert to other types later, you will be happier. References: https://www.linkedin.com/pulse/converting-csv-avro-apache-nifi-jeremy-dyer https://community.hortonworks.com/questions/44063/nifi-avro-to-csv-or-json-to-csvnifi-convert-avro-t.html https://community.hortonworks.com/articles/28341/converting-csv-to-avro-with-apache-nifi.html
... View more
Labels:
09-30-2016
10:40 PM
Here's the simple zeppelin file. twitter-from-strata-hadoop-processing.txt
Rename that as .JSON. For security, don't upload/download are working with .JS or .JSON fies.
... View more
09-30-2016
09:19 PM
5 Kudos
I had a few hours in the morning before the conference schedule kicked in, so I decided to write a little HDF 2.0 flow to grab all the tweets about Strata Hadoop conference. First up, I used GetTwitter to read tweets and filtered on these terms: strata,stratahadoop,strataconf,NIFI,FutureOfData,ApacheNiFi,Hortonworks,Hadoop,ApacheHive,HBase,ApacheSpark,ApacheTez,MachineLearning,ApachePhoenix,ApacheCalcite,ApacheStorm,ApacheAtlas,ApacheKnox,Apache Ranger, HDFS, Apache Pig, Accumulo, Apache Flume, Sqoop, Apache Falcon Input: InvokeHttp: I used this to download the first image URL from tweets. GetTwitter: This is our primary source of data and the most important. You must have a twitter account, a twitter developer account and create a twitter application. Then you can access the keywords and hashtags above. So far I've ingested 14,211 tweets into Phoenix. This included many times I've shut it down for testing and moving things around. I've had this run live as I've added pieces. I do not recommend this development process, but it's good for exploring data. Processing: RouteOnAttribute: To only process tweets with an actual messages, sometimes they are damaged or missing. Don't waste our time. ExecuteStreamCommand: To call shell scripts that call TensorFlow C++ binaries and Python scripts. Many ways to do this, but this is the easiest. UpdateAttribute: To change the file name for files I downloaded to HDFS. For output sinks: PutHDFS: Saved to HDFS in a few different directories (the first attached image); the raw JSON tweet, a limited set of fields such as handle, message, geolocation and a fully processed file that I added TensorFlow Inception v3 image recognition for images attached to Strata tweets and sentiment analysis using VADER on the text of the tweet. PutSQL: I upserted all tweets that were enriched with HDF called TensorFlow & Python Sentiment Analysis into a Phoenix Table; PutSlack: https://nifi-se.slack.com/messages/general/ Visualization: There are a ton of ways to look at this data now. I used Apache Zeppelin since it was part of my HDP 2.5 cluster and it's so easy to use. I added a few tables, charts and did quick SQL exploration of the data in Phoenix. Linux Shell Scripts source /usr/local/lib/bazel/bin/bazel-complete.bash
export JAVA_HOME=/opt/jdk1.8.0_101/
/bin/rm -rf /tmp/$@
hdfs dfs -get /twitter/rawimage/$@ /tmp/
/opt/demo/tensorflow/bazel-bin/tensorflow/examples/label_image/label_image --image="/tmp/$@" --output_layer="softmax:0" --input_layer="Mul:0" --input_std=128 --input_mean=128 --graph=/opt/demo/tensorflow/tensorflow/examples/label_image/data/tensorflow_inception_graph.pb --labels=/opt/demo/tensorflow/tensorflow/examples/label_image/data/imagenet_comp_graph_label_strings.txt 2>&1| cut -c48-
/bin/rm -rf /tmp/$@
python /opt/demo/sentiment/sentiment2.py "$@" Python Script If you have Python 2.7 installed, in previous articles I have shown how to install PiP and NLTK. Very easy to do some simple Sentiment Analysis. I also have a version where I just return the polarity_scores (compound, negative, neutral and positive). from nltk.sentiment.vader import SentimentIntensityAnalyzer
import sys
sid = SentimentIntensityAnalyzer()
ss = sid.polarity_scores(sys.argv[1])
if ss['compound'] == 0.00:
print('Neutral')
elif ss['compound'] < 0.00:
print ('Negative')
else:
print('Positive') NIFI 1.0.0 Flow Template tweetnyc.xml
Resources: http://conferences.oreilly.com/strata/hadoop-big-data-ny/public/schedule/grid/public/ http://hortonworks.com/downloads/#dataflow http://futureofdata.io/ http://www.meetup.com/futureofdata-princeton/
... View more
Labels:
09-25-2016
02:10 PM
5 Kudos
As part of a live Drone ingest, I want to identify what in the image. The metadata provides a ton of information on GPS, altitude and image characteristics, but not what's in the image. IBM, Microsoft and Google all have APIs that do a good job of this and they for the most part of "free" tiers. I wanted to run something locally using libraries installed on my cluster. For my first option, I used TensorFlow Inception-v3
Image Recognition. In future articles I will cover PaddlePaddle, OpenCV and some other Deep Learning and non-deep learning options for Image Recognition. Also I will show the entire Drone to Front-End flow including Phoenix, Spring Boot, Zeppelin, LeafletJS and more. This will be done as part of a meetup presentation with a certified drone pilot. To Run My TensorFlow Binary From HDF 2.0 I use the ExecuteStreamCommand to run a shell script containing the information below: source /usr/local/lib/bazel/bin/bazel-complete.bash
export JAVA_HOME=/opt/jdk1.8.0_101/
hdfs dfs -get /drone/raw/$@ /tmp/
/opt/demo/tensorflow/bazel-bin/tensorflow/examples/label_image/label_image --image="/tmp/$@" --output_layer="softmax:0" --input_layer="Mul:0" --input_std=128 --input_mean=128 --graph=/opt/demo/tensorflow/tensorflow/examples/label_image/data/tensorflow_inception_graph.pb --labels=/opt/demo/tensorflow/tensorflow/examples/label_image/data/imagenet_comp_graph_label_strings.txt 2>&1| cut -c48- In my script I pull the file out of HDFS (that was loaded by HDF 2.0) and then run the binary versino of TensorFlow that I compiled for Centos7. If you can't or don't want to install Bezel and build that, they you can run the python script, it's a little bit slower and has slightly different output. python /usr/lib/python2.7/site-packages/tensorflow/models/image/imagenet/classify_image.py --image_file The C++ version that I compiled is Google's example and you can take a look at it: https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/label_image/README.md https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/label_image/main.cc It's very clean code if you wish to tweak it. Installing TensorFlow You must have JDK 1.8 (and know the path), not just JRE 1.8. Also you need Python 2.7 or 3.+ and PIP. You need to install Google's Bezel build tool. sudo yum groupinstall "Development Tools"
sudo yum install gettext-devel openssl-devel perl-CPAN perl-devel zlib-develsudo yum -y install epel_release
sudo yum -y install gcc gcc-c++ python-pip python-devel atlas atlas-devel gcc-gfortran openssl-devel libffi-devel
pip install --upgrade numpy scipy wheel cryptography
export TF_BINARY_URL=https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.10.0-cp27-none-linux_x86_64.whl
pip install --upgrade https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.10.0-cp27-none-linux_x86_64.whl
git clone https://github.com/tensorflow/tensorflow
cd tensorflow/
wget https://storage.googleapis.com/download.tensorflow.org/models/inception_dec_2015.zip -O tensorflow/examples/label_image/data/inception_dec_2015.zip
unzip tensorflow/examples/label_image/data/inception_dec_2015.zip -d tensorflow/examples/label_image/data/
cd tensorflow/examples/label_image
/opt/demo/bazel/output/bazel label_image
Run Results of TensorFlow python classify_image.py --image_file /opt/demo/dronedataold/Bebop2_20160920083655-0400.jpg
solar dish, solar collector, solar furnace (score = 0.98316)
window screen (score = 0.00196)
manhole cover (score = 0.00070)
radiator (score = 0.00041)
doormat, welcome mat (score = 0.00041)
bazel-bin/tensorflow/examples/label_image/label_image --image=/opt/demo/dronedataold/Bebop2_20160920083655-0400.jpg
solar dish (577): 0.983162I
window screen (912): 0.00196204I
manhole cover (763): 0.000704005I
radiator (571): 0.000408321I
doormat (972): 0.000406186
The image is a picture of solar panels on a residential black tar roof. Resources https://www.tensorflow.org/versions/r0.10/tutorials/index.html https://www.tensorflow.org/versions/r0.10/tutorials/image_recognition/index.html http://hoolihan.net/blog-tim/2016/03/02/installing-tensorflow-on-centos/ https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html http://neuralnetworksanddeeplearning.com/chap1.html
http://colah.github.io/posts/2014-07-Conv-Nets-Modular/ http://neuralnetworksanddeeplearning.com/chap6.html https://www.tensorflow.org/versions/r0.10/tutorials/deep_cnn/index.html https://www.tensorflow.org/versions/r0.10/tutorials/mnist/beginners/index.html http://image-net.org/ https://www.bazel.io/versions/master/docs/install.html https://github.com/bazelbuild/bazel/releases http://tecadmin.net/install-java-8-on-centos-rhel-and-fedora/ https://cloud.google.com/vision/ https://www.microsoft.com/cognitive-services/en-us/computer-vision-api http://www.ibm.com/watson/developercloud/visual-recognition/api/v3/#introduction
... View more
Labels:
09-15-2016
02:15 AM
Flow File: sensor.xml
... View more
09-14-2016
11:54 PM
3 Kudos
If you have seen my article on Microservice on Hive, this is the Phoenix version. Phoenix seems to be a better option for REST microservice. I like having HBase as my main data store, it's very performant and highly scalable for application style queries. See: https://community.hortonworks.com/articles/53629/writing-a-spring-boot-microservices-to-access-hive.html This microservices is a Spring Boot REST service on top of the data loaded by this NiFi data flow. See: https://community.hortonworks.com/content/kbentry/54947/reading-opendata-json-and-storing-into-phoenix-tab.html Pom <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dataflowdeveloper</groupId>
<artifactId>phoenix</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>phoenix</name>
<description>Apache Hbase Phoenix Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.4.0-HBase-1.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>1.4.0.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
<scope>provided</scope>
<version>1.4.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
<type>jar</type>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-social-twitter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-jdbc-core</artifactId>
<version>1.2.1.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Follow this Maven build very carefully, it is very specific version. src/main/resources/application.properties purl=jdbc:phoenix:serverIP:2181:/hbase-unsecure
pdriver=org.apache.phoenix.jdbc.PhoenixDriver
Java Bean to Hold Philly Crime Data public class PhillyCrime implements Serializable {
private String dcDist;
private String dcKey;
private String dispatchDate;
private String dispatchDateTime;
private String dispatchTime;
private String hour;
private String locationBlock;
private String psa;
private String textGeneralCode;
private String ucrGeneral;
} Application Class to Bootstrap Spring Boot package com.dataflowdeveloper;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.social.twitter.api.Twitter;
import org.springframework.social.twitter.api.impl.TwitterTemplate;
@Configuration
@ComponentScan
@EnableAutoConfiguration
@SpringBootApplication
public class HBaseApplication {
public static void main(String[] args) {
SpringApplication.run(HBaseApplication.class, args);
}
@Configuration
@Profile("default")
static class LocalConfiguration {
Logger logger = LoggerFactory.getLogger(LocalConfiguration.class);
@Value("${consumerkey}")
private String consumerKey;
@Value("${consumersecret}")
private String consumerSecret;
@Value("${accesstoken}")
private String accessToken;
@Value("${accesstokensecret}")
private String accessTokenSecret;
@Bean
public Twitter twitter() {
Twitter twitter = null;
try {
twitter = new TwitterTemplate(consumerKey, consumerSecret, accessToken, accessTokenSecret);
} catch (Exception e) {
logger.error("Error:", e);
}
return twitter;
}
@Value("${purl}")
private String databaseUri;
@Bean
public Connection connection() {
Connection con = null;
try {
con = DriverManager.getConnection(databaseUri);
} catch (SQLException e) {
e.printStackTrace();
logger.error("Connection fail: ", e);
}
//dataSource.setDriverClassName("org.apache.phoenix.jdbc.PhoenixDriver");
logger.error("Initialized hbase");
return con;
}
}
}
Phoenix Query Server does not require a Connection Pool! DataSourceService package com.dataflowdeveloper;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component("DataSourceService")
public class DataSourceService {
Logger logger = LoggerFactory.getLogger(DataSourceService.class);
@Autowired
public Connection connection;
// default to empty
public PhillyCrime defaultValue() {
return new PhillyCrime();
}
// querylimit
@Value("${querylimit}")
private String querylimit;
/**
*
* @param query
* - search msg
* @return List of Twitter2
*/
public List<PhillyCrime> search(String query) {
List<PhillyCrime> crimes = new ArrayList<>();
String sql = "";
try {
logger.error("Query: " + query);
logger.error("Limit:" + querylimit);
if ( connection == null ) {
logger.error("Null connection");
return crimes;
}
if ( query == null || query.trim().length() <= 0 ) {
query = "";
sql = "select * from phillycrime";
}
else {
query = "%" + query.toUpperCase() + "%";
sql = "select * from phillycrime WHERE upper(text_general_code) like ? LIMIT ?";
}
PreparedStatement ps = connection
.prepareStatement(sql);
if ( query.length() > 1 ) {
ps.setString(1, query);
ps.setInt(2, Integer.parseInt(querylimit));
}
ResultSet res = ps.executeQuery();
PhillyCrime crime = null;
while (res.next()) {
crime = new PhillyCrime();
crime.setDcKey(res.getString("dc_key"));
crime.setDcDist(res.getString("dc_dist"));
crime.setDispatchDate(res.getString("dispatch_date"));
crime.setDispatchDateTime(res.getString("dispatch_date_time"));
crime.setDispatchTime(res.getString("dispatch_time"));
crime.setHour(res.getString("hour"));
crime.setLocationBlock(res.getString("location_block"));
crime.setPsa(res.getString("psa"));
crime.setTextGeneralCode(res.getString("text_general_code"));
crime.setUcrGeneral(res.getString("ucr_general"));
crimes.add(crime);
}
res.close();
ps.close();
connection.close();
res = null;
ps = null;
connection = null;
crime = null;
logger.error("Size=" + crimes.size());
} catch (Exception e) {
e.printStackTrace();
logger.error("Error in search", e);
}
return crimes;
}
}
This class does your basic JDBC SQL queries and return. Spring Boot Rest Controller import java.util.List;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
@RestController
public class DataController {Logger logger = LoggerFactory.getLogger(DataController.class);
@Autowired
private DataSourceService dataSourceService;
@RequestMapping("/query/{query}")
public List<PhillyCrime> query(
@PathVariable(value="query") String query)
{
List<PhillyCrime> value = dataSourceService.search(query);
return value;
}
To Build mvn package -DskipTests To Run java -Xms512m -Xmx2048m -Dhdp.version=2.4.0.0-169 -Djava.net.preferIPv4Stack=true -jar target/phoenix-0.0.1-SNAPSHOT.jar
To check Your Phoenix Data From the Command Line (2181 is the zookeeper port) /usr/hdp/current/phoenix-client/bin/sqlline.py server:2181:/hbase
Phoenix Table DDL CREATE TABLE phillycrime (dc_dist varchar,
dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar,
text_general_code varchar,ucr_general varchar); Results 2016-09-14 20:09:25.135 INFO 11937 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2016-09-14 20:09:25.150 INFO 11937 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2016-09-14 20:09:25.275 INFO 11937 --- [ main] application : Initializing Spring FrameworkServlet 'dispatcherServlet'
2016-09-14 20:09:25.275 INFO 11937 --- [ main] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started
2016-09-14 20:09:25.294 INFO 11937 --- [ main] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 19 ms
2016-09-14 20:09:25.322 INFO 11937 --- [ main] org.mortbay.log : Logging to Logger[org.mortbay.log] via org.mortbay.log.Slf4jLog
2016-09-14 20:09:25.333 INFO 11937 --- [ main] o.e.jetty.server.AbstractConnector : Started ServerConnector@7a55af6b{HTTP/1.1,[http/1.1]}{0.0.0.0:9999}
2016-09-14 20:09:25.335 INFO 11937 --- [ main] .s.b.c.e.j.JettyEmbeddedServletContainer : Jetty started on port(s) 9999 (http/1.1)
2016-09-14 20:09:25.339 INFO 11937 --- [ main] com.dataflowdeveloper.HBaseApplication : Started HBaseApplication in 13.783 seconds (JVM running for 14.405)
2016-09-14 20:09:37.961 ERROR 11937 --- [tp1282287470-17] com.dataflowdeveloper.DataSourceService : Query: Theft
2016-09-14 20:09:37.961 ERROR 11937 --- [tp1282287470-17] com.dataflowdeveloper.DataSourceService : Limit:250
2016-09-14 20:09:39.050 ERROR 11937 --- [tp1282287470-17] com.dataflowdeveloper.DataSourceService : Size=250
2016-09-14 20:09:39.050 ERROR 11937 --- [tp1282287470-17] com.dataflowdeveloper.DataController : Query:Theft,IP:127.0.0.1 Browser:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 Connection Handshake for App 2016-09-14 20:09:18.042 INFO 11937 --- [ main] o.a.h.h.zookeeper.RecoverableZooKeeper : Process identifier=hconnection-0x4116aac9 connecting to ZooKeeper ensemble=server:2181
2016-09-14 20:09:18.042 INFO 11937 --- [ main] org.apache.zookeeper.ZooKeeper : Initiating client connection, connectString=server:2181 sessionTimeout=90000 watcher=hconnection-0x4116aac90x0, quorum=server:2181, baseZNode=/hbase-unsecure
2016-09-14 20:09:18.044 INFO 11937 --- [1.1.1.1:2181)] org.apache.zookeeper.ClientCnxn : Opening socket connection to server 192.11.1.5:2181. Will not attempt to authenticate using SASL (unknown error)
2016-09-14 20:09:18.163 INFO 11937 --- [1.1.1.1:2181)] org.apache.zookeeper.ClientCnxn : Socket connection established to 192.11.1.:2181, initiating session
2016-09-14 20:09:18.577 INFO 11937 --- [26.195.56:2181)] org.apache.zookeeper.ClientCnxn : Session establishment complete on server :2181, sessionid = 0x157063034991d14, negotiated timeout = 40000
2016-09-14 20:09:19.953 INFO 11937 --- [ main] nectionManager$HConnectionImplementation : Closing master protocol: MasterService
2016-09-14 20:09:19.953 INFO 11937 --- [ main] nectionManager$HConnectionImplementation : Closing zookeeper sessionid=0x157063034991d14
2016-09-14 20:09:20.040 INFO 11937 --- [ main] org.apache.zookeeper.ZooKeeper : Session: 0x157063034991d14 closed
2016-09-14 20:09:20.040 INFO 11937 --- [ain-EventThread] org.apache.zookeeper.ClientCnxn : EventThread shut down
2016-09-14 20:09:20.470 INFO 11937 --- [ main] o.a.p.query.ConnectionQueryServicesImpl : Found quorum: server.com:2181
2016-09-14 20:09:20.471 INFO 11937 --- [ main] o.a.h.h.zookeeper.RecoverableZooKeeper : Process identifier=hconnection-0x36b4fe2a connecting to ZooKeeper ensemble=tspanndev10.field.hortonworks.com:2181
2016-09-14 20:09:20.471 INFO 11937 --- [ main] org.apache.zookeeper.ZooKeeper : Initiating client connection, connectString=tspanndev10.field.hortonworks.com:2181 sessionTimeout=90000 watcher=hconnection-0x36b4fe2a0x0, quorum=tspanndev10.field.hortonworks.com:2181, baseZNode=/hbase-unsecure
2016-09-14 20:09:20.472 INFO 11937 --- [222.2.2.2.2.:2181)] org.apache.zookeeper.ClientCnxn : Opening socket connection to server 1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-09-14 20:09:20.555 INFO 11937 --- [26:2181)] org.apache.zookeeper.ClientCnxn : Socket connection established to 172....:2181, initiating session
2016-09-14 20:09:20.641 INFO 11937 --- [22.2.2:2181)] org.apache.zookeeper.ClientCnxn : Session establishment complete on server 2.2.2/2.2.2:2181, sessionid = 0x157063034991d15, negotiated timeout = 40000
Reference: http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.0/bk_data-access/content/ch_using-phoenix.html https://community.hortonworks.com/articles/19016/connect-to-phoenix-hbase-using-dbvisualizer.html
... View more
Labels:
09-14-2016
11:29 PM
To install a Mosquitto MQTT Server on Centos7 yum -y install unzip Step 1: Add the CentOS 7 mosquitto repository cd /etc/yum.repos.d wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo sudo yum update Step 2: Install mosquitto & mosquitto-clients sudo yum install -y mosquitto mosquitto-clients Step 3: Run mosquitto sudo su
/usr/sbin/mosquitto -d -c /etc/mosquitto/mosquitto.conf > /var/log/mosquitto.log 2>&1
... View more
09-14-2016
02:59 AM
3 Kudos
Running Spark Jobs Through Apache Beam on HDP 2.5 Yarn Cluster
Using the Spark Runner with Apache Beam Apache Beam is still in incubator and not supported on HDP 2.5 or other platforms. sudo yum -y install git
wget http://www.gtlib.gatech.edu/pub/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz After you get Maven downloaded, move it to /opt/demo/maven or into your path. The maven download mirror will change, so grab a new URL from http://maven.apache.org/. Using Yum will give you an older Maven not supported and may interfere with something else. So I recommend getting a new Maven just for this build. Make sure you have Java 7 or greater, which you should have on an Apache machine. I am recommending Java 8 on your new HDP 2.5 nodes if possible. cd /opt/demo/
git clone https://github.com/apache/incubator-beam
cd incubator-beam
/opt/demo/maven/bin/mvn clean install -DskipTests If you want to run this on Spark 2.0 and not Spark 1.6.2, look here for changing environment: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.0/bk_spark-component-guide/content/spark-choose-version.html For HDP 2.5, these are the parameters: spark-submit --class org.apache.beam.runners.spark.examples.WordCount --master yarn-client target/beam-runners-spark-0.3.0-incubating-SNAPSHOT-spark-app.jar --inputFile=kinglear.txt --output=out --runner=SparkRunner --sparkMaster=yarn-client Note, I had to change the parameters to get this to work in my environment. You may also need to do /opt/demo/maven/bin/mvn package from the /opt/demo/incubator-beam/runners/spark directory. This is running a Java 7 example from the built-in examples: https://github.com/apache/incubator-beam/tree/master/examples/java These are the results of running our small Spark job. 16/09/14 02:35:08 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 34.0 KB, free 518.7 KB) 16/09/14 02:35:08 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.26.195.58:39575 (size: 34.0 KB, free: 511.1 MB) 16/09/14 02:35:08 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1008 16/09/14 02:35:08 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[14] at mapToPair at TransformTranslator.java:568) 16/09/14 02:35:08 INFO YarnScheduler: Adding task set 1.0 with 2 tasks 16/09/14 02:35:08 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, tspanndev13.field.hortonworks.com, partition 0,NODE_LOCAL, 1994 bytes) 16/09/14 02:35:08 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, tspanndev13.field.hortonworks.com, partition 1,NODE_LOCAL, 1994 bytes) 16/09/14 02:35:08 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on tspanndev13.field.hortonworks.com:36438 (size: 34.0 KB, free: 511.1 MB) 16/09/14 02:35:08 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on tspanndev13.field.hortonworks.com:36301 (size: 34.0 KB, free: 511.1 MB) 16/09/14 02:35:08 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to tspanndev13.field.hortonworks.com:52646 16/09/14 02:35:08 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 177 bytes 16/09/14 02:35:08 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to tspanndev13.field.hortonworks.com:52640 16/09/14 02:35:09 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 681 ms on tspanndev13.field.hortonworks.com (1/2) 16/09/14 02:35:09 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 1112 ms on tspanndev13.field.hortonworks.com (2/2) 16/09/14 02:35:09 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 16/09/14 02:35:09 INFO DAGScheduler: ResultStage 1 (saveAsNewAPIHadoopFile at TransformTranslator.java:745) finished in 1.113 s 16/09/14 02:35:09 INFO DAGScheduler: Job 0 finished: saveAsNewAPIHadoopFile at TransformTranslator.java:745, took 5.422285 s 16/09/14 02:35:09 INFO SparkRunner: Pipeline execution complete. 16/09/14 02:35:09 INFO SparkContext: Invoking stop() from shutdown hook [root@tspanndev13 spark]# hdfs dfs -ls
Found 5 items drwxr-xr-x - root hdfs 0 2016-09-14 02:35 .sparkStaging
-rw-r--r-- 3 root hdfs 0 2016-09-14 02:35 _SUCCESS
-rw-r--r-- 3 root hdfs 185965 2016-09-14 01:44 kinglear.txt
-rw-r--r-- 3 root hdfs 27304 2016-09-14 02:35 out-00000-of-00002
-rw-r--r-- 3 root hdfs 26515 2016-09-14 02:35 out-00001-of-00002
[root@tspanndev13 spark]# hdfs dfs -cat out-00000-of-00002
oaths: 1
bed: 7
hearted: 5
warranties: 1
Refund: 1
unnaturalness: 1
sea: 7
sham'd: 1
Only: 2
sleep: 8
sister: 29
Another: 2
carbuncle: 1 As you can see as expected it produced the two part output file in HDFS with wordcounts. Not much configuration is required to run your Apache Beam Java jobs on your HDP 2.5 YARN Spark Cluster, so if you have a development cluster, this would be a great place to try it out. Our on your own HDP 2.5 sandbox. Resources: http://beam.incubator.apache.org/learn/programming-guide/ https://github.com/apache/incubator-beam/tree/master/runners/spark
... View more
Labels:
09-09-2016
08:13 PM
10 Kudos
Sensor Reading with Apache NiFi 1.0.0 There are many types of sensors, devices and meters that can be great sources of data. Some can push data, some can pull data, some provide APIs, some give you access to install software.
How To Access Sensors One option is to install MiNiFi on the device if you have root access. This will provide fast access to allow you to script and manage your local data. Another option for bigger devices is to install a full Java based NiFi node. It starts becoming harder once you have tens of thousands of devices. You can install an HDF Edge and communicate from this node to your HDF cluster via Site-to-Site protocol. From this Edge Node that acts as an accumulator for many devices (a good idea so that you don't send 10,000 network requests a second from each set of devices, keep as much traffic locally to save time, time-outs, networking and cloud costs). You can also now aggregate and send larger batches of data and also process some summaries and aggregates locally in NiFi. This will also let you populate local databases, dashboards and statistics that may only be of interest to the local source of the sensors (perhaps a plant manager or automated monitoring system). Another option is to have devices push or pull to a local or remote NiFi install via various protocols including TCP/IP, UDP/IP, REST HTTP, JMS, MQTT, SFTP and Email. Device Push to NiFi Your device can send messages to NiFi via any number of protocols listed. For my example, I push via MQTT. My local NiFi node will consume these messages via ConsumeMQTT.
Reference: Paho-MQTT Your device will need to run Linux (or something related), have Python 2.7 or better and PiP installed. With Pip, you can install the Eclipse library that you need to send MQTT messages. pip install paho-mqtt import paho.mqtt.client as paho
client = paho.Client()
client.connect("servername", 1883, 60)
client.publish("sensor", payload="Test", qos=0, retain=True)
Where "servername" is the name of the server you are sending the message to (it could also be on the NiFi Node, another server, a bigger device, a central aggregator or messaging server). I would recommend having it as close in the network as possible. "sensor" is the name of the topic that we are publishing the message to, NiFi will consume this message. I have cron job setup to run every minute and publish messages (* * * * * /opt/demo/sendit.sh ) NiFi Poll Device NiFi can poll your device and consume from various protocols like JMS, MQTT, SFTP, TCP and UDP. For my example, I chose a REST API over HTTP to get past hurdles of firewalls and such. I setup a Flask Server on RPI, to run my REST API, I run this in a shell script. export FLASK_APP=hello.py
flask run --host=0.0.0.0 --port=8888 --no-debugger
To install Flask, you need to run pip install flask
Sensor Reading Code #!flask/bin/python
from flask import Flask, jsonify
import sys
import time
import datetime
import subprocess
import sys
import urllib2
import json
import paho.mqtt.client as paho
from sense_hat import SenseHat
sense = SenseHat()
sense.clear()
app = Flask(__name__)
@app.route('/pi/api/v1.0/sensors', methods=['GET'])
def get_sensors():
p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = p.communicate()
temp = sense.get_temperature()
temp = round(temp, 1)
temph = sense.get_temperature_from_humidity()
temph = round(temph, 1)
tempp = sense.get_temperature_from_pressure()
tempp = round(tempp, 1)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
tasks = [ { 'tempp': tempp, 'temph': temph, 'cputemp': out, 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]
# As an option we can push this message when we get called as well
client = paho.Client()
client.connect("mqttmessageserver", 1883, 60)
client.publish("sensor", payload=jsonify({'readings': tasks}), qos=0, retain=True)
return jsonify({'readings': tasks})
@app.route('/pi/api/v1.0/location', methods=['GET'])
def get_loc():
orientation = sense.get_orientation()
pitch = orientation['pitch']
roll = orientation['roll']
yaw = orientation['yaw']
acceleration = sense.get_accelerometer_raw()
x = acceleration['x']
y = acceleration['y']
z = acceleration['z']
x=round(x, 0)
y=round(y, 0)
z=round(z, 0)
tasks = [ { 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z } ]
return jsonify({'readings': tasks})
@app.route('/pi/api/v1.0/show', methods=['GET'])
def get_pi():
temp = sense.get_temperature()
temp = round(temp, 1)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
# 8x8 RGB
sense.clear()
info = 'T(C): ' + str(temp) + 'H: ' + str(humidity) + 'P: ' + str(pressure)
sense.show_message(info, text_colour=[255, 0, 0])
sense.clear()
tasks = [ { 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]
return jsonify({'readings': tasks})
if __name__ == '__main__':
app.run(debug=True)
The device I am testing is a Raspberry Pi 3 Model B with a Sense Hat sensor attachment. Besides having sensors for temperature, humidity and barometric pressures it also has a 8x8 light grid for displaying text and simple graphics. We can use this to print messages (sense.show_message) or warnings that we send from NiFi. This allows for 2 way very visceral communication to remote devices. This could be used to notify local personnel of conditions.
nifi 1.0.0 Flows
JSON File Landed in HDFS in our HDP 2.5 Cluster [root@myserverhdp sensors]# hdfs dfs -ls /sensor
Found 2 items
-rw-r--r-- 3 root hdfs 202 2016-09-09 17:26 /sensor/181528179026826
drwxr-xr-x - hdfs hdfs 0 2016-09-09 15:43 /sensor/failure
[root@tspanndev13 sensors]# hdfs dfs -cat /sensor/181528179026826
{
"readings": [
{
"cputemp": "temp=55.8'C\n",
"humidity": 40.8,
"pressure": 1014.1,
"temp": 40.0,
"tempf": 84.0,
"temph": 40.0,
"tempp": 39.1
}
]
}
The final results of our flow is a JSON file on HDFS. We could easily send a copy of the data to Phoenix via PutSQL or to Hive via PutHiveQL or to Spark Streaming for additional processing via Site-To-Site or Kafka.
Resources: https://github.com/topshed/RPi_8x8GridDraw https://www.raspberrypi.org/learning/sense-hat-data-logger/worksheet/ https://www.raspberrypi.org/learning/astro-pi-flight-data-analysis/worksheet/ https://www.raspberrypi.org/learning/astro-pi-guide/sensors/temperature.md https://breadfit.wordpress.com/2015/06/24/installing-mosquitto-under-centos/
... View more
Labels: