Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (2)
Super Guru

Topic: IoT Edge Processing with Apache NiFi and MiniFi and Multiple Deep Learning Libraries

Part 1: Multiple Devices with Data

Keywords: Deep Learning On The Edge, GPS Ingestion, Sense-Hat and Rainbow Hat Sensor Ingest, WebCam Image Ingest

In preparation for my talk at Strata in NYC, I am updating my IoT demos for more devices, more data types and more actions.

I have three streams coming from each device including web camera images.

When we are sending data from a MiniFi agent we need to define a port on an Apache NiFi server/cluster to receive it.

82431-createtemplate.jpg

82433-rainbowminififlow.png

So I design my MiniFi flow in the Apache NiFi UI (pretty soon there will be a special designer for this). You then highlight everything there and hit Create Template. You can then export it and convert to config.yml. Again, this process will be automated and connected with the NiFi Registry very shortly to make this less clicks.

This is an example. When you connect to it in your flow you design in Apache NiFi UI you will connect to this port on the Remote Processor Group. If you are manually editing one (okay never do this, but sometimes I have to). You can copy that ID from this Port Details and past it in the file.

82428-movidius-input-portforminifi.png

Once MiniFi has it's config.yml and it's started, we will start getting messages to that Port.

82446-localiotingest.png

You can see I have two inputs, one for Movidius and one for Rainbow. I could just have one and route to what I want. It's up to you how you want to segment these flows.

Welcome to Apache NiFi registry v0.2.0, this one works just as well. Very stable, but with some new magic. You can now connect to Git and Github!!!!

82429-apachenifiregistryv020.png

We have structured JSON, so let's Infer a schema, clean it up and store it in the Hortonworks Schema Registry. That will make it versioned and REST enabled. I add one for the each of the two JSON file types I am sending from the rainbow device. You can see the schemas in full at the bottom of the article.

82434-rainbowgpsschema.jpg

The data is received from MiniFi on my local NiFi edge server for simple event processing, filtering and analysis.

82435-minifirainbowingestflowlocal.jpg

I route based on the two types of files, apply their schema, do a simple filter via SQL and send the converted AVRO formatted file to my cloud hosted cluster.

Once I get the data I send it from my edge server to my cloud HDF 3.2 cluster. For images, I send them to my existing image storage processor group. For my other two types of files I convert them to Apache ORC and store in HDFS as Apache Hive tables.

Server Dashboard

82436-serverrainbowprocessing.png

Rainbow Processing

82447-rainbowserverprocessingflow.png

Routing is Easy
82440-routedataoncommandandsize.png

On High Humidity, Send a Slack Message (Query on humidity value)

82438-alertsonhighhumidity.png

82439-pushiottoslack.png

We can dive into any flowfile as it travels through the system and examine it's data and metadata.

82430-movidusflowfile.png

Now that my data is saved in HDFS with Hive tables on top I can use the latest version of Apache Zeppelin to analyze the data.

82442-rainbowdatazepp.png

82443-gpstabulardatazepp.png

I added some maps to Zeppelin via Helium, which is now available in HDP 3.0.

82444-gpsresultszepposmmap.png

I found a bunch of new chart types, this one could be insightful.

82432-newzeppelincharts.png82445-movidiussenseresultszepp.png


So with the latest NiFi 1.7.1 and HDP 3.0 I can do a lot of interesting things. Next up, let's run some Dockerized TensorFlow application in my HDP 3.0 cluster.

Strata Talk: https://conferences.oreilly.com/strata/strata-ny/public/schedule/detail/68140

Python Scripts

https://github.com/tspannhw/StrataNYC2018/tree/master

Schemas


rainbow

{
 "type": "record",
 "name": "rainbow",
 "fields": [
  {
   "name": "tempf",
   "type": "double",
   "doc": "Type inferred from '84.15'"
  },
  {
   "name": "cputemp",
   "type": "double",
   "doc": "Type inferred from '53.0'"
  },
  {
   "name": "pressure",
   "type": "double",
   "doc": "Type inferred from '101028.56'"
  },
  {
   "name": "host",
   "type": "string",
   "doc": "Type inferred from '\"rainbow\"'"
  },
  {
   "name": "uniqueid",
   "type": "string",
   "doc": "Type inferred from '\"rainbow_uuid_20180718234222\"'"
  },
  {
   "name": "ipaddress",
   "type": "string",
   "doc": "Type inferred from '\"192.168.1.165\"'"
  },
  {
   "name": "temp",
   "type": "double",
   "doc": "Type inferred from '38.58'"
  },
  {
   "name": "diskfree",
   "type": "string",
   "doc": "Type inferred from '\"4831.2 MB\"'"
  },
  {
   "name": "altitude",
   "type": "double",
   "doc": "Type inferred from '80.65'"
  },
  {
   "name": "ts",
   "type": "string",
   "doc": "Type inferred from '\"2018-07-18 23:42:22\"'"
  },
  {
   "name": "tempf2",
   "type": "double",
   "doc": "Type inferred from '28.97'"
  },
  {
   "name": "memory",
   "type": "double",
   "doc": "Type inferred from '32.3'"
  }
 ]
}

gps

{
 "type": "record",
 "name": "gps",
 "fields": [
  {
   "name": "speed",
   "type": "string",
   "doc": "Type inferred from '\"0.066\"'"
  },
  {
   "name": "diskfree",
   "type": "string",
   "doc": "Type inferred from '\"4830.3 MB\"'"
  },
  {
   "name": "altitude",
   "type": "string",
   "doc": "Type inferred from '\"43.0\"'"
  },
  {
   "name": "ts",
   "type": "string",
   "doc": "Type inferred from '\"2018-07-18 23:46:39\"'"
  },
  {
   "name": "cputemp",
   "type": "double",
   "doc": "Type inferred from '54.0'"
  },
  {
   "name": "latitude",
   "type": "string",
   "doc": "Type inferred from '\"40.2681555\"'"
  },
  {
   "name": "track",
   "type": "string",
   "doc": "Type inferred from '\"0.0\"'"
  },
  {
   "name": "memory",
   "type": "double",
   "doc": "Type inferred from '32.3'"
  },
  {
   "name": "host",
   "type": "string",
   "doc": "Type inferred from '\"rainbow\"'"
  },
  {
   "name": "uniqueid",
   "type": "string",
   "doc": "Type inferred from '\"gps_uuid_20180718234640\"'"
  },
  {
   "name": "ipaddress",
   "type": "string",
   "doc": "Type inferred from '\"192.168.1.165\"'"
  },
  {
   "name": "epd",
   "type": "string",
   "doc": "Type inferred from '\"nan\"'"
  },
  {
   "name": "utc",
   "type": "string",
   "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  },
  {
   "name": "epx",
   "type": "string",
   "doc": "Type inferred from '\"40.135\"'"
  },
  {
   "name": "epy",
   "type": "string",
   "doc": "Type inferred from '\"42.783\"'"
  },
  {
   "name": "epv",
   "type": "string",
   "doc": "Type inferred from '\"171.35\"'"
  },
  {
   "name": "ept",
   "type": "string",
   "doc": "Type inferred from '\"0.005\"'"
  },
  {
   "name": "eps",
   "type": "string",
   "doc": "Type inferred from '\"85.57\"'"
  },
  {
   "name": "longitude",
   "type": "string",
   "doc": "Type inferred from '\"-74.529094\"'"
  },
  {
   "name": "mode",
   "type": "string",
   "doc": "Type inferred from '\"3\"'"
  },
  {
   "name": "time",
   "type": "string",
   "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  },
  {
   "name": "climb",
   "type": "string",
   "doc": "Type inferred from '\"0.0\"'"
  },
  {
   "name": "epc",
   "type": "string",
   "doc": "Type inferred from '\"nan\"'"
  }
 ]
}

SQL

%sql


CREATE EXTERNAL TABLE IF NOT EXISTS movidiussense (label5 STRING, runtime STRING, label1 STRING, diskfree STRING, top1 STRING, starttime STRING, label2 STRING, label3 STRING, top3pct STRING, host STRING, top5pct STRING, humidity DOUBLE, currenttime STRING, roll DOUBLE, uuid STRING, label4 STRING, tempf DOUBLE, y DOUBLE, top4pct STRING, cputemp2 DOUBLE, top5 STRING, top2pct STRING, ipaddress STRING, cputemp INT, pitch DOUBLE, x DOUBLE, z DOUBLE, yaw DOUBLE, pressure DOUBLE, top3 STRING, temp DOUBLE, memory DOUBLE, top4 STRING, imagefilename STRING, top1pct STRING, top2 STRING) STORED AS ORC LOCATION '/movidiussense'


%sql


CREATE EXTERNAL TABLE IF NOT EXISTS minitensorflow2 (image STRING, ts STRING, host STRING, score STRING, human_string STRING, node_id INT) STORED AS ORC LOCATION '/minifitensorflow2'

%sql


CREATE EXTERNAL TABLE IF NOT EXISTS gps (speed STRING, diskfree STRING, altitude STRING, ts STRING, cputemp DOUBLE, latitude STRING, track STRING, memory DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, epd STRING, utc STRING, epx STRING, epy STRING, epv STRING, ept STRING, eps STRING, longitude STRING, mode STRING, time STRING, climb STRING, epc STRING) STORED AS ORC LOCATION '/gps'


%sql


CREATE EXTERNAL TABLE IF NOT EXISTS rainbow (tempf DOUBLE, cputemp DOUBLE, pressure DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, temp DOUBLE, diskfree STRING, altitude DOUBLE, ts STRING, 
                                             tempf2 DOUBLE, memory DOUBLE) STORED AS ORC LOCATION '/rainbow'





References

NiFi Flows

rainbow-server-processing.xml

rainbow-minifi-ingest-in-nifi.xml


nifirainbowserverflow.png
725 Views
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
2 of 2
Last update:
‎08-17-2019 06:54 AM
Updated by:
 
Contributors
Top Kudoed Authors