1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1916 | 04-03-2024 06:39 AM | |
| 3015 | 01-12-2024 08:19 AM | |
| 1645 | 12-07-2023 01:49 PM | |
| 2422 | 08-02-2023 07:30 AM | |
| 3367 | 03-29-2023 01:22 PM |
08-07-2017
12:52 PM
1 Kudo
I have written a Processor for it. See: https://community.hortonworks.com/content/kbentry/116803/building-a-custom-processor-in-apache-nifi-12-for.html You can also run it on an edge node: https://community.hortonworks.com/articles/118132/minifi-capturing-converting-tensorflow-inception-t.html I have run this a few dozen ways and they all work great. https://community.hortonworks.com/articles/58265/analyzing-images-in-hdf-20-using-tensorflow.html https://community.hortonworks.com/articles/80339/iot-capturing-photos-and-analyzing-the-image-with.html https://community.hortonworks.com/articles/118132/minifi-capturing-converting-tensorflow-inception-t.html https://community.hortonworks.com/articles/103863/using-an-asus-tinkerboard-with-tensorflow-and-pyth.html https://community.hortonworks.com/articles/59349/hdf-20-flow-for-ingesting-real-time-tweets-from-st.html https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html https://community.hortonworks.com/articles/83100/deep-learning-iot-workflows-with-raspberry-pi-mqtt.html If you have any questions let me know, I have been integrating NiFi and TensorFlow for over a year.
... View more
08-06-2017
01:40 PM
5 Kudos
Python Word Cloud Integrating existing Python libraries and scripts is very easy in Apache NiFi. I add the library for both version of Python I have on my system, while moving all new scripts to the 3.x branch. Install the library for both Python 2.7 and 3.5 pip install wordcloud
pip3 install wordcloud Example Usage echo "NiFi\nHadoop\nSpark\n" | wordcloud_cli.py --imagefile wordcloud.png For use in NiFi, I wrap my call with a shell script wc.sh echo $1 | tr " " "\n" | wordcloud_cli.py This will build a PNG for me that I can store in a file system or in HDFS, I updated the filename to add png at the end. This will take a parameter to a shell script (our Tweet) and convert it into words usable for a word cloud. You can use other sources or other methods of splitting words. I am pulling twitter messages, so I use ReplaceText to replace the flow file with ${msg} which is just the tweet. Then I execute the Python WordCloud CLI: Example References:
https://amueller.github.io/word_cloud/auto_examples/a_new_hope.html https://amueller.github.io/word_cloud/auto_examples/simple.html#sphx-glr-auto-examples-simple-py https://github.com/amueller/word_cloud
... View more
Labels:
08-06-2017
12:03 AM
4 Kudos
Technology: Python, TensorFlow, Apache Hive, MiniFi, NiFi, HDFS, WebHDFS, Zeppelin, SQL, Raspberry Pi, Pi Camera, S2S. Apache NiFi For Ingest of Images and TensorFlow Analysis from the Edge (Raspberry Pi 3) The Apache NiFi ingestion flow is straightforward. MiniFi sends us flow files over S2S from the RPI which consists of two types of messages. One is a JSON formatted file of metadata and TensorFlow analysis of an image. The second is the actual image captured. We route on the filename attribute to handle each file type appropriately. We send the image to HDFS for storage and retrieval via WebHDFS. The JSON I add a schema and a JSON content-type and split up the file. You see I have some non-JSON junk in there I want pulled out. Then I send my JSON record to QueryRecord to filter out any empty messages. This produces AVRO files from JSON which I convert to ORC and store in HDFS. From there it's easy to query my new deep learning produced multimedia data via standard SQL. Routing by FileName Attribute Split Into Lnes Extract Only JSON Data ORC Configuration Query Record MiniFi Flow Installed on Raspberry Pi The flow on the Pi is simple. We have three processes running. The First is to execute our classify.sh to activate the PiCamera to take a picture and then feed the picture to TensorFlow. The CleanupLogs process is a shell script that deletes old logs. The GetFile reads any image produced by the first shell execute and send the image to NiFi. Hive DDL: CREATE EXTERNAL TABLE IF NOT EXISTS tfimage (image STRING, ts STRING, host STRING, score STRING, human_string STRING, node_id FLOAT) STORED AS ORC
LOCATION '/tfimage' Hive SQL: %jdbc(hive)
select ts, score, human_string,
concat('%html <img width=200 height=200 src="http://princeton10.field.hortonworks.com:50070/webhdfs/v1/tfimagefiles/', SUBSTR(image,18), '?op=OPEN">') as cam_image
from tfimage where image like '%2017%' Shell classify.sh python -W ignore /opt/demo/classify_image.py Modified TensorFlow Example Python # Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Simple image classification with Inception.
Run image classification with Inception trained on ImageNet 2012 Challenge data
set.
This program creates a graph from a saved GraphDef protocol buffer,
and runs inference on an input JPEG image. It outputs human readable
strings of the top 5 predictions along with their probabilities.
Change the --image_file argument to any jpg image to compute a
classification of that image.
Please see the tutorial and website for a detailed description of how
to use this script to perform image recognition.
https://tensorflow.org/tutorials/image_recognition/
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os.path
import re
import sys
import tarfile
import os
import datetime
import math
import random, string
import base64
import json
import time
import picamera
from time import sleep
from time import gmtime, strftime
import numpy as np
from six.moves import urllib
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.ERROR)
FLAGS = None
# pylint: disable=line-too-long
DATA_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz'
# pylint: enable=line-too-long
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
def randomword(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
class NodeLookup(object):
"""Converts integer node ID's to human readable labels."""
def __init__(self,
label_lookup_path=None,
uid_lookup_path=None):
if not label_lookup_path:
label_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_2012_challenge_label_map_proto.pbtxt')
if not uid_lookup_path:
uid_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_synset_to_human_label_map.txt')
self.node_lookup = self.load(label_lookup_path, uid_lookup_path)
def load(self, label_lookup_path, uid_lookup_path):
"""Loads a human readable English name for each softmax node.
Args:
label_lookup_path: string UID to integer node ID.
uid_lookup_path: string UID to human-readable string.
Returns:
dict from integer node ID to human-readable string.
"""
if not tf.gfile.Exists(uid_lookup_path):
tf.logging.fatal('File does not exist %s', uid_lookup_path)
if not tf.gfile.Exists(label_lookup_path):
tf.logging.fatal('File does not exist %s', label_lookup_path)
# Loads mapping from string UID to human-readable string
proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
uid_to_human = {}
p = re.compile(r'[n\d]*[ \S,]*')
for line in proto_as_ascii_lines:
parsed_items = p.findall(line)
uid = parsed_items[0]
human_string = parsed_items[2]
uid_to_human[uid] = human_string
# Loads mapping from string UID to integer node ID.
node_id_to_uid = {}
proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
for line in proto_as_ascii:
if line.startswith(' target_class:'):
target_class = int(line.split(': ')[1])
if line.startswith(' target_class_string:'):
target_class_string = line.split(': ')[1]
node_id_to_uid[target_class] = target_class_string[1:-2]
# Loads the final mapping of integer node ID to human-readable string
node_id_to_name = {}
for key, val in node_id_to_uid.items():
if val not in uid_to_human:
tf.logging.fatal('Failed to locate: %s', val)
name = uid_to_human[val]
node_id_to_name[key] = name
return node_id_to_name
def id_to_string(self, node_id):
if node_id not in self.node_lookup:
return ''
return self.node_lookup[node_id]
def create_graph():
"""Creates a graph from saved GraphDef file and returns a saver."""
# Creates graph from saved graph_def.pb.
with tf.gfile.FastGFile(os.path.join(
FLAGS.model_dir, 'classify_image_graph_def.pb'), 'rb') as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
_ = tf.import_graph_def(graph_def, name='')
def run_inference_on_image(image):
"""Runs inference on an image.
Args:
image: Image file name.
Returns:
Nothing
"""
if not tf.gfile.Exists(image):
tf.logging.fatal('File does not exist %s', image)
image_data = tf.gfile.FastGFile(image, 'rb').read()
# Creates graph from saved GraphDef.
create_graph()
with tf.Session() as sess:
# Some useful tensors:
# 'softmax:0': A tensor containing the normalized prediction across
# 1000 labels.
# 'pool_3:0': A tensor containing the next-to-last layer containing 2048
# float description of the image.
# 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG
# encoding of the image.
# Runs the softmax tensor by feeding the image_data as input to the graph.
softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
predictions = sess.run(softmax_tensor,
{'DecodeJpeg/contents:0': image_data})
predictions = np.squeeze(predictions)
# Creates node ID --> English string lookup.
node_lookup = NodeLookup()
top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1]
row = []
for node_id in top_k:
human_string = node_lookup.id_to_string(node_id)
score = predictions[node_id]
row.append( { 'node_id': node_id, 'image': image, 'host': host, 'ts': currenttime, 'human_string': str(human_string), 'score': str(score)} )
json_string = json.dumps(row)
print( json_string )
def maybe_download_and_extract():
"""Download and extract model tar file."""
dest_directory = FLAGS.model_dir
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = DATA_URL.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' % (
filename, float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
print()
statinfo = os.stat(filepath)
print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def main(_):
maybe_download_and_extract()
# Create unique image name
img_name = '/opt/demo/images/pi_image_{0}_{1}.jpg'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))
# Capture Image from Pi Camera
try:
camera = picamera.PiCamera()
camera.resolution = (1024,768)
camera.annotate_text = " Stored with Apache NiFi "
camera.capture(img_name, resize=(600,400))
pass
finally:
camera.close()
# image = (FLAGS.image_file if FLAGS.image_file else
# os.path.join(FLAGS.model_dir, 'cropped_panda.jpg'))
run_inference_on_image(img_name)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# classify_image_graph_def.pb:
# Binary representation of the GraphDef protocol buffer.
# imagenet_synset_to_human_label_map.txt:
# Map from synset ID to a human readable string.
# imagenet_2012_challenge_label_map_proto.pbtxt:
# Text representation of a protocol buffer mapping a label to synset ID.
parser.add_argument(
'--model_dir',
type=str,
default='/tmp/imagenet',
help=""" Path to classify_image_graph_def.pb,
imagenet_synset_to_human_label_map.txt, and
imagenet_2012_challenge_label_map_proto.pbtxt. """
)
parser.add_argument(
'--image_file',
type=str,
default='',
help='Absolute path to image file.'
)
parser.add_argument(
'--num_top_predictions',
type=int,
default=5,
help='Display this many predictions.'
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
Key Additions to Python row.append( { 'node_id': node_id, 'image': image, 'host': host, 'ts': currenttime, 'human_string': str(human_string), 'score': str(score)} )
json_string = json.dumps(row)
print( json_string ) Image Captured by Camera (Also Added "Stored with Apache NiFi" via Python) References: http://regexr.com/
... View more
08-04-2017
12:44 PM
1 Kudo
Example https://community.hortonworks.com/articles/69930/posting-data-to-azure-event-hubs-using-nifi-and-ht.html
... View more
07-31-2017
08:10 PM
8 Kudos
In preparing for my talk at DataWorksSummit in Australia, I wanted to try yet another way to integrate Apache NiFi with TensorFlow. The common ways being: calling a Python TensorFlow script from Execute Command, calling a TensorFlow Serving server via gRPC, calling a C++ TensorFlow executable via Execute Command, running TensorFlow on the edge and having MiniFi send it to NiFi or calling a TensorFlowOnSpark job via Kafka, Site-to-Site.
TensorFlow has released a Java API (https://www.tensorflow.org/install/install_java), so I decided to write a quick custom processor to run TensorFlow Inception v3. It's a simple set of dependencies for Maven: <dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tensorflow</groupId>
<artifactId>tensorflow</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
It's easy to add the new processor NiFi. First build it mvn install (see my build script), then deploy it: cp nifi-tensorflow-nar/target/nifi-tensorflow-nar-1.0.nar /Volumes/Transcend/Apps/nifi-1.2.0/lib Once you restart NiFi, you can add the TensorFlow Processor.
An example flow is to the use the very smart ListFile which will iterate through a list of files and keep track of which the last timestamp of files it accessed. So I point to a directory of files and the NiFi processor gets fed a ton of images to very quickly process. This is much faster than my calling out to a script. The result of the run is a new attribute, probabilities, which is a string description of what it could be a confidence percentage as text.
This was the guess for the picture of my RV: Source: https://github.com/tspannhw/nifi-tensorflow-processor Resources: https://community.hortonworks.com/articles/83872/data-lake-30-containerization-erasure-coding-gpu-p.html https://community.hortonworks.com/articles/59349/hdf-20-flow-for-ingesting-real-time-tweets-from-st.html
https://community.hortonworks.com/articles/103863/using-an-asus-tinkerboard-with-tensorflow-and-pyth.html
https://community.hortonworks.com/articles/54954/setting-up-gpu-enabled-tensorflow-to-work-with-zep.html
https://community.hortonworks.com/articles/80339/iot-capturing-photos-and-analyzing-the-image-with.html
https://community.hortonworks.com/articles/58265/analyzing-images-in-hdf-20-using-tensorflow.html
https://community.hortonworks.com/articles/83100/deep-learning-iot-workflows-with-raspberry-pi-mqtt.html
... View more
Labels:
07-31-2017
07:57 PM
Select * from "_SYS_SAP"."DevOrSomething.SomePlace/ABunchOfStuff" This is from HANA using JDBC This works for views and tables that don't have the sub directory setup.
... View more
Labels:
- Labels:
-
Apache NiFi
07-11-2017
04:11 PM
You can turn off Phoenix Query Server if you aren't using it, but I can't see there being too many resource issues. Phoenix is awesome.
... View more
07-06-2017
08:41 PM
4 Kudos
This is a cool HAT that attaches without soldering to a Raspberry Pi 3. The main use is with Google's Android Things which is an IoT OS based on Android that you can run on Raspberry Pi and other devices. You can also use this HAT with regular Raspian and Python. This HAT is cool because it has input and output. As part of our Python script, we ingest data from it's sensor, but we also allow for input to it's A, B and C buttons clearly marked on the top. By selecting one of the buttons you can display the current temperature in Celsius, Fahrenheit or the current altitude. The altitude is some code I found commented out in the Pimoroni Rainbow HAT interface Python script. It determines altitude based on a starting value and pressure readings. Looks kind of cool and is relatively accurate. For point of reference, Princeton New Jersey is pretty close to sea level. My script is heavily copied and customized from my previous IoT Python scripts and from the RainbowHAT examples. This hat has 4 digit display, seven multiple color LEDs, BMP280 temperature and pressure sensor and some more goodies in this tiny device. It's available from AdaFruit. We execute the Python code via a simple shell script wrapper: python /opt/demo/minifi.py Source Code: https://github.com/tspannhw/rpi-rainbowhat Prepare the Pi sudo apt-get purge libreoffice wolfram-engine sonic-pi scratch sudo apt-get autoremove Example Python Debug Output CPU: 45.0 C
Corrected Temp: 23.7 C
Room Temp: 74.7 F
Pressure: 101598.9 Pa
Altitude: 33.2
This is the MiniFi flow that I modeled in Apache NiFi, then export as XML. Then use the below shell script to convert into config.yml format to transfer to my device running Apache MiniFi Java agent. This is a simple flow to add a schema name, tell NiFi it's a JSON stream and then query it. If the data has temperatures above a threshold (checked via inline SQL) write the AVRO file to ORC and store in HDFS (automagically building an external Hive table on top of it). Schema to put in Hortonworks Schema Registry or NiFi AVRO Schema Registry {"type":"record","namespace":"hortonworks.hdp.refapp.rainbow","name":"rainbow","fields":[{"name": "tempf", "type": "float"},{ "name": "cputemp", "type": "float"},{"name":"ts","type":"string"},{ "name": "pressure","type": "float"},{ "name": "host","type": "string"},{ "name": "tempf2","type": "float"},{"name": "ipaddress","type": "string"},{"name": "temp","type": "float"},{ "name": "altitude","type": "float"}]} It's pretty easy to define an AVRO schema (in JSON format), like above). Feel free to use the above as a starting point. Strings and floats work very well, keep it simple and leave out extra whitespace. Build Your Configuration minifi-toolkit-1.0.3.0.0.0-453/bin/config.sh transform RainbowForMiniFi.xml config.yml nifireceiverrainbow.xml rainbowforminifi.xml Coming Soon Raspberry Pi 3 running Android Things and Java Connecting to Rainbow Hat, TensorFlow and MiniFi. References:
https://github.com/pimoroni/rainbow-hat https://github.com/androidthings/weatherstation https://en.wikipedia.org/wiki/QNH https://en.wikipedia.org/wiki/Pascal_(unit) https://www.mide.com/pages/air-pressure-at-altitude-calculator https://hackernoon.com/trying-out-the-android-things-weatherstation-codelab-d3f260b59c2f https://codelabs.developers.google.com/codelabs/androidthings-weatherstation/index.html https://shop.pimoroni.com/products/rainbow-hat-for-android-things https://learn.pimoroni.com/tutorial/sandyj/getting-started-with-rainbow-hat-in-python Part of My IoT MiniFi Series (Details Installation and Setup)
https://community.hortonworks.com/content/kbentry/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html https://community.hortonworks.com/content/kbentry/108966/minifi-for-sensor-data-ingest-from-devices.html https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html
... View more
Labels:
07-06-2017
07:10 PM
2 Kudos
I am always ingesting social data, IoT and mobile streams into my HDFS cluster. Unfortunately my cluster is a ephemeral cloud based Hortonworks HDP 2.6 Hadoop cluster, so I don't have a permanent store for my data. I have my processes run for a few weeks and then they are destroyed.
I wanted to quick way to save all my ORC files.
Enter NiFi.
Backup
First we list from some top level directories in HDFS to capture all the files and sub-directories we want to backup. Each processor maintains a timestamp to know what files it processed already, as new files are added they will be assimilated into the flow.
For massive data migration, we can run this on many nodes and use the Distributed Cache service to maintain the state.
Restore
The flow is very simple to restore, read from the local file system and write to HDFS. For HDFS, I use /${path} as the directory so each file is written to the correct sub-directory for it's file group. Easy it's like rsync, but it's Tim Sync. Make sure you have your Hadoop configuration file set. If you are using Kerberos make sure you set your principal and keytab, be very careful for Case-Sensitivity!
For Restore it doesn't get any simpler, I use GetFile to read from the local file system. As part of that those files are deleted, I have a big USB 3.0 drive that I want to keep them, so I copy them to a different directory for later storage. I should probably compress those. Once they get large enough I may invest in a local RPI Storage array running HDP 2.6 with some attached Western Digital PiDrives.
The Data Provenance of one of the flowfiles shows the path and filename. It makes it very easy to move between say S3, on-premise HDFS, local file systems, cloud file systems, jump drives or wherever. Your data is yours, take it with you.
... View more