Created on 12-28-2018 05:15 PM - edited 08-17-2019 05:06 AM
IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams
See Part 1: https://community.hortonworks.com/content/kbentry/229522/iot-series-sensors-utilizing-breakout-garde...
In this second part, I have incremented the functionality in the Python capture, MiniFi, NiFi and post-NiFi processing. I have added a Kafka Streams Java application.
With this NiFi flow we are consuming the MQTT and Kafka messages send by the Kafka Streams application.
In one flow, we received MQTT messages, pull out the entire flow as a message and send to a Slack channel.
In another flow we ingest two types of Kafka messages and store the JSON ones that have a schema in an HBase table via the record processor.
In this flow we receive from the local NiFi router that was called by MiniFi over S2S/HTTP(s). We build two types of messages and send them to Kafka 2.0 brokers. One is the full JSON message with a schema, the other is just the temperature. We create a Kafka Key from the UUID. We also process the images sent from MiniFi with my native Java TensorFlow Inception processor.
I decided to try some TensorFlow processing for our infinite sensor loop, it may be too much memory usage, so I may have to pick a different TensorFlow model and switch to TF Lite (https://www.tensorflow.org/lite/devguide). You will not two extra attributes coming from the Python script running on the Raspberry Pi 3B+.
Another thing I wanted to do is try Kafka Streams since in Kafka 2.0 in HDP and HDF we have a fully supported version available. So based on example code I wrote a simple Kafka Streams Java 8 application that reads Kafka JSON messages sent from NiFi 1.8 and check for some conditions and push out data to MQTT and another Kafka topic.
If you don't have an MQTT broker. Here is a quick way to install a Mosquitto MQTT broker on Centos 7.
sudo yum -y install mosquitto /etc/mosquitto/mosquitto.conf mkdir -p /var/log/mosquitto<br>chmod -R 777 /var/log/mosquitto/<br>touch /var/log/mosquitto/mosquitto.log<br>sudo systemctl start mosquitto<br>sudo systemctl enable mosquitto
Now that we have an MQTT broker our Kafka Streams app can send messages to it and NiFi can read messages from it.
In a future version I will use Hortonworks Schema Registry and Avro.
I have updated the Python script to include TensorFlow and to update to Python 3.5. Make sure you run with Python 3.5 and have all the libraries installed on your RPI/Linux device.
Some of the updated code for 3.5, note the message encoding. Python: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/minifi35.py
def send_tcp(s, message): if not message: try: s.sendall(message.encode('utf-8')) <br> except: print("Failed to send message")
For testing IOT values, I have a GenerateFlowFile with this JSON:
{ "systemtime" : "${now():format('MM/dd/yyyy HH:mm:ss')}", "BH1745_green" : "${random():mod(100):plus(1)} ", "ltr559_prox" : "0000", "end" : "${now():format('yyyyMMddHHmmss')}", "uuid" : "${now():format('yyyyMMddHHmmss')}_${UUID()}", "lsm303d_accelerometer" : "+00.06g : -01.01g : +00.04g", "imgnamep" : "images/bog_image_p_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg", "cputemp" : ${random():mod(100):toNumber()}, "BH1745_blue" : "9.0", "te" : "47.3427119255", "bme680_tempc" : "28.19", "imgname" : "images/bog_image_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg", "bme680_tempf" : "80.${random():mod(100):toNumber()}", "ltr559_lux" : "006.87", "memory" : 34.9, "VL53L1X_distance_in_mm" : 134, "bme680_humidity" : "${random():mod(100):toNumber()}", "host" : "vid5", "diskusage" : "8732.7", "ipaddress" : "192.168.1.167", "bme680_pressure" : "1017.31", "BH1745_clear" : "10.0", "BH1745_red" : "0.0", "lsm303d_magnetometer" : "+00.04 : +00.34 : -00.10", "starttime" : "${now():format('MM/dd/yyyy HH:mm:ss')}" }
Kafka Streams Source Code:
https://github.com/tspannhw/kstreams
Running the Fat Jar:
java -jar target/kstreams-1.0.jar<br>******************************************* Started <br>**********2018/12/28 16:41:19<br>********** Memory Usage: 28284968
Updated Source Code:
https://github.com/tspannhw/minifi-breakoutgarden
Updated Example Run Output
{ "ltr559_lux" : "033.75", "uuid" : "20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135", "cputemp" : 51, "host" : "piups", "lsm303d_magnetometer" : "-00.12 : +00.27 : +00.15", "bme680_tempc" : "24.96", "score" : "0.9694475", "lsm303d_accelerometer" : "+00.12g : -01.00g : +00.08g", "ltr559_prox" : "0000", "bme680_humidity" : "28.875", "diskusage" : "10058.7", "human_string" : "electric fan, blower", "bme680_pressure" : "1012.00", "BH1745_green" : "31.0", "imgnamep" : "/opt/demo/images/bog_image_p_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg", "systemtime" : "12/28/2018 11:24:11", "BH1745_red" : "33.0", "starttime" : "12/28/2018 11:16:02", "BH1745_blue" : "19.8", "end" : "1546014251.2879872", "bme680_tempf" : "76.93", "VL53L1X_distance_in_mm" : 455, "te" : "488.33915853500366", "memory" : 70.8, "imgname" : "/opt/demo/images/bog_image_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg", "ipaddress" : "192.168.1.166", "BH1745_clear" : "40.0" }
From Kafka Streams I am sending a warning on temperature to MQTT which NiFi sends to Slack.
Temperature warning 82.74
Using HBase 2.0, we are storing out data as it streams from Kafka Streams to NiFi. We use PutHBaseRecord which utilizes record processing and our schema to stream our JSON into HBase with ease.
Updated Schema with TF Attributes
{ "type": "record", "name": "garden", "fields": [ { "name": "systemtime", "type": "string" }, { "name": "BH1745_green", "type": "string" }, { "name": "human_string", "type": "string", "default": "UNK" }, { "name": "ltr559_prox", "type": "string" }, { "name": "end", "type": "string" }, { "name": "uuid", "type": "string" }, { "name": "lsm303d_accelerometer", "type": "string" }, { "name": "score", "type": "string", "default": "0" }, { "name": "imgnamep", "type": "string" }, { "name": "cputemp", "type": "double", "doc": "Type inferred from '58.0'" }, { "name": "BH1745_blue", "type": "string", "doc": "Type inferred from '\"10.8\"'" }, { "name": "te", "type": "string", "doc": "Type inferred from '\"254.545491934\"'" }, { "name": "bme680_tempc", "type": "string", "doc": "Type inferred from '\"29.13\"'" }, { "name": "imgname", "type": "string" }, { "name": "bme680_tempf", "type": "string", "doc": "Type inferred from '\"84.43\"'" }, { "name": "ltr559_lux", "type": "string", "doc": "Type inferred from '\"077.95\"'" }, { "name": "memory", "type": "double", "doc": "Type inferred from '37.6'" }, { "name": "VL53L1X_distance_in_mm", "type": "int", "doc": "Type inferred from '161'" }, { "name": "bme680_humidity", "type": "string", "doc": "Type inferred from '\"32.359\"'" }, { "name": "host", "type": "string", "doc": "Type inferred from '\"vid5\"'" }, { "name": "diskusage", "type": "string", "doc": "Type inferred from '\"8357.6\"'" }, { "name": "ipaddress", "type": "string", "doc": "Type inferred from '\"192.168.1.167\"'" }, { "name": "bme680_pressure", "type": "string", "doc": "Type inferred from '\"987.86\"'" }, { "name": "BH1745_clear", "type": "string", "doc": "Type inferred from '\"90.0\"'" }, { "name": "BH1745_red", "type": "string", "doc": "Type inferred from '\"33.0\"'" }, { "name": "lsm303d_magnetometer", "type": "string" }, { "name": "starttime", "type": "string" } ] }
HBase table
create 'breakout', 'sensors'
Example Row
1546014251.2879872 column=sensors:BH1745_blue, timestamp=1546020326955, value=19.8 1546014251.2879872 column=sensors:BH1745_clear, timestamp=1546020326955, value=40.0 1546014251.2879872 column=sensors:BH1745_green, timestamp=1546020326955, value=31.0 1546014251.2879872 column=sensors:BH1745_red, timestamp=1546020326955, value=33.0 1546014251.2879872 column=sensors:VL53L1X_distance_in_mm, timestamp=1546020326955, value=455 1546014251.2879872 column=sensors:bme680_humidity, timestamp=1546020326955, value=28.875 1546014251.2879872 column=sensors:bme680_pressure, timestamp=1546020326955, value=1012.00 1546014251.2879872 column=sensors:bme680_tempc, timestamp=1546020326955, value=24.96 1546014251.2879872 column=sensors:bme680_tempf, timestamp=1546020326955, value=76.93 1546014251.2879872 column=sensors:cputemp, timestamp=1546020326955, value=51.0 1546014251.2879872 column=sensors:diskusage, timestamp=1546020326955, value=10058.7 1546014251.2879872 column=sensors:host, timestamp=1546020326955, value=piups 1546014251.2879872 column=sensors:human_string, timestamp=1546020326955, value=electric fan, blower 1546014251.2879872 column=sensors:imgname, timestamp=1546020326955, value=/opt/demo/images/bog_image_201812281 62321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg 1546014251.2879872 column=sensors:imgnamep, timestamp=1546020326955, value=/opt/demo/images/bog_image_p_201812 28162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg 1546014251.2879872 column=sensors:ipaddress, timestamp=1546020326955, value=192.168.1.166 1546014251.2879872 column=sensors:lsm303d_accelerometer, timestamp=1546020326955, value=+00.12g : -01.00g : +0 0.08g 1546014251.2879872 column=sensors:lsm303d_magnetometer, timestamp=1546020326955, value=-00.12 : +00.27 : +00.1 5 1546014251.2879872 column=sensors:ltr559_lux, timestamp=1546020326955, value=033.75 1546014251.2879872 column=sensors:ltr559_prox, timestamp=1546020326955, value=0000 1546014251.2879872 column=sensors:memory, timestamp=1546020326955, value=70.8 1546014251.2879872 column=sensors:score, timestamp=1546020326955, value=0.9694475 1546014251.2879872 column=sensors:starttime, timestamp=1546020326955, value=12/28/2018 11:16:02 1546014251.2879872 column=sensors:systemtime, timestamp=1546020326955, value=12/28/2018 11:24:11 1546014251.2879872 column=sensors:te, timestamp=1546020326955, value=488.33915853500366 1546014251.2879872 column=sensors:uuid, timestamp=1546020326955, value=20181228162321_cbd0cbd3-17f6-4730-ae43- 1e7b46a01135
Created on 12-29-2018 06:15 AM - edited 08-17-2019 05:04 AM
An example of an image.