Member since
09-11-2017
4
Posts
7
Kudos Received
0
Solutions
03-05-2018
02:06 PM
1 Kudo
Sure, here's the complete yml (some paths, USB ID, image resolution, etc. are specific to my system): https://gist.github.com/achristianson/1dea217e5fcbc88b87e526d919dad2c0. The Tensorflow install process I used is documented here: https://github.com/apache/nifi-minifi-cpp/blob/master/extensions/tensorflow/BUILDING.md. There are multiple ways this could be done, but tensorflow_cc has worked well. The TensorFlow version was used according to the latest master of tensorflow_cc as of the publish date of this article. I believe the version is 1.5.0.
... View more
02-27-2018
05:57 PM
3 Kudos
Keeping track of where a cat is can be a tricky task. In this article, we'll design and prototype a smart IoT cat sensor which detects when a cat is in proximity. This sensor is meant to be part of a larger network of cat sensors covering a target space. Flow Design Sensor input We start by polling an image sensor (camera) for image data. We use the GetUSBCamera processor configured for the USB camera device attached to our sensor controller. On our system, we had to open up permissions on the USB camera device, otherwise MiNiFi's GetUSBCamera processor would record an access denied error in the logs: chown user /dev/bus/usb/001/014
We then configure the sensor processor as such: Processors:
- name: Get
class: GetUSBCamera
Properties:
FPS: .5
Format: RAW
USB Vendor ID: 0x045e
USB Product ID: 0x0779
Machine Learning Inference on the Edge We use TensorFlow to perform class inference on the image data. We do this at the sensor rather than in a centralized system in order to significantly reduce inference latency and network bandwidth consumption. This is a three-step process:
Convert image data to a tensor using TFConvertImageToTensor Perform inference using a pre-trained NASNet Large model applied via TFApplyGraph Extract inferred classes using TFExtractTopLabels Preparation of NASNet Graph We must perform some preliminary steps to get the NASNet graph into form that MiNiFi can use. First, we export the inference graph using the export_inference_graph.py script from TensorFlow models research/slim: python export_inference_graph.py --model_name=nasnet_large --output_file=./nasnet_inf_graph.pb
This will also create a labels.txt file, which we will save for later use. Next, we download and extract the checkpoint nasnet-a_large_04_10_2017.tar.gz. Next, we use freeze_graph to integrate the pre-trained checkopint with the inference graph, and save the whole thing as a frozen graph: from tensorflow.python.tools import freeze_graph
freeze_graph.freeze_graph(input_graph='./nasnet_inf_graph.pb',
input_saver='',
input_binary=True,
input_checkpoint='./model.ckpt',
output_node_names='final_layer/predictions',
restore_op_name='save/restore_all',
filename_tensor_name='save/Const:0',
output_graph='./frozen_nasnet.pb',
clear_devices=True,
initializer_nodes='')
MiNiFi Inference Flow We use the following processors and connections to perform inference on images provided via our camera: Processors:
- name: Convert
class: TFConvertImageToTensor
Properties:
Input Format: RAW
Input Width: 1280
Input Height: 800
Crop Offset X: 240
Crop Offset Y: 0
Crop Size X: 800
Crop Size Y: 800
Output Width: 331
Output Height: 331
Channels: 3
- name: Apply
class: TFApplyGraph
Properties:
Input Node: input:0
Output Node: final_layer/predictions:0
- name: Extract
class: TFExtractTopLabels
- name: Log
class: LogAttribute
Connections:
- source name: Get
source relationship name: success
destination name: Convert
- source name: Convert
source relationship name: success
destination name: Apply
- source name: Apply
source relationship name: success
destination name: Extract
- source name: Extract
source relationship name: success
destination name: Log
We use the following processors and connections to supply TFApplyGraph with the inference graph and TFExtractTopLabels with the labels file: Processors:
- name: GraphGet
class: GetFile
scheduling strategy: TIMER_DRIVEN
scheduling period: 120 sec
Properties:
Keep Source File: true
Input Directory: .
File Filter: "frozen_nasnet.pb"
- name: GraphUpdate
class: UpdateAttribute
Properties:
tf.type: graph
- name: LabelsGet
class: GetFile
scheduling strategy: TIMER_DRIVEN
scheduling period: 120 sec
Properties:
Keep Source File: true
Input Directory: .
File Filter: "labels.txt"
- name: LabelsUpdate
class: UpdateAttribute
Properties:
tf.type: labels
Connections:
- source name: GraphGet
source relationship name: success
destination name: GraphUpdate
- source name: GraphUpdate
source relationship name: success
destination name: Apply
- source name: LabelsGet
source relationship name: success
destination name: LabelsUpdate
- source name: LabelsUpdate
source relationship name: success
destination name: Extract
Route/Store/Forward Inferences For the purposes of this prototype, we'll use RouteOnAttribute in conjunction with the NiFi Expression Language forwarded to an ExecuteProcess using notify-send to notify us of a CAT_DETECTED event. In a production system, we may want to use Remote Processing Groups to forward data of interest to a centralzed system. Our prototype flow looks like this: Processors:
- name: Route
class: RouteOnAttribute
Properties:
cat: ${"tf.top_label_0":matches('(282|283|284|285|286|287|288|289|290|291|292|293|294):.*')}
auto-terminated relationships list:
- unmatched
- name: Notify
class: ExecuteProcess
Properties:
Command: notify-send CAT_DETECTED
auto-terminated relationships list:
- success
Connections:
- source name: Log
source relationship name: success
destination name: Route
- source name: Route
source relationship name: cat
destination name: Notify
Conclusion We can now hold a cat up to our sensor and confirm that it detects a cat and triggers our notification: ----------
Standard FlowFile Attributes
UUID:0143d35c-1be5-11e8-a6f9-b06ebf2c6de8
EntryDate:2018-02-27 12:38:21.748
lineageStartDate:2018-02-27 12:38:21.748
Size:4020 Offset:0
FlowFile Attributes Map Content
key:filename value:1519753101748318191
key:path value:.
key:tf.top_label_0 value:284:Persian cat
key:tf.top_label_1 value:259:Samoyed, Samoyede
key:tf.top_label_2 value:357:weasel
key:tf.top_label_3 value:360:black-footed ferret, ferret, Mustela nigripes
key:tf.top_label_4 value:158:papillon
key:uuid value:0143d35c-1be5-11e8-a6f9-b06ebf2c6de8
FlowFile Resource Claim Content
Content Claim:/home/achristianson/workspace/minifi-article-2018-02-22/flow/contentrepository/1519753075140-43
----------
[2018-02-27 12:38:23.958] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 02951658-1be5-11e8-9218-b06ebf2c6de8 from Route to relationship cat
[2018-02-27 12:38:25.754] [org::apache::nifi::minifi::processors::ExecuteProcess] [info] Execute Command notify-send CAT_DETECTED
MiNiFi - C++ makes it easy to create an IoT cat sensor. To complete our cat tracking system, we simply need to deploy a network of these sensors in the target space and configure the flow to deliver inferences to a centralized NiFi instance for storage and further analysis. We might also consider combining the image data with other data such as GPS sensor data using the GetGPS processor.
... View more
Labels:
09-11-2017
09:17 PM
2 Kudos
Apache NiFi allows us to rapidly create and operate very flexible and powerful
dataflows. There are times, however, when the full flexibility and power of NiFi
may not be required for the task at hand. For these times, MiNiFi may be a
good fit. In particular, MiNiFI - C++ is worth considering when resources such
as memory and compute power are constrained to such an extent that it is not
feasible to run a full Java virtual machine. We are going to demonstrate how to deploy a MiNiFi - C++ dataflow to a
cloud compute node that has only 64 megabytes of RAM. These types of nodes may
be useful as a cost-savings measure, because cloud compute services typically
charge based on resource usage. We'll start by cloning the latest nifi-minifi-cpp src: $ git clone https://github.com/apache/nifi-minifi-cpp.git
$ cd nifi-minifi-cpp/
Since this demo relies on a few commits which are not yet merged into master,
we'll cherry pick the commits: $ git remote add achristianson https://github.com/achristianson/nifi-minifi-cpp.git
$ git fetch --all
$ git cherry-pick cb9bdf 6800ae0
Next, we'll create a python virtual environment and add some helpful MiNiFi
modules to the PYTHONPATH which will help us create our dataflow: $ virtualenv ./env
$ . ./env/bin/activate
$ pip install --upgrade pip
$ pip install --upgrade pyyaml docker
$ export PYTHONPATH="$( pwd )"/docker/test/integration
Next, we'll start python: $ python Next, we'll create a dataflow: >>> from minifi import *
>>> f = flow_yaml(ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp'))
>>> print(f)
Connections:
- destination id: 65472f6f-d87e-43c7-aec2-208046c028bc
name: c42fa886-b7e0-48ac-843d-6a9eeb66eb56
source id: ad2f2e7d-dde9-4496-bc63-0464e4f52a01
source relationship name: success
- destination id: 15a29d90-7012-44c2-b677-b787166c7426
name: f2f7463e-ed4c-4e8a-8752-01740c60775f
source id: 65472f6f-d87e-43c7-aec2-208046c028bc
source relationship name: success
Controller Services: []
Flow Controller:
name: MiNiFi Flow
Processors:
- Properties:
Listening Port: 8080
auto-terminated relationships list: []
class: org.apache.nifi.processors.standard.ListenHTTP
id: ad2f2e7d-dde9-4496-bc63-0464e4f52a01
name: ad2f2e7d-dde9-4496-bc63-0464e4f52a01
penalization period: 30 sec
run duration nanos: 0
scheduling period: 1 sec
scheduling strategy: EVENT_DRIVEN
yield period: 1 sec
- Properties: {}
auto-terminated relationships list: []
class: org.apache.nifi.processors.standard.LogAttribute
id: 65472f6f-d87e-43c7-aec2-208046c028bc
name: 65472f6f-d87e-43c7-aec2-208046c028bc
penalization period: 30 sec
run duration nanos: 0
scheduling period: 1 sec
scheduling strategy: EVENT_DRIVEN
yield period: 1 sec
- Properties:
Output Directory: /tmp
auto-terminated relationships list:
- success
- failure
class: org.apache.nifi.processors.standard.PutFile
id: 15a29d90-7012-44c2-b677-b787166c7426
name: 15a29d90-7012-44c2-b677-b787166c7426
penalization period: 30 sec
run duration nanos: 0
scheduling period: 1 sec
scheduling strategy: EVENT_DRIVEN
yield period: 1 sec
Remote Processing Groups: [] This flow looks good, so we'll save it to config.yml and exit python: >>> with open('conf/config.yml', 'w') as cf:
... cf.write(f)
...
>>>
Next, we'll build the docker image: $ cd docker
$ ./DockerBuild.sh 1000 1000 0.3.0 minificppsource ..
Now we're ready to deploy the image. For this demo, we'll deploy to hyper.sh and
assume that hyper has already been configured. The container size will be s1,
which is an instance with only 64MB of ram. We'll also allocate and attach a
floating IP (FIP): $ hyper load -l apacheminificpp:0.3.0
$ hyper run --size s1 -d --name minifi -p 8080:8080 apacheminificpp:0.3.0
$ hyper fip allocate 1
199.245.60.9
$ hyper fip attach 199.245.60.9 minifi
Now our MiNiFi - C++ container is running and has an IP attached to it. Let's
generate and send some data to the new instance: $ dd if=/dev/urandom of=./testdat bs=1M count=1
$ sha256sum ./testdat
da388a0bd1f69aa94674a20dd285df1b2e553b8cd9425e33498398d25846d692 ./testdat
$ curl -vvv -X POST --data-binary @./testdat http://199.245.60.9:8080/contentListener
* About to connect() to 199.245.60.9 port 8080 (#0)
* Trying 199.245.60.9...
* Connected to 199.245.60.9 (199.245.60.9) port 8080 (#0)
> POST /contentListener HTTP/1.1
> User-Agent: curl/7.29.0
> Host: 199.245.60.9:8080
> Accept: */*
> Content-Length: 1048576
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
< HTTP/1.1 200 OK
< Content-Type: text/html
< Content-Length: 0
<
* Connection #0 to host 199.245.60.9 left intact
The instance successfully received the test data. For good measure, let's verify
the data stored on the instance has the same sha256 sum as the local data: $ hyper exec minifi ls /tmp
150516327995820887
$ hyper exec minifi sha256sum /tmp/1505163279958208874
da388a0bd1f69aa94674a20dd285df1b2e553b8cd9425e33498398d25846d692 /tmp/1505163279958208874
The sha256 sum matches, so our MiNiFi - C++ instance has successfully received
and stored the generated test data. We can now clean up all the resources if
desired. Although the overall process to deploy a full-blown Apache NiFi
instance would be similar, it would be impossible to use an s1 (64MB) instance. We can
therefore significantly save on compute service expenses by deploying MiNiFi -
C++ when flows are simple enough that they fit within the limited feature scope
of MiNiFi.
... View more
Labels: