1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2481 | 04-03-2024 06:39 AM | |
| 3829 | 01-12-2024 08:19 AM | |
| 2072 | 12-07-2023 01:49 PM | |
| 3060 | 08-02-2023 07:30 AM | |
| 4194 | 03-29-2023 01:22 PM |
01-02-2018
04:58 PM
2 Kudos
This is pretty close to a Raspberry Pi Zero. This inexpensive box is another useful IoT device for ingesting some data very inexpensively. Machine Browsing root@NanoPi-Duo:/home/pi# cpu_freq
INFO: HARDWARE=sun8i
CPU0 online=1 temp=48092 governor=ondemand cur_freq=312000
CPU1 online=1 temp=48092 governor=ondemand cur_freq=312000
CPU2 online=1 temp=48092 governor=ondemand cur_freq=312000
CPU3 online=1 temp=48092 governor=ondemand cur_freq=312000 Source Code for Python and Shell Script https://github.com/tspannhw/nifi-nanopi-duo Hardware The FA-CAM202 is a 200M USB camera. 512MB RAM Ubuntu 16.04.3 LTS 4.11.2 Similar SBC to Raspberry PI ARM Software Setup sudo apt-get install fswebcam -y sudo apt-get install libv4l-dev -y sudo apt-get install python-opencv -y sudo npi-config sudo apt-get update sudo apt-get install libcv-dev libopencv-dev -y pip install psutil pip2 install psutil curl http://192.168.1.193:8080/nifi-api/site-to-site/ -v inferred.avro.schema { "type" : "record", "name" : "NANO", "fields" : [ { "name" : "diskfree", "type" : "string", "doc" : "Type inferred from '\"23329.1 MB\"'" }, { "name" : "cputemp", "type" : "double", "doc" : "Type inferred from '55.0'" }, { "name" : "host", "type" : "string", "doc" : "Type inferred from '\"NanoPi-Duo\"'" }, { "name" : "endtime", "type" : "string", "doc" : "Type inferred from '\"2018-01-04 20:23:56\"'" }, { "name" : "ipaddress", "type" : "string", "doc" : "Type inferred from '\"192.168.1.191\"'" }, { "name" : "h", "type" : "int", "doc" : "Type inferred from '342'" }, { "name" : "ts", "type" : "string", "doc" : "Type inferred from '\"2018-01-04 20:23:43\"'" }, { "name" : "filename", "type" : "string", "doc" : "Type inferred from '\"/opt/demo/images/2018-01-04_2023.jpg.faces.jpg\"'" }, { "name" : "w", "type" : "int", "doc" : "Type inferred from '342'" }, { "name" : "memory", "type" : "double", "doc" : "Type inferred from '60.1'" }, { "name" : "y", "type" : "int", "doc" : "Type inferred from '264'" }, { "name" : "x", "type" : "int", "doc" : "Type inferred from '877'" } ] } hive.ddl CREATE EXTERNAL TABLE IF NOT EXISTS nanopi (diskfree STRING, cputemp DOUBLE, host STRING, endtime STRING, ipaddress STRING, h INT, ts STRING, filename STRING, w INT, memory DOUBLE, y INT, x INT) STORED AS ORC
LOCATION '/nano' Apache NiFi The flow in Apache NiFi is pretty simple. We receive the flowfiles from the remote Apache MiniFi box. 1. RouteOnAttribute: Send images to the file system, continue processing the JSON 2. AttributeCleanerProcessor: Clean up the attributes Not really needed for this dataset. 3. UpdateAttribute: Set the schema name to reference the registry 4. SplitJSON: Split JSON into one array per flowfile 5. ConvertRecord: Convert JSON Tree to AVRO with embedded schema 6. ConvertAvroToORC: Build an Apache ORC file (we could add a step before for MergeContent) 7. PutHDFS: Store in Hadoop File System forever Apache MiniFi We have a simple flow in Apache MiniFi. 1. ExecuteProcess: Run a shell script to grab a timestamp filenamed image from the USB webcam. Then call a Python script that does OpenCV Face Detection and adds some local variables to a JSON array with facial squares. 2. GetFile: retrieve all the images from the box and send them to Apache NiFi. Example Data [{"diskfree": "23445.5 MB", "cputemp": 56.7, "host": "NanoPi-Duo", "endtime": "2018-01-04 17:40:30", "ipaddress": "192.168.1.191", "h": 55, "ts": "2018-01-04 17:40:20", "filename": "/opt/demo/images/2018-01-04_1740.jpg.faces.jpg", "w": 55, "memory": 22.6, "y": 471, "x": 270}, {"diskfree": "23445.5 MB", "cputemp": 56.7, "host": "NanoPi-Duo", "endtime": "2018-01-04 17:40:30", "ipaddress": "192.168.1.191", "h": 67, "ts": "2018-01-04 17:40:20", "filename": "/opt/demo/images/2018-01-04_1740.jpg.faces.jpg", "w": 67, "memory": 22.6, "y": 625, "x": 464}] Resources http://wiki.friendlyarm.com/wiki/index.php/NanoPi_Duo https://diyprojects.io/orange-pi-armbian-control-camera-python-opencv/#.Wku2kFQ-f1J https://www.armbian.com/nanopi-duo/ https://github.com/opencv/opencv.git https://github.com/informramiz/Face-Detection-OpenCV https://realpython.com/blog/python/face-detection-in-python-using-a-webcam/ https://realpython.com/blog/python/face-recognition-with-python/ Face
... View more
Labels:
12-30-2017
12:05 AM
3 Kudos
Oracle -> GoldenGate -> Apache Kafka -> Apache NiFi / Hortonworks Schema Registry -> JDBC Database
Sometimes you need to process any number of table changes sent from tools via Apache Kafka. As long as they have proper header data and records in JSON, it's really easy in Apache NiFi.
Requirements:
Process Each Partition Separately
Process Records in Order as each message is an Insert, Update or Delete to an existing table in our receiving JDBC store.
Re-process if data lost
For The Main Processor for Routing, It must only run on the Primary Node.
Enforcing Order
We use the Kafka.Offset to order the records, which makes sense in Apache Kafka topics.
After Insert, Update, Delete queries are built, let's confirm and enforce that strict ordering.
To further confirm processing in order, we make each connection in the flow FirstInFirstOutPrioritizer.
To Route, We Route Each Partition to A Different Processor Group (One Local, The Other Remote)
Let's Store Some Data in HDFS for each Table
Connect To Kafka and Grab From our Topic
Let's Connect to our JDBC Store
Let's do an Update (Table Name is Dynamic)
The Jolt Processor has an awesome tester for trying out Jolt
Make sure we connect our remote partitions
Routing From Routing Server (Primary Node)
For Processing Partition 0 (Run on the Routing Server)
We infer the schema with our InferAvroSchema, so we don't need to know the embedded table layouts before a record arrives. In production it makes sense to know all these in advance and do integration tests and versioning of schemas. This is where Hortonworks Scheme Registry is awesome. We name the avro record after the table dynamically. We can get and store permanent schema in the Hortonworks Schema Registry.
Process The Next Partition 1 .. (We can have one server or cluster per partition)
Process the Partition 1 Kafka Records from the Topic
This Flow Will Convert Our Embedded JSON Table Record into New SQL
Input: {"ID":2001,"GD":"F","DPTID":2,"FIRSTNAME":"Tim","LAST":"Spann"}
Output: INSERT INTO THETABLE (ID, GD, DPTID, FIRSTNAME, LAST) VALUES (?, ?, ?, ?, ?)
sql.args.5.value Spann
sql.table THETABLE
With all the field being parameters for a SQL Injection safe parameter based insert, update or delete based on control sent.
Golden Gate Messages
{"table": "SCHEMA1.TABLE7","op_type": "I","op_ts": "2017-11-01 04:31:56.000000","current_ts": "2017-11-01T04:32:04.754000","pos": "00000000310000020884","after": {"ID":1,"CODE": "B","NAME":"STUFF","DESCR" :"Department","ACTIVE":1}}
Using a simple EvaluateJsonPath we pull out these control fields, example: $.before.
The Table Name for ConvertJSONtoSQL: ${table:substringAfter('.')}. This is to remove all leading schema / tablespace name. From the drop down for each of the three we pick either UPDATE, INSERT or DELETE based on the op_type.
We follow this with a PutSQL which will execute on our destination JDBC database sink.
After that I collect all the attributes convert them to a JSON flowfile and save that to HDFS for logging and reporting. This step could be skipped or could be in another format or sent elsewhere.
Control Fields
pos: position
table: table to update in the data warehouse
current_ts: time stamp
op_ts: time stamp
op_type: operation type (I – insert, U- update, D – delete)
Important Apache NiFi System Fields
kafka.offset
kafka.partition
kafka.topic
We can Route and process these for special handling.
To Create HDFS Directories for Changes
su hdfs <br>hdfs dfs -mkdir -p /new/T1 <br>hdfs dfs -mkdir -p /new/T2 <br>hdfs dfs -mkdir -p /poc/T3
hdfs dfs -chmod -R 777 /new <br>hdfs dfs -ls -R /new
To Create a Test Apache Kafka Topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic goldengate
Creating a MYSQL Database As Recipient JDBC Server
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.45.tar.gz
mysql
create database mydw;
CREATE USER 'nifi'@'%' IDENTIFIED BY 'MyPassWordIsSoAwesome!!!!';
GRANT ALL PRIVILEGES ON *.* TO 'nifi'@'%' WITH GRANT OPTION;
commit;
SHOW GRANTS FOR 'nifi'@'%';
#Create some tables in the database for your records.
create table ALOG (
AID VARCHAR(1),
TIMESEC INT,
SOMEVAL VARCHAR(255),
PRIMARY KEY (AID, TIMESEC)
);
Jolt Filter
Attribute: afterJolt
${op_type:equalsIgnoreCase("D"):ifElse("none", "after")}
Attribute: beforeJolt
${op_type:equalsIgnoreCase("D"):ifElse("before", "none")}
Jolt Script to Transform JSON
[ {
"operation": "shift",
"spec": {
"${beforeJolt}": {
"*": "&"
},
"${afterJolt}": {
"*": "&"
}
}
}, {
"operation": "shift",
"spec": {
"*": "&"
}
} ]
Primary Node Flow Template
primarynode.xml
Partition X Node Flow Template
remotenode.xml
References:
https://www.xenonstack.com/blog/data-ingestion-using-apache-nifi-for-building-data-lakes-twitter-data
https://community.hortonworks.com/questions/107536/nifi-uneven-distribution-consumekafka.html
https://community.hortonworks.com/articles/57262/integrating-apache-nifi-and-apache-kafka.html
https://community.hortonworks.com/articles/80284/hdf-2x-adding-a-new-nifi-node-to-an-existing-secur.html
https://dev.mysql.com/downloads/connector/j/
... View more
Labels:
12-29-2017
07:23 PM
2 Kudos
Processing Voice Data Using a Raspberry Pi 3 with Google AIY Voice Kit is an easy way to control data flows in an organization. The AIY Voice Kit is a good way to prototype voice ingest. It is not perfect, but for low end inexpensive hardware it is a good solution for those that speak clearly and are willing to repeat a phrase a few times for activation. I was able to easily add a word for it to react to. I also have it waiting for someone to press the button to activate. Setup and Run Steps 1. Install box 2. Setup GCP Account 3. As Pi User
~/bin/AIY-voice-kit-shell.sh
src/assistant_library_demo.py 4. Download and Unzip Apache MiniFi 5. Custom Action
controlx 6. Run it
pi@raspberrypi:~/AIY-voice-kit-python/src $ ./controlx.py 7. start minifi 8. Apache MiniFi will tail the command file and look for audio files Code Added to Google Example elif event.type == EventType.ON_RECOGNIZING_SPEECH_FINISHED and event.args: print('You said:', event.args['text']) text = event.args['text'].lower() f=open("/opt/demo/commands.txt", "a+") f.write(text) f.close() if 'process' in text : self._assistant.stop_conversation() self._say_ip() elif 'computer' in text: self._assistant.stop_conversation() self._say_ip() elif text == 'ip address' : self._assistant.stop_conversation() self._say_ip() def _say_ip(self): ip_address = subprocess.check_output("hostname -I | cut -d' ' -f1", shell=True) f=open("/opt/demo/commands.txt", "a+") f.write('My IP address is %s' % ip_address.decode('utf-8')) f.close() # aiy.audio.say('My IP address is %s' % ip_address.decode('utf-8')) #aiy.audio.say('Turning on LED') #voice_name = '/opt/demo/voices/voice_{0}.wav'.format(strftime("%Y%m%d%H%M%S",gmtime())) #aiy.audio.record_to_wave(voice_name, 5) aiy.voicehat.get_led().set_state(aiy.voicehat.LED.BLINK) aiy.audio.say('MiniFy') print('My IP address is %s' % ip_address.decode('utf-8')) Apache NiFi Ingest Flow Flow File for a Sound File Flow File For Commands Apache MiniFi Ingest Apache NiFi Overview Text Command Processing AIYVOICE Schema in Hortonworks Schema Registry Source Code: https://github.com/tspannhw/nifi-googleaiy Results: An example of text deciphered from my speaking into the AIY Voice Kit. what was the process that we could try that a different way right now when was text put it in the file My IP address is 192.168.1.199 The Recorded Audio Files Sent ../images/voice_20171229195641.wav ../images/voice_20171229202456.wav ../images/voice_20171229203028.wav Inferred Schema inferred.avro.schema
{ "type" : "record", "name" : "aiyvoice", "fields" : [ { "name" : "schema", "type" : "string", "doc" : "Type inferred from '\"aiyvoice\"'" }, { "name" : "path", "type" : "string", "doc" : "Type inferred from '\"./\"'" }, { "name" : "schemaname", "type" : "string", "doc" : "Type inferred from '\"aiyvoice\"'" }, { "name" : "commands0", "type" : "string", "doc" : "Type inferred from '\"process process process process process process process process processMy IP address is 192.168.1.199\n\"'" }, { "name" : "filename", "type" : "string", "doc" : "Type inferred from '\"commands.3088-3190.txt\"'" }, { "name" : "ssaddress", "type" : "string", "doc" : "Type inferred from '\"192.168.1.199:60896\"'" }, { "name" : "receivedFrom", "type" : "string", "doc" : "Type inferred from '\"AIY_GOOGLE_VOICE_PI\"'" }, { "name" : "sshost", "type" : "string", "doc" : "Type inferred from '\"192.168.1.199\"'" }, { "name" : "mimetype", "type" : "string", "doc" : "Type inferred from '\"text/plain\"'" }, { "name" : "tailfileoriginalpath", "type" : "string", "doc" : "Type inferred from '\"/opt/demo/commands.txt\"'" }, { "name" : "uuid", "type" : "string", "doc" : "Type inferred from '\"76232a9a-baf1-4f6b-9707-6d532653bbe8\"'" }, { "name" : "RouteOnAttributeRoute", "type" : "string", "doc" : "Type inferred from '\"commands\"'" } ] } Apache NiFi Created Apache Hive DDL hive.ddl CREATE EXTERNAL TABLE IF NOT EXISTS aiyvoice (path STRING, filename STRING, commands0 STRING, ssaddress STRING, receivedFrom STRING, sshost STRING, mimetype STRING, tailfileoriginalpath STRING, uuid STRING, RouteOnAttributeRoute STRING) STORED AS ORC Resources:
https://github.com/google/aiyprojects-raspbian/blob/master/src/action.py https://github.com/google/aiyprojects-raspbian/blob/aiyprojects/HACKING.md https://aiyprojects.withgoogle.com/voice/#project-overview https://projects.raspberrypi.org/en/projects/google-voice-aiy https://github.com/googlesamples/assistant-sdk-python
https://developers.google.com/assistant/sdk/guides/library/python/embed/run-sample http://ktinkerer.co.uk/play-bbc-radio-raspberry-pi-aiy-project/ https://github.com/ktinkerer/aiyprojects-raspbian/tree/radio_player/src https://github.com/google/aiyprojects-raspbian https://github.com/n8kowald/raspi-voice-actions/blob/master/action.py https://github.com/n8kowald/raspi-voice-actions https://medium.com/@aallan/a-magic-mirror-powered-by-aiy-projects-and-the-raspberry-pi-e6a0fea3b4d6
... View more
Labels:
12-28-2017
06:51 PM
3 Kudos
Adding a Movidius NCS to a Raspberry Pi is a great way to augment your deep learning edge programming.
Hardware Setup
Raspberry Pi 3
Movidius NCS
PS 3 Eye Camera
128GB USB Stick
Software Setup
Standard NCS Setup
Python 3
sudo apt-get update
sudo apt-get install python-pip python-dev
sudo apt-get install python3-pip python3-dev
sudo apt-get install fswebcam
sudo apt-get install curl wget unzip
sudo apt-get install oracle-java8-jdk
sudo apt-get install sense-hat
sudo apt-get install octave -y
sudo apt install python3-gst-1.0
pip3 install psutil
Apache MiniFi https://www.apache.org/dyn/closer.lua?path=/nifi/minifi/0.3.0/minifi-0.3.0-bin.zip
git clone https://github.com/movidius/ncsdk.git
git clone https://github.com/movidius/ncappzoo.git
Run Example
~/workspace/ncappzoo/apps/image-classifier
Shell Script
/root/workspace/ncappzoo/apps/image-classifier/run.sh
#!/bin/bash
DATE=$(date +"%Y-%m-%d_%H%M")
fswebcam -r 1280x720 --no-banner /opt/demo/images/$DATE.jpg
python3 -W ignore image-classifier2.py /opt/demo/images/$DATE.jpg
Forked Python Code
I added some extra variables I like to send from IoT devices (time, ip address, CPU temp) for flows and formatted as JSON.
#!/usr/bin/python3
# ****************************************************************************
# Copyright(c) 2017 Intel Corporation.
# License: MIT See LICENSE file in root directory.
# ****************************************************************************
# How to classify images using DNNs on Intel Neural Compute Stick (NCS)
import mvnc.mvncapi as mvnc
import skimage
from skimage import io, transform
import numpy
import os
import sys
import json
import time
import sys, socket
import subprocess
import datetime
from time import sleep
from time import gmtime, strftime
starttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
# User modifiable input parameters
NCAPPZOO_PATH = os.path.expanduser( '~/workspace/ncappzoo' )
GRAPH_PATH = NCAPPZOO_PATH + '/caffe/GoogLeNet/graph'
IMAGE_PATH = sys.argv[1]
LABELS_FILE_PATH = NCAPPZOO_PATH + '/data/ilsvrc12/synset_words.txt'
IMAGE_MEAN = [ 104.00698793, 116.66876762, 122.67891434]
IMAGE_STDDEV = 1
IMAGE_DIM = ( 224, 224 )
# ---- Step 1: Open the enumerated device and get a handle to it -------------
# Look for enumerated NCS device(s); quit program if none found.
devices = mvnc.EnumerateDevices()
if len( devices ) == 0:
print( 'No devices found' )
quit()
# Get a handle to the first enumerated device and open it
device = mvnc.Device( devices[0] )
device.OpenDevice()
# ---- Step 2: Load a graph file onto the NCS device -------------------------
# Read the graph file into a buffer
with open( GRAPH_PATH, mode='rb' ) as f:
blob = f.read()
# Load the graph buffer into the NCS
graph = device.AllocateGraph( blob )
# ---- Step 3: Offload image onto the NCS to run inference -------------------
# Read & resize image [Image size is defined during training]
img = print_img = skimage.io.imread( IMAGE_PATH )
img = skimage.transform.resize( img, IMAGE_DIM, preserve_range=True )
# Convert RGB to BGR [skimage reads image in RGB, but Caffe uses BGR]
img = img[:, :, ::-1]
# Mean subtraction & scaling [A common technique used to center the data]
img = img.astype( numpy.float32 )
img = ( img - IMAGE_MEAN ) * IMAGE_STDDEV
# Load the image as a half-precision floating point array
graph.LoadTensor( img.astype( numpy.float16 ), 'user object' )
# ---- Step 4: Read & print inference results from the NCS -------------------
# Get the results from NCS
output, userobj = graph.GetResult()
labels = numpy.loadtxt( LABELS_FILE_PATH, str, delimiter = '\t' )
order = output.argsort()[::-1][:6]
#### Initialization
external_IP_and_port = ('198.41.0.4', 53) # a.root-servers.net
socket_family = socket.AF_INET
host = os.uname()[1]
def getCPUtemperature():
res = os.popen('vcgencmd measure_temp').readline()
return(res.replace("temp=","").replace("'C\n",""))
def IP_address():
try:
s = socket.socket(socket_family, socket.SOCK_DGRAM)
s.connect(external_IP_and_port)
answer = s.getsockname()
s.close()
return answer[0] if answer else None
except socket.error:
return None
cpuTemp=int(float(getCPUtemperature()))
ipaddress = IP_address()
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
row = { 'label1': labels[order[0]], 'label2': labels[order[1]], 'label3': labels[order[2]], 'label4': labels[order[3]], 'label5': labels[order[4]], 'currenttime': currenttime, 'host': host, 'cputemp': round(cpuTemp,2), 'ipaddress': ipaddress, 'starttime': starttime }
json_string = json.dumps(row)
print(json_string)
# ---- Step 5: Unload the graph and close the device -------------------------
graph.DeallocateGraph()
device.CloseDevice()
# ==== End of file ===========================================================
This same device also has a Sense-Hat, so we grab those as well.
Then I combined the two sets of code to produce a row of Sense-Hat sensor data and NCS AI results.
Source Code
https://github.com/tspannhw/rpi-minifi-movidius-sensehat/tree/master
Example Output JSON
{"label1": "b'n03794056 mousetrap'", "humidity": 17.2, "roll": 94.0, "yaw": 225.0, "label4": "b'n03127925 crate'", "cputemp2": 49.39, "label3": "b'n02091134 whippet'", "memory": 16.2, "z": -0.0, "diskfree": "15866.3 MB", "cputemp": 50, "pitch": 3.0, "temp": 34.9, "y": 1.0, "currenttime": "2017-12-28 19:09:32", "starttime": "2017-12-28 19:09:29", "tempf": 74.82, "label5": "b'n04265275 space heater'", "host": "sensehatmovidius", "ipaddress": "192.168.1.156", "pressure": 1032.7, "label2": "b'n02091032 Italian greyhound'", "x": -0.0}
The current version of Apache MiniFi is 0.30.
Make sure you download the 0.30 version of the Toolkit to convert your template created in your Apache NiFi screen to a proper config.yml. This is compatible with Apache NiFi 1.4.
MiNiFi (Java) Version 0.3.0 Release Date: 2017 December 22 (https://cwiki.apache.org/confluence/display/MINIFI/Release+Notes#ReleaseNotes-Version0.3.0)
To Build Your MiniFi YAML
buildconfig.sh YourConfigFileName.xml
minifi-toolkit-0.3.0/bin/config.sh transform $1 config.yml
scp config.yml pi@192.168.1.156:/opt/dmeo/minifi-0.3.0/conf/
MiNiFi Config Version: 3
Flow Controller:
name: Movidius MiniFi
comment: ''
Core Properties:
flow controller graceful shutdown period: 10 sec
flow service write delay interval: 500 ms
administrative yield duration: 30 sec
bored yield duration: 10 millis
max concurrent threads: 1
variable registry properties: ''
FlowFile Repository:
partitions: 256
checkpoint interval: 2 mins
always sync: false
Swap:
threshold: 20000
in period: 5 sec
in threads: 1
out period: 5 sec
out threads: 4
Content Repository:
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false
Provenance Repository:
provenance rollover time: 1 min
implementation: org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository
Component Status Repository:
buffer size: 1440
snapshot frequency: 1 min
Security Properties:
keystore: ''
keystore type: ''
keystore password: ''
key password: ''
truststore: ''
truststore type: ''
truststore password: ''
ssl protocol: ''
Sensitive Props:
key:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors: []
Controller Services: []
Process Groups:
- id: 9fdb6f99-2e9f-3d86-0000-000000000000
name: Movidius MiniFi
Processors:
- id: ceab4537-43cb-31c8-0000-000000000000
name: Capture Photo and NCS Image Classifier
class: org.apache.nifi.processors.standard.ExecuteProcess
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 60 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Argument Delimiter: ' '
Batch Duration:
Command: /root/workspace/ncappzoo/apps/image-classifier/run.sh
Command Arguments:
Redirect Error Stream: 'false'
Working Directory:
- id: 9fbeac0f-1c3f-3535-0000-000000000000
name: GetFile
class: org.apache.nifi.processors.standard.GetFile
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 0 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
Batch Size: '10'
File Filter: '[^\.].*'
Ignore Hidden Files: 'false'
Input Directory: /opt/demo/images/
Keep Source File: 'false'
Maximum File Age:
Maximum File Size:
Minimum File Age: 5 sec
Minimum File Size: 10 B
Path Filter:
Polling Interval: 0 sec
Recurse Subdirectories: 'true'
Controller Services: []
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: 1bd103a9-09dc-3121-0000-000000000000
name: Capture Photo and NCS Image Classifier/success/0d6447ee-be36-3ec4-b896-1d57e374580f
source id: ceab4537-43cb-31c8-0000-000000000000
source relationship names:
- success
destination id: 0d6447ee-be36-3ec4-b896-1d57e374580f
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
- id: 3cdac0a3-aabf-3a15-0000-000000000000
name: GetFile/success/0d6447ee-be36-3ec4-b896-1d57e374580f
source id: 9fbeac0f-1c3f-3535-0000-000000000000
source relationship names:
- success
destination id: 0d6447ee-be36-3ec4-b896-1d57e374580f
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups:
- id: c8b85dc1-0dfb-3f01-0000-000000000000
name: ''
url: http://hostname.local:8080/nifi
comment: ''
timeout: 60 sec
yield period: 10 sec
transport protocol: HTTP
proxy host: ''
proxy port: ''
proxy user: ''
proxy password: ''
local network interface: ''
Input Ports:
- id: befe3bd8-b4ac-326c-becf-fc448e4a8ff6
name: MiniFi From TX1 Jetson
comment: ''
max concurrent tasks: 1
use compression: false
- id: 0d6447ee-be36-3ec4-b896-1d57e374580f
name: Movidius Input
comment: ''
max concurrent tasks: 1
use compression: false
- id: e43fd055-eb0c-37b2-ad38-91aeea9380cd
name: ChristmasTreeInput
comment: ''
max concurrent tasks: 1
use compression: false
- id: 4689a182-d7e1-3371-bef2-24e102ed9350
name: From ADP Remote Partition 1
comment: ''
max concurrent tasks: 1
use compression: false
Output Ports: []
Input Ports: []
Output Ports: []
Funnels: []
Connections: []
Remote Process Groups: []
NiFi Properties Overrides: {}
Start Apache MiniFi on Your RPI Box
root@sensehatmovidius:/opt/demo/minifi-0.3.0/logs# ../bin/minifi.sh start1686],offset=0,name=2017-12-28_1452.jpg,size=531686], StandardFlowFileRecord[uuid=ca84bb5f-b9ff-40f2-b5f3-ece5278df9c2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1514490624975-1, container=default, section=1], offset=1592409, length=531879],offset=0,name=2017-12-28_1453.jpg,size=531879], StandardFlowFileRecord[uuid=6201c682-967b-4922-89fb-d25abbc2fe82,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1514490624975-1, container=default, section=1], offset=2124819, length=527188],offset=0,name=2017-12-28_1455.jpg,size=527188]] (2.53 MB) to http://hostname.local:8080/nifi-api in 639 milliseconds at a rate of 3.96 MB/sec
2017-12-28 15:08:35,588 INFO [Timer-Driven Process Thread-8] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=Movidius Input,targets=http://hostname.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=670a2494-5c60-4551-9a59-a1859e794cfe,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1514490883824-2, container=default, section=2], offset=530145, length=531289],offset=0,name=2017-12-28_1456.jpg,size=531289], StandardFlowFileRecord[uuid=39cad0fd-ccba-4386-840d-758b39daf48e,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1514491306665-2, container=default, section=2], offset=0, length=504600],offset=0,name=2017-12-28_1501.jpg,size=504600], StandardFlowFileRecord[uuid=3c2d4a2b-a10b-4a92-8c8b-ab73e6960670,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1514491298789-1, container=default, section=1], offset=0, length=527543],offset=0,name=2017-12-28_1502.jpg,size=527543], StandardFlowFileRecord[uuid=92cbfd2b-e94a-4854-b874-ef575dcf3905,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1514490883824-2, container=default, section=2], offset=0, length=529618],offset=0,name=2017-12-28_1454.jpg,size=529618]] (2 MB) to http://hostname.local:8080/nifi-api in 439 milliseconds at a rate of 4.54 MB/sec
2017-12-28 15:08:41,341 INFO [Timer-Driven Process Thread-2] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=Movidius Input,targets=http://hostname.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=3fb7ca68-3e10-40c6-8f37-49532134689d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1514491710195-1, container=default, section=1], offset=0, length=567],offset=0,name=21532511724407,size=567]] (567 bytes) to http://hostname.local:8080/nifi-api in 113 milliseconds at a rate of 4.88 KB/sec
Our Apache MiniFi flow sends that data via HTTP to our Apache NiFi server for processing and converting into a standard Apache ORC file in HDFS for Apache Hive queries.
Using InferAvroSchema we build a schema and upload it to the Hortonworks Schema Registry for processing.
Schema
{
"type": "record",
"name": "movidiussense",
"fields": [
{
"name": "cputemp2",
"type": "double",
"doc": "Type inferred from '54.77'"
},
{
"name": "pitch",
"type": "double",
"doc": "Type inferred from '4.0'"
},
{
"name": "label5",
"type": "string",
"doc": "Type inferred from '\"b'n04238763 slide rule, slipstick'\"'"
},
{
"name": "starttime",
"type": "string",
"doc": "Type inferred from '\"2017-12-28 20:08:37\"'"
},
{
"name": "label2",
"type": "string",
"doc": "Type inferred from '\"b'n02871525 bookshop, bookstore, bookstall'\"'"
},
{
"name": "y",
"type": "double",
"doc": "Type inferred from '1.0'"
},
{
"name": "currenttime",
"type": "string",
"doc": "Type inferred from '\"2017-12-28 20:08:39\"'"
},
{
"name": "x",
"type": "double",
"doc": "Type inferred from '-0.0'"
},
{
"name": "label4",
"type": "string",
"doc": "Type inferred from '\"b'n03482405 hamper'\"'"
},
{
"name": "roll",
"type": "double",
"doc": "Type inferred from '94.0'"
},
{
"name": "temp",
"type": "double",
"doc": "Type inferred from '35.31'"
},
{
"name": "host",
"type": "string",
"doc": "Type inferred from '\"sensehatmovidius\"'"
},
{
"name": "diskfree",
"type": "string",
"doc": "Type inferred from '\"15817.0 MB\"'"
},
{
"name": "memory",
"type": "double",
"doc": "Type inferred from '39.2'"
},
{
"name": "label3",
"type": "string",
"doc": "Type inferred from '\"b'n02666196 abacus'\"'"
},
{
"name": "label1",
"type": "string",
"doc": "Type inferred from '\"b'n02727426 apiary, bee house'\"'"
},
{
"name": "yaw",
"type": "double",
"doc": "Type inferred from '225.0'"
},
{
"name": "cputemp",
"type": "int",
"doc": "Type inferred from '55'"
},
{
"name": "humidity",
"type": "double",
"doc": "Type inferred from '17.2'"
},
{
"name": "tempf",
"type": "double",
"doc": "Type inferred from '75.56'"
},
{
"name": "z",
"type": "double",
"doc": "Type inferred from '-0.0'"
},
{
"name": "pressure",
"type": "double",
"doc": "Type inferred from '1033.2'"
},
{
"name": "ipaddress",
"type": "string",
"doc": "Type inferred from '\"192.168.1.156\"'"
}
]
}
Our data is now in HDFS with an Apache Hive table that Apache NiFi has generated the Create Table DDL for as follows:
CREATE EXTERNAL TABLE IF NOT EXISTS movidiussense (cputemp2 DOUBLE, pitch DOUBLE, label5 STRING, starttime STRING, label2 STRING, y DOUBLE, currenttime STRING, x DOUBLE, label4 STRING, roll DOUBLE, temp DOUBLE, host STRING, diskfree STRING, memory DOUBLE, label3 STRING, label1 STRING, yaw DOUBLE, cputemp INT, humidity DOUBLE, tempf DOUBLE, z DOUBLE, pressure DOUBLE, ipaddress STRING) STORED AS ORC LOCATION '/movidiussense'
Resources
https://developer.movidius.com/
https://github.com/tspannhw/rpi-minifi-movidius-sensehat
http://chris.gg/2012/07/using-a-ps3-eyetoy-with-the-raspberry-pi/
https://developer.movidius.com/start
https://github.com/movidius/ncsdk/blob/master/README.md
... View more
Labels:
12-27-2017
10:53 PM
2 Kudos
There is an open source model server for Apache MXNet that I recently tried. It's very easy to install and use. You must have Apache MXNet and Python installed.
Installation and Setup
To install the Model Server it's a simple. I am using Pip3 to make sure I install to Python3 as I also have Python 2.7 installed on my laptop.
pip3 install mxnet --pre --user
pip3 install mxnet-model-server
pip3 install imdbpy
pip3 install dataset
http://127.0.0.1:9999/api-description
{
"description": {
"host": "127.0.0.1:9999",
"info": {
"title": "Model Serving Apis",
"version": "1.0.0"
},
"paths": {
"/api-description": {
"get": {
"operationId": "api-description",
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"properties": {
"description": {
"type": "string"
}
},
"type": "object"
}
}
}
}
},
"/ping": {
"get": {
"operationId": "ping",
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"properties": {
"health": {
"type": "string"
}
},
"type": "object"
}
}
}
}
},
"/squeezenet/predict": {
"post": {
"consumes": [
"multipart/form-data"
],
"operationId": "squeezenet_predict",
"parameters": [
{
"description": "data should be image which will be resized to: [3, 224, 224]",
"in": "formData",
"name": "data",
"required": "true",
"type": "file"
}
],
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"properties": {
"prediction": {
"type": "string"
}
},
"type": "object"
}
}
}
}
}
},
"schemes": [
"http"
],
"swagger": "2.0"
}
}
http://127.0.0.1:9999/ping
{
"health": "healthy!"
}
Because each server can specify a port, you can have many running at once. I am running two at once. One for SSD and one for SqueezeNet. In the MXNet Model Server github you will find a Model Zoo containing many image processing libraries and examples.
mxnet-model-server --models squeezenet=https://s3.amazonaws.com/model-server/models/squeezenet_v1.1/squeezenet_v1.1.model --service mms/model_service/mxnet_vision_service.py --port 9999
/usr/local/lib/python3.6/site-packages/mms/service_manager.py:14: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
import imp
[INFO 2017-12-27 08:50:23,195 PID:50443 /usr/local/lib/python3.6/site-packages/mms/mxnet_model_server.py:__init__:87] Initialized model serving.
Downloading squeezenet_v1.1.model from https://s3.amazonaws.com/model-server/models/squeezenet_v1.1/squeezenet_v1.1.model.
[08:50:26] src/nnvm/legacy_json_util.cc:190: Loading symbol saved by previous version v0.8.0. Attempting to upgrade...
[08:50:26] src/nnvm/legacy_json_util.cc:198: Symbol successfully upgraded!
[INFO 2017-12-27 08:50:26,701 PID:50443 /usr/local/lib/python3.6/site-packages/mms/serving_frontend.py:add_endpoint:182] Adding endpoint: squeezenet_predict to Flask
[INFO 2017-12-27 08:50:26,701 PID:50443 /usr/local/lib/python3.6/site-packages/mms/serving_frontend.py:add_endpoint:182] Adding endpoint: ping to Flask
[INFO 2017-12-27 08:50:26,702 PID:50443 /usr/local/lib/python3.6/site-packages/mms/serving_frontend.py:add_endpoint:182] Adding endpoint: api-description to Flask
[INFO 2017-12-27 08:50:26,703 PID:50443 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric errors for last 30 seconds is 0.000000
[INFO 2017-12-27 08:50:26,703 PID:50443 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric requests for last 30 seconds is 0.000000
[INFO 2017-12-27 08:50:26,703 PID:50443 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric cpu for last 30 seconds is 0.335000
[INFO 2017-12-27 08:50:26,704 PID:50443 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric memory for last 30 seconds is 0.005696
[INFO 2017-12-27 08:50:26,704 PID:50443 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric disk for last 30 seconds is 0.656000
[INFO 2017-12-27 08:50:26,704 PID:50443 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric overall_latency for last 30 seconds is 0.000000
[INFO 2017-12-27 08:50:26,705 PID:50443 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric inference_latency for last 30 seconds is 0.000000
[INFO 2017-12-27 08:50:26,705 PID:50443 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric preprocess_latency for last 30 seconds is 0.000000
[INFO 2017-12-27 08:50:26,720 PID:50443 /usr/local/lib/python3.6/site-packages/mms/mxnet_model_server.py:start_model_serving:101] Service started successfully.
[INFO 2017-12-27 08:50:26,720 PID:50443 /usr/local/lib/python3.6/site-packages/mms/mxnet_model_server.py:start_model_serving:102] Service description endpoint: 127.0.0.1:9999/api-description
[INFO 2017-12-27 08:50:26,720 PID:50443 /usr/local/lib/python3.6/site-packages/mms/mxnet_model_server.py:start_model_serving:103] Service health endpoint: 127.0.0.1:9999/ping
[INFO 2017-12-27 08:50:26,730 PID:50443 /usr/local/lib/python3.6/site-packages/werkzeug/_internal.py:_log:87] * Running on http://127.0.0.1:9999/ (Press CTRL+C to quit)
For the SSD example, I fork the AWS Github (https://github.com/awslabs/mxnet-model-server.git) and change directory to the example/ssd directory and follow the setup to prepare the model.
mxnet-model-server --models SSD=resnet50_ssd_model.model --service ssd_service.py --port 9998
/usr/local/lib/python3.6/site-packages/mms/service_manager.py:14: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
import imp
[INFO 2017-12-27 09:02:22,800 PID:55345 /usr/local/lib/python3.6/site-packages/mms/mxnet_model_server.py:__init__:87] Initialized model serving.
[INFO 2017-12-27 09:02:24,510 PID:55345 /usr/local/lib/python3.6/site-packages/mms/serving_frontend.py:add_endpoint:182] Adding endpoint: SSD_predict to Flask
[INFO 2017-12-27 09:02:24,510 PID:55345 /usr/local/lib/python3.6/site-packages/mms/serving_frontend.py:add_endpoint:182] Adding endpoint: ping to Flask
[INFO 2017-12-27 09:02:24,511 PID:55345 /usr/local/lib/python3.6/site-packages/mms/serving_frontend.py:add_endpoint:182] Adding endpoint: api-description to Flask
[INFO 2017-12-27 09:02:24,511 PID:55345 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric errors for last 30 seconds is 0.000000
[INFO 2017-12-27 09:02:24,512 PID:55345 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric requests for last 30 seconds is 0.000000
[INFO 2017-12-27 09:02:24,512 PID:55345 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric cpu for last 30 seconds is 0.290000
[INFO 2017-12-27 09:02:24,513 PID:55345 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric memory for last 30 seconds is 0.014777
[INFO 2017-12-27 09:02:24,513 PID:55345 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric disk for last 30 seconds is 0.656000
[INFO 2017-12-27 09:02:24,513 PID:55345 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric overall_latency for last 30 seconds is 0.000000
[INFO 2017-12-27 09:02:24,514 PID:55345 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric inference_latency for last 30 seconds is 0.000000
[INFO 2017-12-27 09:02:24,514 PID:55345 /usr/local/lib/python3.6/site-packages/mms/metric.py:start_recording:118] Metric preprocess_latency for last 30 seconds is 0.000000
[INFO 2017-12-27 09:02:24,514 PID:55345 /usr/local/lib/python3.6/site-packages/mms/mxnet_model_server.py:start_model_serving:101] Service started successfully.
[INFO 2017-12-27 09:02:24,514 PID:55345 /usr/local/lib/python3.6/site-packages/mms/mxnet_model_server.py:start_model_serving:102] Service description endpoint: 127.0.0.1:9998/api-description
[INFO 2017-12-27 09:02:24,514 PID:55345 /usr/local/lib/python3.6/site-packages/mms/mxnet_model_server.py:start_model_serving:103] Service health endpoint: 127.0.0.1:9998/ping
[INFO 2017-12-27 09:02:24,524 PID:55345 /usr/local/lib/python3.6/site-packages/werkzeug/_internal.py:_log:87] * Running on http://127.0.0.1:9998/ (Press CTRL+C to quit)
http://127.0.0.1:9998/api-description
{
"description": {
"host": "127.0.0.1:9998",
"info": {
"title": "Model Serving Apis",
"version": "1.0.0"
},
"paths": {
"/SSD/predict": {
"post": {
"consumes": [
"multipart/form-data"
],
"operationId": "SSD_predict",
"parameters": [
{
"description": "data should be image which will be resized to: [3, 512, 512]",
"in": "formData",
"name": "data",
"required": "true",
"type": "file"
}
],
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"properties": {
"prediction": {
"type": "string"
}
},
"type": "object"
}
}
}
}
},
"/api-description": {
"get": {
"operationId": "api-description",
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"properties": {
"description": {
"type": "string"
}
},
"type": "object"
}
}
}
}
},
"/ping": {
"get": {
"operationId": "ping",
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"properties": {
"health": {
"type": "string"
}
},
"type": "object"
}
}
}
}
}
},
"schemes": [
"http"
],
"swagger": "2.0"
}
}
http://127.0.0.1:9998/ping
{
"health": "healthy!"
}
With this call to Squeeze net we get some classes of guesses and probabilities (0.50 -> 50%).
curl -X POST http://127.0.0.1:9999/squeezenet/predict -F "data=@TimSpann2.jpg"
{
"prediction": [
[
{
"class": "n02877765 bottlecap",
"probability": 0.5077430009841919
},
{
"class": "n03196217 digital clock",
"probability": 0.35705313086509705
},
{
"class": "n03706229 magnetic compass",
"probability": 0.02305465377867222
},
{
"class": "n02708093 analog clock",
"probability": 0.018635360524058342
},
{
"class": "n04328186 stopwatch, stop watch",
"probability": 0.015588048845529556
}
]
]
}
With this test call to SSD, you will see it identifies a person (me) and provides coordinates of a box around me.
curl -X POST http://127.0.0.1:9998/SSD/predict -F "data=@TimSpann2.jpg"
{
"prediction": [
[
"person",
405,
139,
614,
467
],
[
"boat",
26,
0,
459,
481
]
]
}
/opt/demo/curl.sh
curl -X POST http://127.0.0.1:9998/SSD/predict -F "data=@$1"
/opt/demo/curl2.sh
curl -X POST http://127.0.0.1:9999/squeezenet/predict -F "data=@$1"
The Apache NiFi flow is easy, I call the REST URL and pass an image. This can be done with a Groovy Script or by executing a Curl Shell.
Logs From Run
[INFO 2017-12-27 17:41:33,447 PID:90860 /usr/local/lib/python3.6/site-packages/werkzeug/_internal.py:_log:87] 127.0.0.1 - - [27/Dec/2017 17:41:33] "POST /SSD/predict HTTP/1.1" 400 -
[INFO 2017-12-27 17:41:36,289 PID:90860 /usr/local/lib/python3.6/site-packages/mms/serving_frontend.py:predict_callback:440] Request input: data should be image with jpeg format.
[INFO 2017-12-27 17:41:36,289 PID:90860 /usr/local/lib/python3.6/site-packages/mms/request_handler/flask_handler.py:get_file_data:133] Getting file data from request.
[INFO 2017-12-27 17:41:37,035 PID:90860 /usr/local/lib/python3.6/site-packages/mms/serving_frontend.py:predict_callback:475] Response is text.
[INFO 2017-12-27 17:41:37,035 PID:90860 /usr/local/lib/python3.6/site-packages/mms/request_handler/flask_handler.py:jsonify:156] Jsonifying the response: {'prediction': [('motorbike', 270, 877, 1944, 3214), ('car', 77, 763, 2113, 3193)]}
Apache NiFi Results of the Run One of the Images Processed Apache NiFi Flow Template mxnetserver.xml
Resources
https://github.com/awslabs/mxnet-model-server
https://github.com/awslabs/mxnet-model-server/blob/master/docs/README.md
https://github.com/awslabs/mxnet-model-server/blob/master/docs/server.md
https://github.com/awslabs/mxnet-model-server/blob/master/examples/ssd/README.md
http://gluon.mxnet.io/
https://github.com/awslabs/mxnet-model-server/blob/master/docs/model_zoo.md
https://mxnet.incubator.apache.org/gluon/index.html
http://mxnet.incubator.apache.org/tutorials/index.html https://github.com/apache/incubator-mxnet/tree/master/example
https://github.com/apache/incubator-mxnet/tree/master/example#deep-learning-examples
http://mxnet.incubator.apache.org/model_zoo/index.html
https://github.com/apache/incubator-mxnet/releases/tag/1.0.0 https://github.com/gluon-api/gluon-api
http://mxnet.incubator.apache.org/tutorials/r/classifyRealImageWithPretrainedModel.html
https://www.slideshare.net/JulienSIMON5/an-introduction-to-deep-learning-with-apache-mxnet
https://mxnet.incubator.apache.org/get_started/why_mxnet.html
https://www.slideshare.net/JulienSIMON5/deep-learning-for-developers-december-2017
https://aws.amazon.com/blogs/aws/aws-contributes-to-milestone-1-0-release-and-adds-model-serving-capability-for-apache-mxnet/
... View more
Labels:
12-26-2017
10:05 PM
3 Kudos
I wanted to see offer alternatives to running Deep Learning and Machine Learning locally and take advantage of some free hours of cloud time.
It seems that it is trivially easy to integrate calling IBM Watson APIs for all your microservice needs. Apache NiFi makes it super easy and fun.
Using out of the box processors, we can call the InvokeHTTP
POST and GET REST APIs of the IBM Watson Platform for Natural Language, Visual Recognition, Personality Analysis, Language Translation and others. These use SSL, so you have to setup a simple StandardSSLContextService in Apache NiFi. Once you know the JVM you are running Apache NiFi with, it's trivial to grab the requirements for that.
By default the password is the very secure,
changeit.
A Get REST CALL to NLP
A Post to a Watson REST URL
Some URLs and Calls
curl -X POST --form "images_file=@mypic.jpg" "https://gateway-a.watsonplatform.net/visual-recognition/api/v3/classify?api_key={api-key}&version=2016-05-20"
curl -X POST --form "images_file=@myotherpic.jpg" "https://gateway-a.watsonplatform.net/visual-recognition/api/v3/detect_faces?api_key={api-key}&version=2016-05-20"
curl --user "{username}":"{password}" "https://gateway.watsonplatform.net/natural-language-understanding/api/v1/analyze?version=2017-02-27&text=This+is+a+test&features=sentiment,keywords"
curl --user "{username}":"{password}" "https://gateway.watsonplatform.net/natural-language-understanding/api/v1/analyze?version=2017-02-27&text=Test&features=sentiment,keywords&keywords.sentiment=true"
curl -X POST --user "{username}":"{password}" --header "Content-Type: application/json" --data-binary @{path_to_file}tone.json "https://gateway.watsonplatform.net/tone-analyzer/api/v3/tone?version=2017-09-21"
curl -X POST --user "{username}":"{password}" --header "Content-Type: application/json" --data-binary @{path_to_file}tone.json "https://gateway.watsonplatform.net/tone-analyzer/api/v3/tone?version=2017-09-21&sentences=false" curl -X GET --user "{username}":"{password}" "https://gateway.watsonplatform.net/tone-analyzer/api/v3/tone?version=2017-09-21
&text=Team%2C%20I%20know%20that%20times%20are%20tough%21%20Product%20sales%20have
%20been%20disappointing%20for%20the%20past%20three%20quarters.%20We%20have%20a%20
competitive%20product%2C%20but%20we%20need%20to%20do%20a%20better%20job%20of%20
selling%20it%21"
curl -X POST --user "{username}":"{password}" --header "Content-Type: application/json" --data-binary @{path_to_file}tone-chat.json "https://gateway.watsonplatform.net/tone-analyzer/api/v3/tone_chat?version=2017-09-21"
Personalality Insights
curl -X POST --user {username}:{password} --header "Content-Type: text/plain;charset=utf-8" --data-binary "@{path_to_file}profile.txt" "https://gateway.watsonplatform.net/personality-insights/api/v3/profile?version=2016-10-20"
Conversation
{
"url": "https://gateway.watsonplatform.net/conversation/api",
"username": "user",
"password": "pass"
}
Discovery
{
"url": "https://gateway.watsonplatform.net/discovery/api",
"username": "u",
"password": "p"
}
curl -X POST -u "{username}":"{password}" -H "Content-Type: application/json" -d '{ "name":"my-first-environment", "description":"exploring environments"}' "api/v1/environments?version=2017-09-01"
Language Translator
{
"url": "https://gateway.watsonplatform.net/language-translator/api",
"username": "u",
"password": "p"
}
curl -X POST --user "{username}":"{password}" --header "Content-Type: application/json" --header "Accept: application/json" --data '{"text":"Hello, world!","source":"en","target":"es"}' "https://gateway.watsonplatform.net/language-translator/api/v2/translate"
Natural Language
{
"url": "https://gateway.watsonplatform.net/natural-language-understanding/api",
"username": "u",
"password": "p"
}
curl --user "{username}":"{password}" "https://gateway.watsonplatform.net/natural-language-understanding/api/v1/analyze?version=2017-02-27&text=SomeText&features=sentiment,keywords"
If you can call it with curl or wget, you can call it with Apache NiFi.
An Overview of Incorporating IBM Watson in Apache NiFi Flows
With Apache NiFi we can subtitute any message you want NLP to analyze. See
${msg} which is expression language.
As you can see just plug your key in there!
IBM Returns some nice clean JSON.
We get our JSON probabilities and can use them as we see fit. The next step I would convert using a schema to Apache AVRO and then to Apache ORC and store in Apache Hive LLAP for queries and analytics.
'
Resources: https://console.bluemix.net/docs/services/visual-recognition/getting-started.html#getting-started-tutorial https://console.bluemix.net/services/natural-language-understanding/ https://console.bluemix.net/services/personality_insights/ https://console.bluemix.net/docs/services/visual-recognition/getting-started.html#getting-started-tutorial https://www.ibm.com/watson/developercloud/visual-recognition/api/v3/#introduction pip install --upgrade watson-developer-cloud https://github.com/watson-developer-cloud/ https://www.ibm.com/watson/webinars/ https://github.com/watson-developer-cloud/retrieve-and-rank-java https://github.com/watson-developer-cloud/java-sdk https://github.com/watson-developer-cloud/java-sdk/tree/develop/speech-to-text https://github.com/watson-developer-cloud/cognitive-client-java
brew tap watson-developer-cloud/tools https://github.com/watson-developer-cloud/java-sdk https://github.com/watson-developer-cloud/java-sdk/tree/master/examples/src/main/java/com/ibm/watson/developer_cloud https://console.bluemix.net/dashboard/apps https://console.bluemix.net/catalog/services/tone-analyzer/
... View more
Labels:
12-24-2017
03:17 PM
4 Kudos
Happy Holidays!
Apache NiFi makes it easy to build your own integration tests. So I am generating tests to test Turning On and Off My Christmas Tree Hat. I also testing taking a picture.
My use case is to send an HTTP message to trigger a Raspberry Pi to turn on a physical device like a camera or light. This is pretty cool and secure with Apache MiniFi and Apache NiFi. A little Python script is all the code and that's basic example code.
This code is a modified TensorFlow classify.py that adds turning on the Christmas Tree. So we turn on the tree and then take a picture with the PiCamera and then run it through a Tensorflow classifier.
root@vid5:/opt/demo# cat classifytree.py
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Simple image classification with Inception.
Run image classification with Inception trained on ImageNet 2012 Challenge data
set.
This program creates a graph from a saved GraphDef protocol buffer,
and runs inference on an input JPEG image. It outputs human readable
strings of the top 5 predictions along with their probabilities.
Change the --image_file argument to any jpg image to compute a
classification of that image.
Please see the tutorial and website for a detailed description of how
to use this script to perform image recognition.
https://tensorflow.org/tutorials/image_recognition/
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os.path
import re
import sys
import tarfile
import os
import datetime
import math
import random, string
import base64
import json
import time
import picamera
from time import sleep
from time import gmtime, strftime
import numpy as np
from six.moves import urllib
import tensorflow as tf
from gpiozero import LEDBoard
from gpiozero.tools import random_values
from signal import pause
tree = LEDBoard(*range(2,28),pwm=True)
for led in tree:
led.source_delay = 0.1
led.source = random_values()
tf.logging.set_verbosity(tf.logging.ERROR)
FLAGS = None
# pylint: disable=line-too-long
DATA_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz'
# pylint: enable=line-too-long
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
def randomword(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
class NodeLookup(object):
"""Converts integer node ID's to human readable labels."""
def __init__(self,
label_lookup_path=None,
uid_lookup_path=None):
if not label_lookup_path:
label_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_2012_challenge_label_map_proto.pbtxt')
if not uid_lookup_path:
uid_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_synset_to_human_label_map.txt')
self.node_lookup = self.load(label_lookup_path, uid_lookup_path)
def load(self, label_lookup_path, uid_lookup_path):
"""Loads a human readable English name for each softmax node.
Args:
label_lookup_path: string UID to integer node ID.
uid_lookup_path: string UID to human-readable string.
Returns:
dict from integer node ID to human-readable string.
"""
if not tf.gfile.Exists(uid_lookup_path):
tf.logging.fatal('File does not exist %s', uid_lookup_path)
if not tf.gfile.Exists(label_lookup_path):
tf.logging.fatal('File does not exist %s', label_lookup_path)
# Loads mapping from string UID to human-readable string
proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
uid_to_human = {}
p = re.compile(r'[n\d]*[ \S,]*')
for line in proto_as_ascii_lines:
parsed_items = p.findall(line)
uid = parsed_items[0]
human_string = parsed_items[2]
uid_to_human[uid] = human_string
# Loads mapping from string UID to integer node ID.
node_id_to_uid = {}
proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
for line in proto_as_ascii:
if line.startswith(' target_class:'):
target_class = int(line.split(': ')[1])
if line.startswith(' target_class_string:'):
target_class_string = line.split(': ')[1]
node_id_to_uid[target_class] = target_class_string[1:-2]
# Loads the final mapping of integer node ID to human-readable string
node_id_to_name = {}
for key, val in node_id_to_uid.items():
if val not in uid_to_human:
tf.logging.fatal('Failed to locate: %s', val)
name = uid_to_human[val]
node_id_to_name[key] = name
return node_id_to_name
def id_to_string(self, node_id):
if node_id not in self.node_lookup:
return ''
return self.node_lookup[node_id]
def create_graph():
"""Creates a graph from saved GraphDef file and returns a saver."""
# Creates graph from saved graph_def.pb.
with tf.gfile.FastGFile(os.path.join(
FLAGS.model_dir, 'classify_image_graph_def.pb'), 'rb') as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
_ = tf.import_graph_def(graph_def, name='')
def run_inference_on_image(image):
"""Runs inference on an image.
Args:
image: Image file name.
Returns:
Nothing
"""
if not tf.gfile.Exists(image):
tf.logging.fatal('File does not exist %s', image)
image_data = tf.gfile.FastGFile(image, 'rb').read()
# Creates graph from saved GraphDef.
create_graph()
with tf.Session() as sess:
# Some useful tensors:
# 'softmax:0': A tensor containing the normalized prediction across
# 1000 labels.
# 'pool_3:0': A tensor containing the next-to-last layer containing 2048
# float description of the image.
# 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG
# encoding of the image.
# Runs the softmax tensor by feeding the image_data as input to the graph.
softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
predictions = sess.run(softmax_tensor,
{'DecodeJpeg/contents:0': image_data})
predictions = np.squeeze(predictions)
# Creates node ID --> English string lookup.
node_lookup = NodeLookup()
top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1]
row = []
for node_id in top_k:
human_string = node_lookup.id_to_string(node_id)
score = predictions[node_id]
row.append( { 'node_id': node_id, 'image': image, 'host': host, 'ts': currenttime, 'human_string': str(human_string), 'score': str(score)} )
json_string = json.dumps(row)
print( json_string )
def maybe_download_and_extract():
"""Download and extract model tar file."""
dest_directory = FLAGS.model_dir
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = DATA_URL.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' % (
filename, float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
print()
statinfo = os.stat(filepath)
print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def main(_):
maybe_download_and_extract()
# Create unique image name
img_name = '/opt/demo/images/pi_image_{0}_{1}.jpg'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))
# Capture Image from Pi Camera
try:
camera = picamera.PiCamera()
camera.resolution = (1024,768)
camera.annotate_text = " Stored with Apache NiFi "
camera.capture(img_name, resize=(600,400))
pass
finally:
camera.close()
# image = (FLAGS.image_file if FLAGS.image_file else
# os.path.join(FLAGS.model_dir, 'cropped_panda.jpg'))
run_inference_on_image(img_name)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# classify_image_graph_def.pb:
# Binary representation of the GraphDef protocol buffer.
# imagenet_synset_to_human_label_map.txt:
# Map from synset ID to a human readable string.
# imagenet_2012_challenge_label_map_proto.pbtxt:
# Text representation of a protocol buffer mapping a label to synset ID.
parser.add_argument(
'--model_dir',
type=str,
default='/tmp/imagenet',
help=""" Path to classify_image_graph_def.pb,
imagenet_synset_to_human_label_map.txt, and
imagenet_2012_challenge_label_map_proto.pbtxt. """
)
parser.add_argument(
'--image_file',
type=str,
default='',
help='Absolute path to image file.'
)
parser.add_argument(
'--num_top_predictions',
type=int,
default=5,
help='Display this many predictions.'
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
<br>
This is the information for getting your own Christmas Tree Hat for your RPI.
https://thepihut.com/products/3d-xmas-tree-for-raspberry-pi
https://thepihut.com/blogs/raspberry-pi-tutorials/3d-xmas-tree-for-raspberry-pi-assembly-instructions
sudo apt-get install python-gpiozero python3-gpiozero
Results of Running
[{"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.175653", "human_string": "pay-phone, pay-station", "node_id": 843}, {"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.0890657", "human_string": "cellular telephone, cellular phone, cellphone, cell, mobile phone", "node_id": 914}, {"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.0631831", "human_string": "vending machine", "node_id": 558}, {"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.0541551", "human_string": "abacus", "node_id": 547}, {"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.0417486", "human_string": "rotisserie", "node_id": 663}]
To Remote Active the Tree
curl -X POST http://192.168.1.167:8033/contentListener --data-ascii "tree-on" -v
It's so easy to enable Apache MIniFi to be controlled by any remote HTTP request.
Other Apache MiniFi Requests
root@vid5:/opt/demo/minifi-0.2.0/logs# curl -v http://HW13125.local:8080/nifi-api/system-diagnostics
* Hostname was NOT found in DNS cache
* Trying 192.168.1.193...
* Connected to HW13125.local (192.168.1.193) port 8080 (#0)
> GET /nifi-api/system-diagnostics HTTP/1.1
> User-Agent: curl/7.38.0
> Host: HW13125.local:8080
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Mon, 18 Dec 2017 13:42:07 GMT
< X-Frame-Options: SAMEORIGIN
< Cache-Control: private, no-cache, no-store, no-transform
< Content-Type: application/json
< Vary: Accept-Encoding
< Vary: User-Agent
< Content-Length: 1852
* Server Jetty(9.4.3.v20170317) is not blacklisted
< Server: Jetty(9.4.3.v20170317)
<
{"systemDiagnostics":{"aggregateSnapshot":{"totalNonHeap":"390.23 MB","totalNonHeapBytes":409190400,"usedNonHeap":"370.09 MB","usedNonHeapBytes":388065576,"freeNonHeap":"20.15 MB","freeNonHeapBytes":21124824,"maxNonHeap":"-1 bytes","maxNonHeapBytes":-1,"totalHeap":"2 GB","totalHeapBytes":2147483648,"usedHeap":"1.77 GB","usedHeapBytes":1904638968,"freeHeap":"231.59 MB","freeHeapBytes":242844680,"maxHeap":"2 GB","maxHeapBytes":2147483648,"heapUtilization":"89.0%","availableProcessors":8,"processorLoadAverage":2.794921875,"totalThreads":105,"daemonThreads":41,"uptime":"43:00:01.210","flowFileRepositoryStorageUsage":{"freeSpace":"55.76 GB","totalSpace":"931.19 GB","usedSpace":"875.43 GB","freeSpaceBytes":59870846976,"totalSpaceBytes":999860912128,"usedSpaceBytes":939990065152,"utilization":"94.0%"},"contentRepositoryStorageUsage":[{"identifier":"default","freeSpace":"55.76 GB","totalSpace":"931.19 GB","usedSpace":"875.43 GB","freeSpaceBytes":59870846976,"totalSpaceBytes":999860912128,"usedSpaceBytes":939990065152* Connection #0 to host HW13125.local left intact
,"utilization":"94.0%"}],"provenanceRepositoryStorageUsage":[{"identifier":"default","freeSpace":"55.76 GB","totalSpace":"931.19 GB","usedSpace":"875.43 GB","freeSpaceBytes":59870846976,"totalSpaceBytes":999860912128,"usedSpaceBytes":939990065152,"utilization":"94.0%"}],"garbageCollection":[{"name":"G1 Young Generation","collectionCount":742,"collectionTime":"00:00:17.754","collectionMillis":17754},{"name":"G1 Old Generation","collectionCount":0,"collectionTime":"00:00:00.000","collectionMillis":0}],"statsLastRefreshed":"08:42:07 EST","versionInfo":{"niFiVersion":"1.5.0-SNAPSHOT","javaVendor":"Oracle Corporation","javaVersion":"1.8.0_121","osName":"Mac OS X","osVersion":"10.13.2","osArchitecture":"x86_64","buildTag":"HEAD","buildRevision":"a774f1d","buildBranch":"master","buildTimestamp":"12/07/2017 13:37:07 EST"}}}}root@vid5:/opt/d
Resources
https://community.hortonworks.com/articles/118132/minifi-capturing-converting-tensorflow-inception-t.html
https://github.com/tspannhw/rpi-minifi-movidius-sensehat
https://github.com/tspannhw/rpi-sensehat-minifi-python
I will keep my eyes out for Raspberry PI add-ons for other holidays.
For the second christmas tree it's a Sense Hat!
1. Setup Raspian Stretch
https://www.raspberrypi.org/documentation/configuration/wireless/wireless-cli.md
2. Sense-Hat
sudo apt-get install sense-hat
sudo apt-get install octave -y
pip install --upgrade sense-hat
pip install --upgrade pillow
pip install rtimulib
pip install psutil
sudo apt-get install oracle-java8-jdk
sudo apt install gstreamer-1.0
sudo apt install python3-gst-1.0
sudo apt-get install gir1.2-gstreamer-1.0
sudo apt-get install gir1.2-gst-plugins-base-1.0
For the Sense Hat
Just run this: https://github.com/PixelNoob/sensehat/blob/master/xmas_tree.py
... View more
Labels:
12-22-2017
08:49 PM
5 Kudos
In the Holidays, it's nice to know how much energy you are using. So one small step is I bought a low-end inexpensive TPLink Energy Monitoring plug for one device. I have been monitoring phone charging and my Apple monitor. Let's read the data and do some queries in Apache Hive and Apache Spark 2 SQL. Processing Live Energy Feeds in The Cloud Monitor Energy From a Local OSX If your local instance does not have access to Apache Hive, you will need to send the data via Site-to-Site to a Remote Apache NiFi / HDF server/cluster that can. For Apache Hive Usage, Please Convert to Apache ORC Files To Create Your New Table, Grab the hive.ddl Inside of Apache Zeppelin, we can create our table based on the above DDL. We could have also let Apache NiFi create the table for us. I like to keep my DDL with my notebook. Just a personal choice. We can then query our table in Apache Zeppelin utilizing Apache Spark 2 SQL and Apache Hive QL. Overview Step 1: Purchase an inexpensive energy monitoring plug Step 2: Connect it to a Phone App via WIFI Step 3: Once Configured, you can now access via Python Step 4: Install the HS100 Python Library in Python 3.x Step 5: Fork My Github and Use My Shell Script and Python Script Step 6: Add the Local Apache NiFi Flow which will call that Script Step 7: Add a Remote Apache NiFi Flow for Processing into Apache Hadoop Step 8: Create Your Table Step 9: Query with Apache Hive and Apache Spark SQL via Apache Zeppelin or Other UI Step 10: Turn that extra stuff off and save money! The Open Source Code and Artefacts Shell Script (smartreader.sh) python3 meterreader.py Python Code (meterreader.py) from pyHS100 import SmartPlug, SmartBulb
#from pprint import pformat as pf
import json
import datetime
plug = SmartPlug("192.168.1.200")
row = { }
emeterdaily = plug.get_emeter_daily(year=2017, month=12)
for k, v in emeterdaily.items():
row["hour%s" % k] = v
hwinfo = plug.hw_info
for k, v in hwinfo.items():
row["%s" % k] = v
sysinfo = plug.get_sysinfo()
for k, v in sysinfo.items():
row["%s" % k] = v
timezone = plug.timezone
for k, v in timezone.items():
row["%s" % k] = v
emetermonthly = plug.get_emeter_monthly(year=2017)
for k, v in emetermonthly.items():
row["day%s" % k] = v
realtime = plug.get_emeter_realtime()
for k, v in realtime.items():
row["%s" % k] = v
row['alias'] = plug.alias
row['time'] = plug.time.strftime('%m/%d/%Y %H:%M:%S')
row['ledon'] = plug.led
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
json_string = json.dumps(row)
print(json_string) The code is basically a small tweak on the example code provided with the pyHS100 code. This code allows you to access the HS110 that I have. My PC and my smart meter are on the same WiFi which can't be 5G. Example Data {"hour19": 0.036, "hour20": 0.021, "hour21": 0.017, "sw_ver": "1.1.1 Build 160725 Rel.164033", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "060BFEA28A8CD1E67146EB5B2B599CC8", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim Spann's MiniFi Controller SmartPlug - Desk1", "icon_hash": "", "relay_state": 1, "on_time": 161599, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -32, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "day12": 0.074, "current": 0.04011, "voltage": 122.460974, "power": 1.8772, "total": 0.074, "time": "12/21/2017 13:21:52", "ledon": true, "systemtime": "12/21/2017 13:21:53"} As you can see we only get the hours and days where we had usage. Since this is new, I don't have them all. I created my schema to handle all the days of a month and all the hours of a day. We are going to have a sparse table. If I was monitoring millions of devices, I would put this in Apache HBase. I may do that later. Let's create an HDFS directory for Loading Apache ORC Files hdfs dfs -mkdir -p /smartPlug
hdfs dfs -chmod -R 777 /smartPlug Table DDL CREATE EXTERNAL TABLE IF NOT EXISTS smartPlug (hour19 DOUBLE, hour20 DOUBLE, hour21 DOUBLE, hour22 DOUBLE, hour23 DOUBLE, hour18 DOUBLE, hour17 DOUBLE, hour16 DOUBLE, hour15 DOUBLE, hour14 DOUBLE, hour13 DOUBLE, hour12 DOUBLE, hour11 DOUBLE, hour10 DOUBLE, hour9 DOUBLE, hour8 DOUBLE, hour7 DOUBLE, hour6 DOUBLE, hour5 DOUBLE, hour4 DOUBLE, hour3 DOUBLE, hour2 DOUBLE, hour1 DOUBLE, hour0 DOUBLE, sw_ver STRING, hw_ver STRING, mac STRING, type STRING, hwId STRING, fwId STRING, oemId STRING, dev_name STRING, model STRING, deviceId STRING, alias STRING, icon_hash STRING, relay_state INT, on_time INT, feature STRING, updating INT, rssi INT, led_off INT, latitude DOUBLE, longitude DOUBLE, index INT, zone_str STRING, tz_str STRING, dst_offset INT, day31 DOUBLE, day30 DOUBLE, day29 DOUBLE, day28 DOUBLE, day27 DOUBLE, day26 DOUBLE, day25 DOUBLE, day24 DOUBLE, day23 DOUBLE, day22 DOUBLE, day21 DOUBLE, day20 DOUBLE, day19 DOUBLE, day18 DOUBLE, day17 DOUBLE, day16 DOUBLE, day15 DOUBLE, day14 DOUBLE, day13 DOUBLE, day12 DOUBLE, day11 DOUBLE, day10 DOUBLE, day9 DOUBLE, day8 DOUBLE, day7 DOUBLE, day6 DOUBLE, day5 DOUBLE, day4 DOUBLE, day3 DOUBLE, day2 DOUBLE, day1 DOUBLE, current DOUBLE, voltage DOUBLE, power DOUBLE, total DOUBLE, time STRING, ledon BOOLEAN, systemtime STRING) STORED AS ORC
LOCATION '/smartPlug' A Simple Query on Some of the Variables select `current`,voltage, power,total,time,systemtime, on_time, rssi, latitude, longitude
from smartPlug Note that current is a special word in SQL so we tick it. An Apache Calcite Query Inside Apache NiFi SELECT * FROM FLOWFILE WHERE "current" > 0 With the Python API I can turn it off, so don't monitor then. In an updated article I will add a few smart plugs and turn them on and off based on things occurring. Perhaps turn off a light when no motion detected. We can do anything with Apache NiFi, Apache MiniFi and Python. The API also allows for turning the green LED light on the plug on and off. The Screen Prints above are from the IoS version of the TPLink KASA app, which let's you configure and monitor your plug. For many people that's good enough, but not for me. Resources smartplugprocessing.xml monitorpowerlocal.xml https://github.com/GadgetReactor/pyHS100 https://pypi.python.org/pypi/pyHS100 pip3 install pyhs100 https://github.com/tspannhw/nifi-smartplug/tree/master
... View more
Labels:
12-14-2017
09:57 PM
2 Kudos
I needed to build a quick SQL table from a JSON. There's some online tools, but I'd rather Java this process. It works okay enough, now I am wondering if this would make a good Apache NiFi processor package com.dataflowdeveloper.processors.process;
import java.util.Iterator;
import java.util.Map;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JsonToDDL {
/**
*
* @param tableName
* @param json
* @return String DDL SQL
*/
public String parse(String tableName, String json) {
JsonFactory factory = new JsonFactory();
StringBuilder sql = new StringBuilder(256);
sql.append("CREATE TABLE ").append(tableName).append(" ( ");
ObjectMapper mapper = new ObjectMapper(factory);
JsonNode rootNode = null;
try {
rootNode = mapper.readTree(json);
} catch (Exception e) {
e.printStackTrace();
}
Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String, JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
sql.append(field.getKey());
if (field.getValue().canConvertToInt()) {
sql.append(" INT, ");
} else if (field.getValue().canConvertToLong()) {
sql.append(" LONG, ");
} else if (field.getValue().asText().contains("/")) {
sql.append(" DATE, ");
} else if (field.getValue().asText().contains("-")) {
sql.append(" DATE, ");
} else if (field.getValue().asText().length() > 25) {
sql.append(" VARCHAR( ").append( field.getValue().asText().length() + 25 ) .append("), ");
} else {
sql.append(" VARCHAR(25), ");
}
}
// end table
sql.deleteCharAt(sql.length() - 2);
sql.append(" ) ");
return sql.toString();
}
public static void main(String[] args) {
JsonToDDL ddl = new JsonToDDL();
String json = "{\"EMP_ID\":3001,\"DURATION_SEC\":288000,\"LOG_DATE\":\"2017-11-07 10:00:00\"}";
String ddlSQL = ddl.parse("TIME_LOG", json);
System.out.println("DDL=" + ddlSQL);
json = " {\"EMP_ID\":4001,\"GENDER\": \"M\",\"DEPT_ID\":4, \"FIRST_NAME\":\"Brett\",\"LAST_NAME\" :\"Lee\"}";
ddlSQL = ddl.parse("EMPLOYEE", json);
System.out.println("DDL=" + ddlSQL);
json = "{\"DEPT_ID\":1,\"CODE\": \"FN\",\"NAME\":\"Finance\",\"DESCRIPTION\" :\"Finance Department\",\"ACTIVE\":1}";
ddlSQL = ddl.parse("DEPARTMENT", json);
System.out.println("DDL=" + ddlSQL);
}
}
In tests it looks okay. I do some guessing of what type something should be from the JSON value. I was also thinking I could hook this up to Avro Tools to do some other type investigation. So should I make this a processor? Or just a little script. Reference: https://github.com/kite-sdk/kite http://kitesdk.org/docs/1.1.0/Inferring-a-Schema-from-a-Java-Class.html https://avro.apache.org/docs/1.8.1/api/java/org/apache/avro/reflect/package-summary.html https://avro.apache.org/docs/1.8.2/gettingstartedjava.html#Serializing+and+deserializing+with+code+generation
... View more
Labels:
12-11-2017
03:02 PM
2 Kudos
Building schemas is tedious work and fraught with errors. The InferAvroSchema processor can get you started. It generates a compliant schema for use. There is one caveat, you have to make sure you are using Apache Avro safe field names. I have a custom processor that will clean your attributes if you need them Avro-safe. See processor listed below. Example Flow Utilizing InferAvroSchema InferAvroSchema Details Step 0: Use Apache NiFi to Convert Data to JSON or CSV Step 1: Send JSON or CSV Data to InferAvroSchema I recommend setting output destination to flowfile-attribute, input content type to json, pretty avro output to true. Step 2: The New schema is now in attribute: inferred.avro.schema. inferred.avro.schema
{ "type" : "record", "name" : "schema1", "fields" : [ { "name" : "table", "type" : "string", "doc" : "Type inferred from '\"schema1.tableName\"'" } ] } This schema can then be used for conversions directly or stored in Hortonworks Schema Registry or Apache NiFi Built-in Avro Registry. Now you can use it for ConvertRecord, QueryRecord and other Record processing. Example Generated Schema in Avro-JSON Format Stored in Hortonworks Schema Registry: Source: https://github.com/tspannhw/nifi-attributecleaner-processor
... View more
Labels: