Member since
07-30-2019
93
Posts
96
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1286 | 08-12-2016 02:57 PM | |
1774 | 05-02-2016 09:14 PM |
03-11-2022
04:08 AM
hi, this isn't working as it is on nifi 1.14, can you give me a hand please? i used a "generateFlowfile" with some random text, and connected to executeScript but get the following: ExecuteScript[id=78c5739f-017f-1000-0000-0000016ca301] ExecuteScript[id=78c5739f-017f-1000-0000-0000016ca301] failed to process due to javax.script.ScriptException: java.lang.NullPointerException: java.lang.NullPointerException in <script> at line number 25; rolling back session: java.lang.NullPointerException ↳ causes: Traceback (most recent call last): File "<script>", line 25, in <module> java.lang.NullPointerException java.lang.NullPointerException: java.lang.NullPointerException ↳ causes: javax.script.ScriptException: java.lang.NullPointerException: java.lang.NullPointerException in <script> at line number 25 ↳ causes: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: java.lang.NullPointerException: java.lang.NullPointerException in <script> at line number 25
... View more
01-18-2021
02:40 AM
@Law serveraddress is hostname of host where nifi service is installed. E.g you have installed it on samplehost.com then it would be "http://samplehost.com:8080/nifi" .If you enable SSL for nifi later then the URL will change to "https://samplehost.com:<secure_port>/nifi". Also you should check that host and port are reachable from your browser to access NiFi UI. If this answer resolves your issue or allows you to move forward, please choose to ACCEPT this solution and close this topic. If you have further dialogue on this topic please comment here. -Akash
... View more
01-30-2020
04:56 AM
Introduction
Image classification is a powerful tool that can be used to gather insight that is not contained in textual information like (titles, comments, tags, etc...). The objects in the image posted along with a tweet for example may have nothing to do with what was written in the post. In this exercise we will look at how a real time stream of tweets on any topic can be broken down and analyzed in order to classify the main object/subject of a photo by leveraging a deep learning model that has been deployed as a Rest Endpoint via CDSW.
Requirements
NiFi
CDSW
Twitter Developer Tokens - developer.twitter.com
A topic of interest
Setup
To get started we need to get a model deployed in CDSW that will serve as our endpoint for predictions of our incoming images.
In CDSW Create a project and then add the python code we will use to score any image URL:
import json
import csv
import keras
#import pandas as pandas
from keras.applications import ResNet50
from keras.preprocessing.image import img_to_array
from keras.applications import imagenet_utils
from keras.preprocessing.image import load_img
from PIL import Image
import numpy as np
import io
from io import BytesIO
import os
#import magic
import requests
#!pip install keras tensorflow Pillow requests numpy
def load_model():
# load the pre-trained Keras model (here we are using a model
# pre-trained on ImageNet and provided by Keras, but you can
# substitute in your own networks just as easily)
global model
model = ResNet50(weights="imagenet")
#model2 = VGG
#model3
def prepare_image(image, target):
# if the image mode is not RGB, convert it
if image.mode != "RGB":
image = image.convert("RGB")
# resize the input image and preprocess it
image = image.resize(target)
image = img_to_array(image)
image = np.expand_dims(image, axis=0)
image = imagenet_utils.preprocess_input(image)
# return the processed image
return image
def predict(im):
# initialize the data dictionary that will be returned from the
# view
# read the image in PIL format
data = {}
filename='temp.jpeg'
response = requests.get(im["url"])
with open(filename, 'wb') as f:
f.write(response.content)
image = load_img(filename, target_size=(224, 224))
# preprocess the image and prepare it for classification
image = prepare_image(image, target=(224, 224))
# classify the input image and then initialize the list
# of predictions to return to the client
preds = model.predict(image)
results = imagenet_utils.decode_predictions(preds)
data["predictions"] = []
# loop over the results and add them to the list of
# returned predictions
for (imagenetID, label, prob) in results[0]:
r = {"label": label, "probability": float(prob)}
data["predictions"].append(r)
# return the data dictionary as a JSON response
jdata = json.dumps(data)
return jdata
load_model()
Now, CDSW will need to have some "pip installs" if you are using a clean environment without any dependencies installed.
In the cdsw_build.sh script,
Add the following: pip2 install sklearn
pip2 install keras
pip2 install tensorflow
pip2 install Pillow
pip2 install requests
pip2 install numpy You can run this in your workspace if you want to test code interactively or make adjustments to the model.
Now, navigate back to the Project page and In Model, create new model.
Give the model a name and select the python file just created
For the function, enter <predict> (without the brackets).
You need a sample request like the following:
{
"url": "<a href="<a href="<a href="https://miro.medium.com/max/3027/0*kp8rJzqHjagMj22J" target="_blank">https://miro.medium.com/max/3027/0*kp8rJzqHjagMj22J</a>" target="_blank"><a href="https://miro.medium.com/max/3027/0*kp8rJzqHjagMj22J</a" target="_blank">https://miro.medium.com/max/3027/0*kp8rJzqHjagMj22J</a</a>>" target="_blank"><a href="<a href="https://miro.medium.com/max/3027/0*kp8rJzqHjagMj22J</a" target="_blank">https://miro.medium.com/max/3027/0*kp8rJzqHjagMj22J</a</a>" target="_blank"><a href="https://miro.medium.com/max/3027/0*kp8rJzqHjagMj22J</a</a" target="_blank">https://miro.medium.com/max/3027/0*kp8rJzqHjagMj22J</a</a</a>>>"
}
And a sample response, like the following:
{
"predictions": [
{
"probability": 0.7351706624031067,
"label": "airliner"
},
{
"probability": 0.19801650941371918,
"label": "space_shuttle"
},
{
"probability": 0.05893068388104439,
"label": "wing"
},
{
"probability": 0.006579721812158823,
"label": "warplane"
},
{
"probability": 0.0006061011808924377,
"label": "airship"
}
],
"success": true
}
Then, you can deploy. After a couple of minutes, the status will show deployed. You can click test in the UI and get back a prediction.
Now, we have an endpoint capable of predicting any image given the images url.
Next step is to setup NiFi.
The following is a flow which can read from twitter and then score against your endpoint:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.3">
<description>predict the images posted along with tweets about any topic you specify in the filters of the input processor</description>
<groupId>f0f4f2f6-016f-1000-ffff-ffffd1df01cd</groupId>
<name>twitter_image_prediction</name>
<snippet>
<connections>
<id>219fa01a-56a5-38aa-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>1b0c31a8-ec30-3832-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>Response</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>7f0d4823-8b99-305b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>5021a311-2a61-30eb-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>55ad95fb-e275-3c64-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>Response</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>7d54a566-45aa-3c17-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>5c06faac-4180-323a-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>c82b45d8-617b-396c-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>isImage</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>f29be6bf-ddbf-3b6d-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6b6627d4-649b-3070-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>55ad95fb-e275-3c64-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>Original</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>c82b45d8-617b-396c-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>8dcd8171-6516-36b1-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>912a453f-e17e-35a5-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>matched</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>d93241be-c09a-3a4c-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>98eb5d14-1885-3d33-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>55ad95fb-e275-3c64-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>probHigh</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>912a453f-e17e-35a5-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>b36415ac-5cc5-39bb-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>f5dac0c6-3680-3d94-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>e5fa80e4-9690-38a3-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>babe17fd-025b-3b3f-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>63d78c45-e918-371a-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>cf25469b-28b9-3f90-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>cd56c28b-94c9-31e9-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>7f0d4823-8b99-305b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>63d78c45-e918-371a-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>ce19ac81-e486-30bb-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>f29be6bf-ddbf-3b6d-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>matched</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>f5dac0c6-3680-3d94-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>cfbbc63d-28a4-3888-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>cf25469b-28b9-3f90-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>isImage</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>f29be6bf-ddbf-3b6d-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>dbdf2bf4-a2a3-30cc-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>7d54a566-45aa-3c17-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>isImage</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>f29be6bf-ddbf-3b6d-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>dc2c9d0e-5315-39ae-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>9e48d253-c1a1-3df8-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>912a453f-e17e-35a5-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f2d088e5-6458-3ca3-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>d93241be-c09a-3a4c-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>matched</selectedRelationships>
<source>
<groupId>74a3848d-1f7a-3866-0000-000000000000</groupId>
<id>1b0c31a8-ec30-3832-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<funnels>
<id>55ad95fb-e275-3c64-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>2090.0000059810604</x>
<y>712.0</y>
</position>
</funnels>
<funnels>
<id>9e48d253-c1a1-3df8-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>2602.0000059810604</x>
<y>776.0</y>
</position>
</funnels>
<processors>
<id>1b0c31a8-ec30-3832-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>1770.0000059810604</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Return Type</key>
<value>
<name>Return Type</name>
</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>
<name>Path Not Found Behavior</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
<entry>
<key>json_prediction</key>
<value>
<name>json_prediction</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Destination</key>
<value>flowfile-content</value>
</entry>
<entry>
<key>Return Type</key>
<value>auto-detect</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>ignore</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
<entry>
<key>json_prediction</key>
<value>$.response</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>EvaluateJsonPath</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.EvaluateJsonPath</type>
</processors>
<processors>
<id>63d78c45-e918-371a-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>1282.0000059810604</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>canonical-value-lookup-cache-size</key>
<value>
<name>canonical-value-lookup-cache-size</name>
</value>
</entry>
<entry>
<key>accessKey</key>
<value>
<name>accessKey</name>
</value>
</entry>
<entry>
<key>Authorization</key>
<value>
<name>Authorization</name>
</value>
</entry>
<entry>
<key>request</key>
<value>
<name>request</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>canonical-value-lookup-cache-size</key>
<value>100</value>
</entry>
<entry>
<key>accessKey</key>
<value>mkvz7qe59mw0jtsybqtg2ooybc5w3v1p</value>
</entry>
<entry>
<key>Authorization</key>
<value>Bearer somevalue</value>
</entry>
<entry>
<key>request</key>
<value>{"accessKey":"your_access_key","request":{
"url": "${media}"
}}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>7d54a566-45aa-3c17-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>1210.0000059810604</x>
<y>648.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>HTTP Method</key>
<value>
<name>HTTP Method</name>
</value>
</entry>
<entry>
<key>Remote URL</key>
<value>
<name>Remote URL</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Connection Timeout</key>
<value>
<name>Connection Timeout</name>
</value>
</entry>
<entry>
<key>Read Timeout</key>
<value>
<name>Read Timeout</name>
</value>
</entry>
<entry>
<key>Include Date Header</key>
<value>
<name>Include Date Header</name>
</value>
</entry>
<entry>
<key>Follow Redirects</key>
<value>
<name>Follow Redirects</name>
</value>
</entry>
<entry>
<key>Attributes to Send</key>
<value>
<name>Attributes to Send</name>
</value>
</entry>
<entry>
<key>Basic Authentication Username</key>
<value>
<name>Basic Authentication Username</name>
</value>
</entry>
<entry>
<key>Basic Authentication Password</key>
<value>
<name>Basic Authentication Password</name>
</value>
</entry>
<entry>
<key>proxy-configuration-service</key>
<value>
<identifiesControllerService>org.apache.nifi.proxy.ProxyConfigurationService</identifiesControllerService>
<name>proxy-configuration-service</name>
</value>
</entry>
<entry>
<key>Proxy Host</key>
<value>
<name>Proxy Host</name>
</value>
</entry>
<entry>
<key>Proxy Port</key>
<value>
<name>Proxy Port</name>
</value>
</entry>
<entry>
<key>Proxy Type</key>
<value>
<name>Proxy Type</name>
</value>
</entry>
<entry>
<key>invokehttp-proxy-user</key>
<value>
<name>invokehttp-proxy-user</name>
</value>
</entry>
<entry>
<key>invokehttp-proxy-password</key>
<value>
<name>invokehttp-proxy-password</name>
</value>
</entry>
<entry>
<key>Put Response Body In Attribute</key>
<value>
<name>Put Response Body In Attribute</name>
</value>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>
<name>Max Length To Put In Attribute</name>
</value>
</entry>
<entry>
<key>Digest Authentication</key>
<value>
<name>Digest Authentication</name>
</value>
</entry>
<entry>
<key>Always Output Response</key>
<value>
<name>Always Output Response</name>
</value>
</entry>
<entry>
<key>Add Response Headers to Request</key>
<value>
<name>Add Response Headers to Request</name>
</value>
</entry>
<entry>
<key>Content-Type</key>
<value>
<name>Content-Type</name>
</value>
</entry>
<entry>
<key>send-message-body</key>
<value>
<name>send-message-body</name>
</value>
</entry>
<entry>
<key>Use Chunked Encoding</key>
<value>
<name>Use Chunked Encoding</name>
</value>
</entry>
<entry>
<key>Penalize on "No Retry"</key>
<value>
<name>Penalize on "No Retry"</name>
</value>
</entry>
<entry>
<key>use-etag</key>
<value>
<name>use-etag</name>
</value>
</entry>
<entry>
<key>etag-max-cache-size</key>
<value>
<name>etag-max-cache-size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>HTTP Method</key>
<value>GET</value>
</entry>
<entry>
<key>Remote URL</key>
<value>${media}</value>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Connection Timeout</key>
<value>5 secs</value>
</entry>
<entry>
<key>Read Timeout</key>
<value>15 secs</value>
</entry>
<entry>
<key>Include Date Header</key>
<value>True</value>
</entry>
<entry>
<key>Follow Redirects</key>
<value>True</value>
</entry>
<entry>
<key>Attributes to Send</key>
</entry>
<entry>
<key>Basic Authentication Username</key>
</entry>
<entry>
<key>Basic Authentication Password</key>
</entry>
<entry>
<key>proxy-configuration-service</key>
</entry>
<entry>
<key>Proxy Host</key>
</entry>
<entry>
<key>Proxy Port</key>
</entry>
<entry>
<key>Proxy Type</key>
<value>http</value>
</entry>
<entry>
<key>invokehttp-proxy-user</key>
</entry>
<entry>
<key>invokehttp-proxy-password</key>
</entry>
<entry>
<key>Put Response Body In Attribute</key>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>256</value>
</entry>
<entry>
<key>Digest Authentication</key>
<value>false</value>
</entry>
<entry>
<key>Always Output Response</key>
<value>false</value>
</entry>
<entry>
<key>Add Response Headers to Request</key>
<value>false</value>
</entry>
<entry>
<key>Content-Type</key>
<value>${mime.type}</value>
</entry>
<entry>
<key>send-message-body</key>
<value>true</value>
</entry>
<entry>
<key>Use Chunked Encoding</key>
<value>false</value>
</entry>
<entry>
<key>Penalize on "No Retry"</key>
<value>false</value>
</entry>
<entry>
<key>use-etag</key>
<value>false</value>
</entry>
<entry>
<key>etag-max-cache-size</key>
<value>10MB</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>InvokeHTTP</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>Failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>No Retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>Original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>Response</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>Retry</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.InvokeHTTP</type>
</processors>
<processors>
<id>7f0d4823-8b99-305b-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>1522.0000059810604</x>
<y>360.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>HTTP Method</key>
<value>
<name>HTTP Method</name>
</value>
</entry>
<entry>
<key>Remote URL</key>
<value>
<name>Remote URL</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Connection Timeout</key>
<value>
<name>Connection Timeout</name>
</value>
</entry>
<entry>
<key>Read Timeout</key>
<value>
<name>Read Timeout</name>
</value>
</entry>
<entry>
<key>Include Date Header</key>
<value>
<name>Include Date Header</name>
</value>
</entry>
<entry>
<key>Follow Redirects</key>
<value>
<name>Follow Redirects</name>
</value>
</entry>
<entry>
<key>Attributes to Send</key>
<value>
<name>Attributes to Send</name>
</value>
</entry>
<entry>
<key>Basic Authentication Username</key>
<value>
<name>Basic Authentication Username</name>
</value>
</entry>
<entry>
<key>Basic Authentication Password</key>
<value>
<name>Basic Authentication Password</name>
</value>
</entry>
<entry>
<key>proxy-configuration-service</key>
<value>
<identifiesControllerService>org.apache.nifi.proxy.ProxyConfigurationService</identifiesControllerService>
<name>proxy-configuration-service</name>
</value>
</entry>
<entry>
<key>Proxy Host</key>
<value>
<name>Proxy Host</name>
</value>
</entry>
<entry>
<key>Proxy Port</key>
<value>
<name>Proxy Port</name>
</value>
</entry>
<entry>
<key>Proxy Type</key>
<value>
<name>Proxy Type</name>
</value>
</entry>
<entry>
<key>invokehttp-proxy-user</key>
<value>
<name>invokehttp-proxy-user</name>
</value>
</entry>
<entry>
<key>invokehttp-proxy-password</key>
<value>
<name>invokehttp-proxy-password</name>
</value>
</entry>
<entry>
<key>Put Response Body In Attribute</key>
<value>
<name>Put Response Body In Attribute</name>
</value>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>
<name>Max Length To Put In Attribute</name>
</value>
</entry>
<entry>
<key>Digest Authentication</key>
<value>
<name>Digest Authentication</name>
</value>
</entry>
<entry>
<key>Always Output Response</key>
<value>
<name>Always Output Response</name>
</value>
</entry>
<entry>
<key>Add Response Headers to Request</key>
<value>
<name>Add Response Headers to Request</name>
</value>
</entry>
<entry>
<key>Content-Type</key>
<value>
<name>Content-Type</name>
</value>
</entry>
<entry>
<key>send-message-body</key>
<value>
<name>send-message-body</name>
</value>
</entry>
<entry>
<key>Use Chunked Encoding</key>
<value>
<name>Use Chunked Encoding</name>
</value>
</entry>
<entry>
<key>Penalize on "No Retry"</key>
<value>
<name>Penalize on "No Retry"</name>
</value>
</entry>
<entry>
<key>use-etag</key>
<value>
<name>use-etag</name>
</value>
</entry>
<entry>
<key>etag-max-cache-size</key>
<value>
<name>etag-max-cache-size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>HTTP Method</key>
<value>POST</value>
</entry>
<entry>
<key>Remote URL</key>
<value>cdsw_endpoint_url</value>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Connection Timeout</key>
<value>5 secs</value>
</entry>
<entry>
<key>Read Timeout</key>
<value>15 secs</value>
</entry>
<entry>
<key>Include Date Header</key>
<value>True</value>
</entry>
<entry>
<key>Follow Redirects</key>
<value>True</value>
</entry>
<entry>
<key>Attributes to Send</key>
<value>accessKey</value>
</entry>
<entry>
<key>Basic Authentication Username</key>
</entry>
<entry>
<key>Basic Authentication Password</key>
</entry>
<entry>
<key>proxy-configuration-service</key>
</entry>
<entry>
<key>Proxy Host</key>
</entry>
<entry>
<key>Proxy Port</key>
</entry>
<entry>
<key>Proxy Type</key>
<value>http</value>
</entry>
<entry>
<key>invokehttp-proxy-user</key>
</entry>
<entry>
<key>invokehttp-proxy-password</key>
</entry>
<entry>
<key>Put Response Body In Attribute</key>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>256</value>
</entry>
<entry>
<key>Digest Authentication</key>
<value>false</value>
</entry>
<entry>
<key>Always Output Response</key>
<value>false</value>
</entry>
<entry>
<key>Add Response Headers to Request</key>
<value>false</value>
</entry>
<entry>
<key>Content-Type</key>
<value>${mime.type}</value>
</entry>
<entry>
<key>send-message-body</key>
<value>true</value>
</entry>
<entry>
<key>Use Chunked Encoding</key>
<value>false</value>
</entry>
<entry>
<key>Penalize on "No Retry"</key>
<value>false</value>
</entry>
<entry>
<key>use-etag</key>
<value>false</value>
</entry>
<entry>
<key>etag-max-cache-size</key>
<value>10MB</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>InvokeHTTP</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>Failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>No Retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>Original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>Response</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>Retry</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.InvokeHTTP</type>
</processors>
<processors>
<id>912a453f-e17e-35a5-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>2442.0000059810604</x>
<y>272.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Routing Strategy</key>
<value>
<name>Routing Strategy</name>
</value>
</entry>
<entry>
<key>probHigh</key>
<value>
<name>probHigh</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Routing Strategy</key>
<value>Route to Property name</value>
</entry>
<entry>
<key>probHigh</key>
<value>${prob1:gt(0.75)}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>RouteOnAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>probHigh</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
</processors>
<processors>
<id>c82b45d8-617b-396c-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>1034.0000059810604</x>
<y>944.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>HTTP Method</key>
<value>
<name>HTTP Method</name>
</value>
</entry>
<entry>
<key>Remote URL</key>
<value>
<name>Remote URL</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Connection Timeout</key>
<value>
<name>Connection Timeout</name>
</value>
</entry>
<entry>
<key>Read Timeout</key>
<value>
<name>Read Timeout</name>
</value>
</entry>
<entry>
<key>Include Date Header</key>
<value>
<name>Include Date Header</name>
</value>
</entry>
<entry>
<key>Follow Redirects</key>
<value>
<name>Follow Redirects</name>
</value>
</entry>
<entry>
<key>Attributes to Send</key>
<value>
<name>Attributes to Send</name>
</value>
</entry>
<entry>
<key>Basic Authentication Username</key>
<value>
<name>Basic Authentication Username</name>
</value>
</entry>
<entry>
<key>Basic Authentication Password</key>
<value>
<name>Basic Authentication Password</name>
</value>
</entry>
<entry>
<key>proxy-configuration-service</key>
<value>
<identifiesControllerService>org.apache.nifi.proxy.ProxyConfigurationService</identifiesControllerService>
<name>proxy-configuration-service</name>
</value>
</entry>
<entry>
<key>Proxy Host</key>
<value>
<name>Proxy Host</name>
</value>
</entry>
<entry>
<key>Proxy Port</key>
<value>
<name>Proxy Port</name>
</value>
</entry>
<entry>
<key>Proxy Type</key>
<value>
<name>Proxy Type</name>
</value>
</entry>
<entry>
<key>invokehttp-proxy-user</key>
<value>
<name>invokehttp-proxy-user</name>
</value>
</entry>
<entry>
<key>invokehttp-proxy-password</key>
<value>
<name>invokehttp-proxy-password</name>
</value>
</entry>
<entry>
<key>Put Response Body In Attribute</key>
<value>
<name>Put Response Body In Attribute</name>
</value>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>
<name>Max Length To Put In Attribute</name>
</value>
</entry>
<entry>
<key>Digest Authentication</key>
<value>
<name>Digest Authentication</name>
</value>
</entry>
<entry>
<key>Always Output Response</key>
<value>
<name>Always Output Response</name>
</value>
</entry>
<entry>
<key>Add Response Headers to Request</key>
<value>
<name>Add Response Headers to Request</name>
</value>
</entry>
<entry>
<key>Content-Type</key>
<value>
<name>Content-Type</name>
</value>
</entry>
<entry>
<key>send-message-body</key>
<value>
<name>send-message-body</name>
</value>
</entry>
<entry>
<key>Use Chunked Encoding</key>
<value>
<name>Use Chunked Encoding</name>
</value>
</entry>
<entry>
<key>Penalize on "No Retry"</key>
<value>
<name>Penalize on "No Retry"</name>
</value>
</entry>
<entry>
<key>use-etag</key>
<value>
<name>use-etag</name>
</value>
</entry>
<entry>
<key>etag-max-cache-size</key>
<value>
<name>etag-max-cache-size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>HTTP Method</key>
<value>GET</value>
</entry>
<entry>
<key>Remote URL</key>
<value>${media}</value>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Connection Timeout</key>
<value>5 secs</value>
</entry>
<entry>
<key>Read Timeout</key>
<value>15 secs</value>
</entry>
<entry>
<key>Include Date Header</key>
<value>True</value>
</entry>
<entry>
<key>Follow Redirects</key>
<value>True</value>
</entry>
<entry>
<key>Attributes to Send</key>
</entry>
<entry>
<key>Basic Authentication Username</key>
</entry>
<entry>
<key>Basic Authentication Password</key>
</entry>
<entry>
<key>proxy-configuration-service</key>
</entry>
<entry>
<key>Proxy Host</key>
</entry>
<entry>
<key>Proxy Port</key>
</entry>
<entry>
<key>Proxy Type</key>
<value>http</value>
</entry>
<entry>
<key>invokehttp-proxy-user</key>
</entry>
<entry>
<key>invokehttp-proxy-password</key>
</entry>
<entry>
<key>Put Response Body In Attribute</key>
<value>pic</value>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>256</value>
</entry>
<entry>
<key>Digest Authentication</key>
<value>false</value>
</entry>
<entry>
<key>Always Output Response</key>
<value>false</value>
</entry>
<entry>
<key>Add Response Headers to Request</key>
<value>false</value>
</entry>
<entry>
<key>Content-Type</key>
<value>${mime.type}</value>
</entry>
<entry>
<key>send-message-body</key>
<value>true</value>
</entry>
<entry>
<key>Use Chunked Encoding</key>
<value>false</value>
</entry>
<entry>
<key>Penalize on "No Retry"</key>
<value>false</value>
</entry>
<entry>
<key>use-etag</key>
<value>false</value>
</entry>
<entry>
<key>etag-max-cache-size</key>
<value>10MB</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>InvokeHTTP</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>Failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>No Retry</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>Original</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>Response</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>Retry</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.InvokeHTTP</type>
</processors>
<processors>
<id>cf25469b-28b9-3f90-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>1050.0000059810604</x>
<y>200.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
<entry>
<key>Line-by-Line Evaluation Mode</key>
<value>
<name>Line-by-Line Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>{"accessKey":"mkvz7qe59mw0jtsybqtg2ooybc5w3v1p","request":{
"url": "${media}"
}}</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Regex Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
<entry>
<key>Line-by-Line Evaluation Mode</key>
<value>All</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ReplaceText</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
<processors>
<id>d93241be-c09a-3a4c-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>2434.0000059810604</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Return Type</key>
<value>
<name>Return Type</name>
</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>
<name>Path Not Found Behavior</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
<entry>
<key>prob1</key>
<value>
<name>prob1</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Destination</key>
<value>flowfile-attribute</value>
</entry>
<entry>
<key>Return Type</key>
<value>auto-detect</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>ignore</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
<entry>
<key>prob1</key>
<value>$.predictions[0].probability</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>EvaluateJsonPath</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.EvaluateJsonPath</type>
</processors>
<processors>
<id>e5fa80e4-9690-38a3-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>33.000009169540704</y>
</position>
<bundle>
<artifact>nifi-social-media-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Twitter Endpoint</key>
<value>
<name>Twitter Endpoint</name>
</value>
</entry>
<entry>
<key>max-client-error-retries</key>
<value>
<name>max-client-error-retries</name>
</value>
</entry>
<entry>
<key>Consumer Key</key>
<value>
<name>Consumer Key</name>
</value>
</entry>
<entry>
<key>Consumer Secret</key>
<value>
<name>Consumer Secret</name>
</value>
</entry>
<entry>
<key>Access Token</key>
<value>
<name>Access Token</name>
</value>
</entry>
<entry>
<key>Access Token Secret</key>
<value>
<name>Access Token Secret</name>
</value>
</entry>
<entry>
<key>Languages</key>
<value>
<name>Languages</name>
</value>
</entry>
<entry>
<key>Terms to Filter On</key>
<value>
<name>Terms to Filter On</name>
</value>
</entry>
<entry>
<key>IDs to Follow</key>
<value>
<name>IDs to Follow</name>
</value>
</entry>
<entry>
<key>Locations to Filter On</key>
<value>
<name>Locations to Filter On</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Twitter Endpoint</key>
<value>Filter Endpoint</value>
</entry>
<entry>
<key>max-client-error-retries</key>
<value>5</value>
</entry>
<entry>
<key>Consumer Key</key>
</entry>
<entry>
<key>Consumer Secret</key>
</entry>
<entry>
<key>Access Token</key>
</entry>
<entry>
<key>Access Token Secret</key>
</entry>
<entry>
<key>Languages</key>
<value>en</value>
</entry>
<entry>
<key>Terms to Filter On</key>
<value>dog</value>
</entry>
<entry>
<key>IDs to Follow</key>
</entry>
<entry>
<key>Locations to Filter On</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>GetTwitter</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.twitter.GetTwitter</type>
</processors>
<processors>
<id>f29be6bf-ddbf-3b6d-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>738.0000059810604</x>
<y>416.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Routing Strategy</key>
<value>
<name>Routing Strategy</name>
</value>
</entry>
<entry>
<key>isImage</key>
<value>
<name>isImage</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Routing Strategy</key>
<value>Route to Property name</value>
</entry>
<entry>
<key>isImage</key>
<value>${type:equals("photo")}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>RouteOnAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>isImage</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
</processors>
<processors>
<id>f5dac0c6-3680-3d94-0000-000000000000</id>
<parentGroupId>74a3848d-1f7a-3866-0000-000000000000</parentGroupId>
<position>
<x>738.0000059810604</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.10.0.2.0.0.0-106</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Return Type</key>
<value>
<name>Return Type</name>
</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>
<name>Path Not Found Behavior</name>
</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>
<name>Null Value Representation</name>
</value>
</entry>
<entry>
<key>media</key>
<value>
<name>media</name>
</value>
</entry>
<entry>
<key>type</key>
<value>
<name>type</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Destination</key>
<value>flowfile-attribute</value>
</entry>
<entry>
<key>Return Type</key>
<value>auto-detect</value>
</entry>
<entry>
<key>Path Not Found Behavior</key>
<value>ignore</value>
</entry>
<entry>
<key>Null Value Representation</key>
<value>empty string</value>
</entry>
<entry>
<key>media</key>
<value>$.entities.media[0].media_url</value>
</entry>
<entry>
<key>type</key>
<value>$.entities.media[0].type</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>EvaluateJsonPath</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>unmatched</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.EvaluateJsonPath</type>
</processors>
</snippet>
<timestamp>01/30/2020 12:03:02 UTC</timestamp>
</template>
You need to provide your Twitter credentials, the CDSW token and URL in the processors requiring additional configuration. Then you can start the flow.
example nifi flow
Looking at the queue at the very end of the above flow, there is a split where I am routing images with greater than 75% confidence away from the rest of the predictions.
In this example I am just looking at dogs, so let's see what the results look like...
For this tweet, we get the following prediction:
{"predictions": [{"probability": 0.9260697364807129, "label": "Boston_bull"}, {"probability": 0.06926842778921127, "label": "French_bulldog"}, {"probability": 0.0018951991805806756, "label": "toy_terrier"}, {"probability": 0.0010464458027854562, "label": "boxer"}, {"probability": 0.0003390783385839313, "label": "Staffordshire_bullterrier"}]}
Following is the image:
boston bull
Not too shabby for a phase one prediction pipeline!
That is all for this walkthrough. Check back for the next article which will build on this to incorporate Flink for streaming analysis of our results as well as the text from the tweet to gain more insights.
Hope you enjoyed this article and see you again soon.
... View more
11-19-2019
05:37 AM
1 Kudo
INTRODUCTION
With IoT growing at an exponential rate it is imperative that we continuously adapt our streaming architectures to stay ahead of the game. Apache Flink provides many great capabilities for stream processing that can be applied to a modern data architecture to enable advanced stream processing like windowing of streaming data, anomaly detection, and process monitoring to name a few.
Something more important that any one tool; however, is the overall architecture of your solution and how it enables your full data lifecycle. In order to provide a robust solution you must take into account and make decisions that select the best tools for each part of your data's journey.
Let's look at an example that will be growing rapidly over the next few years. Cellular IoT devices! In particular the IoT devices mad by particle.io.
I will walk you through the end to end solution designed to:
Deploy a cellular IoT device at the edge programmed to collect Temperature readings and also accept input to trigger alerts at the edge.
Set up web-hook integrations to push your readings into your Enterprise via Apache NiFi.
Analyze the readings in flight and route temperature spikes intelligently to alerting flows without delay.
Store events in Apache Kafka for downstream consumption in other event driven applications
If you are planning to follow along, code for all stages of the solution can be found on GitHub by following this link: https://github.com/vvagias/IoT_Solutions
You may also find additional tutorials and ask questions here: https:// community.cloudera.com
Let's begin!
DEVICE
DEVICE SELECTION
First we need a device to use at the edge. I will be using a "Particle Electron" shown below.
Particle Electron 3G Cellular Board
Green LED
White LED
DHT11 Temperature and Humidity Sensor
Breadboard
LiPo Battery
The device you choose should take into account how it will be deployed and what that environment will offer. For example this device will be used to demonstrate the full solution in several settings and several locations. With no real predictable environment the cellular access combined with the ability to run off a rechargeable battery make this device ideal.
Were this to be deployed here in my office where I am writing this, I could use the exact same architecture but the device I would select would leverage the outlets I have available for power and the local ethernet or WiFi network to communicate.
DEVICE PROGRAMMING
With that said, now that we have selected a device we must provide it with firmware that will allow it to “become” the sensor we need for this solution. As you can see in the image above I have already attached some components. Specifically A DHT11 Temperature and Humidity sensor along with a white and green led. We can add more to this in later advanced sections but for now this will be more than enough.
We can see the device and its metrics in the particle console once you have registered it following the instructions included with the device.
To program the device we need an IDE. All devices have IDE’s they work with some have several to choose from.
You are free to use any IDE you feel comfortable with. The particle devices have access to an online web IDE which I have found useful as well as IDE plugins for Atom and Visual Studio.
Using the IDE you chose, flash the DHT11 application firmware to your device.
If you're following along then this would be the .ino file from the github repo.
DEVICE TESTING
With the firmware flashed you can verify that you are getting readings. Since I chose to use the particle board I am able to call my functions from the console.
I can click “GET” on any variable to grab a current reading from the device, or I can call any of the pre-defined functions to execute the code on the device and examine the results.
Go ahead and test everything out and once you are satisfied with the functions and variables you have and how they accept and transmit their data you can begin to move on to the next stage in our solution.
DATA INTEGRATION
INTEGRATION POINTS
In an Enterprise setting you don’t always have the luxury of selecting and programming the device/sensor. Often this decision is above your pay-grade or was made by your predecessor and you are now responsible for making those investments work.
This is part of the journey of an IoT developer. In either case there are things we know for certain:
Data will be generated by the sensor.
That data will have a schema or predictable format.
The data needs to be moved into the Enterprise in order to generate value
The variables; however, are what might create issues for you:
What format is the data in?
How do you access the data?
Are there proprietary drivers required?
What happens to the data if no one is listening?
etc...
As you can see, the devil is always in the details. This is why selecting the right tools for each stage of the solution is a critical point you should take away from this book. Once you become good at a particular tool or programming language it is easy to start seeing it as the solution to every problem. The proverbial person with a hammer hitting in a screw. Sure the hammer can get the screw into the board but then the strengths of both the hammer and the screw have been misused and the connection made by the screw is weaker than if it had been installed using a screw driver. The same is true in every enterprise solution deployment. You should strive to make not only the development of the solution efficient (which is why developers lean towards their strengths in language) but also to maximize the maintainability and future expansion of the solution. If the solution you build is any good it will most certainly drive demand for it’s expansion... You want to be ready and capable when that demand arrises and when opportunities for increased value present themselves.
Far to many organizations struggle trying to manage the mountain of technical debt that they buried their IoT solutions under and end up missing out on opportunities to add new data streams, incorporate ML models and real time analytics or fail to expand to meet the needs of the wider organization.
That’s enough of me trying to caution you as you embark on your journey. Let’s get back to the solution we are examining and look at the integration points.
The device we are working with (Particle Electron) leverages a 3G network to communicate and the particle device cloud allows us to set up integrations. For this solution we will leverage a web hook to send a POST request over HTTP anytime the device publishes a temperature reading.
On a side note there are several integration points, all with their own value.
• Direct Serial Port communication • Request data over HTTP and receive a response • Pub/Sub to a MQ
• Pub direct to Target Data Store
This is a decision that I made for the elegance and simplicity of being able to change the destination of the web-hook easily and the cluster which processes the data can be created and destroyed at will.
CREATE INTEGRATION
To create the web-hook you simply go to the integrations tab in the Particle console and enter the required info. You should then see it listed as an Integration on the Integration tab. All you need here is the IP address of you ingestion server. We will be using Apache Nifi so we send the data to our NiFi URL.
This takes care of our integration point into the organization. And out sensor is fully operational and ready to be deployed anywhere in the field as is. Great work!
Now to add some color for enterprise deployments. To make this ingest HA you might choose to make the web-hook url endpoint that of a load balancer which would redirect to an active instance of NiFi should you ever have an issue where NiFi goes down you can continue to process the data with a failover instance. You may also choose to add power backup options such as larger batteries, solar or direct outlet to ensure the sensor stays on and requires limited human intervention.
For now we are good with the current setup.
DATA FLOW
INGESTION
With our integration setup we now need to be able to handle the HTTP requests that will be sent to our NiFi instance every time a reading event occurs on our device.
There are countless ways to handle such a request; however, this goes back to picking the right tools for each stage in our process. NiFi was not an arbitrary selection but strategic in the way we are able to handle the ingest, movement and ultimate landing of our data.
To illustrate what I mean let’s look at how easy it is to go from nothing to being able to examine the HTTP request sent by our device.
In NiFi the developer can drag a processor onto the canvas and select “HandleHTTPRequest”. Right click and select configure to see the settings view.
As you can see the majority of the properties are pre configured to default values and there are only a few values we need to provide. Namely :
1.Listening Port if different from 80.
2.HTTP Context Map
- This is created for you in the drop down by selecting create new and enabling the service.
3.Any additional values you wish to add in your tailored solution.
That is all now close the dialogue and drag another processor onto the canvas. This time we use the Handle HTTP Response and specify the return code. Link the two processors together by dragging a connection from Request to Response as seen below:
We also add a “funnel” to direct our messages temporarily as we are developing our flow to allow us to see what our results are.
Now you can go publish some data from the particle console and you will start to see those messages in the Queue in NiFi. That’s all we have a fully functional integration point ready to pull in all messages from our first device and more in the future.
If you are anything like me you have already seen the huge list of processors available in NiFi and no doubt have already tried writing the data to some endpoint or even created a twitter bot by adding another processor to the canvas. This is what I mean when I say efficiency in design, maintainability AND extendability. Completing our original architecture shouldn’t limit us in what else we can enable.
Back to our flow. If you right click the queue and select list queue you see a list of messages. These are flow files and represent each individual event that entered NiFi through your HandleHTTPRequest processor. You can expand a message and click view to see the data you are dealing with:
So our JSON data package will have the following format:
{
"event": "readings",
"data": "69.80",
"published_at": "2019-11-14T15:59:51.534Z", "coreid": "330032001247373333353132"
}
We can always modify the firmware to add additional reading values but this is great for our first phase and 1.0 release.
We now have our ingestion in place and can begin work on the processing of the data.
PROCESSING
Now let’s add the remaining processors to complete our flow. Add the following processors to the canvas:
1. EvaluateJSONPath x2 2. RouteOnAttribute 3. InvokeHTTP 4. PublishKafka_2_0
Now connect them so that the data flows as in the image below:
Our first EvaluateJSONPath will simply extract the temperature and store it as an attribute.
[ $.data ] is all we need to enter here that will return the value of the data element that is at the root level of the json.
Now in the RouteOnAttribute processor we can examine this attribute and route to highTemp if it exceeds a certain value. This is where you can begin to translate your business logic into realtime analytics and automate the actions taken to enable a multitude of critical and value driving use cases.
We will be routing our highTemp values immediately to the InvokeHTTP processor which will communicate directly with the device. In our example we will simply toggle an LED to show the functionality, but this is easily expanded to (turn on a pump, turn on an exhaust fan, sound and alarm in multiple places, etc...) Perhaps we simply add it to a more intense monitoring flow that looks at the readings over a given window and only takes action if the trend continues.
There are many more processors you can explore that help you further process and enrich the data as it is in flight. We can now move to the real time action on the data.
REALTIME ACTION
Here we will set our InvokeHTTP processor to the device endpoint. This allows us to communicate directly with the device and (retrieve variable values, execute functions, trigger events, etc...).
We will use the toggle() function we flashed in the firmware to blink our LED when the temperature is above 62° F feel free to change this to a value that is interesting in your environment.
Further down our flow we also push our event data into a Kafka Topic which allows us to decouple our ingest and realtime action portion of the solution from additional downstream applications and storage that depend on this data.
DECOUPLING
In our PublishKafka processor we can specify the Kafka broker and topic we wish to write to. That is all you need to do! Much simpler than having to write java code every time you want to push new data into Kafka, or switch topics...
You may have noticed several PublishKafka processors. There are many available and you can do even more with the others. PublishKafkaRecord allows you to convert formats and publish to Kafka all in a single processor by setting one more property value! Try converting your JSON to Avro for example if you are interested in exploring this further.
We will leave it at that as this checks all our phase 1 boxes and completes the final portion of our solution.
This is the end to end solution as is and takes us from the edge device, through the ingestion and processing of the real time data, all the way to the AI. In this beginners example we implemented a simple but powerful AI that can monitor and act on a continuous stream of sensor data without ever taking a break. Great work!
You might be left thinking 🤔That’s awesome... but what else can I do. What happens to the data next? I’m glad you asked.
In fact the theme of the second book in this series will expand on this solution architecture to take into consideration things like Stream Message Management, Stream Replication, Security and Governance. We will also explore the downstream application part of the architecture as we look at Apache Flink.
Hope you enjoyed reading this book and the solution we evaluated along the way.
... View more
Labels:
08-29-2019
09:41 AM
5 Kudos
Apache NiFi is an extremely versatile tool that can be used in just about ANY streaming/real-time architecture to enable rapid development and efficient monitoring and maintenance. Often times it is easiest to learn by doing something hands on and tangible.
With that in mind let's look at something most of us are familiar with. Our irrigation control system. Almost every new home in Texas has one of these systems included, and all commercial land establishments such as Golf Courses, Farms, Parks and Businesses with outdoor landscaping. This can also easily be applied to Fire Safety systems where a hydrant is triggered remotely to minimize damages caused by a fire.
To get started we will need to set up our irrigation control system to accept commands remotely. You can skin this cat several different ways including buying a wifi enabled irrigation system, but where is the fun in that. Let's get our hands dirty and kick off with some embedded programming.
You will need to get out or first acquire the following:
1-8 or more channel 5V relay
Relay
wifi enabled Arduino board ( I am using a Node MCU for this)
2 pack is great here as you can swap them out during your debug/development
NodeMCU
Some wire of decent thickness (working with 24V AC here so insulated.)
Now we can leverage Arduino IDE and the work of Reginald Watson or any of the awesome folks that post similar tutorials on Arduino community. It doesn't matter whet approach you take here as long as the end result is a relay that can be toggled via HTTP request:
Arduino Tutorial
How to flash the arduino is covered really well in the tutorials so I will leave it out of the scope of this article.
Once flash you can now wire the relays in line with the valves in your control panel. The idea here is to allow the panel to continue to function as it normally would but now we can bypass panel control via relays.
Here are a couple picture of what my modified Irrigation Control System looks like:
Now let's get our NiFi flow going. I have included a template which you can use to quickly get this project working and then modify it to allow for however many relays you wish to control. I will just do zone2 as it's the one I can see from my office window and since I spend so much time in my office it's the only grass that really matters... 🙂
Get the NiFi Template on GitHub : NiFi_Voice_Irrigation_Control_with_Siri.git
We start by creating our own HTTP Server that will listen for and respond to our requests to turn the system on/off.
HandleHTTPRequest and HandleHTTPResponse will interact with our sending applications. Route on Attribute is then used to parse the request parameters for the specific zone you wish to control and the command you wish to issue.
the incoming request is parsed and the parameters are stored as attributes. For example:
in a terminal, you can execute
curl -v -X "POST" http://<NiFi-ip_address>/zone2?on
The parameters parsed from the above example we are interested in would then look like this:
http.query.string --- on http.remote.host --- sender IP http.request.uri --- /zone2
Here is what our Flow should look like...
The first RouteOnAttribute will parse and route the zone the following RouteOnAttribute will parse and route the command i.e. on/off...
Go ahead and run the curl command again.
You should be getting a response back in NiFi and water should be spraying out of your zone.
<s:Envelope xmlns:s="<a href="<a href="<a href="<a href="<a href="http://schemas.xmlsoap.org/soap/envelope/" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/envelope/</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a>>" target="_blank"><a href="<a href="http://schemas.xmlsoap.org/soap/envelope/</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/envelope/</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://schemas.xmlsoap.org/soap/envelope/</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/envelope/</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a>>" target="_blank"><a href="<a href="http://schemas.xmlsoap.org/soap/envelope/</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/envelope/</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a</a>>>>" target="_blank"><a href="<a href="<a href="<a href="http://schemas.xmlsoap.org/soap/envelope/</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/envelope/</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a>>" target="_blank"><a href="<a href="http://schemas.xmlsoap.org/soap/envelope/</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/envelope/</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://schemas.xmlsoap.org/soap/envelope/</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/envelope/</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a</a>>" target="_blank"><a href="<a href="http://schemas.xmlsoap.org/soap/envelope/</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/envelope/</a</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/envelope/</a</a</a</a</a>>>>>" s:encodingStyle="<a href="<a href="<a href="<a href="<a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a>>" target="_blank"><a href="<a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a>>" target="_blank"><a href="<a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a</a>>>>" target="_blank"><a href="<a href="<a href="<a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a>>" target="_blank"><a href="<a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a</a>>" target="_blank"><a href="<a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a</a>" target="_blank"><a href="http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a</a" target="_blank">http://schemas.xmlsoap.org/soap/encoding/"><s:Body</a</a</a</a</a>>>>>>
<u:GetBinaryStateResponse xmlns:u="urn:Belkin:service:basicevent:1">
<BinaryState>1</BinaryState>
</u:GetBinaryStateResponse>
</s:Body> </s:Envelope>
At this point you can apply this solution to any design pattern you wish!
Some ideas and what will be covered in future articles:
Use MiNiFi and a moisture sensor to fully automate the process. Allow MiNiFi to monitor ideal moisture levels and only apply water when needed.
Use ML and computer vision to detect unwanted pests(i.e. Rabbits, Dogs, Neighbors) and spray them when on the lawn or in the garden...
Create your own application that interfaces with NiFi to give you GUI controls.
Add parameter parsing to pass a time argument that can turn off after a specified time interval.
For kicks lets wrap this article up with Siri integration! We will expose our relay control through Apple HomeKit to control the irrigation using our Apple Watch, Phone or Computer.
On your machine of choice (i.e. your laptop, a server that is always running, a raspberry Pi) we need to install Homebridge.
Here is a great tutorial to get this setup homebridge from github: nfarina/homebridge
once installed we need to add an HTTP-Switch accessory I use HttpMultiSwitch available here : /homebridge-http-multiswitch
you will need to now create your config to point to your NiFi HTTP handler:
{
"bridge": {
"name": "Homebridge",
"username": "AF:BC:DE:10:22:16",
"port": 51826,
"pin": "911-93-594"
},
"description": "Configuration for Homebridge accessories.",
"platforms": [{
"platform": "config",
"name": "Config",
"port": 8082,
"sudo": false
}],
"accessories": [
{
"accessory": "HttpMultiswitch",
"switch_type": "Switch",
"name": "Sprinker",
"http_method": "GET",
"base_url": "http://<NIFI-IP>:8181",
"on_url": "/zone2?on",
"off_url": "/zone2?off"
}
]
}
The example above is mine you can modify the name of the switch as well as the pin but the rest is required at a minimum.
Now launch homebridge and add it to your HomeKit by scanning the QR code that is generated!!
Viola! You can now tell siri to "Turn on Sprinkler" in this case or "Turn on WHATEVER YOU CALL YOURS"
I hope you enjoyed this hands on tutorial getting you started in the world of NiFi and integrating with just about any solution you can think of. Have fun and let us know how your project goes!
... View more
Labels:
02-23-2018
03:58 PM
3 Kudos
Moving to more advanced application of our basic monitoring flow. Picking up where we left off in the first part of this series: Monitor Temperature & Humidity Sensors with Apache NiFi Now that we have the ability to collect the sensor data using NiFi, we will want to be able to store and process the data using our HDP cluster. The resources at the sensor level would not be enough to provide deep analytics or long term storage. For this reason we can leverage NiFi's site-2-site protocol to send the sensor events directly to our HDF cluster for processing. Start of by adding a remote process group to your NiFi flow at the sensor end and feed the output of ExecuteProcess into this remote processor. Supply the URL of your HDF NiFi instance (same as you would type into browser to get to the design gui). Now move to your remote instance of NiFi on your HDF cluster and create a new input port. replace the execute process processor with the input port. Now when you start the processors on both nifi instances your NiFi instance at the "edge" gathering data directly from the sensor will send the data directly to your new input port. I will be taking this data and pushing it directly into Solr for indexing and use with a banana dashboard. You may want to do other things with your data and later articles in this series will cover other applications, for example pushing to Hbase or Kafka and streaming into Druid. You can also write to HDFS or Hive. For now Solr will be what we use. Create a collection in Solr and provide the name of that collection to the PutSolrContentStream. This will start to populate your Solr collection with sensor events. To make this data more useful to us we will need to also collect the timestamp that each event is collected at. I have done this by modifying the python script to include an extra field in the json. You may decide to leverage NiFi for this. All code and templates can be quickly obtained by cloning the accompanied git repo to your machine. arduino-nifi-dht # -*- coding: utf-8 -*-
"""
Created on Thu Feb 22 15:54:50 2018
@author: vvagias
"""
import serial
import json
import time
from time import gmtime, strftime
ser = serial.Serial('/dev/cu.usbmodem1411', 9600)
a = ser.readline().decode('utf8').replace("\n","").split(',')
js = {
"temperature" : float(a[1]),
"humidity" : float(a[0]),
"time" : strftime("%Y-%m-%d %H:%M:%S", gmtime())
}
print(json.dumps(js))
Now we have everything we need to get a solid dashboard put together to monitor these sensor events in real time. Move to your banana instance and either create a new time series dashboard and start having fun with whatever you desire... Or you can follow along a bit further and upload the template included in the git repo located in the banana directory. Upload this template to your instance and edit the dashboard Solr section to point to the collection you specified in the earlier step. Click save and you should have a dashboard that looks like this: Well... If that doesn't put a smile on your face then you are definitely reading the wrong article 😉 You can modify the dashboard and make it show what you are interested in then just click save at the top right and set as default so you always have it when you reload the page. I hope you enjoyed the article and gained some value from what we did. I also hope you will upvote the article if you found it useful and check out the other articles that follow in the series!
... View more
Labels:
02-22-2018
11:42 PM
4 Kudos
Use commodity hardware (Arduino Boards) to monitor temperature and humidity with NiFi. get the source to follow along on git: REPO In this tutorial we will need some hardware:
Arduino Board DHT11 Sensor (DHT22 will work as well) Jumper wires In Arduino IDE select your board and port and enter the following code: #include "DHT.h"
#define DHTPIN 2 // what digital pin we're connected to
// Uncomment whatever type you're using!
#define DHTTYPE DHT11 // DHT 11
//#define DHTTYPE DHT22 // DHT 22 (AM2302), AM2321
//#define DHTTYPE DHT21 // DHT 21 (AM2301)
// Connect pin 1 (on the left) of the sensor to +5V
// NOTE: If using a board with 3.3V logic like an Arduino Due connect pin 1
// to 3.3V instead of 5V!
// Connect pin 2 of the sensor to whatever your DHTPIN is
// Connect pin 4 (on the right) of the sensor to GROUND
// Connect a 10K resistor from pin 2 (data) to pin 1 (power) of the sensor
// Initialize DHT sensor.
// Note that older versions of this library took an optional third parameter to
// tweak the timings for faster processors. This parameter is no longer needed
// as the current DHT reading algorithm adjusts itself to work on faster procs.
DHT dht(DHTPIN, DHTTYPE);
void setup() {
Serial.begin(9600);
dht.begin();
}
void loop() {
// Wait a few seconds between measurements.
delay(2000);
// Reading temperature or humidity takes about 250 milliseconds!
float h = dht.readHumidity();
// Read temperature as Fahrenheit (isFahrenheit = true)
float f = dht.readTemperature(true);
Serial.print(h);
Serial.print(", ");
Serial.print(f);
Serial.print("\n");
}
Flash/Upload the code to your arduino. You should now be able to see the csv data being printed to the Serial Monitor in your Arduino IDE! If you don't see any data check that you have the baud rate set to 9600. If you still have issues verify the connections to your arduino are correct. Now we need to set up a python application that will read the Serial data from the Arduino and convert it to a JSON format that we can use for stream processing in NiFi. open up your favorite python IDE and create a file called arduino-nifi.py. Enter the following code: # -*- coding: utf-8 -*-
"""
Created on Thu Feb 22 15:54:50 2018
@author: vvagias
"""
import serial
import json
ser = serial.Serial('/dev/cu.usbmodem1411', 9600)
a = ser.readline().decode('utf8').replace("\n","").split(',')
js = {
"temperature" : float(a[1]),
"humidity" : float(a[0])
}
print(json.dumps(js))
We now have our data in valid JSON format and have also converted our string values for temperature and humidity to floats. We are now ready to move on to NiFi. Open NiFi and either create the flow in the image below or import the template included in the zip folder from git. the Execute Process calls our python script and passes the data on to the Evaluate JSONPath processor. It is set to pull the temperature out and place it on the flow file as an attribute. The RouteOnAttribute processor then routes the data based on the value of temperature being greater than 72 degrees. This is where you can start using your imagination!! Load a datadatabase, send alerts, load a dashboard (banana) perhaps. Stay tuned for Data Science applications built on top of this foundation! Hope you enjoyed this tutorial 🙂
... View more
Labels:
09-11-2017
04:26 PM
2 Kudos
Apache Atlas offers enterprises the capability to implement Data Governance on their HDP cluster. This is a game changer for enterprises that are looking to find ways to become more agile and effective in how they leverage data assets across the organization. Enterprises no longer have to see their data volumes grow and become unmanageable. With things like Asset Tagging, Data Lineage & Data Impact as well as Taxonomy, organizations can have a Business Catalogue at their fingertips allowing assets to be searched for and leveraged without any prior knowledge of the assets existence.
In this Intro video we look at Asset Tagging as well as the Data Lineage and Data Impact functionality that tells you where data came from, how it was transformed along the way and what assets are affected downstream.
... View more
Labels:
09-11-2017
04:09 PM
4 Kudos
IBM DSX (Data Science Experience) Is a powerful tool that brings together Jupyter notebooks and RStudio into a single platform where Data Scientists can collaborate and share data assets and notebooks in an effective and streamlined way.
In this Intro video we will cover some great aspects of DSX that everyone should know about when getting started and we will finish with the basics of accessing data assets that you have loaded into DSX.
https://youtu.be/hb7o0NhV87k
... View more
05-17-2017
05:11 PM
2 Kudos
If you haven't already see the 1st tutorial I made which guides you through the setup of rapidminer to read from hive. We will pick up where it leaves off. Read-from-hive-using-rapidminer add a "set role" operator next to the your "retrieve from hive operator" that is located in the "Radoop Nest". This allows you to select the column you wish to use in the model. In this case I set the name field to the category column in my dataset. You can obtain this dataset here: data This is just for illustrative purposes so if you have data that has labels already feel free to use in place of this. Now add a "split validation" operator and connect the ports. Then double click the validation operator. Add a "decision tree" operator on the left pane and add an "apply model" and "performance" operator and connect them all. For performance select accuracy or whatever you wish to check. If you are using the sample data provided in this tutorial you will see some errors. Click on the error icons on each operator and select quick fix and apply. Your panes should look like this: You can modify this to run on Spark if you have spark on your cluster by using the "Spark Decision Tree" Operator. That's how you can set up and train a model in Rapidminer. Here we just used Decision tree but there are several algorithms to choose from.
... View more