1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1855 | 04-03-2024 06:39 AM | |
| 2886 | 01-12-2024 08:19 AM | |
| 1594 | 12-07-2023 01:49 PM | |
| 2359 | 08-02-2023 07:30 AM | |
| 3248 | 03-29-2023 01:22 PM |
08-06-2017
12:03 AM
4 Kudos
Technology: Python, TensorFlow, Apache Hive, MiniFi, NiFi, HDFS, WebHDFS, Zeppelin, SQL, Raspberry Pi, Pi Camera, S2S. Apache NiFi For Ingest of Images and TensorFlow Analysis from the Edge (Raspberry Pi 3) The Apache NiFi ingestion flow is straightforward. MiniFi sends us flow files over S2S from the RPI which consists of two types of messages. One is a JSON formatted file of metadata and TensorFlow analysis of an image. The second is the actual image captured. We route on the filename attribute to handle each file type appropriately. We send the image to HDFS for storage and retrieval via WebHDFS. The JSON I add a schema and a JSON content-type and split up the file. You see I have some non-JSON junk in there I want pulled out. Then I send my JSON record to QueryRecord to filter out any empty messages. This produces AVRO files from JSON which I convert to ORC and store in HDFS. From there it's easy to query my new deep learning produced multimedia data via standard SQL. Routing by FileName Attribute Split Into Lnes Extract Only JSON Data ORC Configuration Query Record MiniFi Flow Installed on Raspberry Pi The flow on the Pi is simple. We have three processes running. The First is to execute our classify.sh to activate the PiCamera to take a picture and then feed the picture to TensorFlow. The CleanupLogs process is a shell script that deletes old logs. The GetFile reads any image produced by the first shell execute and send the image to NiFi. Hive DDL: CREATE EXTERNAL TABLE IF NOT EXISTS tfimage (image STRING, ts STRING, host STRING, score STRING, human_string STRING, node_id FLOAT) STORED AS ORC
LOCATION '/tfimage' Hive SQL: %jdbc(hive)
select ts, score, human_string,
concat('%html <img width=200 height=200 src="http://princeton10.field.hortonworks.com:50070/webhdfs/v1/tfimagefiles/', SUBSTR(image,18), '?op=OPEN">') as cam_image
from tfimage where image like '%2017%' Shell classify.sh python -W ignore /opt/demo/classify_image.py Modified TensorFlow Example Python # Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Simple image classification with Inception.
Run image classification with Inception trained on ImageNet 2012 Challenge data
set.
This program creates a graph from a saved GraphDef protocol buffer,
and runs inference on an input JPEG image. It outputs human readable
strings of the top 5 predictions along with their probabilities.
Change the --image_file argument to any jpg image to compute a
classification of that image.
Please see the tutorial and website for a detailed description of how
to use this script to perform image recognition.
https://tensorflow.org/tutorials/image_recognition/
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os.path
import re
import sys
import tarfile
import os
import datetime
import math
import random, string
import base64
import json
import time
import picamera
from time import sleep
from time import gmtime, strftime
import numpy as np
from six.moves import urllib
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.ERROR)
FLAGS = None
# pylint: disable=line-too-long
DATA_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz'
# pylint: enable=line-too-long
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
def randomword(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
class NodeLookup(object):
"""Converts integer node ID's to human readable labels."""
def __init__(self,
label_lookup_path=None,
uid_lookup_path=None):
if not label_lookup_path:
label_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_2012_challenge_label_map_proto.pbtxt')
if not uid_lookup_path:
uid_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_synset_to_human_label_map.txt')
self.node_lookup = self.load(label_lookup_path, uid_lookup_path)
def load(self, label_lookup_path, uid_lookup_path):
"""Loads a human readable English name for each softmax node.
Args:
label_lookup_path: string UID to integer node ID.
uid_lookup_path: string UID to human-readable string.
Returns:
dict from integer node ID to human-readable string.
"""
if not tf.gfile.Exists(uid_lookup_path):
tf.logging.fatal('File does not exist %s', uid_lookup_path)
if not tf.gfile.Exists(label_lookup_path):
tf.logging.fatal('File does not exist %s', label_lookup_path)
# Loads mapping from string UID to human-readable string
proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
uid_to_human = {}
p = re.compile(r'[n\d]*[ \S,]*')
for line in proto_as_ascii_lines:
parsed_items = p.findall(line)
uid = parsed_items[0]
human_string = parsed_items[2]
uid_to_human[uid] = human_string
# Loads mapping from string UID to integer node ID.
node_id_to_uid = {}
proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
for line in proto_as_ascii:
if line.startswith(' target_class:'):
target_class = int(line.split(': ')[1])
if line.startswith(' target_class_string:'):
target_class_string = line.split(': ')[1]
node_id_to_uid[target_class] = target_class_string[1:-2]
# Loads the final mapping of integer node ID to human-readable string
node_id_to_name = {}
for key, val in node_id_to_uid.items():
if val not in uid_to_human:
tf.logging.fatal('Failed to locate: %s', val)
name = uid_to_human[val]
node_id_to_name[key] = name
return node_id_to_name
def id_to_string(self, node_id):
if node_id not in self.node_lookup:
return ''
return self.node_lookup[node_id]
def create_graph():
"""Creates a graph from saved GraphDef file and returns a saver."""
# Creates graph from saved graph_def.pb.
with tf.gfile.FastGFile(os.path.join(
FLAGS.model_dir, 'classify_image_graph_def.pb'), 'rb') as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
_ = tf.import_graph_def(graph_def, name='')
def run_inference_on_image(image):
"""Runs inference on an image.
Args:
image: Image file name.
Returns:
Nothing
"""
if not tf.gfile.Exists(image):
tf.logging.fatal('File does not exist %s', image)
image_data = tf.gfile.FastGFile(image, 'rb').read()
# Creates graph from saved GraphDef.
create_graph()
with tf.Session() as sess:
# Some useful tensors:
# 'softmax:0': A tensor containing the normalized prediction across
# 1000 labels.
# 'pool_3:0': A tensor containing the next-to-last layer containing 2048
# float description of the image.
# 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG
# encoding of the image.
# Runs the softmax tensor by feeding the image_data as input to the graph.
softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
predictions = sess.run(softmax_tensor,
{'DecodeJpeg/contents:0': image_data})
predictions = np.squeeze(predictions)
# Creates node ID --> English string lookup.
node_lookup = NodeLookup()
top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1]
row = []
for node_id in top_k:
human_string = node_lookup.id_to_string(node_id)
score = predictions[node_id]
row.append( { 'node_id': node_id, 'image': image, 'host': host, 'ts': currenttime, 'human_string': str(human_string), 'score': str(score)} )
json_string = json.dumps(row)
print( json_string )
def maybe_download_and_extract():
"""Download and extract model tar file."""
dest_directory = FLAGS.model_dir
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = DATA_URL.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' % (
filename, float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
print()
statinfo = os.stat(filepath)
print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def main(_):
maybe_download_and_extract()
# Create unique image name
img_name = '/opt/demo/images/pi_image_{0}_{1}.jpg'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))
# Capture Image from Pi Camera
try:
camera = picamera.PiCamera()
camera.resolution = (1024,768)
camera.annotate_text = " Stored with Apache NiFi "
camera.capture(img_name, resize=(600,400))
pass
finally:
camera.close()
# image = (FLAGS.image_file if FLAGS.image_file else
# os.path.join(FLAGS.model_dir, 'cropped_panda.jpg'))
run_inference_on_image(img_name)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# classify_image_graph_def.pb:
# Binary representation of the GraphDef protocol buffer.
# imagenet_synset_to_human_label_map.txt:
# Map from synset ID to a human readable string.
# imagenet_2012_challenge_label_map_proto.pbtxt:
# Text representation of a protocol buffer mapping a label to synset ID.
parser.add_argument(
'--model_dir',
type=str,
default='/tmp/imagenet',
help=""" Path to classify_image_graph_def.pb,
imagenet_synset_to_human_label_map.txt, and
imagenet_2012_challenge_label_map_proto.pbtxt. """
)
parser.add_argument(
'--image_file',
type=str,
default='',
help='Absolute path to image file.'
)
parser.add_argument(
'--num_top_predictions',
type=int,
default=5,
help='Display this many predictions.'
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
Key Additions to Python row.append( { 'node_id': node_id, 'image': image, 'host': host, 'ts': currenttime, 'human_string': str(human_string), 'score': str(score)} )
json_string = json.dumps(row)
print( json_string ) Image Captured by Camera (Also Added "Stored with Apache NiFi" via Python) References: http://regexr.com/
... View more
10-06-2017
06:54 PM
There has been a major upgrade of the processor thanks to @Simon Elliston Ball Check out the latest version 2.1. I am also prototyping a DL4J processor based on this and some code from the SkyMind guys.
... View more
08-08-2017
02:00 PM
1 Kudo
https://github.com/hortonworks/registry/blob/master/examples/schema-registry/avro/src/main/java/com/hortonworks/registries/schemaregistry/examples/avro/KafkaAvroSerDesApp.java
... View more
07-06-2017
08:41 PM
4 Kudos
This is a cool HAT that attaches without soldering to a Raspberry Pi 3. The main use is with Google's Android Things which is an IoT OS based on Android that you can run on Raspberry Pi and other devices. You can also use this HAT with regular Raspian and Python. This HAT is cool because it has input and output. As part of our Python script, we ingest data from it's sensor, but we also allow for input to it's A, B and C buttons clearly marked on the top. By selecting one of the buttons you can display the current temperature in Celsius, Fahrenheit or the current altitude. The altitude is some code I found commented out in the Pimoroni Rainbow HAT interface Python script. It determines altitude based on a starting value and pressure readings. Looks kind of cool and is relatively accurate. For point of reference, Princeton New Jersey is pretty close to sea level. My script is heavily copied and customized from my previous IoT Python scripts and from the RainbowHAT examples. This hat has 4 digit display, seven multiple color LEDs, BMP280 temperature and pressure sensor and some more goodies in this tiny device. It's available from AdaFruit. We execute the Python code via a simple shell script wrapper: python /opt/demo/minifi.py Source Code: https://github.com/tspannhw/rpi-rainbowhat Prepare the Pi sudo apt-get purge libreoffice wolfram-engine sonic-pi scratch sudo apt-get autoremove Example Python Debug Output CPU: 45.0 C
Corrected Temp: 23.7 C
Room Temp: 74.7 F
Pressure: 101598.9 Pa
Altitude: 33.2
This is the MiniFi flow that I modeled in Apache NiFi, then export as XML. Then use the below shell script to convert into config.yml format to transfer to my device running Apache MiniFi Java agent. This is a simple flow to add a schema name, tell NiFi it's a JSON stream and then query it. If the data has temperatures above a threshold (checked via inline SQL) write the AVRO file to ORC and store in HDFS (automagically building an external Hive table on top of it). Schema to put in Hortonworks Schema Registry or NiFi AVRO Schema Registry {"type":"record","namespace":"hortonworks.hdp.refapp.rainbow","name":"rainbow","fields":[{"name": "tempf", "type": "float"},{ "name": "cputemp", "type": "float"},{"name":"ts","type":"string"},{ "name": "pressure","type": "float"},{ "name": "host","type": "string"},{ "name": "tempf2","type": "float"},{"name": "ipaddress","type": "string"},{"name": "temp","type": "float"},{ "name": "altitude","type": "float"}]} It's pretty easy to define an AVRO schema (in JSON format), like above). Feel free to use the above as a starting point. Strings and floats work very well, keep it simple and leave out extra whitespace. Build Your Configuration minifi-toolkit-1.0.3.0.0.0-453/bin/config.sh transform RainbowForMiniFi.xml config.yml nifireceiverrainbow.xml rainbowforminifi.xml Coming Soon Raspberry Pi 3 running Android Things and Java Connecting to Rainbow Hat, TensorFlow and MiniFi. References:
https://github.com/pimoroni/rainbow-hat https://github.com/androidthings/weatherstation https://en.wikipedia.org/wiki/QNH https://en.wikipedia.org/wiki/Pascal_(unit) https://www.mide.com/pages/air-pressure-at-altitude-calculator https://hackernoon.com/trying-out-the-android-things-weatherstation-codelab-d3f260b59c2f https://codelabs.developers.google.com/codelabs/androidthings-weatherstation/index.html https://shop.pimoroni.com/products/rainbow-hat-for-android-things https://learn.pimoroni.com/tutorial/sandyj/getting-started-with-rainbow-hat-in-python Part of My IoT MiniFi Series (Details Installation and Setup)
https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html https://community.hortonworks.com/content/kbentry/108966/minifi-for-sensor-data-ingest-from-devices.html https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html
... View more
Labels:
07-06-2017
07:10 PM
2 Kudos
I am always ingesting social data, IoT and mobile streams into my HDFS cluster. Unfortunately my cluster is a ephemeral cloud based Hortonworks HDP 2.6 Hadoop cluster, so I don't have a permanent store for my data. I have my processes run for a few weeks and then they are destroyed.
I wanted to quick way to save all my ORC files.
Enter NiFi.
Backup
First we list from some top level directories in HDFS to capture all the files and sub-directories we want to backup. Each processor maintains a timestamp to know what files it processed already, as new files are added they will be assimilated into the flow.
For massive data migration, we can run this on many nodes and use the Distributed Cache service to maintain the state.
Restore
The flow is very simple to restore, read from the local file system and write to HDFS. For HDFS, I use /${path} as the directory so each file is written to the correct sub-directory for it's file group. Easy it's like rsync, but it's Tim Sync. Make sure you have your Hadoop configuration file set. If you are using Kerberos make sure you set your principal and keytab, be very careful for Case-Sensitivity!
For Restore it doesn't get any simpler, I use GetFile to read from the local file system. As part of that those files are deleted, I have a big USB 3.0 drive that I want to keep them, so I copy them to a different directory for later storage. I should probably compress those. Once they get large enough I may invest in a local RPI Storage array running HDP 2.6 with some attached Western Digital PiDrives.
The Data Provenance of one of the flowfiles shows the path and filename. It makes it very easy to move between say S3, on-premise HDFS, local file systems, cloud file systems, jump drives or wherever. Your data is yours, take it with you.
... View more
02-01-2019
04:22 PM
I meet a problem,the data from mysql in hive is replaced. When the property of Conflict Resolution Strategy is append,i cannot query the count from hive with the statement "select count(1) from table_xxx". help!!!
... View more
06-16-2017
04:49 PM
2 Kudos
See
Part 1: https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html
See Part 2: https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html
We build a flow in Apache NiFi and then export the template. Using the MiniFi tool we convert this into a config.yaml file and send it to our device via scp. You can see this in Part 1. This simple flow calls a shell script that will run a Python script to get our sensor data. This flow will then send the data to our NiFi server via S2S over HTTP.
What I have added in this part is use of the new Record and Schema paradigms and also the ability to SQL queries against incoming flow files using
QueryRecord. This requires building an AVRO schema for our data, which is dead easy JSON definition.
We set out port to connect the minifi agent to our server.
Data quickly starts coming in.
Receive the JSON Messages in NiFi via S2S
Top Steps to Ingest Sensor Day in a Few Hours
Connect to the port Set a schema to pull from the registry, also set a mime-type for JSON Query the flow file and just take ones over 65 degrees Fahrenheit via Apache Calcite processed SQL These produces an AVRO file using the AvoRecordSetWriter and the schema from the AvroSchemaRegistry I store the AVRO file produced to HDFS I store the raw JSON file sent in to HDFS I convert the AVRO file to ORC I store the ORC files to HDFS I grab the autogenerated hive.ddl to create the external tables. I query my sensor data in Zeppelin
hive.ddl Automagically Generated Hive Schema
CREATE EXTERNAL TABLE IF NOT EXISTS sensor (tempf FLOAT, cputemp FLOAT, ts STRING, pressure FLOAT, host STRING, pitch FLOAT, ipaddress STRING, temp FLOAT, diskfree STRING, yaw FLOAT, humidity FLOAT, memory FLOAT, y FLOAT, x FLOAT, z FLOAT, roll FLOAT) STORED AS ORC
I grab the HDFS location and add that to the DDL: LOCATION '/sensor'.
For the AVRO and JSON versions of the data, I make similar tables.
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/jsonsensor';
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS AVRO
LOCATION '/avrosensor';
Install Libraries (See Part 1 for MiniFi install)
pip install --upgrade sense-hat<br>pip install --upgrade pillow<br>pip install rtimulib
pip install psutil
sudo apt-get install oracle-java8-jdk
Shell Script
python /opt/demo/rpi-sensehat-mqtt-nifi/sense2.py
Python Script
from sense_hat import SenseHat
import json
import sys, socket
import os
import psutil
import subprocess
import time
import datetime
from time import sleep
from time import gmtime, strftime
# get data
#current_milli_time = lambda: int(round(time.time() * 1000))
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
rasp = ('armv' in os.uname()[4])
cpu = psutil.cpu_percent(interval=1)
if rasp:
f = open('/sys/class/thermal/thermal_zone0/temp', 'r')
l = f.readline()
ctemp = 1.0 * float(l)/1000
usage = psutil.disk_usage("/")
mem = psutil.virtual_memory()
diskrootfree = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
mempercent = mem.percent
external_IP_and_port = ('198.41.0.4', 53) # a.root-servers.net
socket_family = socket.AF_INET
#p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
#out, err = p.communicate()
def IP_address():
try:
s = socket.socket(socket_family, socket.SOCK_DGRAM)
s.connect(external_IP_and_port)
answer = s.getsockname()
s.close()
return answer[0] if answer else None
except socket.error:
return None
ipaddress = IP_address()
sense = SenseHat()
sense.clear()
temp = sense.get_temperature()
temp = round(temp, 2)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
orientation = sense.get_orientation()
pitch = orientation['pitch']
roll = orientation['roll']
yaw = orientation['yaw']
acceleration = sense.get_accelerometer_raw()
x = acceleration['x']
y = acceleration['y']
z = acceleration['z']
#cputemp = out
x=round(x, 0)
y=round(y, 0)
z=round(z, 0)
pitch=round(pitch,0)
roll=round(roll,0)
yaw=round(yaw,0)
row = { 'ts': currenttime, 'host': host, 'memory': mempercent, 'diskfree': diskrootfree, 'cputemp': round(ctemp,2), 'ipaddress': ipaddress, 'temp': temp, 'tempf': round(((temp * 1.8) + 12),2), 'humidity': humidity, 'pressure': pressure, 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z }
json_string = json.dumps(row)
print(json_string)
One Record (JSON)
{"tempf": 75.14, "temp": 35.08, "pitch": 1.0, "diskfree": "1211.8 MB", "yaw": 55.0, "cputemp": 52.08, "ts": "2017-06-16 17:39:08", "humidity": 41.5, "pressure": 0.0, "host": "picroft", "memory": 23.0, "y": 0.0, "x": -1.0, "z": 1.0, "ipaddress": "192.168.1.156", "roll": 1.0}
AVRO Schema (JSON Format)
{"type":"record","namespace":"hortonworks.hdp.refapp.sensehat","name":"sensehat","fields":[{"name": "tempf", "type": "float"},{ "name": "cputemp", "type": "float"},{"name":"ts","type":"string"},{ "name": "pressure","type": "float"},{ "name": "host","type": "string"},{ "name": "pitch","type": "float"},{"name": "ipaddress","type": "string"},{"name": "temp","type": "float"},{ "name": "diskfree","type": "string"},{ "name": "yaw","type": "float" },{"name": "humidity","type": "float"},{"name": "memory","type": "float"},{"name": "y", "type": "float"},{"name": "x", "type": "float" },{"name": "z","type": "float"},{"name": "roll", "type": "float"}]}
config.yml
MiNiFi Config Version: 2
Flow Controller:
name: sense hat
comment: sense hat 2017
Core Properties:
flow controller graceful shutdown period: 10 sec
flow service write delay interval: 500 ms
administrative yield duration: 30 sec
bored yield duration: 10 millis
max concurrent threads: 1
FlowFile Repository:
partitions: 256
checkpoint interval: 2 mins
always sync: false
Swap:
threshold: 20000
in period: 5 sec
in threads: 1
out period: 5 sec
out threads: 4
Content Repository:
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false
Provenance Repository:
provenance rollover time: 1 min
Component Status Repository:
buffer size: 1440
snapshot frequency: 1 min
Security Properties:
keystore: ''
keystore type: ''
keystore password: ''
key password: ''
truststore: ''
truststore type: ''
truststore password: ''
ssl protocol: ''
Sensitive Props:
key:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors:
- id: db6fbd3b-ddf4-3041-0000-000000000000
name: ExecuteProcess
class: org.apache.nifi.processors.standard.ExecuteProcess
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 60 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Argument Delimiter: ' '
Batch Duration:
Command: /opt/demo/rpi-sensehat-mqtt-nifi/sense2.sh
Command Arguments:
Redirect Error Stream: 'true'
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: 5635290a-4cb6-3da7-0000-000000000000
name: minifiSenseHat
source id: db6fbd3b-ddf4-3041-0000-000000000000
source relationship names:
- success
destination id: 166616e3-1962-1660-2b7c-2f824584b23a
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups:
- id: fdc45649-84be-374b-0000-000000000000
name: ''
url: http://hw13125.local:8080/nifi
comment: ''
timeout: 30 sec
yield period: 10 sec
transport protocol: HTTP
Input Ports:
- id: 166616e3-1962-1660-2b7c-2f824584b23a
name: MiniFi SenseHat
comment: ''
max concurrent tasks: 1
use compression: false
Build our MiniFi Configuration File from the sensorminif.xml minifi-toolkit-1.0.2.1.4.0-5/bin/config.sh transform sensorminifi.xml config.yml Then just SCP to your device.
Flows
sensornifi.xml sensorminifi.xml
Source Repository
https://github.com/tspannhw/rpi-sensehat-minifi-python Example MiniFi Log dResourceClaim[id=1497645887239-1, container=default, section=1], offset=2501, length=278],offset=0,name=13917785142443,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 116 milliseconds at a rate of 2.32 KB/sec
2017-06-16 20:54:41,827 INFO [Provenance Maintenance Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 3162
2017-06-16 20:54:41,844 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully merged 16 journal files (3 records) into single Provenance Log File provenance_repository/3159.prov in 33 milliseconds
2017-06-16 20:54:41,846 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 5 records
2017-06-16 20:54:43,288 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@87eb01 checkpointed with 0 Records and 0 Swap Files in 100 milliseconds (Stop-the-world time = 13 milliseconds, Clear Edit Logs time = 10 millis), max Transaction ID -1
2017-06-16 20:54:48,429 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2017-06-16 20:54:48,890 INFO [pool-23-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@17461db checkpointed with 0 Records and 0 Swap Files in 460 milliseconds (Stop-the-world time = 190 milliseconds, Clear Edit Logs time = 77 millis), max Transaction ID 2107
2017-06-16 20:54:48,891 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 461 milliseconds
2017-06-16 20:54:51,482 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@f69f9d Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-16 20:55:07,621 INFO [Timer-Driven Process Thread-9] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes:
PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
2017-06-16 20:55:07,957 INFO [Timer-Driven Process Thread-9] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi SenseHat,target=http://hw13125.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=b3bcd211-7425-4750-9e4c-ba2d477b9cc1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1497645887239-1, container=default, section=1], offset=2779, length=278],offset=0,name=13979556432846,size=278]] (278 bytes) to http://HW13125.local:8080/nifi-api in 114 milliseconds at a rate of 2.38 KB/sec
Check the Status of MiniFi root@picroft:/opt/demo/minifi-1.0.2.1.4.0-5# bin/minifi.sh flowStatus processor:db6fbd3b-ddf4-3041-0000-000000000000:health,stats,bulletins
minifi.sh: JAVA_HOME not set; results may vary
Bootstrap Classpath: /opt/demo/minifi-1.0.2.1.4.0-5/conf:/opt/demo/minifi-1.0.2.1.4.0-5/lib/bootstrap/*:/opt/demo/minifi-1.0.2.1.4.0-5/lib/*
Java home:
MiNiFi home: /opt/demo/minifi-1.0.2.1.4.0-5
Bootstrap Config File: /opt/demo/minifi-1.0.2.1.4.0-5/conf/bootstrap.conf
FlowStatusReport{controllerServiceStatusList=null, processorStatusList=[{name='ExecuteProcess', processorHealth={runStatus='Running', hasBulletins=false, validationErrorList=[]}, processorStats={activeThreads=0, flowfilesReceived=0, bytesRead=0, bytesWritten=1390, flowfilesSent=0, invocations=5, processingNanos=9290051632}, bulletinList=[]}], connectionStatusList=null, remoteProcessGroupStatusList=null, instanceStatus=null, systemDiagnosticsStatus=null, reportingTaskStatusList=null, errorsGeneratingReport=[]}
Output Displayed in Apache Zeppelin Workbook
Using the DDL generated by Apache NiFi, we can create external Hive tables for the raw JSON data, the ORC cleaned up version of the data and also an AVRO version of the data.
We can then query our datasets.
References:
https://cwiki.apache.org/confluence/display/Hive/AvroSerDe https://json-schema-validator.herokuapp.com/avro.jsp https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_minifi-administration/content/ch01s01.html https://community.hortonworks.com/articles/55839/reading-sensor-data-from-remote-sensors-on-raspber.html https://www.thepolyglotdeveloper.com/2016/08/connect-multiple-wireless-networks-raspberry-pi/ https://community.hortonworks.com/articles/83100/deep-learning-iot-workflows-with-raspberry-pi-mqtt.html
... View more
Labels:
06-16-2017
02:27 PM
5 Kudos
See Part 1: https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html In this article, we are ingesting beacon data using an ASUS Tinkerboard which is very similar to a Raspberry PI in form, form factor and features. It is newer and has less peripherals, but has 2 gig of RAM, ARM Mali GPU T760 MP4, Quad-core ARM Cortex-A17 1.8Ghz processor with a dual-channel DDR3 memory and BLE built-in (Bluetooth 4.0 with EDR). For beacons, I am using the basic Estimate three beacon collection. They have a really nice solid beacon in a silicon case, that looks nice and works well. They have a mobile app to manage, monitor and simulate as well as a cloud app. From our Apache NiFi server we have an Input Port that is receiving the messages from the Java MiniFi agent running on the ASUS Tinkerboard. As with most of the data, I want to ingest the data every minute. The ASUS Tinkerboard device has the read light lit up. The Estimate beacons are the colored devices to the right. For Tinkerboard's the default username is linaro. Requires BlueTooth Reading Libraries in Linux sudo apt-get -y install bluetooth bluez libbluetooth-dev libudev-dev bluez-hcidump python-bluez
sudo apt-get -y update
sudo apt-get -y install python-dev python3-dev
pip install beacontools[scan]
The built-in BlueTooth is hci0. You will need to have Python 2.7 and/or Python 3.5 installed. Install Oracle JDK 8 sudo apt-get -y install oracle-java8-jdk Shell Script to ExcuteProcess /opt/demo/py-decode-beacon/runble.sh python bluez_scan.py Python Script See: Python beacon advertisement decoder
Copyright (c) 2015 Patrick Van Oosterwijck https://github.com/adamf/BLE/blob/master/ble-scanner.py and https://github.com/xorbit/py-decode-beacon Starting MiniFi on Tinkerboard bin/minifi.sh start Diagnostics on MiniFi ./bin/minifi.sh flowStatus systemdiagnostics:heap,processorstats,contentrepositoryusage,flowfilerepositoryusage,garbagecollection Logs From MiniFi 2017-06-12 17:13:49,873 INFO [Timer-Driven Process Thread-9] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi Tinker,target=http://hw13125.local:8080/nifi/] Successfully sent [StandardFlowFileRecord[uuid=98de3195-8be3-4433-840b-c84b9a84fb6f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1497282405127-2, container=default, section=2], offset=729236, length=16331],offset=0,name=98638611491740,size=16331]] (15.95 KB) to http://HW13125.local:8080/nifi-api in 138 milliseconds at a rate of 114.88 KB/sec
2017-06-12 17:13:57,918 INFO [Provenance Maintenance Thread-3] o.a.n.p.MiNiFiPersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 2365
2017-06-12 17:13:57,962 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully merged 16 journal files (3 records) into single Provenance Log File provenance_repository/2362.prov in 243 milliseconds
2017-06-12 17:13:57,971 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.MiNiFiPersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 5 records
2017-06-12 17:14:04,671 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@944ade Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-12 17:14:21,554 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2017-06-12 17:14:26,057 INFO [pool-23-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@541c15 checkpointed with 0 Records and 0 Swap Files in 4502 milliseconds (Stop-the-world time = 3887 milliseconds, Clear Edit Logs time = 549 millis), max Transaction ID 985
2017-06-12 17:14:26,059 INFO [pool-23-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 4503 milliseconds
2017-06-12 17:14:42,783 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@3e088f checkpointed with 0 Records and 0 Swap Files in 97 milliseconds (Stop-the-world time = 49 milliseconds, Clear Edit Logs time = 33 millis), max Transaction ID -1
2017-06-12 17:15:05,438 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@944ade Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-06-12 17:15:49,116 INFO [Timer-Driven Process Thread-5] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes:
PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
It's a very simple model, just execute the Python and send the resultant JSON to NiFi for processing and storage. The same code will also work on a Raspberry Pi and for most similar Linux devices. ASUS Tinkerboard is running TinkerOS_Debian V1.8, not Raspian. References:
https://community.hortonworks.com/articles/103863/using-an-asus-tinkerboard-with-tensorflow-and-pyth.html https://dzone.com/articles/using-tinkerboard-with-tensorflow-and-python https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_minifi-administration/content/ch01s01.html https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_minifi-quick-start/content/ch_minifi-quick-start.html https://community.hortonworks.com/articles/56341/getting-started-with-minifi.html https://github.com/hipol/EstimoteServer https://github.com/taka-wang/py-beacon https://learn.adafruit.com/bluefruit-le-python-library https://github.com/citruz/beacontools https://pypi.python.org/pypi/beacontools/1.0.1 https://github.com/adamf/BLE/blob/master/ble-scanner.py https://github.com/xorbit/py-decode-beacon https://cloud.estimote.com/docs/ https://github.com/estimote/estimote-specs https://evothings.com/doc/examples/ibeacon-scan.html https://github.com/switchdoclabs/iBeacon-Scanner- http://www.switchdoc.com/2014/08/ibeacon-raspberry-pi-scanner-python/ https://github.com/mlwelles/BeaconScanner https://github.com/dburr/linux-ibeacon https://github.com/beaconinside/awesome-beacon https://github.com/adafruit/Adafruit_Python_BluefruitLE https://www.asus.com/us/Single-Board-Computer/Tinker-Board/ https://www.asus.com/us/Single-Board-Computer/Tinker-Board/HelpDesk_Download/
... View more
Labels:
06-14-2017
07:28 PM
Apache NiFi 1.2, apache download. I'll try in Apache NiFi 1.3.
... View more