Community Articles

Find and share helpful community-sourced technical articles.
avatar
Master Guru


105382-edgetoaiarch.png

Use Case

IoT Devices with Sensors, Cameras

Overview

In this, the third of the CDSW series, we build on using CDSW to classify images with a Python Apache MXNet model. In this use case we are receiving edge data from devices running MiniFi Agents that are collecting sensor data, images and also running edge analytics with TensorFlow. An Apache NiFi server collects this data with full data lineage using HTTP calls from the device(s). We then filter, transform, merge and route the sensor data, image data, deep learning analytics data and metadata to different data stores. As part of the flow we upload our images to a cloud hosted FTP server (could be S3 or any media store anywhere) and call a CDSW Model from Apache NiFi via REST and get the model results back as JSON. We are also storing our sensor data in Parquet files in HDFS. We then trigger a PySpark job from CDSW via API from Apache NiFi and check the status of that. We store the status result data in Parquet as well for PySpark SQL analysis.

For additional steps we can join together the image and sensor data via image name and do additional queries, reports and dashboards.

We can also route this data to Apache Kafka for downstream analysis in Kafka Streams, Storm, Spark Streaming or SAM.

Part 1: https://community.hortonworks.com/content/kbentry/239858/integrating-machine-learning-models-into-yo...

Part 2: https://community.hortonworks.com/content/kbentry/239961/using-cloudera-data-science-workbench-with-...

Summary

MiniFi Java Agents read sensor values and feed them to Apache NiFi via HTTPS with full data provenance and lineage. Apache NiFi acts as master orchestrator conducting, filtering, transforming, converting, querying, aggregating, routing and cleansing the streams. As part of the flow we call Cloudera Data Science Workbench via REST API to classify ingested images via an Apache MXNet Python GluonCV Yolo model. We also call a Spark job to process ingested Parquet files stored in HDFS loaded from the related sensor and metadata. The Pyspark jobs are triggered from Apache NiFi via REST API calls to Cloudera Data Science Workbench's jobs api.

For this particular integration I am using a self-built Apache 1.9, Apache NiFi - MiniFi Java Agent 0.5.0, Cloudera Data Science Workbench 1.5 for HDP, HDFS, Apache Spark 2, Python 3, PySpark and Parquet.

Overall Apache NiFi Flow

105383-nifiiotarch1.png

105386-nififlowarch3.png

105385-nififlow2.png

Workflow walk-thru

For Images, we transmit the images to an FTP server, run them through an Inception classifier (TensorFlow NiFi Processor) and extract those results plus metadata for future uses.

For Sensor Data, we merge it, convert to Parquet and store the files. We also store it to HBase and send alerts to a slack channel. When we are complete we trigger an Apache Spark PySpark SQL job via CDSW. This job can email us a report and has nice dashboards to see your job run. We also clean up, filter, flatten and merge with JSON status as Parquet files for future analysis with PySpark SQL.

We must set Content-Type for application/json, send an empty message body, no chunk encoding and you can turn on Always Output response.

105389-httppost.png

We need to cleanup and remove some fields from the status returned. Jolt works magic on JSON.

105391-joltcleanup.png

Setting up FTP is easy.

105395-sendimagestoftp.png

Here is what some of the sensor data looks like while in motion.

105396-sensordata.png

We setup a job in CDSW very easily from an existing Python file.

105390-jobslist.png

After we have run the job a few times we get a nice graph of run duration for our Job history.

105392-pysparkjobhistory.png

You can see details of the run including the session and the results.

105393-pysparksensorjob.png

When the job is running you can see it in process and all the completed runs.

105394-pysparksqljob.png

We can query our data with Pyspark Dataframes for simple output.

105397-sensorpysparksql.png

we can display the schema.

105398-sensorschemaanddata.png

We can use Pandas for a nicer table display of the data.

105399-sensorspysparksession.png

105400-sensorworkbenchexplore.png

Load Data Manually

We can have Apache NiFi push to HDFS directly for us. To load data manually in Cloudera DSW after uploading the files to a directory in CDSW:

# To Load Data Created By niFi

!hdfs dfs -mkdir /tmp/status
!hdfs dfs -put status/*.parquet /tmp/status
!hdfs dfs -ls /tmp/status!hdfs dfs -mkdir /tmp/sensors
!hdfs dfs -put sensors/*.parquet /tmp/sensors
!hdfs dfs -ls /tmp/sensors

Source Code

https://github.com/tspannhw/nifi-cdsw-edge

Jolt To Cleanup CDSW Status JSON

[{  
"operation": "shift",  
"spec": {  "*.*": "&(0,1)_&(0,2)",  
           "*.*.*": "&(0,1)_&(0,2)_&(0,3)",  
           "*.*.*.*": "&(0,1)_&(0,2)_&(0,3)_&(0,4)",             "*": "&"  }  },  
{  "operation": "remove",  
"spec": {  "environment": "",  "environment*": "",  "latest_k8s": "",  
"report_attachments": ""  }}]

We remove the arrays, remove some unwanted fields and flatten the data for easy querying. We then convert to Apache Avro and store as Apache Parquet files for querying with Pyspark.

URL to Start a Cloudera Data Science Workbench Job

http://cdsw/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4/start

as Per:

http://cdsw.example.com/api/v1/projects/<$USERNAME>/<$PROJECT_NAME>/jobs/<$JOB_ID>/start

What Does the IoT Data Look Like?

{ "uuid" : "20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125", "BH1745_clear" : "0.0", "te" : "601.1575453281403", "host" : "piups", "BH1745_blue" : "0.0", "imgname" : "/opt/demo/images/bog_image_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg", "lsm303d_accelerometer" : "+00.08g : -01.01g : +00.09g", "cputemp" : 44, "systemtime" : "02/12/2019 23:34:39", "memory" : 45.7, "bme680_tempc" : "23.97", "imgnamep" : "/opt/demo/images/bog_image_p_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg", "bme680_pressure" : "1000.91", "BH1745_red" : "0.0", "bme680_tempf" : "75.15", "diskusage" : "9622.5", "ltr559_lux" : "000.00", "bme680_humidity" : "24.678", "lsm303d_magnetometer" : "+00.03 : +00.42 : -00.11", "BH1745_green" : "0.0", "ipaddress" : "192.168.1.166", "starttime" : "02/12/2019 23:24:38", "ltr559_prox" : "0000", "VL53L1X_distance_in_mm" : 553, "end" : "1550032479.3900714" }

What Does the TensorFlow Image Analytics Data Look Like?

{"probability_4":"2.00%","file.group":"root", "s2s.address":"192.168.1.166:60966", "probability_5":"1.90%","file.lastModifiedTime":"2019-02-12T18:02:21-0500", "probability_2":"3.14%","probability_3":"2.35%","probability_1":"3.40%", "file.permissions":"rw-r--r--","uuid":"0596aa5f-325b-4bd2-ae80-6c7561c8c056", "absolute.path":"/opt/demo/images/","path":"/","label_5":"fountain", "label_4":"lampshade","filename":"bog_image_20190212230221_00c846a7-b8d2-4192-b8eb-f6f13268483c.jpg", "label_3":"breastplate","s2s.host":"192.168.1.166","file.creationTime":"2019-02-12T18:02:21-0500", "file.lastAccessTime":"2019-02-12T18:02:21-0500", "file.owner":"root", "label_2":"spotlight", "label_1":"coffeepot", "RouteOnAttribute.Route":"isImage"}

Transformed Job Status Data

{ "id" : 4, "name" : "Pyspark SQL Job", "script" : "pysparksqljob.py", "cpu" : 2, "memory" : 4, "nvidia_gpu" : 0, "engine_image_id" : 7, "kernel" : "python3", "englishSchedule" : "", "timezone" : "America/New_York", "total_runs" : 108, "total_failures" : 0, "paused" : false, "type" : "manual", "creator_id" : 19, "creator_username" : "tspann", "creator_name" : "Timothy Spann", "creator_email" : "tspann@EmailIsland.Space", "creator_url" : "http://cdsw-hdp-3/api/v1/users/tspann", "creator_html_url" : "http://cdsw-hdp-3/tspann", "project_id" : 30, "project_slug" : "tspann/future-of-data-meetup-princeton-12-feb-2019", "project_name" : "Future of Data Meetup Princeton 12 Feb 2019", "project_owner_id" : 19, "project_owner_username" : "tspann", "project_owner_email" : "tspann@email.tu", "project_owner_name" : "Timothy Spann", "project_owner_url" : "http://cdsw-hdp-3/api/v1/users/tspann", "project_owner_html_url" : "http://cdsw-hdp/tspann", "project_url" : "http://cdsw-hdp-3/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019", "project_html_url" : "http://cdsw-hdp-3/tspann/future-of-data-meetup-princeton-12-feb-2019", "latest_id" : "jq47droa9zv9ou0j", "latest_batch" : true, "latest_job_id" : 4, "latest_status" : "scheduling", "latest_oomKilled" : false, "latest_created_at" : "2019-02-13T13:04:28.961Z", "latest_scheduling_at" : "2019-02-13T13:04:28.961Z", "latest_url" : "http://server/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/dashboards/jq47droa...", "latest_html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/engines/jq47droa9zv9ou0j", "latest_shared_view_visibility" : "private", "report_include_logs" : true, "report_send_from_creator" : false, "timeout" : 30, "timeout_kill" : false, "created_at" : "2019-02-13T04:46:26.597Z", "updated_at" : "2019-02-13T04:46:26.597Z", "shared_view_visibility" : "private", "url" : "http://serverapi/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4", "html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4", "engine_id" : "jq47droa9zv9ou0j" }

PySpark Sensor Spark SQL for Data Analysis

from __future__ import print_function
import pandas as pd
import sys, re
from operator import add
from pyspark.sql import SparkSession
pd.options.display.html.table_schema = True
spark = SparkSession  .builder  .appName("Sensors")  .getOrCreate()
# Access the parquet 
sensor = spark.read.parquet("/tmp/sensors/*.parquet")
data = sensor.toPandas()
pd.DataFrame(data)
spark.stop()

PySpark Status Spark SQL for Data Analysis


from __future__ import print_function
import pandas as pd
import sys, re
from operator import add
from pyspark.sql 
import SparkSession
pd.options.display.html.table_schema = True  
spark = SparkSession\  
.builder\  
.appName("Status")\  
.getOrCreate()
# Access the parquet  
sensor = spark.read.parquet("/tmp/status/*.parquet")
# show content
sensor.show()    
# query
# 
sensor.select(sensor['bme680_humidity'], sensor['bme680_tempf'], sensor['lsm303d_magnetometer']).show()
sensor.printSchema()sensor.count()
data = sensor.toPandas()pd.DataFrame(data)
spark.stop()


Status Schema (jobstatus)

{  
   "type":"record",
   "name":"jobstatus",
   "fields":[  
      {  
         "name":"id",
         "type":["int","null"]
      },
      {  
         "name":"name",
         "type":["string","null"]
      },
      {  
         "name":"script",
         "type":["string","null"]
      },
      {  
         "name":"cpu",
         "type":["int","null"]
      },
      {  
         "name":"memory",
         "type":["int","null"]
      },
      {  
         "name":"nvidia_gpu",
         "type":["int","null"]
      },
      {  
         "name":"engine_image_id",
         "type":["int","null"]
      },
      {  
         "name":"kernel",
         "type":["string","null"]
      },
      {  
         "name":"englishSchedule",
         "type":["string","null"]
      },
      {  
         "name":"timezone",
         "type":["string","null"]
      },
      {  
         "name":"total_runs",
         "type":["int","null"]
      },
      {  
         "name":"total_failures",
         "type":["int","null"],
         "doc":"Type inferred from '0'"
      },
      {  
         "name":"paused",
         "type":["boolean","null"],
         "doc":"Type inferred from 'false'"
      },
      {  
         "name":"type",
         "type":["string","null"],
         "doc":"Type inferred from '\"manual\"'"
      },
      {  
         "name":"creator_id",
         "type":["int","null"],
         "doc":"Type inferred from '19'"
      },
      {  
         "name":"creator_username",
         "type":["string","null"]
      },
      {  
         "name":"creator_name",
         "type":["string","null"]
      },
      {  
         "name":"creator_email",
         "type":["string","null"]
      },
      {  
         "name":"creator_url",
         "type":["string","null"]
      },
      {  
         "name":"creator_html_url",
         "type":["string","null"]
      },
      {  
         "name":"project_id",
         "type":["int","null"]
      },
      {  
         "name":"project_slug",
         "type":["string","null"]
      },
      {  
         "name":"project_name",
         "type":["string","null"]
      },
      {  
         "name":"project_owner_id",
         "type":["int","null"]
      },
      {  
         "name":"project_owner_username",
         "type":["string","null"]
      },
      {  
         "name":"project_owner_email",
         "type":["string","null"]
      },
      {  
         "name":"project_owner_name",
         "type":["string","null"]
      },
      {  
         "name":"project_owner_url",
         "type":["string","null"]
      },
      {  
         "name":"project_owner_html_url",
         "type":["string","null"]
      },
      {  
         "name":"project_url",
         "type":["string","null"]
               },
      {  
         "name":"project_html_url",
         "type":["string","null"]
      },
      {  
         "name":"latest_id",
         "type":["string","null"]
      },
      {  
         "name":"latest_batch",
         "type":["boolean","null"]
      },
      {  
         "name":"latest_job_id",
         "type":["int","null"]
      },
      {  
         "name":"latest_status",
         "type":["string","null"]
      },
      {  
         "name":"latest_oomKilled",
         "type":["boolean","null"]
      },
      {  
         "name":"latest_created_at",
         "type":["string","null"]
      },
      {  
         "name":"latest_scheduling_at",
         "type":["string","null"]
      },
      {  
         "name":"latest_url",
         "type":["string","null"]
      },
      {  
         "name":"latest_html_url",
         "type":["string","null"]
      },
      {  
         "name":"latest_shared_view_visibility",
         "type":["string","null"]
      },
      {  
         "name":"report_include_logs",
         "type":["boolean","null"]
      },
      {  
         "name":"report_send_from_creator",
         "type":["boolean","null"]
      },
      {  
         "name":"timeout",
         "type":["int","null"]
      },
      {  
         "name":"timeout_kill",
         "type":["boolean","null"]
      },
      {  
         "name":"created_at",
         "type":["string","null"]
      },
      {  
         "name":"updated_at",
         "type":["string","null"]
      },
      {  
         "name":"shared_view_visibility",
         "type":["string","null"]
      },
      {  
         "name":"url",
         "type":["string","null"]
      },
      {  
         "name":"html_url",
         "type":["string","null"]
      },
      {  
         "name":"engine_id",
         "type":["string","null"]
      }
   ]
}

Documentation

https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_rest_apis.html#cdsw...

References

https://community.hortonworks.com/articles/229522/iot-series-sensors-utilizing-breakout-garden-hat-p...

https://community.hortonworks.com/articles/232136/iot-series-sensors-utilizing-breakout-garden-hat-p...

Join Me in March at Data Works Summit in Barcelona.

105384-barcelona1.png

Or In Princeton Monthly.


callmodel.pngcalljob2.png
3,258 Views