1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1843 | 04-03-2024 06:39 AM | |
| 2872 | 01-12-2024 08:19 AM | |
| 1584 | 12-07-2023 01:49 PM | |
| 2348 | 08-02-2023 07:30 AM | |
| 3241 | 03-29-2023 01:22 PM |
02-13-2019
05:22 AM
5 Kudos
Use Case IoT Devices with Sensors, Cameras Overview In this, the third of the CDSW series, we build on using CDSW to classify images with a Python Apache MXNet model. In this use case we are receiving edge data from devices running MiniFi Agents that are collecting sensor data, images and also running edge analytics with TensorFlow. An Apache NiFi server collects this data with full data lineage using HTTP calls from the device(s). We then filter, transform, merge and route the sensor data, image data, deep learning analytics data and metadata to different data stores. As part of the flow we upload our images to a cloud hosted FTP server (could be S3 or any media store anywhere) and call a CDSW Model from Apache NiFi via REST and get the model results back as JSON. We are also storing our sensor data in Parquet files in HDFS. We then trigger a PySpark job from CDSW via API from Apache NiFi and check the status of that. We store the status result data in Parquet as well for PySpark SQL analysis. For additional steps we can join together the image and sensor data via image name and do additional queries, reports and dashboards. We can also route this data to Apache Kafka for downstream analysis in Kafka Streams, Storm, Spark Streaming or SAM. Part 1: https://community.hortonworks.com/content/kbentry/239858/integrating-machine-learning-models-into-your-big.html Part 2: https://community.hortonworks.com/content/kbentry/239961/using-cloudera-data-science-workbench-with-apache.html Summary MiniFi Java Agents read sensor values and feed them to Apache NiFi via HTTPS with full data provenance and lineage. Apache NiFi acts as master orchestrator conducting, filtering, transforming, converting, querying, aggregating, routing and cleansing the streams. As part of the flow we call Cloudera Data Science Workbench via REST API to classify ingested images via an Apache MXNet Python GluonCV Yolo model. We also call a Spark job to process ingested Parquet files stored in HDFS loaded from the related sensor and metadata. The Pyspark jobs are triggered from Apache NiFi via REST API calls to Cloudera Data Science Workbench's jobs api. For this particular integration I am using a self-built Apache 1.9, Apache NiFi - MiniFi Java Agent 0.5.0, Cloudera Data Science Workbench 1.5 for HDP, HDFS, Apache Spark 2, Python 3, PySpark and Parquet. Overall Apache NiFi Flow Workflow walk-thru For Images, we transmit the images to an FTP server, run them through an Inception classifier (TensorFlow NiFi Processor) and extract those results plus metadata for future uses. For Sensor Data, we merge it, convert to Parquet and store the files. We also store it to HBase and send alerts to a slack channel. When we are complete we trigger an Apache Spark PySpark SQL job via CDSW. This job can email us a report and has nice dashboards to see your job run. We also clean up, filter, flatten and merge with JSON status as Parquet files for future analysis with PySpark SQL. We must set Content-Type for application/json, send an empty message body, no chunk encoding and you can turn on Always Output response. We need to cleanup and remove some fields from the status returned. Jolt works magic on JSON. Setting up FTP is easy. Here is what some of the sensor data looks like while in motion. We setup a job in CDSW very easily from an existing Python file. After we have run the job a few times we get a nice graph of run duration for our Job history. You can see details of the run including the session and the results. When the job is running you can see it in process and all the completed runs. We can query our data with Pyspark Dataframes for simple output. we can display the schema. We can use Pandas for a nicer table display of the data. Load Data Manually We can have Apache NiFi push to HDFS directly for us. To load data manually in Cloudera DSW after uploading the files to a directory in CDSW: # To Load Data Created By niFi
!hdfs dfs -mkdir /tmp/status
!hdfs dfs -put status/*.parquet /tmp/status
!hdfs dfs -ls /tmp/status!hdfs dfs -mkdir /tmp/sensors
!hdfs dfs -put sensors/*.parquet /tmp/sensors
!hdfs dfs -ls /tmp/sensors
Source Code https://github.com/tspannhw/nifi-cdsw-edge Jolt To Cleanup CDSW Status JSON [{
"operation": "shift",
"spec": { "*.*": "&(0,1)_&(0,2)",
"*.*.*": "&(0,1)_&(0,2)_&(0,3)",
"*.*.*.*": "&(0,1)_&(0,2)_&(0,3)_&(0,4)", "*": "&" } },
{ "operation": "remove",
"spec": { "environment": "", "environment*": "", "latest_k8s": "",
"report_attachments": "" }}] We remove the arrays, remove some unwanted fields and flatten the data for easy querying. We then convert to Apache Avro and store as Apache Parquet files for querying with Pyspark. URL to Start a Cloudera Data Science Workbench Job http://cdsw/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4/start as Per: http://cdsw.example.com/api/v1/projects/<$USERNAME>/<$PROJECT_NAME>/jobs/<$JOB_ID>/start What Does the IoT Data Look Like? {
"uuid" : "20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125",
"BH1745_clear" : "0.0",
"te" : "601.1575453281403",
"host" : "piups",
"BH1745_blue" : "0.0",
"imgname" : "/opt/demo/images/bog_image_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg",
"lsm303d_accelerometer" : "+00.08g : -01.01g : +00.09g",
"cputemp" : 44,
"systemtime" : "02/12/2019 23:34:39",
"memory" : 45.7,
"bme680_tempc" : "23.97",
"imgnamep" : "/opt/demo/images/bog_image_p_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg",
"bme680_pressure" : "1000.91",
"BH1745_red" : "0.0",
"bme680_tempf" : "75.15",
"diskusage" : "9622.5",
"ltr559_lux" : "000.00",
"bme680_humidity" : "24.678",
"lsm303d_magnetometer" : "+00.03 : +00.42 : -00.11",
"BH1745_green" : "0.0",
"ipaddress" : "192.168.1.166",
"starttime" : "02/12/2019 23:24:38",
"ltr559_prox" : "0000",
"VL53L1X_distance_in_mm" : 553,
"end" : "1550032479.3900714"
} What Does the TensorFlow Image Analytics Data Look Like? {"probability_4":"2.00%","file.group":"root",
"s2s.address":"192.168.1.166:60966",
"probability_5":"1.90%","file.lastModifiedTime":"2019-02-12T18:02:21-0500",
"probability_2":"3.14%","probability_3":"2.35%","probability_1":"3.40%",
"file.permissions":"rw-r--r--","uuid":"0596aa5f-325b-4bd2-ae80-6c7561c8c056",
"absolute.path":"/opt/demo/images/","path":"/","label_5":"fountain",
"label_4":"lampshade","filename":"bog_image_20190212230221_00c846a7-b8d2-4192-b8eb-f6f13268483c.jpg",
"label_3":"breastplate","s2s.host":"192.168.1.166","file.creationTime":"2019-02-12T18:02:21-0500",
"file.lastAccessTime":"2019-02-12T18:02:21-0500",
"file.owner":"root",
"label_2":"spotlight",
"label_1":"coffeepot",
"RouteOnAttribute.Route":"isImage"} Transformed Job Status Data {
"id" : 4,
"name" : "Pyspark SQL Job",
"script" : "pysparksqljob.py",
"cpu" : 2,
"memory" : 4,
"nvidia_gpu" : 0,
"engine_image_id" : 7,
"kernel" : "python3",
"englishSchedule" : "",
"timezone" : "America/New_York",
"total_runs" : 108,
"total_failures" : 0,
"paused" : false,
"type" : "manual",
"creator_id" : 19,
"creator_username" : "tspann",
"creator_name" : "Timothy Spann",
"creator_email" : "tspann@EmailIsland.Space",
"creator_url" : "http://cdsw-hdp-3/api/v1/users/tspann",
"creator_html_url" : "http://cdsw-hdp-3/tspann",
"project_id" : 30,
"project_slug" : "tspann/future-of-data-meetup-princeton-12-feb-2019",
"project_name" : "Future of Data Meetup Princeton 12 Feb 2019",
"project_owner_id" : 19,
"project_owner_username" : "tspann",
"project_owner_email" : "tspann@email.tu",
"project_owner_name" : "Timothy Spann",
"project_owner_url" : "http://cdsw-hdp-3/api/v1/users/tspann",
"project_owner_html_url" : "http://cdsw-hdp/tspann",
"project_url" : "http://cdsw-hdp-3/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019",
"project_html_url" : "http://cdsw-hdp-3/tspann/future-of-data-meetup-princeton-12-feb-2019",
"latest_id" : "jq47droa9zv9ou0j",
"latest_batch" : true,
"latest_job_id" : 4,
"latest_status" : "scheduling",
"latest_oomKilled" : false,
"latest_created_at" : "2019-02-13T13:04:28.961Z",
"latest_scheduling_at" : "2019-02-13T13:04:28.961Z",
"latest_url" : "http://server/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/dashboards/jq47droa9zv9ou0j",
"latest_html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/engines/jq47droa9zv9ou0j",
"latest_shared_view_visibility" : "private",
"report_include_logs" : true,
"report_send_from_creator" : false,
"timeout" : 30,
"timeout_kill" : false,
"created_at" : "2019-02-13T04:46:26.597Z",
"updated_at" : "2019-02-13T04:46:26.597Z",
"shared_view_visibility" : "private",
"url" : "http://serverapi/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4",
"html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4",
"engine_id" : "jq47droa9zv9ou0j"
} PySpark Sensor Spark SQL for Data Analysis from __future__ import print_function
import pandas as pd
import sys, re
from operator import add
from pyspark.sql import SparkSession
pd.options.display.html.table_schema = True
spark = SparkSession .builder .appName("Sensors") .getOrCreate()
# Access the parquet
sensor = spark.read.parquet("/tmp/sensors/*.parquet")
data = sensor.toPandas()
pd.DataFrame(data)
spark.stop() PySpark Status Spark SQL for Data Analysis from __future__ import print_function
import pandas as pd
import sys, re
from operator import add
from pyspark.sql
import SparkSession
pd.options.display.html.table_schema = True
spark = SparkSession\
.builder\
.appName("Status")\
.getOrCreate()
# Access the parquet
sensor = spark.read.parquet("/tmp/status/*.parquet")
# show content
sensor.show()
# query
#
sensor.select(sensor['bme680_humidity'], sensor['bme680_tempf'], sensor['lsm303d_magnetometer']).show()
sensor.printSchema()sensor.count()
data = sensor.toPandas()pd.DataFrame(data)
spark.stop()
Status Schema (jobstatus) {
"type":"record",
"name":"jobstatus",
"fields":[
{
"name":"id",
"type":["int","null"]
},
{
"name":"name",
"type":["string","null"]
},
{
"name":"script",
"type":["string","null"]
},
{
"name":"cpu",
"type":["int","null"]
},
{
"name":"memory",
"type":["int","null"]
},
{
"name":"nvidia_gpu",
"type":["int","null"]
},
{
"name":"engine_image_id",
"type":["int","null"]
},
{
"name":"kernel",
"type":["string","null"]
},
{
"name":"englishSchedule",
"type":["string","null"]
},
{
"name":"timezone",
"type":["string","null"]
},
{
"name":"total_runs",
"type":["int","null"]
},
{
"name":"total_failures",
"type":["int","null"],
"doc":"Type inferred from '0'"
},
{
"name":"paused",
"type":["boolean","null"],
"doc":"Type inferred from 'false'"
},
{
"name":"type",
"type":["string","null"],
"doc":"Type inferred from '\"manual\"'"
},
{
"name":"creator_id",
"type":["int","null"],
"doc":"Type inferred from '19'"
},
{
"name":"creator_username",
"type":["string","null"]
},
{
"name":"creator_name",
"type":["string","null"]
},
{
"name":"creator_email",
"type":["string","null"]
},
{
"name":"creator_url",
"type":["string","null"]
},
{
"name":"creator_html_url",
"type":["string","null"]
},
{
"name":"project_id",
"type":["int","null"]
},
{
"name":"project_slug",
"type":["string","null"]
},
{
"name":"project_name",
"type":["string","null"]
},
{
"name":"project_owner_id",
"type":["int","null"]
},
{
"name":"project_owner_username",
"type":["string","null"]
},
{
"name":"project_owner_email",
"type":["string","null"]
},
{
"name":"project_owner_name",
"type":["string","null"]
},
{
"name":"project_owner_url",
"type":["string","null"]
},
{
"name":"project_owner_html_url",
"type":["string","null"]
},
{
"name":"project_url",
"type":["string","null"]
},
{
"name":"project_html_url",
"type":["string","null"]
},
{
"name":"latest_id",
"type":["string","null"]
},
{
"name":"latest_batch",
"type":["boolean","null"]
},
{
"name":"latest_job_id",
"type":["int","null"]
},
{
"name":"latest_status",
"type":["string","null"]
},
{
"name":"latest_oomKilled",
"type":["boolean","null"]
},
{
"name":"latest_created_at",
"type":["string","null"]
},
{
"name":"latest_scheduling_at",
"type":["string","null"]
},
{
"name":"latest_url",
"type":["string","null"]
},
{
"name":"latest_html_url",
"type":["string","null"]
},
{
"name":"latest_shared_view_visibility",
"type":["string","null"]
},
{
"name":"report_include_logs",
"type":["boolean","null"]
},
{
"name":"report_send_from_creator",
"type":["boolean","null"]
},
{
"name":"timeout",
"type":["int","null"]
},
{
"name":"timeout_kill",
"type":["boolean","null"]
},
{
"name":"created_at",
"type":["string","null"]
},
{
"name":"updated_at",
"type":["string","null"]
},
{
"name":"shared_view_visibility",
"type":["string","null"]
},
{
"name":"url",
"type":["string","null"]
},
{
"name":"html_url",
"type":["string","null"]
},
{
"name":"engine_id",
"type":["string","null"]
}
]
} Documentation https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_rest_apis.html#cdsw_api References https://community.hortonworks.com/articles/229522/iot-series-sensors-utilizing-breakout-garden-hat-p.html https://community.hortonworks.com/articles/232136/iot-series-sensors-utilizing-breakout-garden-hat-p-1.html Join Me in March at Data Works Summit in Barcelona. Or In Princeton Monthly.
... View more
02-10-2019
10:15 PM
This is an update to a previous article on accessing Philadelphia Open Crime Data and storing it in Apache Phoenix on HBase. Part 1: https://community.hortonworks.com/articles/54947/reading-opendata-json-and-storing-into-phoenix-tab.html For nosql summit https://community.hortonworks.com/articles/56642/creating-a-spring-boot-java-8-microservice-to-read.html Update this http://princeton0.field.hortonworks.com:16010/master-status https://community.hortonworks.com/content/kbentry/54947/reading-opendata-json-and-storing-into-phoenix-tab.html Philly Crime Data City of Philly App Token 76MVJDcTksxeS1uYPf8D0XdUF Secret Token WZnlB_YJ5r9rjj_alWVdc_yqnxRpnIk5BHgb crime https://data.phila.gov/resource/sspu-uyfa.json?$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$limit=5000 https://www.opendataphilly.org/dataset/crime-incidents/resource/f6e5998d-9d33-4a45-8397-3a6bb8607d10 https://www.opendataphilly.org/dataset/crime-incidents https://data.phila.gov/resource/sspu-uyfa.json https://data.phila.gov/resource/sspu-uyfa.json?$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$limit=5000&dispatch_date=2019-02-18 "dc_dist":"18", "dc_key":"200918067518", "dispatch_date":"2009-10-02", "dispatch_date_time":"2009-10-02T14:24:00.000", "dispatch_time":"14:24:00", "hour":"14", "location_block":"S 38TH ST / MARKETUT ST", "psa":"3", "text_general_code":"Other Assaults", "ucr_general":"800"} CREATE EXTERNAL TABLE crime (dc_dist STRING, dc_key STRING,dispatch_date STRING,dispatch_date_time STRING,hour STRING,location_block STRING,psa STRING, text_general_code STRING,ucr_general STRING) CLUSTERED BY (psa)INTO 4 BUCKETS ROW FORMAT DELIMITED STORED AS ORC LOCATION '/crime/hive' TBLPROPERTIES('transactional'='true'); dc_dist,dc_key ,dispatch_date,dispatch_date_time,hour,location_block,psa,text_general_code,ucr_general Today https://data.phila.gov/resource/sspu-uyfa.json?$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$limit=5000&dispatch_date=2016-09-12 &dispatch_date=${now():format('yyyy-MM-dd')} 311 service https://data.phila.gov/resource/4t9v-rppq.json?$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$limit=5000 http://arcgis.dvrpc.org/arcgis/rest/services/Transportation/PedestrianCounts/FeatureServer/1 http://arcgis.dvrpc.org/arcgis/rest/services/Transportation/TrafficCounts/FeatureServer/0 see: https://community.hortonworks.com/articles/52856/stream-data-into-hive-like-a-king-using-nifi.html Perhaps adapt this https://github.com/socrata/soda-java https://www.opendataphilly.org/ create table crime hdfs dfs -mkdir -p /crime/fail hdfs dfs -mkdir -p /crime https://phoenix.apache.org/faq.html /usr/hdp/current/phoenix-client/bin/sqlline.py localhost:2181:/hbase-unsecure CREATE TABLE phillycrime (dc_dist varchar, dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar, text_general_code varchar,ucr_general varchar); {"dc_dist":"18","dc_key":"200918067518","dispatch_date":"2009-10-02","dispatch_date_time":"2009-10-02T14:24:00.000","dispatch_time":"14:24:00","hour":"14","location_block":"S 38TH ST / MARKETUT ST","psa":"3","text_general_code":"Other Assaults","ucr_general":"800"} upsert into phillycrime values ('18', '200918067518', '2009-10-02','2009-10-02T14:24:00.000','14:24:00','14', 'S 38TH ST / MARKETUT ST','3','Other Assaults','800'); !tables !describe phillycrime upsert into test values org.apache.phoenix.jdbc.PhoenixDriver jdbc:phoenix:localhost:2181:/hbase-unsecure /usr/hdp/2.4.0.0-169/phoenix/phoenix-client.jar /usr/hdp/2.4.0.0-169/hbase/lib/hbase-client.jar /etc/hbase/conf/hbase-site.xml plus hadoop ones https://community.hortonworks.com/articles/19016/connect-to-phoenix-hbase-using-dbvisualizer.html 0: jdbc:phoenix:localhost:2181:/hbase-unsecur> !describe phillycrime +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | COLUMN_NAME | | +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ | | | PHILLYCRIME | DC_DIST | 12 | | | | PHILLYCRIME | DC_KEY | 12 | | | | PHILLYCRIME | DISPATCH_DATE | 12 | | | | PHILLYCRIME | DISPATCH_DATE_TIME | 12 | | | | PHILLYCRIME | HOUR | 12 | | | | PHILLYCRIME | LOCATION_BLOCK | 12 | | | | PHILLYCRIME | PSA | 12 | | | | PHILLYCRIME | TEXT_GENERAL_CODE | 12 | | | | PHILLYCRIME | UCR_GENERAL | 12 | +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ 0: jdbc:phoenix:localhost:2181:/hbase-unsecur> 0: jdbc:phoenix:localhost:2181:/hbase-unsecur> upsert into phillycrime values ('18', '200918067518', '2009-10-02','2009-10-02T14:24:00.000','14', 'S 38TH ST / MARKETUT ST','3','Other Assaults','800'); 1 row affected (0.115 seconds) 0: jdbc:phoenix:localhost:2181:/hbase-unsecur> select * from phillycrime; +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ | DC_DIST | DC_KEY | DISPATCH_DATE | DISPATCH_DATE_TIME | | +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ | 18 | 200918067518 | 2009-10-02 | 2009-10-02T14:24:00.000 | 14 | +------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+----------------+ 1 row selected (0.197 seconds) upsert into phillycrime values ('${'dc_dist'}','${'dc_key'}','${'dispatch_date'}','${'dispatch_date_time'}','${'dispatch_time'}','${'hour'}','${'location_block'}','${'psa'}','${'text_general_code'}','${'ucr_general'}') jdbc:phoenix:localhost:2181:/hbase-unsecur> select distinct(text_general_code) from phillycrime; https://data.phila.gov/resource/sspu-uyfa.json?$$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$$limit=5000&dispatch_date=${now():format('yyyy-MM-dd')} <value>phillycrime${now():toNumber()}.json</value> upsert into phillycrime values ('${'dc_dist'}','${'dc_key'}','${'dispatch_date'}','${'dispatch_date_time'}','${'dispatch_time'}','${'hour'}','${'location_block'}','${'psa'}','${'text_general_code'}','${'ucr_general'}') <name>URL</name> <value>https://data.phila.gov/resource/sspu-uyfa.json?$$$app_token=76MVJDcTksxeS1uYPf8D0XdUF&$$limit=5000&dispatch_date=${now():format('yyyy-MM-dd')}</value> </property> <property> <name>Filename</name> <value>phillycrime${now():toNumber()}.json</value> </property> <name>Regular Expression</name> <value>(?s:^.*$)</value> </property> <property> <name>Replacement Value</name> <value>upsert into phillycrime values ('${'dc_dist'}','${'dc_key'}','${'dispatch_date'}','${'dispatch_date_time'}','${'dispatch_time'}','${'hour'}','${'location_block'}','${'psa'}','${'text_general_code'}','${'ucr_general'}')</value> <comment/> <class>org.apache.nifi.dbcp.DBCPConnectionPool</class> <enabled>true</enabled> <property> <name>Database Connection URL</name> <value>jdbc:phoenix:localhost:2181:/hbase-unsecure</value> </property> <property> <name>Database Driver Class Name</name> <value>org.apache.phoenix.jdbc.PhoenixDriver</value> </property> <property> <name>Database Driver Jar Url</name> <value>file:///usr/hdp/2.4.0.0-169/phoenix/phoenix-client.jar</value> https://data.phila.gov/resource/sspu-uyfa.json https://www.opendataphilly.org/dataset/crime-incidents Run spring boot … https://www.opendataphilly.org/dataset/crime-incidents https://community.hortonworks.com/articles/72420/ingesting-remote-sensor-feeds-into-apache-phoenix.html https://github.com/tspannhw/phoenix org.apache.phoenix.jdbc.PhoenixDriver jdbc:phoenix:princeton0.field.hortonworks.com:/hbase-unsecure phoenixuser
... View more
Labels:
02-10-2019
10:08 PM
Tracking Air Quality with HDP and HDF: Part 2 - Indoor Air Quality Using a few sensors on a MiniFi node we are able to generate some air quality sensor readings. Source: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/aqminifi.py Data: row['bme680_tempc'] = '{0:.2f}'.format(sensor.data.temperature) row['bme680_tempf'] = '{0:.2f}'.format((sensor.data.temperature * 1.8) + 32) row['bme680_pressure'] = '{0:.2f}'.format(sensor.data.pressure) row['bme680_gas'] = '{0:.2f}'.format(gas) row['bme680_humidity'] = '{0:.2f}'.format(hum) row['bme680_air_quality_score'] = '{0:.2f}'.format(air_quality_score) row['bme680_gas_baseline'] = '{0:.2f}'.format(gas_baseline) row['bme680_hum_baseline'] = '{0:.2f}'.format(hum_baseline) See Part 1: https://community.hortonworks.com/articles/189630/tracking-air-quality-with-hdp-and-hdfi-part-1-apac.html Newark / NYC Hazecam https://hazecam.net/images/main/newark.jpg Example {"bme680_air_quality_score": "82.45", "uuid": "20190131191921_59c5441c-47b4-4f6f-a6d6-b3943bc9cf2b", "ipaddress": "192.168.1.166", "bme680_gas_baseline": 367283.28, "bme680_pressure": "1024.51", "bme680_hum_baseline": 40.0, "memory": 11.7, "end": "1548962361.4146328", "cputemp": 47, "host": "piups", "diskusage": "9992.7", "bme680_tempf": "87.53", "te": "761.2184100151062", "starttime": "01/31/2019 14:06:40", "systemtime": "01/31/2019 14:19:21", "bme680_humidity": "13.22", "bme680_tempc": "30.85", "bme680_gas": "363274.92"} { "end" : "1548967753.7064438", "host" : "piups", "diskusage" : "9990.4", "cputemp" : 47, "starttime" : "01/31/2019 15:44:11", "bme680_hum_baseline" : "40.00", "bme680_humidity" : "13.23", "ipaddress" : "192.168.1.166", "bme680_tempc" : "30.93", "te" : "301.96490716934204", "bme680_air_quality_score" : "83.27", "systemtime" : "01/31/2019 15:49:13", "bme680_tempf" : "87.67", "bme680_gas_baseline" : "334942.60", "uuid" : "20190131204913_4984a635-8dcd-408a-ba23-c0d225ba2d86", "bme680_pressure" : "1024.69", "memory" : 12.6, "bme680_gas" : "336547.19" }
... View more
Labels:
02-09-2019
06:30 PM
1 Kudo
Use Case:
We have data stored in a MongoDB from a third party application in Amazon.
Export from MongoDB to Parquet.
Moving data from a single purpose data silo to your Enterprise Data Lake is a common use case. Using Apache NiFi we can easily save your data from this remote silo and bring it streaming into your analytics store for machine learning and deep analytics with Impala, Hive and Spark. It doesn't matter which cloud which are coming from or going to or from cloud to on-premise or various Hybrid situations. Apache NiFi will work in all of these situations which full data lineage and provenance on what it did when.
I have created a mock dataset with Mockaroo. It's all about yummy South Jersey sandwiches.
Our Easy MongoDB Flows to Ingest Mongo data to our Date Lake and another flow to load MongoDB.
In our test, we loaded all the data from our Mock REST API into a MongoDB in the cloud. In the real world an application populated that dataset and now we need to bring it into our central data lake for analytics.
We use Jolt to replace the non-Hadoop friendly built-in MongoDB _id with a friendly name mongo_id.
Storing to Parquet on HDFS is Easy (Let's compress with Snappy)
Connecting to MongoDB is easy, setup a controller and specify the database and collection.
Our MongoDB Connection Service, just enter your URI with username/password@server.
GetHTTP URL https://my.api.mockaroo.com/hoagie.json GetHTTP Filename ${filename:append('hoagie.'):append(${now():format('yyyyMMddHHmmSS'):append(${md5}):append('.json')})} JSON Path Expression $.* JOLT Chain [{ "operation": "shift", "spec": { "_id": "mongo_id", "*": "&" } }] Mongo URI mongodb://user:userpassword@server.cloud.com:13916/nifi
Many files stored in HDFS as Parquet
... View more
Labels:
02-09-2019
05:59 PM
3 Kudos
Series: Integration of Apache NiFi and Cloudera Data Science Workbench Part 2: Using Cloudera Data Science Workbench with Apache NiFi and Apache MXNet for GluonCV YOLO Workloads Summary Now that we have shown it's easy to do standard NLP, next up is Deep Learning. As you can see NLP, Machine Learning, Deep Learning and more are all in our reach for building your own AI as a Service using tools from Cloudera. These can run in public or private clouds as scale. Now you can run and integrate machine learning services, computer vision APIs and anything you have created in house with your own Data Scientists or with the help of Cloudera Fast Forward Labs (https://www.cloudera.com/products/fast-forward-labs-research.html). The YOLO pretrained model will download the image to /tmp from the URL to process it. The Python 3 script will also download the GLUONCV model for YOLO3 as well. Using Pre-trained Model: yolo3_darknet53_voc Image Sources https://github.com/tspannhw/images and/or https://picsum.photos/400/600 Example Input { "url": "https://raw.githubusercontent.com/tspannhw/images/master/89389-nifimountains.jpg" } Sample Call to Our REST Service curl -H "Content-Type: application/json" -X POST http://myurliscoolerthanyours.com/api/altus-ds-1/models/call-model -d '{"accessKey":"longkeyandstuff","request":{"url":"https://raw.githubusercontent.com/tspannhw/images/master/89389-nifimountains.jpg"}}' Sample JSON Result Set {"class1": "cat", "pct1": "98.15670800000001", "host": "gluoncv-apache-mxnet-29-49-67dfdf4c86-vcpvr", "shape": "(1, 3, 566, 512)", "end": "1549671127.877511", "te": "10.178656578063965", "systemtime": "02/09/2019 00:12:07", "cpu": 17.0, "memory": 12.8} Example Deployment Model Resources Replicas 1 Total CPU 1 vCPUs <-- An extra vCPU wouldn't hurt. Total Memory 8.00 GiB <-- Make sure you have enough RAM. I recommend for Deep Learning models to use more vCPUs and more memory as you will be manipulating images and large tensors. I also recommend more replicas for production use cases. You can have up to 9. https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_models.html. I like the idea of 3, 5 or 7 replicas. How-To Step 1: Let's Clean Up and Test Some Python 3 Code So first I take an existing example Python3 GluonCV Apache MXNet YOLO code that I already have. As you can see it uses a pretrained model from Apache MXNet's rich model zoo. This started here: https://github.com/tspannhw/nifi-gluoncv-yolo3 I paired down the libraries as I used an interactive Python 3 session to test and refine my code. As before, I set a variable to pass in my data, this time a URL pointing to an image. As you can see in my interactive session I can run my yolo function and get results. I had a library in there to display the annotated image while I was testing. I took this code off to save time, memory and reduce libraries. This was needed while testing though. The model seems to be working it identified me as a person and my cat as a cat. Step 2: Create, Build and Deploy a Model I got to models, point to my new file and the function I used (yolo) and put in a sample Input and response. I deploy it, then it is in the list of available models. You goes through a few steps as the docker container is deployed to K8 and all the required pips are installed during a build process. Once it is built, you can see the build(s) in the Build screen. Step 3: Test the Model Once it is done building and marked deployed we can use the built in tester from Overview. We can see the result in JSON ready to travel over an HTTP REST API. Step 4: Monitor the Deployed Model We can see the standard output and error and see how many times we are called and success. You can see it downloaded the model from the Apache MXNet zoo. If you need to stop, rebuild or replace a model, it's easy. Step 5: Apache NiFi Flow As you can see it's a few steps to run the flow. I am using GenerateFlowFile to get us started, but I could have a cron scheduler starting us or react to a Kafka/MQTT/JMS message or another trigger. I then build the JSON needed to call the REST API. Example: {"accessKey":"accesskey","request":{"url":"${url}"}} Then we call the REST API via an HTTP Post (http://myurliscoolerthanyours.com/api/altus-ds-1/models/call-model). We then parse the JSON it returns to just give us the fields we want, we don't really need status. We name our schema so we can run Apache Calcite SQL queries against it. Let's only save Cats and People to our Amazon S3 bucket. At this point I can add more queries and destinations. I can store it everywhere or anywhere. Example Output {
"success": true,
"response":
{ "class1": "cat", "cpu": 38.3, "end": "1549672761.1262221", "host": "gluoncv-apache-mxnet-29-50-7fb5cfc5b9-sx6dg", "memory": 14.9, "pct1": "98.15670800000001", "shape": "(1, 3, 566, 512)", "systemtime": "02/09/2019 00:39:21", "te": "3.380652666091919" }}
Build a Schema for the Data and store it in Apache NiFi AVRO Schema Registry or Cloudera Schema Registry { "type" : "record", "name" : "gluon", "fields" : [ { "name" : "class1", "type" : ["string","null"] }, { "name" : "cpu", "type" : ["double","null"] }, { "name" : "end", "type" : ["string","null"]}, { "name" : "host", "type" : ["string","null"]}, { "name" : "memory", "type" : ["double","null"]}, { "name" : "pct1", "type" : ["string","null"] }, { "name" : "shape", "type" : ["string","null"] }, { "name" : "systemtime", "type" : ["string","null"] }, { "name" : "te", "type" : ["string","null"] } ] } I like to allow for nulls in case we have missing data, but that is up to your Data Steward and team. If you need to add a version of the schema with a new field, you must add "null" as an option since old data won't have that if you wish to share a schema. Source: https://github.com/tspannhw/nifi-cdsw-gluoncv cdswmxnet.xml
... View more
02-06-2019
05:49 PM
4 Kudos
Using Deployed Models as a Function as a Service
Using Cloudera Data Science Workbench with Apache NiFi we can easily call functions within our deployed models from Apache NiFi as part of flows. I am working against CDSW on HDP (https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_hdp.html), but it will work for all CDSW regardless of install type. In my simple example, I built a Python model that uses TextBlob to run sentiment against a passed in sentence. It returns Sentiment Polarity and Subjectivity which we can immediately act upon in our flow. CDSW is extremely easy to work with and I was up and running in a few minutes. For my model, I created a python 3 script and a shell script for install details. Both of these artifacts are available here: https://github.com/tspannhw/nifi-cdsw My Apache NiFi 1.8 flow is here (I use no custom processors): cdsw-twitter-sentiment.xml Deploying a Machine Learning Model as a REST Service Once you login to CDSW and create a project or choose an existing one (https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_projects.html). From your project, open workbench and you can install some libraries and test some Python. I am using a Python 3 session to download the TextBlob/NLTK Corpora for NLP. Let's Pip Install some libraries for testing Let's Create a new Model You choose your file (mine is sentiment.py see github). The function name is actually sentiment. Notice a typo I had to rebuild this and deploy. You setup an example input (sentence is the input parameter name) and an example output. Input and output will be JSON since this is a REST API. Let's Deploy It (Python 3) The deploy will build it for deployment. We can see standard output, standard error, status, # of REST calls received and success. Once a Model is Deployed We Can Control It We can stop it, rebuild it or replace the files if need be. I had to update things a few times. The amount of resources used for the model rest hosting if your choice from a drop down. Since I am doing something small I picked the smallest model with only 1 virtual CPU and 2 GB of RAM. All of this is running in Docker on Kubernetes! Once Deployed, It's Ready To Test and Use From Apache NiFi Just click test. See the JSON results and we can now call it from an Apache NiFi flow. Once Deployed We Can Monitor The Model Let's Run the Test See the status and response! Apache NiFi Example Flow Step 1: Call Twitter Step 2: Extract Social Attributes of Interest Step 3: Build our web call with our access key and function parameter Step 4: Extract our string as a flow file to send to the HTTP Post Step 5: Call Our Cloudera Data Science Workbench REST API (see tester). Step 6: Extract the two result values. Step 7: Let's route on the sentiment We can have negative (<0), neutral (0), positive (>0) and very positive (1) polarity of the sentiment. See TextBlob for more information on how this works. Step 8: Send bad sentiment to a slack channel for human analysis. We send all the related information to a slack channel including the message. Example Message Sent to Slack Step 9: Store all the results (or some) in either Phoenix/HBase, Hive LLAP, Impala, Kudu or HDFS. Results as Attributes Slack Message Call
${msg:append(" User:"):append(${user_name}):append(${handle}):append(" Geo:"):append(${coordinates}):append(${geo}):append(${location}):append(${place}):append(" Hashtags:"):append(${hashtags}):append(" Polarity:"):append(${polarity}):append(" Subjectivity:"):append(${subjectivity}):append(" Friends Count:"):append(${friends_count}):append(" Followers Count:"):append(${followers_count}):append(" Retweet Count:"):append(${retweet_count}):append(" Source:"):append(${source}):append(" Time:"):append(${time}):append(" Tweet ID:"):append(${tweet_id})}
REST CALL to Model
{"accessKey":"from your workbench","request":{"sentence":"${msg:replaceAll('\"', ''):replaceAll('\n','')}"}}
Resources
https://textblob.readthedocs.io/en/dev/api_reference.html#textblob.blob.TextBlob.sentiment https://community.hortonworks.com/articles/222605/converting-powerpoint-presentations-into-french-fr.html https://community.hortonworks.com/articles/76935/using-sentiment-analysis-and-nlp-tools-with-hdp-25.html
... View more
02-04-2019
06:56 PM
4 Kudos
Log Log Log Sudo logs have a lot of useful information on hosts, users and auditable actions that may be useful for cybersecurity, capacity planning, user tracking, data lake population, user management and general security. Symbol Model 1 Step 1 - Get a File Step 2 - Split Into Lines Step 3 - Set the Mime Type to Plain Text Step 4 - Extract Grok Step 5 - Action! Checking for More Options (All Named elements in the GrokPatterns) Example Sudo Log (could be /auth.log, /var/log/sudo.log, secure, ...) Jan 31 19:17:20 princeton0 su: pam_unix(su-l:session): session opened for user ambari-qa by (uid=0)
Jan 31 19:17:20 princeton0 su: pam_unix(su-l:session): session closed for user ambari-qa
Jan 31 19:18:19 princeton0 su: pam_unix(su-l:session): session opened for user zeppelin by (uid=0)
Jan 31 19:18:19 princeton0 su: pam_unix(su-l:session): session closed for user zeppelin
Jan 31 19:18:20 princeton0 su: pam_unix(su-l:session): session opened for user ambari-qa by (uid=0)
Jan 31 19:18:20 princeton0 su: pam_unix(su-l:session): session closed for user ambari-qa Grok Patterns SUDO_TTY TTY=%{NOTSPACE:sudo_tty}
SUDO_PWD PWD=%{DATA:sudo_pwd}
SUDO_COMMAND COMMAND=%{DATA:sudo_command}
SUDO_USER %{NOTSPACE:sudo_user}
SUDO_RUNAS USER=%{SUDO_USER:sudo_runas}
SUDO_REMOVE_SESSION %{SYSLOGTIMESTAMP:timestamp8} %{NOTSPACE:hostname8} %{NOTSPACE:appcaller} \[%{NOTSPACE:pid7}\]: %{GREEDYDATA:sessionremoval}
SUDO_INFO_COMMAND_SUCCESSFUL %{SUDO_USER:sudo_user2} : %{SUDO_TTY:sudo_tty2} ; %{SUDO_PWD:sudo_pwd2} ; %{SUDO_RUNAS:sudo_runas2} ; %{SUDO_COMMAND:sudo_command2}
SUDO_INFO_PAM_UNIX_SESSION_OPENED pam_unix\(%{NOTSPACE:user1}:session\): session opened for user %{NOTSPACE:sudo_runas3} by %{SUDO_USER:sudo_user3}\(uid=%{NUMBER:uid3}\)
SUDO_INFO_PAM_UNIX_SESSION_CLOSED pam_unix\(%{NOTSPACE:user4}:session\): session closed for user %{NOTSPACE:sudo_runas4}
SUDO_PAM_OPEN2 %{SYSLOGTIMESTAMP:timestamp8} %{NOTSPACE:hostname8} %{NOTSPACE:appcaller}: pam_unix\(%{NOTSPACE:user1}:session\): session opened for user %{NOTSPACE:sudo_runas81} by \(uid=%{NUMBER:uid81}\)
SUDO_SEAT %{SYSLOGTIMESTAMP:timestamp77} %{NOTSPACE:hostname77} %{NOTSPACE:appcaller77}\[%{NOTSPACE:pid77}\]: %{GREEDYDATA:message77}
SUDO_INFO %{SUDO_INFO_COMMAND_SUCCESSFUL:cmdsuccess}|%{SUDO_INFO_PAM_UNIX_SESSION_OPENED:pam_opened}|%{SUDO_INFO_PAM_UNIX_SESSION_CLOSED:pam_closed}
SUDO_ERROR_INCORRECT_PASSWORD_ATTEMPTS %{SUDO_USER} : %{NUMBER} incorrect password attempts ; %{SUDO_TTY:sudo_tty5} ; %{SUDO_PWD:sudo_pwd5} ; %{SUDO_RUNAS:sudo_runas5} ; %{SUDO_COMMAND:sudo_cmd5}
SUDO_ERROR_FAILED_TO_GET_PASSWORD %{NOTSPACE:person6} failed to get password: %{NOTSPACE:autherror6} authentication error
SUDO_PUBLICKEY %{SYSLOGTIMESTAMP:timestamp7} %{NOTSPACE:hostname7} sshd\[%{NOTSPACE:pid7}\]: Accepted publickey for %{NOTSPACE:username} from %{NOTSPACE:sourceip} port %{NOTSPACE:port} ssh2: RSA %{NOTSPACE:rsakey}
SUDO_OPEN_PAM %{SYSLOGTIMESTAMP:timestamp8} %{NOTSPACE:hostname8} %{NOTSPACE:appcaller}\[%{NOTSPACE:pid8}\]: pam_unix\(%{NOTSPACE:user1}:session\): session opened for user %{NOTSPACE:sudo_runas} by \(uid=%{NUMBER:uid}\)
SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:timestamp9}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:pam_module}\(%{DATA:pam_caller}\): session %{WORD:pam_session_state} for user %{USERNAME:username}(?: by %{NOTSPACE:pam_by})?
CRON_ACTION [A-Z ]+
CRONLOG %{SYSLOGBASE} \(%{USER:user9}\) %{CRON_ACTION:action9} \(%{DATA:message9}\)
SYSLOGLINE %{SYSLOGBASE2} %{GREEDYDATA:message10}
SUDO_ERROR %{SUDO_ERROR_FAILED_TO_GET_PASSWORD}|%{SUDO_ERROR_INCORRECT_PASSWORD_ATTEMPTS}
GREEDYMULTILINE (.|\n)*
AUTH1 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname11} sshd(?:\[%{POSINT:systemauthpid11}\])?: %{DATA:systemauthsshevent} %{DATA:systemauthsshmethod} for (invalid user )?%{DATA:systemauthuser} from %{IPORHOST:systemauthsship} port %{NUMBER:systemauthsshport} ssh2(: %{GREEDYDATA:systemauthsshsignature})?
AUTH2 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname12} sshd(?:\[%{POSINT:systemauthpid12}\])?: %{DATA:systemauthsshevent} user %{DATA:systemauthuser} from %{IPORHOST:systemauthsship}
AUTH3 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname14} sshd(?:\[%{POSINT:systemauthpid13}\])?: Did not receive identification string from %{IPORHOST:systemauthsshdroppedip}
AUTH4 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname15} sudo(?:\[%{POSINT:systemauthpid14}\])?: \s*%{DATA:systemauthuser} :( %{DATA:systemauthsudoerror} ;)? TTY=%{DATA:systemauthsudotty} ; PWD=%{DATA:systemauthsudopwd} ; USER=%{DATA:systemauthsudouser} ; COMMAND=%{GREEDYDATA:systemauthosudocmd}
AUTH5 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname16} groupadd(?:\[%{POSINT:systemauthpid15}\])?: new group: name=%{DATA:systemauthgroupaddname}, GID=%{NUMBER:systemauthgroupaddgid}
AUTH6 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname17} useradd(?:\[%{POSINT:systemauthpid16}\])?: new user: name=%{DATA:systemauthuseraddname}, UID=%{NUMBER:systemauthuseradduid}, GID=%{NUMBER:systemauthuseraddgid}, home=%{DATA:systemauthuseraddhome}, shell=%{DATA:systemauthuseraddshell}$
AUTH7 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname18} %{DATA:systemauthprogram17}(?:\[%{POSINT:systemauthpid17}\])?: %{GREEDYMULTILINE:systemauthmessage}"] }
AUTH_LOG %{AUTH1}|%{AUTH2}|%{AUTH3}|%{AUTH4}|%{AUTH5}|%{AUTH6}|%{AUTH7}
SU \+\s+%{DATA:su_tty19}\s+%{USER:su_user19}:%{USER:su_targetuser19}
SSH_AUTHFAIL_WRONGUSER Failed %{WORD:ssh_authmethod} for invalid user %{USERNAME:ssh_user} from %{IP:ssh_client_ip} port %{NUMBER:ssh_client_port} %{GREEDYDATA:message}
SSH_AUTHFAIL_WRONGCREDS Failed %{WORD:ssh_authmethod} for %{USERNAME:ssh_user} from %{IP:ssh_client_ip} port %{NUMBER:ssh_client_port} %{GREEDYDATA:message}
SSH_AUTH_SUCCESS Accepted %{WORD:ssh_authmethod} for %{USERNAME:ssh_user} from %{IP:ssh_client_ip} port %{NUMBER:ssh_client_port} %{WORD:ssh_x} %{WORD:ssh_pubkey_type} %{GREEDYDATA:ssh_pubkey_fingerprint}
SSH_DISCONNECT Received disconnect from %{IP:ssh_client_ip} port %{INT:ssh_client_port}.*?:\s+%{GREEDYDATA:ssh_disconnect_reason}
SSH %{SSH_DISCONNECT}|%{SSH_AUTH_SUCCESS}|%{SSH_AUTHFAIL_WRONGUSER}|%{SSH_AUTHFAIL_WRONGCREDS}
SUDO %{SUDO_INFO}|%{SUDO_ERROR}|%{SUDO_PUBLICKEY}|%{SSH}|%{SUDO_OPEN_PAM}|%{SUDO_REMOVE_SESSION}|%{SUDO_PAM_OPEN2}|%{SUDO_SEAT}
Using some experimentation with http://grokdebug.herokuapp.com/ and finding some known patterns online. You can easily add more patterns to grab a lot of different log types. All of these can be pulled out in a processor, as seen in the above diagram. Source Code https://github.com/tspannhw/nifi-logs Example Template sudo.xml
... View more
Labels:
01-25-2019
09:01 PM
2 Kudos
Introduction
SoChain provides a fast set of public freely available APIs (don't abuse them) to access information on various networks.
If you need this for critical work, please donate: https://chain.so/address/devfund.
One of the things you will see in this simple flow is that NiFi excels in ingesting REST and working with JSON. As you can see with split it up, shred it, filter it, manipulate and extract from it. With the resulting usable objects we build a schema that will also us to do record processing. Once we have a set of records with a schema I can store it to
I just hosted a Future of Data Princeton Meetup in Woodbridge New Jersey with some amazing speakers sponsored by ChainNinja. While this was all about Blockchain for Enterprise and no cryptocurrency was involved it made we want to investigate some cryptocurrency data. As you can see, manipulating complex JSON data, filtering, modifying, routing and scripting with it's values is trivial in Apache NiFi.
In my next article I am investigating Hyperledger and Ethereum for enterprise solutions integration with Apache NiFi, Impala, Hive, Kudu, HBase, Spark, Kafka and other enterprise technologies. Steps We read from the URL I send the original file to immutable HDFS storage. In another branch, I will use EvaluateJSONPath to pull out one attribute to use to get detail records. $.data.blocks I use that attribute to build a deeper REST call to get the details for the latest block. https://chain.so/api/v2/block/BTC/${block_no} This is in invokeHTTP which is a scriptable HTTP(s) call. This comes in handy often. In the next EvaluateJSONPath I pull out all the high level attributes of the JSON file. I want these for all the records as master fields. These are repeated. After that I split out the two arrays of data beneath that into two separate branches. I will breach these down into individual records for parsing. I could also apply a schema and handle these are groups of records. This is an example of reading a REST API and creating a unique name per call. Also notice it's easy to handle HTTPS as well as HTTP.
SoChain Ingest Flow for REST APIs Calls
Example Unique File Name we can script
${filename:append('btc.'):append(${now():format('yyyymmddHHMMSS'):append(${md5}):append('.json')})}
REST URLs
https://chain.so/api/v2/get_info/BTC
https://chain.so/api/v2/get_price/BTC/USD
https://chain.so/api/v2/get_info/DOGE
https://chain.so/api/v2/get_info/LTC
Example of the Value of the Apache NiFi Provenance. (These are the attributes acquired for one flowfile).
Attribute Values
Access-Control-Allow-Headers
Origin,Accept,Content-Type,X-Requested-With,X-CSRF-Token
Access-Control-Allow-Methods
GET,POST
Access-Control-Allow-Origin
*
CF-RAY
49e564b17e23923c-EWR
Cache-Control
no-cache, no-store, max-age=0, must-revalidate
Connection
keep-alive
Content-Type
application/json; charset=utf-8
Date
Thu, 24 Jan 2019 20:54:07 GMT
Expect-CT
max-age=604800, report-uri="https://report-uri.cloudflare.com/cdn-cgi/beacon/expect-ct"
Expires
Fri, 01 Jan 1990 00:00:00 GMT
Pragma
no-cache
Server
cloudflare
Set-Cookie
__cfduid=d6f52ee1552c73223442296ff7230e9fd1548363246; expires=Fri, 24-Jan-20 20:54:06 GMT; path=/; domain=.chain.so; HttpOnly, _mkra_ctxt=1a7dafd219c4972a7562f232dc63f524--200; path=/; max-age=5
Status
200 OK
Strict-Transport-Security
max-age=31536000;includeSubDomains
Transfer-Encoding
chunked
X-Content-Type-Options
nosniff
X-Download-Options
noopen
X-Frame-Options
SAMEORIGIN
X-Request-Id
20d3f592-50b6-40cf-a496-a6f915eb463b
X-Runtime
1.018401
X-XSS-Protection
1; mode=block
bits
172fd633
block_no
559950
blockhash
0000000000000000001c68f61ddcc30568536a583c843a7d0c9606b9582fd7e5
fee
0.05142179
filename
btc.201949241501759.json
fragment.count
1
fragment.identifier
cec10691-82e9-402b-84a9-7901b084f10a
fragment.index
0
gethttp.remote.source
chain.so
invokehttp.remote.dn
CN=ssl371663.cloudflaressl.com,OU=PositiveSSL Multi-Domain,OU=Domain Control Validated
invokehttp.request.url
https://chain.so/api/v2/block/BTC/559950
invokehttp.status.code
200
invokehttp.status.message
OK
invokehttp.tx.id
bc8a0a18-0685-4a2c-97fa-34541b9ea929
merkleroot
41eb6f68477e96c9239ae1bbe4e5d4d02529c6f7faebc4ad801730d09609a0ef
mime.type
application/json; charset=utf-8
mining_difficulty
5883988430955.408
network
BTC
next_blockhash
Empty string set
nonce
1358814296
path
./
previous_blockhash
0000000000000000001b2b3d3b5741462fe31981a6c0ae9335ed8851e936664b
schema
chainsotxinputinfo
schema.name
chainsotxinputinfo
segment.original.filename
btc.201949241501759.json
sent_value
3977.10078351
size
470242
time
1548362873
uuid
3c1d72b4-e993-4b32-a679-0741a44aeefb
An example input record:
{
"input_no" : 0,
"address" : "3N7Vid17hE1ofGcWR6bWEmtQBQ8kKQ7iKW",
"value" : "0.20993260",
"received_from" : {
"txid" : "4e0f00cddb8e3d98de7f645684dc7526468d1dc33efbbf0bc173ed19c6556896",
"output_no" : 4
}
}
An Example LiteCoin Record
{
"status" : "success",
"data" : {
"name" : "Litecoin",
"acronym" : "LTC",
"network" : "LTC",
"symbol_htmlcode" : "Ł",
"url" : "http://www.litecoin.com/",
"mining_difficulty" : "6399667.35869154",
"unconfirmed_txs" : 8,
"blocks" : 1567929,
"price" : "0.00000000",
"price_base" : "BTC",
"price_update_time" : 1548451214,
"hashrate" : "178582229079753"
}
}
Example NiFi Flow chainso.xml
... View more
Labels:
02-13-2019
11:56 AM
I changed the column type from bigint to integer for the topology_test_run_case_source.occurence and it was resolved and the test cases also started to run !!! @Timothy Spann @Geoffrey Shelton Okot thanks for the help !!!
... View more
01-22-2019
09:50 PM
4 Kudos
Working with a Proximity Beacon Network Part 1 Introduction: In a retail environment, we want to be able to interact with people within the store. Our beacons can provide hyperlocalized information and also help us determine what's going on in the store for traffic patterns. Our beacons are also giving us temperature and other reading as any sensor we may have in our IoT retail environment. I have set up three Estimote Proximity beacons in an indoor environment broadcasting both Estimote and IBeacon messages. iBeacon is a Bluetooth advertising protocol by Apple that is built into iPhones. In this article we ingest, filter and route the data based on the beacon IDs. In a following article we will stream our data to a data store(s) and run machine learning and analytics on our streaming time series data. I tested the BLE library from the command line with a Python script: Cloud Setup Since we are using our own IoT gateways and networks, I am only using the Estimote Cloud to check the beacons and make sure they are registered. Estimote provides an IPhone Application (Estimote) that you can download and use to do some basic programming and range testing of the beacons. MiniFi Setup: We run our shell script that has the Python BLE scanner run for 40 seconds and add JSON rows to a file. We then continuously tail that file and read new lines and send them to NiFi for processing. As you can see MiniFi is sending a steady stream of data to an Apache NiFi instance via HTTP S2S API. NiFi Setup: The flow is pretty simple. We add a schema name to it, split the text to get one JSON row per line. We calculate record stats, just to check. The main logic is the Partition Record to help us partition records into different categories based on their Estimote Beacon IDs. We then route based on those partitions. We can do different filtering and handling after that if we need to. We partition our JSON records into AVRO records and pull out the estimote id as a new attribute to use for routing. In the Route we look at the partition key which is an address for the device. Software: Apache NiFi 1.8.0, MiniFi 0.5.0, Java JDK 1.8, Ubuntu 16.04, Apple IPhone SE, Python BLE / Beacon Libraries. Beacon Types: iBeacon, Estimote Networks: BLE and WiFi Source Code: https://github.com/tspannhw/minifi-estimote Schema: { "type" : "record", "name" : "estimote", "fields" : [ { "name" : "battery", "type" : "int", "doc" : "Type inferred from '80'" }, { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"47a038d5eb032640\"'" }, { "name" : "magnetic_fieldz", "type" : "double", "doc" : "Type inferred from '-0.1484375'" }, { "name" : "magnetic_fieldx", "type" : "double", "doc" : "Type inferred from '-0.3125'" }, { "name" : "magnetic_fieldy", "type" : "double", "doc" : "Type inferred from '0.515625'" }, { "name" : "end", "type" : "string", "doc" : "Type inferred from '\"1547679071.5\"'" }, { "name" : "temperature", "type" : "double", "doc" : "Type inferred from '25.5'" }, { "name" : "cputemp1", "type" : "double", "doc" : "Type inferred from '38.0'" }, { "name" : "memory", "type" : "double", "doc" : "Type inferred from '26.1'" }, { "name" : "protocol_version", "type" : "int", "doc" : "Type inferred from '2'" }, { "name" : "current_motion", "type" : "int", "doc" : "Type inferred from '420'" }, { "name" : "te", "type" : "string", "doc" : "Type inferred from '\"0.362270116806\"'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"01/16/2019 17:51:11\"'" }, { "name" : "cputemp", "type" : "double", "doc" : "Type inferred from '39.0'" }, { "name" : "uptime", "type" : "int", "doc" : "Type inferred from '4870800'" }, { "name" : "host", "type" : "string", "doc" : "Type inferred from '\"Laptop\"'" }, { "name" : "diskusage", "type" : "string", "doc" : "Type inferred from '\"418487.1\"'" }, { "name" : "ipaddress", "type" : "string", "doc" : "Type inferred from '\"192.168.1.241\"'" }, { "name" : "uuid", "type" : "string", "doc" : "Type inferred from '\"20190116225111_2cbbac13-fed0-4d81-a24a-3aa593b5f674\"'" }, { "name" : "is_moving", "type" : "boolean", "doc" : "Type inferred from 'false'" }, { "name" : "accelerationy", "type" : "double", "doc" : "Type inferred from '0.015748031496062992'" }, { "name" : "accelerationx", "type" : "double", "doc" : "Type inferred from '0.0'" }, { "name" : "accelerationz", "type" : "double", "doc" : "Type inferred from '1.0236220472440944'" }, { "name" : "starttime", "type" : "string", "doc" : "Type inferred from '\"01/16/2019 17:51:11\"'" }, { "name" : "rssi", "type" : "int", "doc" : "Type inferred from '-60'" }, { "name" : "bt_addr", "type" : "string", "doc" : "Type inferred from '\"fa:e2:20:6e:d4:a5\"'" } ] }
Python Snippet: from beacontools import parse_packet
from beacontools import BeaconScanner, EstimoteTelemetryFrameA, EstimoteTelemetryFrameB, EstimoteFilter telemetry_b_packet = b"\x02\x01\x04\x03\x03\x9a\xfe\x17\x16\x9a\xfe\x22\x47\xa0\x38\xd5" b"\xeb\x03\x26\x40\x01\xd8\x42\xed\x73\x49\x25\x66\xbc\x2e\x50"
telemetry_b = parse_packet(telemetry_b_packet)
telemetry_a_packet = b"\x02\x01\x04\x03\x03\x9a\xfe\x17\x16\x9a\xfe\x22\x47\xa0\x38\xd5" b"\xeb\x03\x26\x40\x00\x00\x01\x41\x44\x47\xfa\xff\xff\xff\xff"
telemetry = parse_packet(telemetry_a_packet) Example Data: {"battery": 80, "id": "47a038d5eb032640", "magnetic_fieldz": -0.1484375, "magnetic_fieldx": -0.3125, "magnetic_fieldy": 0.515625, "end": "1548194024.99", "temperature": 25.5, "cputemp1": 45.0, "memory": 42.6, "protocol_version": 2, "current_motion": 420, "te": "39.767373085", "systemtime": "01/22/2019 16:53:44", "cputemp": 43.0, "uptime": 4870800, "host": "Laptop", "diskusage": "418124.2", "ipaddress": "192.168.1.241", "uuid": "20190122215344_2a41168e-31da-4ae7-bf62-0b300c69cd5b", "is_moving": false, "accelerationy": 0.015748031496062992, "accelerationx": 0.0, "accelerationz": 1.0236220472440944, "starttime": "01/22/2019 16:53:05", "rssi": -63, "bt_addr": "fa:e2:20:6e:d4:a5"} We have several values from the Ubuntu MiniFi host machine:
host diskuage ipaddress cputemp memory We have important values from the three beacons:
battery magnetic_field(x, y, z) current_motion id bt_addr rssi estimoteid temperature Reference Articles:
https://community.hortonworks.com/articles/99861/ingesting-ibeacon-data-via-ble-to-mqtt-wifi-gatewa.html https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html https://community.hortonworks.com/articles/131320/using-partitionrecord-grokreaderjsonwriter-to-pars.html Resources:
https://en.wikipedia.org/wiki/Bluetooth_low_energy_beacon https://cloud.estimote.com/#/beacons https://developer.estimote.com/ibeacon/ https://developer.apple.com/ibeacon/Getting-Started-with-iBeacon.pdf https://pypi.org/project/beacontools/ https://www.instructables.com/id/iBeacon-Entry-System-with-the-Raspberry-Pi-and-Azu/#step0 https://github.com/switchdoclabs/iBeacon-Scanner- https://developer.estimote.com/android/tutorial/part-1-setting-up/ https://developer.estimote.com/ https://github.com/flyinactor91/RasPi-iBeacons https://github.com/GillisWerrebrouck/BeaconScanner https://github.com/emanuele-falzone/pedestrian-gate-automation https://github.com/biagiobotticelli/SmartTeamTrackingServer https://github.com/citruz/beacontools/blob/master/examples/parser_example.py https://github.com/citruz/beacontools/blob/master/beacontools/packet_types/ibeacon.py
... View more
Labels: