Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (1)
Super Guru

IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams

See Part 1: https://community.hortonworks.com/content/kbentry/229522/iot-series-sensors-utilizing-breakout-garde...

In this second part, I have incremented the functionality in the Python capture, MiniFi, NiFi and post-NiFi processing. I have added a Kafka Streams Java application.

96410-breakoutgardenarchitecture.jpg

With this NiFi flow we are consuming the MQTT and Kafka messages send by the Kafka Streams application.

96513-cloudnififlow.png

96499-cloudnififlowmqtt.png

In one flow, we received MQTT messages, pull out the entire flow as a message and send to a Slack channel.

In another flow we ingest two types of Kafka messages and store the JSON ones that have a schema in an HBase table via the record processor.

96514-flow1.png

In this flow we receive from the local NiFi router that was called by MiniFi over S2S/HTTP(s). We build two types of messages and send them to Kafka 2.0 brokers. One is the full JSON message with a schema, the other is just the temperature. We create a Kafka Key from the UUID. We also process the images sent from MiniFi with my native Java TensorFlow Inception processor.

96503-setkafkakey.png

96504-puthbaserecord.png

96505-hbaseclient.png

96506-jsontree.png

96508-extracttextformsg.png

96509-putslack.png

96510-consumemqttprops.png

96511-consumekafka2.png

I decided to try some TensorFlow processing for our infinite sensor loop, it may be too much memory usage, so I may have to pick a different TensorFlow model and switch to TF Lite (https://www.tensorflow.org/lite/devguide). You will not two extra attributes coming from the Python script running on the Raspberry Pi 3B+.

Another thing I wanted to do is try Kafka Streams since in Kafka 2.0 in HDP and HDF we have a fully supported version available. So based on example code I wrote a simple Kafka Streams Java 8 application that reads Kafka JSON messages sent from NiFi 1.8 and check for some conditions and push out data to MQTT and another Kafka topic.

96496-runninginij.png

If you don't have an MQTT broker. Here is a quick way to install a Mosquitto MQTT broker on Centos 7.

sudo yum -y install mosquitto 
/etc/mosquitto/mosquitto.conf
mkdir -p /var/log/mosquitto<br>chmod -R 777 /var/log/mosquitto/<br>touch /var/log/mosquitto/mosquitto.log<br>sudo systemctl start mosquitto<br>sudo systemctl enable mosquitto

Now that we have an MQTT broker our Kafka Streams app can send messages to it and NiFi can read messages from it.

In a future version I will use Hortonworks Schema Registry and Avro.

I have updated the Python script to include TensorFlow and to update to Python 3.5. Make sure you run with Python 3.5 and have all the libraries installed on your RPI/Linux device.

Some of the updated code for 3.5, note the message encoding. Python: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/minifi35.py

def send_tcp(s, message):    
  if not message:   
    try:           
      s.sendall(message.encode('utf-8'))         <br>    except: 
      print("Failed to send message")


96500-receivedmessageminifi.png96498-receivedkafkamessage.png96502-imagesthroughtfnifi.png96512-kafkameta.png

For testing IOT values, I have a GenerateFlowFile with this JSON:

{
  "systemtime" : "${now():format('MM/dd/yyyy HH:mm:ss')}",
  "BH1745_green" : "${random():mod(100):plus(1)} ",
  "ltr559_prox" : "0000",
  "end" : "${now():format('yyyyMMddHHmmss')}",
  "uuid" : "${now():format('yyyyMMddHHmmss')}_${UUID()}",
  "lsm303d_accelerometer" : "+00.06g : -01.01g : +00.04g",
  "imgnamep" : "images/bog_image_p_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
  "cputemp" : ${random():mod(100):toNumber()},
  "BH1745_blue" : "9.0",
  "te" : "47.3427119255",
  "bme680_tempc" : "28.19",
  "imgname" : "images/bog_image_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
  "bme680_tempf" : "80.${random():mod(100):toNumber()}",
  "ltr559_lux" : "006.87",
  "memory" : 34.9,
  "VL53L1X_distance_in_mm" : 134,
  "bme680_humidity" : "${random():mod(100):toNumber()}",
  "host" : "vid5",
  "diskusage" : "8732.7",
  "ipaddress" : "192.168.1.167",
  "bme680_pressure" : "1017.31",
  "BH1745_clear" : "10.0",
  "BH1745_red" : "0.0",
  "lsm303d_magnetometer" : "+00.04 : +00.34 : -00.10",
  "starttime" : "${now():format('MM/dd/yyyy HH:mm:ss')}"
}

Kafka Streams Source Code:

https://github.com/tspannhw/kstreams

Running the Fat Jar:

java -jar target/kstreams-1.0.jar<br>******************************************* Started <br>**********2018/12/28 16:41:19<br>**********
Memory Usage: 28284968

Updated Source Code:

https://github.com/tspannhw/minifi-breakoutgarden

Updated Example Run Output

{
  "ltr559_lux" : "033.75",
  "uuid" : "20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135",
  "cputemp" : 51,
  "host" : "piups",
  "lsm303d_magnetometer" : "-00.12 : +00.27 : +00.15",
  "bme680_tempc" : "24.96",
  "score" : "0.9694475",
  "lsm303d_accelerometer" : "+00.12g : -01.00g : +00.08g",
  "ltr559_prox" : "0000",
  "bme680_humidity" : "28.875",
  "diskusage" : "10058.7",
  "human_string" : "electric fan, blower",
  "bme680_pressure" : "1012.00",
  "BH1745_green" : "31.0",
  "imgnamep" : "/opt/demo/images/bog_image_p_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
  "systemtime" : "12/28/2018 11:24:11",
  "BH1745_red" : "33.0",
  "starttime" : "12/28/2018 11:16:02",
  "BH1745_blue" : "19.8",
  "end" : "1546014251.2879872",
  "bme680_tempf" : "76.93",
  "VL53L1X_distance_in_mm" : 455,
  "te" : "488.33915853500366",
  "memory" : 70.8,
  "imgname" : "/opt/demo/images/bog_image_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
  "ipaddress" : "192.168.1.166",
  "BH1745_clear" : "40.0"
}

From Kafka Streams I am sending a warning on temperature to MQTT which NiFi sends to Slack.

Temperature warning 82.74

96497-slackmessages.png

Using HBase 2.0, we are storing out data as it streams from Kafka Streams to NiFi. We use PutHBaseRecord which utilizes record processing and our schema to stream our JSON into HBase with ease.

Updated Schema with TF Attributes

{
 "type": "record",
 "name": "garden",
 "fields": [
  {
   "name": "systemtime",
   "type": "string"
  },
  {
   "name": "BH1745_green",
   "type": "string"
  },
  {
   "name": "human_string",
   "type": "string",
   "default": "UNK"
  },
  {
   "name": "ltr559_prox",
   "type": "string"
  },
  {
   "name": "end",
   "type": "string"
  },
  {
   "name": "uuid",
   "type": "string"
  },
  {
   "name": "lsm303d_accelerometer",
   "type": "string"
  },
  {
   "name": "score",
   "type": "string",
   "default": "0"
  },
  {
   "name": "imgnamep",
   "type": "string"
  },
  {
   "name": "cputemp",
   "type": "double",
   "doc": "Type inferred from '58.0'"
  },
  {
   "name": "BH1745_blue",
   "type": "string",
   "doc": "Type inferred from '\"10.8\"'"
  },
  {
   "name": "te",
   "type": "string",
   "doc": "Type inferred from '\"254.545491934\"'"
  },
  {
   "name": "bme680_tempc",
   "type": "string",
   "doc": "Type inferred from '\"29.13\"'"
  },
  {
   "name": "imgname",
   "type": "string"
  },
  {
   "name": "bme680_tempf",
   "type": "string",
   "doc": "Type inferred from '\"84.43\"'"
  },
  {
   "name": "ltr559_lux",
   "type": "string",
   "doc": "Type inferred from '\"077.95\"'"
  },
  {
   "name": "memory",
   "type": "double",
   "doc": "Type inferred from '37.6'"
  },
  {
   "name": "VL53L1X_distance_in_mm",
   "type": "int",
   "doc": "Type inferred from '161'"
  },
  {
   "name": "bme680_humidity",
   "type": "string",
   "doc": "Type inferred from '\"32.359\"'"
  },
  {
   "name": "host",
   "type": "string",
   "doc": "Type inferred from '\"vid5\"'"
  },
  {
   "name": "diskusage",
   "type": "string",
   "doc": "Type inferred from '\"8357.6\"'"
  },
  {
   "name": "ipaddress",
   "type": "string",
   "doc": "Type inferred from '\"192.168.1.167\"'"
  },
  {
   "name": "bme680_pressure",
   "type": "string",
   "doc": "Type inferred from '\"987.86\"'"
  },
  {
   "name": "BH1745_clear",
   "type": "string",
   "doc": "Type inferred from '\"90.0\"'"
  },
  {
   "name": "BH1745_red",
   "type": "string",
   "doc": "Type inferred from '\"33.0\"'"
  },
  {
   "name": "lsm303d_magnetometer",
   "type": "string"
  },
  {
   "name": "starttime",
   "type": "string"
  }
 ]
}

96507-schemawithtf.png

HBase table

create 'breakout', 'sensors'

96501-hbasetable.png

Example Row

 1546014251.2879872              column=sensors:BH1745_blue, timestamp=1546020326955, value=19.8
 1546014251.2879872              column=sensors:BH1745_clear, timestamp=1546020326955, value=40.0
 1546014251.2879872              column=sensors:BH1745_green, timestamp=1546020326955, value=31.0
 1546014251.2879872              column=sensors:BH1745_red, timestamp=1546020326955, value=33.0
 1546014251.2879872              column=sensors:VL53L1X_distance_in_mm, timestamp=1546020326955, value=455
 1546014251.2879872              column=sensors:bme680_humidity, timestamp=1546020326955, value=28.875
 1546014251.2879872              column=sensors:bme680_pressure, timestamp=1546020326955, value=1012.00
 1546014251.2879872              column=sensors:bme680_tempc, timestamp=1546020326955, value=24.96
 1546014251.2879872              column=sensors:bme680_tempf, timestamp=1546020326955, value=76.93
 1546014251.2879872              column=sensors:cputemp, timestamp=1546020326955, value=51.0
 1546014251.2879872              column=sensors:diskusage, timestamp=1546020326955, value=10058.7
 1546014251.2879872              column=sensors:host, timestamp=1546020326955, value=piups
 1546014251.2879872              column=sensors:human_string, timestamp=1546020326955, value=electric fan, blower
 1546014251.2879872              column=sensors:imgname, timestamp=1546020326955, value=/opt/demo/images/bog_image_201812281
                                 62321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg
 1546014251.2879872              column=sensors:imgnamep, timestamp=1546020326955, value=/opt/demo/images/bog_image_p_201812
                                 28162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg
 1546014251.2879872              column=sensors:ipaddress, timestamp=1546020326955, value=192.168.1.166
 1546014251.2879872              column=sensors:lsm303d_accelerometer, timestamp=1546020326955, value=+00.12g : -01.00g : +0
                                 0.08g
 1546014251.2879872              column=sensors:lsm303d_magnetometer, timestamp=1546020326955, value=-00.12 : +00.27 : +00.1
                                 5
 1546014251.2879872              column=sensors:ltr559_lux, timestamp=1546020326955, value=033.75
 1546014251.2879872              column=sensors:ltr559_prox, timestamp=1546020326955, value=0000
 1546014251.2879872              column=sensors:memory, timestamp=1546020326955, value=70.8
 1546014251.2879872              column=sensors:score, timestamp=1546020326955, value=0.9694475
 1546014251.2879872              column=sensors:starttime, timestamp=1546020326955, value=12/28/2018 11:16:02
 1546014251.2879872              column=sensors:systemtime, timestamp=1546020326955, value=12/28/2018 11:24:11
 1546014251.2879872              column=sensors:te, timestamp=1546020326955, value=488.33915853500366
 1546014251.2879872              column=sensors:uuid, timestamp=1546020326955, value=20181228162321_cbd0cbd3-17f6-4730-ae43-
                                 1e7b46a01135
747 Views
Comments
Super Guru

96518-bog-image-20181221012613-883be3d9-18c5-466c-9f7e-c.jpeg

An example of an image.

Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
2 of 2
Last update:
‎08-17-2019 05:06 AM
Updated by:
 
Contributors
Top Kudoed Authors