Created on 08-21-2017 10:02 PM - edited 08-17-2019 11:32 AM
The MiniFi flow executes two scripts: one to call TensorFlow Python that captures an OpenCV Raspberry Pi Camera and runs Inception on it. That message is formatted as JSON and sent on. The second script reads GPS values from a USB GSP sensor and outputs JSON. Get File reads the Pi Camera image produced by the ClassifyImages process. Cleanup logs is a standalone timed script that cleans up old logs on the Raspberry Pi.
Using InferredAvroSchema I created a schema for the GPS unit and stored it in the Hortonworks Schema Registry.
This is the provenance event for a typical GPS message sent. You can see what shell script we ran and from what host.
In Apache NiFi we process the message, routing to the correct place, setting a schema and querying it for a latitude. Then we convert the AVRO record to ORC to save as a Hive table.
MiniFi requires we change the NiFi created template to a configuration file via the command-line MiniFi Toolkit.
minifi-toolkit-0.2.0/bin/config.sh transform gpstensorflowpiminifi2.xml config.yml scp config.yml pi@192.168.1.167:/opt/demo/minifi/conf/ ./gpsrun.sh {"ipaddress": "192.168.1.167", "utc": "2017-08-21T20:00:06.000Z", "epx": "10.301", "epv": "50.6", "serialno": "000000002a1f1e34", "altitude": "38.393", "cputemp": 58.0, "eps": "37.16", "longitude": "-74.52923472", "ts": "2017-08-21 20:00:03", "public_ip": "71.168.184.247", "track": "236.6413", "host": "vid5", "mode": "3", "time": "2017-08-21T20:00:06.000Z", "latitude": "40.268194845", "climb": "-0.054", "speed": "0.513", "ept": "0.005"} 2017-08-21 16:20:33,199 INFO [Timer-Driven Process Thread-6] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes: PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data 2017-08-21 16:20:34,261 INFO [Timer-Driven Process Thread-6] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi TensorFlowImage,targets=http://hw13125.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=f84767ec-c627-4b63-9e88-bba1dfb4eb9b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1503346615133-2, container=default, section=2], offset=2198, length=441],offset=0,name=3460526041973,size=441]] (441 bytes) to http://HW13125.local:8080/nifi-api in 117 milliseconds at a rate of 3.65 KB/sec {"ipaddress": "192.168.1.167", "utc": "2017-08-21T20:17:21.010Z", "epx": "10.301", "epv": "50.6", "serialno": "000000002a1f1e34", "altitude": "43.009", "cputemp": 52.0, "eps": "1.33", "longitude": "-74.529242206", "ts": "2017-08-21 20:16:55", "public_ip": "71.168.184.247", "track": "190.894", "host": "vid5", "mode": "3", "time": "2017-08-21T20:17:21.010Z", "latitude": "40.268159632", "climb": "0.022", "speed": "0.353", "ept": "0.005"}
To collect our GPS information, below is my script called by MiniFi.
Source:
#! /usr/bin/python import os from gps import * from time import * import time import threading import json import time import colorsys import os import json import sys, socket import subprocess import time import datetime from time import sleep from time import gmtime, strftime import signal import time import urllib2 # Need sudo apt-get install gpsd gpsd-clients python-gps ntp # Based on #Author: Callum Pritchard, Joachim Hummel #Project Name: Flick 3D Gesture #Project Description: Sending Flick 3D Gesture sensor data to mqtt #Version Number: 0.1 #Date: 15/6/17 #Release State: Alpha testing #Changes: Created # Based on # Written by Dan Mandle http://dan.mandle.me September 2012 # License: GPL 2.0 # Based on: https://hortonworks.com/tutorial/analyze-iot-weather-station-data-via-connected-data-architecture/se... #### Initialization # yyyy-mm-dd hh:mm:ss currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime()) external_IP_and_port = ('198.41.0.4', 53) # a.root-servers.net socket_family = socket.AF_INET host = os.uname()[1] def getCPUtemperature(): res = os.popen('vcgencmd measure_temp').readline() return(res.replace("temp=","").replace("'C\n","")) def IP_address(): try: s = socket.socket(socket_family, socket.SOCK_DGRAM) s.connect(external_IP_and_port) answer = s.getsockname() s.close() return answer[0] if answer else None except socket.error: return None # Get Raspberry Pi Serial Number def get_serial(): # Extract serial from cpuinfo file cpuserial = "0000000000000000" try: f = open('/proc/cpuinfo','r') for line in f: if line[0:6]=='Serial': cpuserial = line[10:26] f.close() except: cpuserial = "ERROR000000000" return cpuserial # Get Raspberry Pi Public IP via IPIFY Rest Call def get_public_ip(): ip = json.load(urllib2.urlopen('https://api.ipify.org/?format=json'))['ip'] return ip cpuTemp=int(float(getCPUtemperature())) ipaddress = IP_address() # Attempt to get Public IP public_ip = get_public_ip() # Attempt to get Raspberry Pi Serial Number serial = get_serial() gpsd = None class GpsPoller(threading.Thread): def __init__(self): threading.Thread.__init__(self) global gpsd #bring it in scope gpsd = gps(mode=WATCH_ENABLE) #starting the stream of info self.current_value = None self.running = True #setting the thread running to true def run(self): global gpsd while gpsp.running: gpsd.next() #this will continue to loop and grab EACH set of gpsd info to clear the buffer if __name__ == '__main__': gpsp = GpsPoller() # create the thread stopthis = False try: gpsp.start() # start it up while not stopthis: if gpsd.fix.latitude > 0: row = { 'latitude': str(gpsd.fix.latitude), 'longitude': str(gpsd.fix.longitude), 'utc': str(gpsd.utc), 'time': str(gpsd.fix.time), 'altitude': str(gpsd.fix.altitude), 'eps': str(gpsd.fix.eps), 'epx': str(gpsd.fix.epx), 'epv': str(gpsd.fix.epv), 'ept': str(gpsd.fix.ept), 'speed': str(gpsd.fix.speed), 'climb': str(gpsd.fix.climb), 'track': str(gpsd.fix.track), 'ts': currenttime, 'public_ip': public_ip, 'serialno': serial, 'host': host, 'cputemp': round(cpuTemp,2), 'ipaddress': ipaddress, 'mode': str(gpsd.fix.mode)} json_string = json.dumps(row) print json_string gpsp.running = False stopthis = True except (KeyboardInterrupt, SystemExit): #when you press ctrl+c gpsp.running = False gpsp.join() # wait for the thread to finish what it's doing
Link
https://github.com/tspannhw/dws2017sydney