Created on 12-18-2016 05:51 AM - edited 08-17-2019 07:17 AM
To send data to my firewall protected internal Hadoop cluster, I have my remote Raspberry Pi 3 with attached Sense-Hat use a cron job to send MQTT messages to a cloud hosted MQTT Broker. NiFi then subscribes to that queue and pulls down the messages asynchronously. I use JSON as my packaging format as that is very easy to work with and process in NiFi and elsewhere.
The Sense-Hat connects to your non-Pi Zero and provides a number of easy to access sensors including ones for temperature, humidity, X,Y, Z, Roll and Pitch. It also has a nice light display that can be used to display graphical messages. For data capture needs it also has standard Linux reports like CPU temperature that we can also grab. You could also grab memory usage and disk space. Since the API to access Sense-Hat is Python, it makes sense to keep my access program in Python. So I have a small Python program that reads and formats the Sense Hat sensor values, puts them into a JSON document and sends them up to my MQTT cloud broker. It's a very simple NiFi flow to ingest these values as they arrive. I have previously read these values via a REST API I wrote on the Pi using Flask, but that interface requires direct synchronous access from the cluster to the Pi, which is not usually possible. I would love to push from the Pi to NiFi, but again that would require a direct network connection. Having the asynchronous break, is great for performance and also allows either party to be offline and have the broker queue hold the messages until we return.
My cloud provider for MQTT has a free plan and I am using that for this demo. They have a web UI that you can see some statistics and information on the broker, messages, topics and queue.
Once ingested, I pull out the fields I like from the JSON received from MQTT, format it into a SQL Insert and then call the PutSQL processor to upsert those values in HBase through the Phoenix layer. I have added UUID() from NiFi as a primary key to allow for uniqueness for every row ingested.
Once we have ingested data into Apache Phoenix it is very easy to display the data for exploration and graphing in Apache Zeppelin notebooks via the jdbc(phoenix) interpreter.
Phoenix DDL:
CREATE TABLE sensor (sensorpk varchar not null primary key, cputemp varchar, humidity decimal(10,1),pressure decimal(10,1),temp decimal(5,1),tempf decimal(5,1),temph decimal(5,1), tempp decimal(5,1), pitch decimal(10,1), roll decimal(10,1), yaw decimal(10,1), x decimal(10,1), y decimal(10,1), z decimal(10,1));
HDFS Raw Copy of JSON
[root@tspanndev13 bin]# hdfs dfs -ls /sensors/raw Found 44 items -rw-r--r-- 3 root hdfs 232 2016-12-14 21:33 /sensors/raw/6837549415709401 -rw-r--r-- 3 root hdfs 231 2016-12-14 21:33 /sensors/raw/6837551583239075
hdfs dfs -cat /sensors/raw/6837549415709401 [{"tempf": 76.26, "temph": 35.7, "pressure": 1016.5, "tempp": 34.3, "pitch": 5.391706934578461, "temp": 35.7, "yaw": 23.348624309736945, "humidity": 20.3, "cputemp": "51.5", "y": 0.0, "x": 0.0, "z": 1.0, "roll": 2.1213289036024343}
Python Code
from sense_hat import SenseHat import json import paho.mqtt.client as paho sense = SenseHat() sense.clear() temp = sense.get_temperature() temp = round(temp, 1) humidity = sense.get_humidity() humidity = round(humidity, 1) pressure = sense.get_pressure() pressure = round(pressure, 1) orientation = sense.get_orientation() pitch = orientation['pitch'] roll = orientation['roll'] yaw = orientation['yaw'] acceleration = sense.get_accelerometer_raw() x = acceleration['x'] y = acceleration['y'] z = acceleration['z'] x=round(x, 0) y=round(y, 0) z=round(z, 0) row = [ { 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure, 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z } ] json_string = json.dumps(row) client = paho.Client() client.username_pw_set("myuser","mypassword") client.connect("cloudserver", 134324, 60) client.publish("sensor", payload=json_string, qos=0, retain=True)
Please note: that the sensors on the sense hat are not industrial grade or extremely accurate. For commercial purposes you will want more precise industrial sensors, battery backups, special casing and higher end devices.
Reference:
Sense-Hat Features: