Created on 09-09-2016 08:13 PM - edited 08-17-2019 10:12 AM
Sensor Reading with Apache NiFi 1.0.0
There are many types of sensors, devices and meters that can be great sources of data. Some can push data, some can pull data, some provide APIs, some give you access to install software.
How To Access Sensors
One option is to install MiNiFi on the device if you have root access. This will provide fast access to allow you to script and manage your local data. Another option for bigger devices is to install a full Java based NiFi node.
It starts becoming harder once you have tens of thousands of devices. You can install an HDF Edge and communicate from this node to your HDF cluster via Site-to-Site protocol. From this Edge Node that acts as an accumulator for many devices (a good idea so that you don't send 10,000 network requests a second from each set of devices, keep as much traffic locally to save time, time-outs, networking and cloud costs). You can also now aggregate and send larger batches of data and also process some summaries and aggregates locally in NiFi. This will also let you populate local databases, dashboards and statistics that may only be of interest to the local source of the sensors (perhaps a plant manager or automated monitoring system).
Another option is to have devices push or pull to a local or remote NiFi install via various protocols including TCP/IP, UDP/IP, REST HTTP, JMS, MQTT, SFTP and Email.
Device Push to NiFi
Your device can send messages to NiFi via any number of protocols listed. For my example, I push via MQTT. My local NiFi node will consume these messages via ConsumeMQTT.
Reference: Paho-MQTT
Your device will need to run Linux (or something related), have Python 2.7 or better and PiP installed. With Pip, you can install the Eclipse library that you need to send MQTT messages. pip install paho-mqtt
import paho.mqtt.client as paho client = paho.Client() client.connect("servername", 1883, 60) client.publish("sensor", payload="Test", qos=0, retain=True)
Where "servername" is the name of the server you are sending the message to (it could also be on the NiFi Node, another server, a bigger device, a central aggregator or messaging server). I would recommend having it as close in the network as possible. "sensor" is the name of the topic that we are publishing the message to, NiFi will consume this message. I have cron job setup to run every minute and publish messages (* * * * * /opt/demo/sendit.sh )
NiFi Poll Device
NiFi can poll your device and consume from various protocols like JMS, MQTT, SFTP, TCP and UDP. For my example, I chose a REST API over HTTP to get past hurdles of firewalls and such.
I setup a Flask Server on RPI, to run my REST API, I run this in a shell script.
export FLASK_APP=hello.py flask run --host=0.0.0.0 --port=8888 --no-debugger
To install Flask, you need to run pip install flask
Sensor Reading Code
#!flask/bin/python from flask import Flask, jsonify import sys import time import datetime import subprocess import sys import urllib2 import json import paho.mqtt.client as paho from sense_hat import SenseHat sense = SenseHat() sense.clear() app = Flask(__name__) @app.route('/pi/api/v1.0/sensors', methods=['GET']) def get_sensors(): p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() temp = sense.get_temperature() temp = round(temp, 1) temph = sense.get_temperature_from_humidity() temph = round(temph, 1) tempp = sense.get_temperature_from_pressure() tempp = round(tempp, 1) humidity = sense.get_humidity() humidity = round(humidity, 1) pressure = sense.get_pressure() pressure = round(pressure, 1) tasks = [ { 'tempp': tempp, 'temph': temph, 'cputemp': out, 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ] # As an option we can push this message when we get called as well client = paho.Client() client.connect("mqttmessageserver", 1883, 60) client.publish("sensor", payload=jsonify({'readings': tasks}), qos=0, retain=True) return jsonify({'readings': tasks}) @app.route('/pi/api/v1.0/location', methods=['GET']) def get_loc(): orientation = sense.get_orientation() pitch = orientation['pitch'] roll = orientation['roll'] yaw = orientation['yaw'] acceleration = sense.get_accelerometer_raw() x = acceleration['x'] y = acceleration['y'] z = acceleration['z'] x=round(x, 0) y=round(y, 0) z=round(z, 0) tasks = [ { 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z } ] return jsonify({'readings': tasks}) @app.route('/pi/api/v1.0/show', methods=['GET']) def get_pi(): temp = sense.get_temperature() temp = round(temp, 1) humidity = sense.get_humidity() humidity = round(humidity, 1) pressure = sense.get_pressure() pressure = round(pressure, 1) # 8x8 RGB sense.clear() info = 'T(C): ' + str(temp) + 'H: ' + str(humidity) + 'P: ' + str(pressure) sense.show_message(info, text_colour=[255, 0, 0]) sense.clear() tasks = [ { 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ] return jsonify({'readings': tasks}) if __name__ == '__main__': app.run(debug=True)
The device I am testing is a Raspberry Pi 3 Model B with a Sense Hat sensor attachment. Besides having sensors for temperature, humidity and barometric pressures it also has a 8x8 light grid for displaying text and simple graphics. We can use this to print messages (sense.show_message) or warnings that we send from NiFi. This allows for 2 way very visceral communication to remote devices. This could be used to notify local personnel of conditions.
nifi 1.0.0 Flows
JSON File Landed in HDFS in our HDP 2.5 Cluster
[root@myserverhdp sensors]# hdfs dfs -ls /sensor Found 2 items -rw-r--r-- 3 root hdfs 202 2016-09-09 17:26 /sensor/181528179026826 drwxr-xr-x - hdfs hdfs 0 2016-09-09 15:43 /sensor/failure [root@tspanndev13 sensors]# hdfs dfs -cat /sensor/181528179026826 { "readings": [ { "cputemp": "temp=55.8'C\n", "humidity": 40.8, "pressure": 1014.1, "temp": 40.0, "tempf": 84.0, "temph": 40.0, "tempp": 39.1 } ] }
The final results of our flow is a JSON file on HDFS. We could easily send a copy of the data to Phoenix via PutSQL or to Hive via PutHiveQL or to Spark Streaming for additional processing via Site-To-Site or Kafka.
Resources:
https://github.com/topshed/RPi_8x8GridDraw
https://www.raspberrypi.org/learning/sense-hat-data-logger/worksheet/
https://www.raspberrypi.org/learning/astro-pi-flight-data-analysis/worksheet/
https://www.raspberrypi.org/learning/astro-pi-guide/sensors/temperature.md
https://breadfit.wordpress.com/2015/06/24/installing-mosquitto-under-centos/
Created on 09-14-2016 11:29 PM
To install a Mosquitto MQTT Server on Centos7
yum -y install unzip
Step 1: Add the CentOS 7 mosquitto repository
cd /etc/yum.repos.d
wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo
sudo yum update
Step 2: Install mosquitto & mosquitto-clients
sudo yum install -y mosquitto mosquitto-clients
Step 3: Run mosquitto
sudo su /usr/sbin/mosquitto -d -c /etc/mosquitto/mosquitto.conf > /var/log/mosquitto.log 2>&1
Created on 09-15-2016 02:15 AM
Flow File: sensor.xml