Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
Master Guru

10420-pimqttoverview.png

To send data to my firewall protected internal Hadoop cluster, I have my remote Raspberry Pi 3 with attached Sense-Hat use a cron job to send MQTT messages to a cloud hosted MQTT Broker. NiFi then subscribes to that queue and pulls down the messages asynchronously. I use JSON as my packaging format as that is very easy to work with and process in NiFi and elsewhere.

The Sense-Hat connects to your non-Pi Zero and provides a number of easy to access sensors including ones for temperature, humidity, X,Y, Z, Roll and Pitch. It also has a nice light display that can be used to display graphical messages. For data capture needs it also has standard Linux reports like CPU temperature that we can also grab. You could also grab memory usage and disk space. Since the API to access Sense-Hat is Python, it makes sense to keep my access program in Python. So I have a small Python program that reads and formats the Sense Hat sensor values, puts them into a JSON document and sends them up to my MQTT cloud broker. It's a very simple NiFi flow to ingest these values as they arrive. I have previously read these values via a REST API I wrote on the Pi using Flask, but that interface requires direct synchronous access from the cluster to the Pi, which is not usually possible. I would love to push from the Pi to NiFi, but again that would require a direct network connection. Having the asynchronous break, is great for performance and also allows either party to be offline and have the broker queue hold the messages until we return.

My cloud provider for MQTT has a free plan and I am using that for this demo. They have a web UI that you can see some statistics and information on the broker, messages, topics and queue.

10427-cloudmqttdash.png

10428-cloudmqttmessage.png

Once ingested, I pull out the fields I like from the JSON received from MQTT, format it into a SQL Insert and then call the PutSQL processor to upsert those values in HBase through the Phoenix layer. I have added UUID() from NiFi as a primary key to allow for uniqueness for every row ingested.

10425-loadedsensorsphoenix.png

Once we have ingested data into Apache Phoenix it is very easy to display the data for exploration and graphing in Apache Zeppelin notebooks via the jdbc(phoenix) interpreter.

10424-mqttsensehatzepp2.png

10421-mqttsensehatzepp.png

Phoenix DDL:

CREATE TABLE sensor (sensorpk varchar not null primary key, cputemp varchar,
humidity decimal(10,1),pressure decimal(10,1),temp decimal(5,1),tempf decimal(5,1),temph decimal(5,1), tempp decimal(5,1), pitch decimal(10,1), roll decimal(10,1), yaw decimal(10,1), x decimal(10,1), y decimal(10,1), z decimal(10,1));

HDFS Raw Copy of JSON

[root@tspanndev13 bin]# hdfs dfs -ls /sensors/raw
Found 44 items
-rw-r--r--   3 root hdfs        232 2016-12-14 21:33 /sensors/raw/6837549415709401
-rw-r--r--   3 root hdfs        231 2016-12-14 21:33 /sensors/raw/6837551583239075
hdfs dfs -cat /sensors/raw/6837549415709401
[{"tempf": 76.26, "temph": 35.7, "pressure": 1016.5, "tempp": 34.3, "pitch": 5.391706934578461, "temp": 35.7, "yaw": 23.348624309736945, "humidity": 20.3, "cputemp": "51.5", "y": 0.0, "x": 0.0, "z": 1.0, "roll": 2.1213289036024343}

Python Code

from sense_hat import SenseHat
import json
import paho.mqtt.client as paho
sense = SenseHat() 
sense.clear()
temp = sense.get_temperature()
temp = round(temp, 1)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
orientation = sense.get_orientation()
pitch = orientation['pitch']
roll = orientation['roll']
yaw = orientation['yaw']
acceleration = sense.get_accelerometer_raw()
x = acceleration['x']
y = acceleration['y']
z = acceleration['z']
x=round(x, 0)
y=round(y, 0)
z=round(z, 0)
row = [ { 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure, 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z } ]
json_string = json.dumps(row)
client = paho.Client()
client.username_pw_set("myuser","mypassword")
client.connect("cloudserver", 134324, 60)
client.publish("sensor", payload=json_string, qos=0, retain=True)

Please note: that the sensors on the sense hat are not industrial grade or extremely accurate. For commercial purposes you will want more precise industrial sensors, battery backups, special casing and higher end devices.

Reference:

Sense-Hat Features:

  • Gyroscope
  • Accelerometer
  • Magnetometer
  • Barometer
  • Temperature sensor
  • Relative Humidity sensor
  • 8x8 LED matrix display
  • 5 button joystick
3,447 Views