Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
Super Guru


The MiniFi flow executes two scripts: one to call TensorFlow Python that captures an OpenCV Raspberry Pi Camera and runs Inception on it. That message is formatted as JSON and sent on. The second script reads GPS values from a USB GSP sensor and outputs JSON. Get File reads the Pi Camera image produced by the ClassifyImages process. Cleanup logs is a standalone timed script that cleans up old logs on the Raspberry Pi.


Using InferredAvroSchema I created a schema for the GPS unit and stored it in the Hortonworks Schema Registry.


This is the provenance event for a typical GPS message sent. You can see what shell script we ran and from what host.


In Apache NiFi we process the message, routing to the correct place, setting a schema and querying it for a latitude. Then we convert the AVRO record to ORC to save as a Hive table.

MiniFi requires we change the NiFi created template to a configuration file via the command-line MiniFi Toolkit.

minifi-toolkit-0.2.0/bin/ transform gpstensorflowpiminifi2.xml config.yml 
scp config.yml pi@  

{"ipaddress": "", "utc": "2017-08-21T20:00:06.000Z", "epx": "10.301", "epv": "50.6", "serialno": "000000002a1f1e34", "altitude": "38.393", "cputemp": 58.0, "eps": "37.16", 
"longitude": "-74.52923472", "ts": "2017-08-21 20:00:03", "public_ip": "", "track": "236.6413", "host": "vid5", "mode": "3", "time": "2017-08-21T20:00:06.000Z", 
"latitude": "40.268194845", "climb": "-0.054", "speed": "0.513", "ept": "0.005"}

2017-08-21 16:20:33,199 INFO [Timer-Driven Process Thread-6] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes:
PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
2017-08-21 16:20:34,261 INFO [Timer-Driven Process Thread-6] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi TensorFlowImage,targets=http://hw13125.local:8080/nifi] 
Successfully sent [StandardFlowFileRecord[uuid=f84767ec-c627-4b63-9e88-bba1dfb4eb9b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1503346615133-2, container=default, section=2], offset=2198, length=441],offset=0,name=3460526041973,size=441]] (441 bytes) to http://HW13125.local:8080/nifi-api in 117 milliseconds at a rate of 3.65 KB/sec

{"ipaddress": "", "utc": "2017-08-21T20:17:21.010Z", "epx": "10.301", "epv": "50.6", "serialno": "000000002a1f1e34", 
"altitude": "43.009", "cputemp": 52.0, "eps": "1.33", "longitude": "-74.529242206", "ts": "2017-08-21 20:16:55", "public_ip": "", 
"track": "190.894", "host": "vid5", "mode": "3", "time": "2017-08-21T20:17:21.010Z", "latitude": "40.268159632", 
"climb": "0.022", "speed": "0.353", "ept": "0.005"} 

To collect our GPS information, below is my script called by MiniFi.


#! /usr/bin/python

import os
from gps import *
from time import *
import time
import threading
import json
import time
import colorsys
import os
import json
import sys, socket
import subprocess
import time
import datetime
from time import sleep
from time import gmtime, strftime
import signal
import time
import urllib2

# Need sudo apt-get install gpsd gpsd-clients python-gps ntp
# Based on
#Author: Callum Pritchard, Joachim Hummel
#Project Name: Flick 3D Gesture
#Project Description: Sending Flick 3D Gesture sensor data to mqtt
#Version Number: 0.1
#Date: 15/6/17
#Release State: Alpha testing
#Changes: Created
# Based on
# Written by Dan Mandle September 2012
# License: GPL 2.0

# Based on:

#### Initialization
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())

external_IP_and_port = ('', 53)  #
socket_family = socket.AF_INET

host = os.uname()[1]

def getCPUtemperature():
    res = os.popen('vcgencmd measure_temp').readline()

def IP_address():
            s = socket.socket(socket_family, socket.SOCK_DGRAM)
            answer = s.getsockname()
            return answer[0] if answer else None
        except socket.error:
            return None

# Get Raspberry Pi Serial Number
def get_serial():
  # Extract serial from cpuinfo file
  cpuserial = "0000000000000000"
    f = open('/proc/cpuinfo','r')
    for line in f:
      if line[0:6]=='Serial':
        cpuserial = line[10:26]
    cpuserial = "ERROR000000000"

  return cpuserial

# Get Raspberry Pi Public IP via IPIFY Rest Call
def get_public_ip():
  ip = json.load(urllib2.urlopen(''))['ip']
  return ip

ipaddress = IP_address()
# Attempt to get Public IP
public_ip = get_public_ip()

# Attempt to get Raspberry Pi Serial Number
serial = get_serial()

gpsd = None

class GpsPoller(threading.Thread):
  def __init__(self):
    global gpsd #bring it in scope
    gpsd = gps(mode=WATCH_ENABLE) #starting the stream of info
    self.current_value = None
    self.running = True #setting the thread running to true

  def run(self):
    global gpsd
    while gpsp.running: #this will continue to loop and grab EACH set of gpsd info to clear the buffer

if __name__ == '__main__':
  gpsp = GpsPoller() # create the thread
  stopthis = False
    gpsp.start() # start it up
    while not stopthis:
      if gpsd.fix.latitude > 0:
        row = { 'latitude': str(gpsd.fix.latitude),
         'longitude': str(gpsd.fix.longitude),
         'utc': str(gpsd.utc),
         'time':   str(gpsd.fix.time),
         'altitude': str(gpsd.fix.altitude),
         'eps': str(gpsd.fix.eps),
         'epx': str(gpsd.fix.epx),
         'epv': str(gpsd.fix.epv),
         'ept': str(gpsd.fix.ept),
         'speed': str(gpsd.fix.speed),
         'climb': str(gpsd.fix.climb),
         'track': str(gpsd.fix.track),
         'ts': currenttime,
         'public_ip': public_ip,
         'serialno': serial,
         'host': host,
         'cputemp': round(cpuTemp,2),
         'ipaddress': ipaddress,
         'mode': str(gpsd.fix.mode)}
        json_string = json.dumps(row)
        print json_string
 gpsp.running = False
        stopthis = True

  except (KeyboardInterrupt, SystemExit): #when you press ctrl+c
    gpsp.running = False
    gpsp.join() # wait for the thread to finish what it's doing


Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.
Version history
Last update:
‎08-17-2019 11:32 AM
Updated by:
Top Kudoed Authors