1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2470 | 04-03-2024 06:39 AM | |
| 3819 | 01-12-2024 08:19 AM | |
| 2064 | 12-07-2023 01:49 PM | |
| 3045 | 08-02-2023 07:30 AM | |
| 4182 | 03-29-2023 01:22 PM |
08-06-2017
01:40 PM
5 Kudos
Python Word Cloud Integrating existing Python libraries and scripts is very easy in Apache NiFi. I add the library for both version of Python I have on my system, while moving all new scripts to the 3.x branch. Install the library for both Python 2.7 and 3.5 pip install wordcloud
pip3 install wordcloud Example Usage echo "NiFi\nHadoop\nSpark\n" | wordcloud_cli.py --imagefile wordcloud.png For use in NiFi, I wrap my call with a shell script wc.sh echo $1 | tr " " "\n" | wordcloud_cli.py This will build a PNG for me that I can store in a file system or in HDFS, I updated the filename to add png at the end. This will take a parameter to a shell script (our Tweet) and convert it into words usable for a word cloud. You can use other sources or other methods of splitting words. I am pulling twitter messages, so I use ReplaceText to replace the flow file with ${msg} which is just the tweet. Then I execute the Python WordCloud CLI: Example References:
https://amueller.github.io/word_cloud/auto_examples/a_new_hope.html https://amueller.github.io/word_cloud/auto_examples/simple.html#sphx-glr-auto-examples-simple-py https://github.com/amueller/word_cloud
... View more
Labels:
08-06-2017
12:03 AM
4 Kudos
Technology: Python, TensorFlow, Apache Hive, MiniFi, NiFi, HDFS, WebHDFS, Zeppelin, SQL, Raspberry Pi, Pi Camera, S2S. Apache NiFi For Ingest of Images and TensorFlow Analysis from the Edge (Raspberry Pi 3) The Apache NiFi ingestion flow is straightforward. MiniFi sends us flow files over S2S from the RPI which consists of two types of messages. One is a JSON formatted file of metadata and TensorFlow analysis of an image. The second is the actual image captured. We route on the filename attribute to handle each file type appropriately. We send the image to HDFS for storage and retrieval via WebHDFS. The JSON I add a schema and a JSON content-type and split up the file. You see I have some non-JSON junk in there I want pulled out. Then I send my JSON record to QueryRecord to filter out any empty messages. This produces AVRO files from JSON which I convert to ORC and store in HDFS. From there it's easy to query my new deep learning produced multimedia data via standard SQL. Routing by FileName Attribute Split Into Lnes Extract Only JSON Data ORC Configuration Query Record MiniFi Flow Installed on Raspberry Pi The flow on the Pi is simple. We have three processes running. The First is to execute our classify.sh to activate the PiCamera to take a picture and then feed the picture to TensorFlow. The CleanupLogs process is a shell script that deletes old logs. The GetFile reads any image produced by the first shell execute and send the image to NiFi. Hive DDL: CREATE EXTERNAL TABLE IF NOT EXISTS tfimage (image STRING, ts STRING, host STRING, score STRING, human_string STRING, node_id FLOAT) STORED AS ORC
LOCATION '/tfimage' Hive SQL: %jdbc(hive)
select ts, score, human_string,
concat('%html <img width=200 height=200 src="http://princeton10.field.hortonworks.com:50070/webhdfs/v1/tfimagefiles/', SUBSTR(image,18), '?op=OPEN">') as cam_image
from tfimage where image like '%2017%' Shell classify.sh python -W ignore /opt/demo/classify_image.py Modified TensorFlow Example Python # Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Simple image classification with Inception.
Run image classification with Inception trained on ImageNet 2012 Challenge data
set.
This program creates a graph from a saved GraphDef protocol buffer,
and runs inference on an input JPEG image. It outputs human readable
strings of the top 5 predictions along with their probabilities.
Change the --image_file argument to any jpg image to compute a
classification of that image.
Please see the tutorial and website for a detailed description of how
to use this script to perform image recognition.
https://tensorflow.org/tutorials/image_recognition/
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os.path
import re
import sys
import tarfile
import os
import datetime
import math
import random, string
import base64
import json
import time
import picamera
from time import sleep
from time import gmtime, strftime
import numpy as np
from six.moves import urllib
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.ERROR)
FLAGS = None
# pylint: disable=line-too-long
DATA_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz'
# pylint: enable=line-too-long
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
def randomword(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
class NodeLookup(object):
"""Converts integer node ID's to human readable labels."""
def __init__(self,
label_lookup_path=None,
uid_lookup_path=None):
if not label_lookup_path:
label_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_2012_challenge_label_map_proto.pbtxt')
if not uid_lookup_path:
uid_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_synset_to_human_label_map.txt')
self.node_lookup = self.load(label_lookup_path, uid_lookup_path)
def load(self, label_lookup_path, uid_lookup_path):
"""Loads a human readable English name for each softmax node.
Args:
label_lookup_path: string UID to integer node ID.
uid_lookup_path: string UID to human-readable string.
Returns:
dict from integer node ID to human-readable string.
"""
if not tf.gfile.Exists(uid_lookup_path):
tf.logging.fatal('File does not exist %s', uid_lookup_path)
if not tf.gfile.Exists(label_lookup_path):
tf.logging.fatal('File does not exist %s', label_lookup_path)
# Loads mapping from string UID to human-readable string
proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
uid_to_human = {}
p = re.compile(r'[n\d]*[ \S,]*')
for line in proto_as_ascii_lines:
parsed_items = p.findall(line)
uid = parsed_items[0]
human_string = parsed_items[2]
uid_to_human[uid] = human_string
# Loads mapping from string UID to integer node ID.
node_id_to_uid = {}
proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
for line in proto_as_ascii:
if line.startswith(' target_class:'):
target_class = int(line.split(': ')[1])
if line.startswith(' target_class_string:'):
target_class_string = line.split(': ')[1]
node_id_to_uid[target_class] = target_class_string[1:-2]
# Loads the final mapping of integer node ID to human-readable string
node_id_to_name = {}
for key, val in node_id_to_uid.items():
if val not in uid_to_human:
tf.logging.fatal('Failed to locate: %s', val)
name = uid_to_human[val]
node_id_to_name[key] = name
return node_id_to_name
def id_to_string(self, node_id):
if node_id not in self.node_lookup:
return ''
return self.node_lookup[node_id]
def create_graph():
"""Creates a graph from saved GraphDef file and returns a saver."""
# Creates graph from saved graph_def.pb.
with tf.gfile.FastGFile(os.path.join(
FLAGS.model_dir, 'classify_image_graph_def.pb'), 'rb') as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
_ = tf.import_graph_def(graph_def, name='')
def run_inference_on_image(image):
"""Runs inference on an image.
Args:
image: Image file name.
Returns:
Nothing
"""
if not tf.gfile.Exists(image):
tf.logging.fatal('File does not exist %s', image)
image_data = tf.gfile.FastGFile(image, 'rb').read()
# Creates graph from saved GraphDef.
create_graph()
with tf.Session() as sess:
# Some useful tensors:
# 'softmax:0': A tensor containing the normalized prediction across
# 1000 labels.
# 'pool_3:0': A tensor containing the next-to-last layer containing 2048
# float description of the image.
# 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG
# encoding of the image.
# Runs the softmax tensor by feeding the image_data as input to the graph.
softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
predictions = sess.run(softmax_tensor,
{'DecodeJpeg/contents:0': image_data})
predictions = np.squeeze(predictions)
# Creates node ID --> English string lookup.
node_lookup = NodeLookup()
top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1]
row = []
for node_id in top_k:
human_string = node_lookup.id_to_string(node_id)
score = predictions[node_id]
row.append( { 'node_id': node_id, 'image': image, 'host': host, 'ts': currenttime, 'human_string': str(human_string), 'score': str(score)} )
json_string = json.dumps(row)
print( json_string )
def maybe_download_and_extract():
"""Download and extract model tar file."""
dest_directory = FLAGS.model_dir
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = DATA_URL.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' % (
filename, float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
print()
statinfo = os.stat(filepath)
print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def main(_):
maybe_download_and_extract()
# Create unique image name
img_name = '/opt/demo/images/pi_image_{0}_{1}.jpg'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))
# Capture Image from Pi Camera
try:
camera = picamera.PiCamera()
camera.resolution = (1024,768)
camera.annotate_text = " Stored with Apache NiFi "
camera.capture(img_name, resize=(600,400))
pass
finally:
camera.close()
# image = (FLAGS.image_file if FLAGS.image_file else
# os.path.join(FLAGS.model_dir, 'cropped_panda.jpg'))
run_inference_on_image(img_name)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# classify_image_graph_def.pb:
# Binary representation of the GraphDef protocol buffer.
# imagenet_synset_to_human_label_map.txt:
# Map from synset ID to a human readable string.
# imagenet_2012_challenge_label_map_proto.pbtxt:
# Text representation of a protocol buffer mapping a label to synset ID.
parser.add_argument(
'--model_dir',
type=str,
default='/tmp/imagenet',
help=""" Path to classify_image_graph_def.pb,
imagenet_synset_to_human_label_map.txt, and
imagenet_2012_challenge_label_map_proto.pbtxt. """
)
parser.add_argument(
'--image_file',
type=str,
default='',
help='Absolute path to image file.'
)
parser.add_argument(
'--num_top_predictions',
type=int,
default=5,
help='Display this many predictions.'
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
Key Additions to Python row.append( { 'node_id': node_id, 'image': image, 'host': host, 'ts': currenttime, 'human_string': str(human_string), 'score': str(score)} )
json_string = json.dumps(row)
print( json_string ) Image Captured by Camera (Also Added "Stored with Apache NiFi" via Python) References: http://regexr.com/
... View more
07-31-2017
08:10 PM
8 Kudos
In preparing for my talk at DataWorksSummit in Australia, I wanted to try yet another way to integrate Apache NiFi with TensorFlow. The common ways being: calling a Python TensorFlow script from Execute Command, calling a TensorFlow Serving server via gRPC, calling a C++ TensorFlow executable via Execute Command, running TensorFlow on the edge and having MiniFi send it to NiFi or calling a TensorFlowOnSpark job via Kafka, Site-to-Site.
TensorFlow has released a Java API (https://www.tensorflow.org/install/install_java), so I decided to write a quick custom processor to run TensorFlow Inception v3. It's a simple set of dependencies for Maven: <dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tensorflow</groupId>
<artifactId>tensorflow</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
It's easy to add the new processor NiFi. First build it mvn install (see my build script), then deploy it: cp nifi-tensorflow-nar/target/nifi-tensorflow-nar-1.0.nar /Volumes/Transcend/Apps/nifi-1.2.0/lib Once you restart NiFi, you can add the TensorFlow Processor.
An example flow is to the use the very smart ListFile which will iterate through a list of files and keep track of which the last timestamp of files it accessed. So I point to a directory of files and the NiFi processor gets fed a ton of images to very quickly process. This is much faster than my calling out to a script. The result of the run is a new attribute, probabilities, which is a string description of what it could be a confidence percentage as text.
This was the guess for the picture of my RV: Source: https://github.com/tspannhw/nifi-tensorflow-processor Resources: https://community.hortonworks.com/articles/83872/data-lake-30-containerization-erasure-coding-gpu-p.html https://community.hortonworks.com/articles/59349/hdf-20-flow-for-ingesting-real-time-tweets-from-st.html
https://community.hortonworks.com/articles/103863/using-an-asus-tinkerboard-with-tensorflow-and-pyth.html
https://community.hortonworks.com/articles/54954/setting-up-gpu-enabled-tensorflow-to-work-with-zep.html
https://community.hortonworks.com/articles/80339/iot-capturing-photos-and-analyzing-the-image-with.html
https://community.hortonworks.com/articles/58265/analyzing-images-in-hdf-20-using-tensorflow.html
https://community.hortonworks.com/articles/83100/deep-learning-iot-workflows-with-raspberry-pi-mqtt.html
... View more
Labels:
07-06-2017
08:41 PM
4 Kudos
This is a cool HAT that attaches without soldering to a Raspberry Pi 3. The main use is with Google's Android Things which is an IoT OS based on Android that you can run on Raspberry Pi and other devices. You can also use this HAT with regular Raspian and Python. This HAT is cool because it has input and output. As part of our Python script, we ingest data from it's sensor, but we also allow for input to it's A, B and C buttons clearly marked on the top. By selecting one of the buttons you can display the current temperature in Celsius, Fahrenheit or the current altitude. The altitude is some code I found commented out in the Pimoroni Rainbow HAT interface Python script. It determines altitude based on a starting value and pressure readings. Looks kind of cool and is relatively accurate. For point of reference, Princeton New Jersey is pretty close to sea level. My script is heavily copied and customized from my previous IoT Python scripts and from the RainbowHAT examples. This hat has 4 digit display, seven multiple color LEDs, BMP280 temperature and pressure sensor and some more goodies in this tiny device. It's available from AdaFruit. We execute the Python code via a simple shell script wrapper: python /opt/demo/minifi.py Source Code: https://github.com/tspannhw/rpi-rainbowhat Prepare the Pi sudo apt-get purge libreoffice wolfram-engine sonic-pi scratch sudo apt-get autoremove Example Python Debug Output CPU: 45.0 C
Corrected Temp: 23.7 C
Room Temp: 74.7 F
Pressure: 101598.9 Pa
Altitude: 33.2
This is the MiniFi flow that I modeled in Apache NiFi, then export as XML. Then use the below shell script to convert into config.yml format to transfer to my device running Apache MiniFi Java agent. This is a simple flow to add a schema name, tell NiFi it's a JSON stream and then query it. If the data has temperatures above a threshold (checked via inline SQL) write the AVRO file to ORC and store in HDFS (automagically building an external Hive table on top of it). Schema to put in Hortonworks Schema Registry or NiFi AVRO Schema Registry {"type":"record","namespace":"hortonworks.hdp.refapp.rainbow","name":"rainbow","fields":[{"name": "tempf", "type": "float"},{ "name": "cputemp", "type": "float"},{"name":"ts","type":"string"},{ "name": "pressure","type": "float"},{ "name": "host","type": "string"},{ "name": "tempf2","type": "float"},{"name": "ipaddress","type": "string"},{"name": "temp","type": "float"},{ "name": "altitude","type": "float"}]} It's pretty easy to define an AVRO schema (in JSON format), like above). Feel free to use the above as a starting point. Strings and floats work very well, keep it simple and leave out extra whitespace. Build Your Configuration minifi-toolkit-1.0.3.0.0.0-453/bin/config.sh transform RainbowForMiniFi.xml config.yml nifireceiverrainbow.xml rainbowforminifi.xml Coming Soon Raspberry Pi 3 running Android Things and Java Connecting to Rainbow Hat, TensorFlow and MiniFi. References:
https://github.com/pimoroni/rainbow-hat https://github.com/androidthings/weatherstation https://en.wikipedia.org/wiki/QNH https://en.wikipedia.org/wiki/Pascal_(unit) https://www.mide.com/pages/air-pressure-at-altitude-calculator https://hackernoon.com/trying-out-the-android-things-weatherstation-codelab-d3f260b59c2f https://codelabs.developers.google.com/codelabs/androidthings-weatherstation/index.html https://shop.pimoroni.com/products/rainbow-hat-for-android-things https://learn.pimoroni.com/tutorial/sandyj/getting-started-with-rainbow-hat-in-python Part of My IoT MiniFi Series (Details Installation and Setup)
https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html https://community.hortonworks.com/content/kbentry/108966/minifi-for-sensor-data-ingest-from-devices.html https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html
... View more
Labels:
07-06-2017
07:10 PM
2 Kudos
I am always ingesting social data, IoT and mobile streams into my HDFS cluster. Unfortunately my cluster is a ephemeral cloud based Hortonworks HDP 2.6 Hadoop cluster, so I don't have a permanent store for my data. I have my processes run for a few weeks and then they are destroyed.
I wanted to quick way to save all my ORC files.
Enter NiFi.
Backup
First we list from some top level directories in HDFS to capture all the files and sub-directories we want to backup. Each processor maintains a timestamp to know what files it processed already, as new files are added they will be assimilated into the flow.
For massive data migration, we can run this on many nodes and use the Distributed Cache service to maintain the state.
Restore
The flow is very simple to restore, read from the local file system and write to HDFS. For HDFS, I use /${path} as the directory so each file is written to the correct sub-directory for it's file group. Easy it's like rsync, but it's Tim Sync. Make sure you have your Hadoop configuration file set. If you are using Kerberos make sure you set your principal and keytab, be very careful for Case-Sensitivity!
For Restore it doesn't get any simpler, I use GetFile to read from the local file system. As part of that those files are deleted, I have a big USB 3.0 drive that I want to keep them, so I copy them to a different directory for later storage. I should probably compress those. Once they get large enough I may invest in a local RPI Storage array running HDP 2.6 with some attached Western Digital PiDrives.
The Data Provenance of one of the flowfiles shows the path and filename. It makes it very easy to move between say S3, on-premise HDFS, local file systems, cloud file systems, jump drives or wherever. Your data is yours, take it with you.
... View more
06-30-2017
04:07 PM
8 Kudos
From Database MetaData to Data Ingest in a Few Quick Steps
A company wants to know when new tables are added to a JDBC source (say a RDBMS). Using the
ListDatabaseTables processor we can get a list of TABLEs (also views, system tables and other database objects), but for our purposes we want tables with data. I have used the ngdbc.jar from SAP HANA to connect and query tables with ease.
For today's example I am connecting to MySQL as I have a MySQL database available for use and modification.
Pre-Step
mysql -u root -p test < person.sql
CREATE USER 'nifi'@'%' IDENTIFIED BY 'reallylongDifficultPassDF&^D&F^Dwird';
GRANT ALL PRIVILEGES ON *.* TO 'nifi'@'%' WITH GRANT OPTION;
COMMIT;
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| MOCK_DATA |
| mock2 |
| personReg |
| visitor |
+----------------+
4 rows in set (0.00 sec)
I created a user to use for my JDBC Connection Pool in NiFi to read the metadata and data.
These table names will show up in NiFi in the
db.table.name attribute.
Step 1: ListDatabaseTables: Let's get a list of all the tables in MySQL for the database we have chosen.
After it starts running you can check it's
state and see what tables were ingested and the most recent timestamp (Value).
We will get back what
catalog we read from, how many tables and each tablename and it's fullname.
HDF NiFi supports generic JDBC drivers and specific coding for Oracle, MS SQL Server 2008 and MS SQL Server 2012+.
Step 2: GenerateTableFetch using the table name returned from the list returned by the database control.
Step 3: We use extract text to get the SQL statement created by generate table fetch.
We add a new attribute,
sql, with value, ^(.*).
Step 4: ExecuteSQL with that $sql attribute.
Step 5: Convert AVRO files produced by ExecuteSQL into performant Apache ORC Files
Step 6: PutHDFS to store these ORC files in Hadoop
I added the table name as part of the directory structure so a new directory is created for each transferred table. Now we have dynamic HDFS directory structure creation.
Step 7: Replace Text to build a SQL statement that will generate an external Hive table on our new ORC directory
Step 8: PutHiveQL to execute the statement that was just dynamically created for our new table.
We no have instantly queryable Hadoop data available to Hive, SparkSQL, Zeppelin, ODBC, JDBC and a ton of BI tools and interfaces.
Step 9: Finally we can look at the data that we have ingested from MySQL into new Hive tables.
That was easy, the best part is as new tables are added to MySQL, they will be autoingested into HDFS and Hive tables built.
Future Updates:
Use Hive Merge to update changed data. We can also ingest to Phoenix/HBase and use the upsert DML.
Test with other databases. Tested with MySQL.
Quick Tip (HANA):
In NiFi, refer to tables with their full name in quotes:
"SAP_"."ASDSAD.SDASD.ASDAD"
References:
https://blogs.sap.com/2017/04/20/how-to-send-data-from-apache-nifi-to-hana/
https://community.hortonworks.com/questions/45934/how-to-connect-nifi-with-hana.html
https://community.hortonworks.com/articles/81153/basic-cdc-using-apache-nifi-and-sap-hana.html
https://community.hortonworks.com/articles/92496/qadcdc-our-how-to-ingest-some-database-tables-to-h.html
https://community.hortonworks.com/articles/64122/incrementally-streaming-rdbms-data-to-your-hadoop.html
... View more
Labels:
06-16-2017
04:49 PM
2 Kudos
See
Part 1: https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html
See Part 2: https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html
We build a flow in Apache NiFi and then export the template. Using the MiniFi tool we convert this into a config.yaml file and send it to our device via scp. You can see this in Part 1. This simple flow calls a shell script that will run a Python script to get our sensor data. This flow will then send the data to our NiFi server via S2S over HTTP.
What I have added in this part is use of the new Record and Schema paradigms and also the ability to SQL queries against incoming flow files using
QueryRecord. This requires building an AVRO schema for our data, which is dead easy JSON definition.
We set out port to connect the minifi agent to our server.
Data quickly starts coming in.
Receive the JSON Messages in NiFi via S2S
Top Steps to Ingest Sensor Day in a Few Hours
Connect to the port Set a schema to pull from the registry, also set a mime-type for JSON Query the flow file and just take ones over 65 degrees Fahrenheit via Apache Calcite processed SQL These produces an AVRO file using the AvoRecordSetWriter and the schema from the AvroSchemaRegistry I store the AVRO file produced to HDFS I store the raw JSON file sent in to HDFS I convert the AVRO file to ORC I store the ORC files to HDFS I grab the autogenerated hive.ddl to create the external tables. I query my sensor data in Zeppelin
hive.ddl Automagically Generated Hive Schema
CREATE EXTERNAL TABLE IF NOT EXISTS sensor (tempf FLOAT, cputemp FLOAT, ts STRING, pressure FLOAT, host STRING, pitch FLOAT, ipaddress STRING, temp FLOAT, diskfree STRING, yaw FLOAT, humidity FLOAT, memory FLOAT, y FLOAT, x FLOAT, z FLOAT, roll FLOAT) STORED AS ORC
I grab the HDFS location and add that to the DDL: LOCATION '/sensor'.
For the AVRO and JSON versions of the data, I make similar tables.
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/jsonsensor';
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS AVRO
LOCATION '/avrosensor';
Install Libraries (See Part 1 for MiniFi install)
pip install --upgrade sense-hat<br>pip install --upgrade pillow<br>pip install rtimulib
pip install psutil
sudo apt-get install oracle-java8-jdk
Shell Script
python /opt/demo/rpi-sensehat-mqtt-nifi/sense2.py
Python Script
from sense_hat import SenseHat
import json
import sys, socket
import os
import psutil
import subprocess
import time
import datetime
from time import sleep
from time import gmtime, strftime
# get data
#current_milli_time = lambda: int(round(time.time() * 1000))
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
rasp = ('armv' in os.uname()[4])
cpu = psutil.cpu_percent(interval=1)
if rasp:
f = open('/sys/class/thermal/thermal_zone0/temp', 'r')
l = f.readline()
ctemp = 1.0 * float(l)/1000
usage = psutil.disk_usage("/")
mem = psutil.virtual_memory()
diskrootfree = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
mempercent = mem.percent
external_IP_and_port = ('198.41.0.4', 53) # a.root-servers.net
socket_family = socket.AF_INET
#p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
#out, err = p.communicate()
def IP_address():
try:
s = socket.socket(socket_family, socket.SOCK_DGRAM)
s.connect(external_IP_and_port)
answer = s.getsockname()
s.close()
return answer[0] if answer else None
except socket.error:
return None
ipaddress = IP_address()
sense = SenseHat()
sense.clear()
temp = sense.get_temperature()
temp = round(temp, 2)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
orientation = sense.get_orientation()
pitch = orientation['pitch']
roll = orientation['roll']
yaw = orientation['yaw']
acceleration = sense.get_accelerometer_raw()
x = acceleration['x']
y = acceleration['y']
z = acceleration['z']
#cputemp = out
x=round(x, 0)
y=round(y, 0)
z=round(z, 0)
pitch=round(pitch,0)
roll=round(roll,0)
yaw=round(yaw,0)
row = { 'ts': currenttime, 'host': host, 'memory': mempercent, 'diskfree': diskrootfree, 'cputemp': round(ctemp,2), 'ipaddress': ipaddress, 'temp': temp, 'tempf': round(((temp * 1.8) + 12),2), 'humidity': humidity, 'pressure': pressure, 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z }
json_string = json.dumps(row)
print(json_string)
One Record (JSON)
{"tempf": 75.14, "temp": 35.08, "pitch": 1.0, "diskfree": "1211.8 MB", "yaw": 55.0, "cputemp": 52.08, "ts": "2017-06-16 17:39:08", "humidity": 41.5, "pressure": 0.0, "host": "picroft", "memory": 23.0, "y": 0.0, "x": -1.0, "z": 1.0, "ipaddress": "192.168.1.156", "roll": 1.0}
AVRO Schema (JSON Format)
{"type":"record","namespace":"hortonworks.hdp.refapp.sensehat","name":"sensehat","fields":[{"name": "tempf", "type": "float"},{ "name": "cputemp", "type": "float"},{"name":"ts","type":"string"},{ "name": "pressure","type": "float"},{ "name": "host","type": "string"},{ "name": "pitch","type": "float"},{"name": "ipaddress","type": "string"},{"name": "temp","type": "float"},{ "name": "diskfree","type": "string"},{ "name": "yaw","type": "float" },{"name": "humidity","type": "float"},{"name": "memory","type": "float"},{"name": "y", "type": "float"},{"name": "x", "type": "float" },{"name": "z","type": "float"},{"name": "roll", "type": "float"}]}
config.yml
MiNiFi Config Version: 2
Flow Controller:
name: sense hat
comment: sense hat 2017
Core Properties:
flow controller graceful shutdown period: 10 sec
flow service write delay interval: 500 ms
administrative yield duration: 30 sec
bored yield duration: 10 millis
max concurrent threads: 1
FlowFile Repository:
partitions: 256
checkpoint interval: 2 mins
always sync: false
Swap:
threshold: 20000
in period: 5 sec
in threads: 1
out period: 5 sec
out threads: 4
Content Repository:
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false
Provenance Repository:
provenance rollover time: 1 min
Component Status Repository:
buffer size: 1440
snapshot frequency: 1 min
Security Properties:
keystore: ''
keystore type: ''
keystore password: ''
key password: ''
truststore: ''
truststore type: ''
truststore password: ''
ssl protocol: ''
Sensitive Props:
key:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors:
- id: db6fbd3b-ddf4-3041-0000-000000000000
name: ExecuteProcess
class: org.apache.nifi.processors.standard.ExecuteProcess
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 60 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Argument Delimiter: ' '
Batch Duration:
Command: /opt/demo/rpi-sensehat-mqtt-nifi/sense2.sh
Command Arguments:
Redirect Error Stream: 'true'
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: 5635290a-4cb6-3da7-0000-000000000000
name: minifiSenseHat
source id: db6fbd3b-ddf4-3041-0000-000000000000
source relationship names:
- success
destination id: 166616e3-1962-1660-2b7c-2f824584b23a
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups:
- id: fdc45649-84be-374b-0000-000000000000
name: ''
url: http://hw13125.local:8080/nifi
comment: ''
timeout: 30 sec
yield period: 10 sec
transport protocol: HTTP
Input Ports:
- id: 166616e3-1962-1660-2b7c-2f824584b23a
name: MiniFi SenseHat
comment: ''
max concurrent tasks: 1
use compression: false
Build our MiniFi Configuration File from the sensorminif.xml minifi-toolkit-1.0.2.1.4.0-5/bin/config.sh transform sensorminifi.xml config.yml Then just SCP to your device.
Flows
sensornifi.xml sensorminifi.xml
Source Repository
https://github.com/tspannhw/rpi-sensehat-minifi-python Example MiniFi Log dResourceClaim[id=1497645887239-1, container=default, section=1], offset=2501, length=278],offset=0,name=13917785142443,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 116 milliseconds at a rate of 2.32 KB/sec
2017-06-16 20:54:41,827 INFO [Provenance Maintenance Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 3162
2017-06-16 20:54:41,844 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully merged 16 journal files (3 records) into single Provenance Log File provenance_repository/3159.prov in 33 milliseconds
2017-06-16 20:54:41,846 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 5 records
2017-06-16 20:54:43,288 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@87eb01 checkpointed with 0 Records and 0 Swap Files in 100 milliseconds (Stop-the-world time = 13 milliseconds, Clear Edit Logs time = 10 millis), max Transaction ID -1
2017-06-16 20:54:48,429 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2017-06-16 20:54:48,890 INFO [pool-23-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@17461db checkpointed with 0 Records and 0 Swap Files in 460 milliseconds (Stop-the-world time = 190 milliseconds, Clear Edit Logs time = 77 millis), max Transaction ID 2107
2017-06-16 20:54:48,891 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 461 milliseconds
2017-06-16 20:54:51,482 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@f69f9d Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-16 20:55:07,621 INFO [Timer-Driven Process Thread-9] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes:
PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
2017-06-16 20:55:07,957 INFO [Timer-Driven Process Thread-9] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi SenseHat,target=http://hw13125.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=b3bcd211-7425-4750-9e4c-ba2d477b9cc1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1497645887239-1, container=default, section=1], offset=2779, length=278],offset=0,name=13979556432846,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 114 milliseconds at a rate of 2.38 KB/sec
Check the Status of MiniFi root@picroft:/opt/demo/minifi-1.0.2.1.4.0-5# bin/minifi.sh flowStatus processor:db6fbd3b-ddf4-3041-0000-000000000000:health,stats,bulletins
minifi.sh: JAVA_HOME not set; results may vary
Bootstrap Classpath: /opt/demo/minifi-1.0.2.1.4.0-5/conf:/opt/demo/minifi-1.0.2.1.4.0-5/lib/bootstrap/*:/opt/demo/minifi-1.0.2.1.4.0-5/lib/*
Java home:
MiNiFi home: /opt/demo/minifi-1.0.2.1.4.0-5
Bootstrap Config File: /opt/demo/minifi-1.0.2.1.4.0-5/conf/bootstrap.conf
FlowStatusReport{controllerServiceStatusList=null, processorStatusList=[{name='ExecuteProcess', processorHealth={runStatus='Running', hasBulletins=false, validationErrorList=[]}, processorStats={activeThreads=0, flowfilesReceived=0, bytesRead=0, bytesWritten=1390, flowfilesSent=0, invocations=5, processingNanos=9290051632}, bulletinList=[]}], connectionStatusList=null, remoteProcessGroupStatusList=null, instanceStatus=null, systemDiagnosticsStatus=null, reportingTaskStatusList=null, errorsGeneratingReport=[]}
Output Displayed in Apache Zeppelin Workbook
Using the DDL generated by Apache NiFi, we can create external Hive tables for the raw JSON data, the ORC cleaned up version of the data and also an AVRO version of the data.
We can then query our datasets.
References:
https://cwiki.apache.org/confluence/display/Hive/AvroSerDe https://json-schema-validator.herokuapp.com/avro.jsp https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_minifi-administration/content/ch01s01.html https://community.hortonworks.com/articles/55839/reading-sensor-data-from-remote-sensors-on-raspber.html https://www.thepolyglotdeveloper.com/2016/08/connect-multiple-wireless-networks-raspberry-pi/ https://community.hortonworks.com/articles/83100/deep-learning-iot-workflows-with-raspberry-pi-mqtt.html
... View more
Labels:
06-16-2017
02:27 PM
5 Kudos
See Part 1: https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html In this article, we are ingesting beacon data using an ASUS Tinkerboard which is very similar to a Raspberry PI in form, form factor and features. It is newer and has less peripherals, but has 2 gig of RAM, ARM Mali GPU T760 MP4, Quad-core ARM Cortex-A17 1.8Ghz processor with a dual-channel DDR3 memory and BLE built-in (Bluetooth 4.0 with EDR). For beacons, I am using the basic Estimate three beacon collection. They have a really nice solid beacon in a silicon case, that looks nice and works well. They have a mobile app to manage, monitor and simulate as well as a cloud app. From our Apache NiFi server we have an Input Port that is receiving the messages from the Java MiniFi agent running on the ASUS Tinkerboard. As with most of the data, I want to ingest the data every minute. The ASUS Tinkerboard device has the read light lit up. The Estimate beacons are the colored devices to the right. For Tinkerboard's the default username is linaro. Requires BlueTooth Reading Libraries in Linux sudo apt-get -y install bluetooth bluez libbluetooth-dev libudev-dev bluez-hcidump python-bluez
sudo apt-get -y update
sudo apt-get -y install python-dev python3-dev
pip install beacontools[scan]
The built-in BlueTooth is hci0. You will need to have Python 2.7 and/or Python 3.5 installed. Install Oracle JDK 8 sudo apt-get -y install oracle-java8-jdk Shell Script to ExcuteProcess /opt/demo/py-decode-beacon/runble.sh python bluez_scan.py Python Script See: Python beacon advertisement decoder
Copyright (c) 2015 Patrick Van Oosterwijck https://github.com/adamf/BLE/blob/master/ble-scanner.py and https://github.com/xorbit/py-decode-beacon Starting MiniFi on Tinkerboard bin/minifi.sh start Diagnostics on MiniFi ./bin/minifi.sh flowStatus systemdiagnostics:heap,processorstats,contentrepositoryusage,flowfilerepositoryusage,garbagecollection Logs From MiniFi 2017-06-12 17:13:49,873 INFO [Timer-Driven Process Thread-9] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi Tinker,target=http://hw13125.local:8080/nifi/] Successfully sent [StandardFlowFileRecord[uuid=98de3195-8be3-4433-840b-c84b9a84fb6f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1497282405127-2, container=default, section=2], offset=729236, length=16331],offset=0,name=98638611491740,size=16331]] (15.95 KB) to http://HW13125.local:8080/nifi-api in 138 milliseconds at a rate of 114.88 KB/sec
2017-06-12 17:13:57,918 INFO [Provenance Maintenance Thread-3] o.a.n.p.MiNiFiPersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 2365
2017-06-12 17:13:57,962 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully merged 16 journal files (3 records) into single Provenance Log File provenance_repository/2362.prov in 243 milliseconds
2017-06-12 17:13:57,971 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 5 records
2017-06-12 17:14:04,671 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@944ade Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-12 17:14:21,554 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2017-06-12 17:14:26,057 INFO [pool-23-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@541c15 checkpointed with 0 Records and 0 Swap Files in 4502 milliseconds (Stop-the-world time = 3887 milliseconds, Clear Edit Logs time = 549 millis), max Transaction ID 985
2017-06-12 17:14:26,059 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 4503 milliseconds
2017-06-12 17:14:42,783 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@3e088f checkpointed with 0 Records and 0 Swap Files in 97 milliseconds (Stop-the-world time = 49 milliseconds, Clear Edit Logs time = 33 millis), max Transaction ID -1
2017-06-12 17:15:05,438 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@944ade Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-12 17:15:49,116 INFO [Timer-Driven Process Thread-5] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes:
PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
It's a very simple model, just execute the Python and send the resultant JSON to NiFi for processing and storage. The same code will also work on a Raspberry Pi and for most similar Linux devices. ASUS Tinkerboard is running TinkerOS_Debian V1.8, not Raspian. References:
https://community.hortonworks.com/articles/103863/using-an-asus-tinkerboard-with-tensorflow-and-pyth.html https://dzone.com/articles/using-tinkerboard-with-tensorflow-and-python https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_minifi-administration/content/ch01s01.html https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_minifi-quick-start/content/ch_minifi-quick-start.html https://community.hortonworks.com/articles/56341/getting-started-with-minifi.html https://github.com/hipol/EstimoteServer https://github.com/taka-wang/py-beacon https://learn.adafruit.com/bluefruit-le-python-library https://github.com/citruz/beacontools https://pypi.python.org/pypi/beacontools/1.0.1 https://github.com/adamf/BLE/blob/master/ble-scanner.py https://github.com/xorbit/py-decode-beacon https://cloud.estimote.com/docs/ https://github.com/estimote/estimote-specs https://evothings.com/doc/examples/ibeacon-scan.html https://github.com/switchdoclabs/iBeacon-Scanner- http://www.switchdoc.com/2014/08/ibeacon-raspberry-pi-scanner-python/ https://github.com/mlwelles/BeaconScanner https://github.com/dburr/linux-ibeacon https://github.com/beaconinside/awesome-beacon https://github.com/adafruit/Adafruit_Python_BluefruitLE https://www.asus.com/us/Single-Board-Computer/Tinker-Board/ https://www.asus.com/us/Single-Board-Computer/Tinker-Board/HelpDesk_Download/
... View more
Labels:
06-12-2017
08:45 PM
6 Kudos
MiniFi For Image Capture and Ingestion from Raspberry PI Our Apache NiFi 1.2 instance is waiting for data to be pushed from MiniFi instance. We export this to an XML template to convert to YAML. minifi-toolkit-1.0.2.1.4.0-5/bin/config.sh transform MinifiImage.xml config.yml Then I scp over the YAML file to my MiniFi RPI 3. A clean running MiniFi instance. Not there will be some gzipped logs showing up in there. Grab all the images that are available (make sure they are complete and at least 30 seconds old and bigger than 4096 bytes). Schedule when you want things to run. Our use case is one picture a minute. Make sure you map your remote process group to the correct Input Port!
MiniFi Setup Make sure you have Oracle's JDK 8 installed and in the path. export JAVA_HOME=/usr/lib/jvm/jdk-8-oracle-arm32-vfp-hflt/ bin/minifi.sh start Check the Logs tail -f log/minifi-app.log Check the Status bin/minifi.sh flowStatus controllerservices:health,bulletins Test your Camera raspistill -o cam4.jpg Python Code #!/usr/bin/python
# heavily remixed existing code
import os
import datetime
import traceback
import math
import random, string
import base64
import picamera
from time import sleep
from time import gmtime, strftime
def randomword(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
# Create unique image name
img_name = '/opt/demo/images/pi_image_{0}_{1}.jpg'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))
# Capture Image from Pi Camera
try:
camera = picamera.PiCamera()
camera.annotate_text = " Stored with Apache NiFi "
camera.capture(img_name, resize=(800,600))
pass
finally:
camera.close()
Create an image using the Raspberry Pi official camera (I am using Noir). Add image text to each image. I place that script in a shell script and call it from MiniFi. I could use an execute script, I'll do that in the next one. /opt/demo/runimage.sh Using MiniFi to send over my images is easier (no weird conversions of binary data), safer, includes more provenance and is preferred over sending with MQTT. If my machine is too tiny (Onion Omega) or locked down, I would use MQTT. References https://docs.hortonworks.com/HDPDocuments/HDF2/HDF-2.1.4/bk_minifi-quick-start/content/ch_minifi-quick-start.html https://docs.hortonworks.com/HDPDocuments/HDF2/HDF-2.1.4/bk_release-notes/content/index.html https://docs.hortonworks.com/HDPDocuments/HDF2/HDF-2.1.2/bk_dataflow-user-guide/content/configure-site-to-site-client-nifi-instance.html https://community.hortonworks.com/articles/16461/nifi-understanding-how-to-use-process-groups-and-r.html https://www.ncdc.noaa.gov/cdo-web/datasets https://github.com/tspannhw/rpi-picamera-mqtt-nifi https://community.hortonworks.com/articles/56341/getting-started-with-minifi.html http://www.pyimagesearch.com/2015/03/30/accessing-the-raspberry-pi-camera-with-opencv-and-python/ https://community.hortonworks.com/content/kbentry/77988/ingest-remote-camera-images-from-raspberry-pi-via.html https://community.hortonworks.com/articles/83100/deep-learning-iot-workflows-with-raspberry-pi-mqtt.html
... View more
Labels:
05-30-2017
01:42 PM
what are your settings for minio? and you must be running a minio server and have permissions to it. you need to set your access and secret keys and host base and host bucket
... View more