1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1999 | 04-03-2024 06:39 AM | |
| 3168 | 01-12-2024 08:19 AM | |
| 1725 | 12-07-2023 01:49 PM | |
| 2505 | 08-02-2023 07:30 AM | |
| 3517 | 03-29-2023 01:22 PM |
12-24-2017
03:17 PM
4 Kudos
Happy Holidays!
Apache NiFi makes it easy to build your own integration tests. So I am generating tests to test Turning On and Off My Christmas Tree Hat. I also testing taking a picture.
My use case is to send an HTTP message to trigger a Raspberry Pi to turn on a physical device like a camera or light. This is pretty cool and secure with Apache MiniFi and Apache NiFi. A little Python script is all the code and that's basic example code.
This code is a modified TensorFlow classify.py that adds turning on the Christmas Tree. So we turn on the tree and then take a picture with the PiCamera and then run it through a Tensorflow classifier.
root@vid5:/opt/demo# cat classifytree.py
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Simple image classification with Inception.
Run image classification with Inception trained on ImageNet 2012 Challenge data
set.
This program creates a graph from a saved GraphDef protocol buffer,
and runs inference on an input JPEG image. It outputs human readable
strings of the top 5 predictions along with their probabilities.
Change the --image_file argument to any jpg image to compute a
classification of that image.
Please see the tutorial and website for a detailed description of how
to use this script to perform image recognition.
https://tensorflow.org/tutorials/image_recognition/
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os.path
import re
import sys
import tarfile
import os
import datetime
import math
import random, string
import base64
import json
import time
import picamera
from time import sleep
from time import gmtime, strftime
import numpy as np
from six.moves import urllib
import tensorflow as tf
from gpiozero import LEDBoard
from gpiozero.tools import random_values
from signal import pause
tree = LEDBoard(*range(2,28),pwm=True)
for led in tree:
led.source_delay = 0.1
led.source = random_values()
tf.logging.set_verbosity(tf.logging.ERROR)
FLAGS = None
# pylint: disable=line-too-long
DATA_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz'
# pylint: enable=line-too-long
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
def randomword(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
class NodeLookup(object):
"""Converts integer node ID's to human readable labels."""
def __init__(self,
label_lookup_path=None,
uid_lookup_path=None):
if not label_lookup_path:
label_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_2012_challenge_label_map_proto.pbtxt')
if not uid_lookup_path:
uid_lookup_path = os.path.join(
FLAGS.model_dir, 'imagenet_synset_to_human_label_map.txt')
self.node_lookup = self.load(label_lookup_path, uid_lookup_path)
def load(self, label_lookup_path, uid_lookup_path):
"""Loads a human readable English name for each softmax node.
Args:
label_lookup_path: string UID to integer node ID.
uid_lookup_path: string UID to human-readable string.
Returns:
dict from integer node ID to human-readable string.
"""
if not tf.gfile.Exists(uid_lookup_path):
tf.logging.fatal('File does not exist %s', uid_lookup_path)
if not tf.gfile.Exists(label_lookup_path):
tf.logging.fatal('File does not exist %s', label_lookup_path)
# Loads mapping from string UID to human-readable string
proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
uid_to_human = {}
p = re.compile(r'[n\d]*[ \S,]*')
for line in proto_as_ascii_lines:
parsed_items = p.findall(line)
uid = parsed_items[0]
human_string = parsed_items[2]
uid_to_human[uid] = human_string
# Loads mapping from string UID to integer node ID.
node_id_to_uid = {}
proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
for line in proto_as_ascii:
if line.startswith(' target_class:'):
target_class = int(line.split(': ')[1])
if line.startswith(' target_class_string:'):
target_class_string = line.split(': ')[1]
node_id_to_uid[target_class] = target_class_string[1:-2]
# Loads the final mapping of integer node ID to human-readable string
node_id_to_name = {}
for key, val in node_id_to_uid.items():
if val not in uid_to_human:
tf.logging.fatal('Failed to locate: %s', val)
name = uid_to_human[val]
node_id_to_name[key] = name
return node_id_to_name
def id_to_string(self, node_id):
if node_id not in self.node_lookup:
return ''
return self.node_lookup[node_id]
def create_graph():
"""Creates a graph from saved GraphDef file and returns a saver."""
# Creates graph from saved graph_def.pb.
with tf.gfile.FastGFile(os.path.join(
FLAGS.model_dir, 'classify_image_graph_def.pb'), 'rb') as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
_ = tf.import_graph_def(graph_def, name='')
def run_inference_on_image(image):
"""Runs inference on an image.
Args:
image: Image file name.
Returns:
Nothing
"""
if not tf.gfile.Exists(image):
tf.logging.fatal('File does not exist %s', image)
image_data = tf.gfile.FastGFile(image, 'rb').read()
# Creates graph from saved GraphDef.
create_graph()
with tf.Session() as sess:
# Some useful tensors:
# 'softmax:0': A tensor containing the normalized prediction across
# 1000 labels.
# 'pool_3:0': A tensor containing the next-to-last layer containing 2048
# float description of the image.
# 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG
# encoding of the image.
# Runs the softmax tensor by feeding the image_data as input to the graph.
softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
predictions = sess.run(softmax_tensor,
{'DecodeJpeg/contents:0': image_data})
predictions = np.squeeze(predictions)
# Creates node ID --> English string lookup.
node_lookup = NodeLookup()
top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1]
row = []
for node_id in top_k:
human_string = node_lookup.id_to_string(node_id)
score = predictions[node_id]
row.append( { 'node_id': node_id, 'image': image, 'host': host, 'ts': currenttime, 'human_string': str(human_string), 'score': str(score)} )
json_string = json.dumps(row)
print( json_string )
def maybe_download_and_extract():
"""Download and extract model tar file."""
dest_directory = FLAGS.model_dir
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = DATA_URL.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' % (
filename, float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
print()
statinfo = os.stat(filepath)
print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
tarfile.open(filepath, 'r:gz').extractall(dest_directory)
def main(_):
maybe_download_and_extract()
# Create unique image name
img_name = '/opt/demo/images/pi_image_{0}_{1}.jpg'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))
# Capture Image from Pi Camera
try:
camera = picamera.PiCamera()
camera.resolution = (1024,768)
camera.annotate_text = " Stored with Apache NiFi "
camera.capture(img_name, resize=(600,400))
pass
finally:
camera.close()
# image = (FLAGS.image_file if FLAGS.image_file else
# os.path.join(FLAGS.model_dir, 'cropped_panda.jpg'))
run_inference_on_image(img_name)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# classify_image_graph_def.pb:
# Binary representation of the GraphDef protocol buffer.
# imagenet_synset_to_human_label_map.txt:
# Map from synset ID to a human readable string.
# imagenet_2012_challenge_label_map_proto.pbtxt:
# Text representation of a protocol buffer mapping a label to synset ID.
parser.add_argument(
'--model_dir',
type=str,
default='/tmp/imagenet',
help=""" Path to classify_image_graph_def.pb,
imagenet_synset_to_human_label_map.txt, and
imagenet_2012_challenge_label_map_proto.pbtxt. """
)
parser.add_argument(
'--image_file',
type=str,
default='',
help='Absolute path to image file.'
)
parser.add_argument(
'--num_top_predictions',
type=int,
default=5,
help='Display this many predictions.'
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
<br>
This is the information for getting your own Christmas Tree Hat for your RPI.
https://thepihut.com/products/3d-xmas-tree-for-raspberry-pi
https://thepihut.com/blogs/raspberry-pi-tutorials/3d-xmas-tree-for-raspberry-pi-assembly-instructions
sudo apt-get install python-gpiozero python3-gpiozero
Results of Running
[{"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.175653", "human_string": "pay-phone, pay-station", "node_id": 843}, {"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.0890657", "human_string": "cellular telephone, cellular phone, cellphone, cell, mobile phone", "node_id": 914}, {"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.0631831", "human_string": "vending machine", "node_id": 558}, {"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.0541551", "human_string": "abacus", "node_id": 547}, {"image": "/opt/demo/images/pi_image_bey_20171218140347.jpg", "ts": "2017-12-18 14:03:32", "host": "vid5", "score": "0.0417486", "human_string": "rotisserie", "node_id": 663}]
To Remote Active the Tree
curl -X POST http://192.168.1.167:8033/contentListener --data-ascii "tree-on" -v
It's so easy to enable Apache MIniFi to be controlled by any remote HTTP request.
Other Apache MiniFi Requests
root@vid5:/opt/demo/minifi-0.2.0/logs# curl -v http://HW13125.local:8080/nifi-api/system-diagnostics
* Hostname was NOT found in DNS cache
* Trying 192.168.1.193...
* Connected to HW13125.local (192.168.1.193) port 8080 (#0)
> GET /nifi-api/system-diagnostics HTTP/1.1
> User-Agent: curl/7.38.0
> Host: HW13125.local:8080
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Mon, 18 Dec 2017 13:42:07 GMT
< X-Frame-Options: SAMEORIGIN
< Cache-Control: private, no-cache, no-store, no-transform
< Content-Type: application/json
< Vary: Accept-Encoding
< Vary: User-Agent
< Content-Length: 1852
* Server Jetty(9.4.3.v20170317) is not blacklisted
< Server: Jetty(9.4.3.v20170317)
<
{"systemDiagnostics":{"aggregateSnapshot":{"totalNonHeap":"390.23 MB","totalNonHeapBytes":409190400,"usedNonHeap":"370.09 MB","usedNonHeapBytes":388065576,"freeNonHeap":"20.15 MB","freeNonHeapBytes":21124824,"maxNonHeap":"-1 bytes","maxNonHeapBytes":-1,"totalHeap":"2 GB","totalHeapBytes":2147483648,"usedHeap":"1.77 GB","usedHeapBytes":1904638968,"freeHeap":"231.59 MB","freeHeapBytes":242844680,"maxHeap":"2 GB","maxHeapBytes":2147483648,"heapUtilization":"89.0%","availableProcessors":8,"processorLoadAverage":2.794921875,"totalThreads":105,"daemonThreads":41,"uptime":"43:00:01.210","flowFileRepositoryStorageUsage":{"freeSpace":"55.76 GB","totalSpace":"931.19 GB","usedSpace":"875.43 GB","freeSpaceBytes":59870846976,"totalSpaceBytes":999860912128,"usedSpaceBytes":939990065152,"utilization":"94.0%"},"contentRepositoryStorageUsage":[{"identifier":"default","freeSpace":"55.76 GB","totalSpace":"931.19 GB","usedSpace":"875.43 GB","freeSpaceBytes":59870846976,"totalSpaceBytes":999860912128,"usedSpaceBytes":939990065152* Connection #0 to host HW13125.local left intact
,"utilization":"94.0%"}],"provenanceRepositoryStorageUsage":[{"identifier":"default","freeSpace":"55.76 GB","totalSpace":"931.19 GB","usedSpace":"875.43 GB","freeSpaceBytes":59870846976,"totalSpaceBytes":999860912128,"usedSpaceBytes":939990065152,"utilization":"94.0%"}],"garbageCollection":[{"name":"G1 Young Generation","collectionCount":742,"collectionTime":"00:00:17.754","collectionMillis":17754},{"name":"G1 Old Generation","collectionCount":0,"collectionTime":"00:00:00.000","collectionMillis":0}],"statsLastRefreshed":"08:42:07 EST","versionInfo":{"niFiVersion":"1.5.0-SNAPSHOT","javaVendor":"Oracle Corporation","javaVersion":"1.8.0_121","osName":"Mac OS X","osVersion":"10.13.2","osArchitecture":"x86_64","buildTag":"HEAD","buildRevision":"a774f1d","buildBranch":"master","buildTimestamp":"12/07/2017 13:37:07 EST"}}}}root@vid5:/opt/d
Resources
https://community.hortonworks.com/articles/118132/minifi-capturing-converting-tensorflow-inception-t.html
https://github.com/tspannhw/rpi-minifi-movidius-sensehat
https://github.com/tspannhw/rpi-sensehat-minifi-python
I will keep my eyes out for Raspberry PI add-ons for other holidays.
For the second christmas tree it's a Sense Hat!
1. Setup Raspian Stretch
https://www.raspberrypi.org/documentation/configuration/wireless/wireless-cli.md
2. Sense-Hat
sudo apt-get install sense-hat
sudo apt-get install octave -y
pip install --upgrade sense-hat
pip install --upgrade pillow
pip install rtimulib
pip install psutil
sudo apt-get install oracle-java8-jdk
sudo apt install gstreamer-1.0
sudo apt install python3-gst-1.0
sudo apt-get install gir1.2-gstreamer-1.0
sudo apt-get install gir1.2-gst-plugins-base-1.0
For the Sense Hat
Just run this: https://github.com/PixelNoob/sensehat/blob/master/xmas_tree.py
... View more
Labels:
12-23-2017
06:48 PM
5 Kudos
2017 in Review First off, this was an amazing year for Big Data, IoT, Streaming, Machine Learning and Deep Learning. So many cool events, updates, new products, new projects, new libraries and community growth. I've seen a lot of people adopt and grow Big Data and streaming projects from nothing. Using the power of Open Source and the tools made available by Apache, companies are growing with the help of trusted partners and a community of engineers and users. We had three awesome DataWorksSummit (Formerly Hadoop Summit, but now a lot more things from IoT, AI and Streaming). I attended Munich and spoke at Sydney. I missed California, but all the videos and slides were online and I loved those. I spoke at Oracle Code in NYC which was a fun little event. I was surprised to learn that many people never heard of Apache NiFi or how easily you could use it to build real-time dataflows including Deep Learning and Big Data. I got to talk to a lot of interesting people while working the Hortonworks Booth at Strata NYC. Such a huge event, fidget spinners and streaming were the main talk away there. We had a lot of awesome meetups in Princeton and in the NYC and Philadelphia areas. The Princeton Future of Data Group grew to over 750 members! A great community of data scientists, engineers, students, analysts, techies and business thought leaders. I am really proud to be apart of this amazing group. Meetups I got to speak at most of the meetups except when we had special guests. I had some great NY/NJ/Philly team mates co-running the meetup: @milind pandit @Greg Keys. Greg and I also created a North Jersey meetup. November 14th - Enterprise Data at Scale I spoke on IBM DSX, Apache NiFi, Apache Spark, Python, Jupyter and Data Science. We had two excellent IBM resources assisting me fortunately. October 5th - Deep Learning with DeepLearning4J (DL4J). A great talk by my friend from SkyMind. It's nice to see their project get accepted to Eclipse. August 8th - Deep Dive into HDF 3.0 @ Honeywell June 20th - Latest Innovation -Schema Registry and More. @TRAC Intermodal May 16th - Hadoop Tools Overview March 28th - Apache NiFi: Ingesting Enterprise Data at Scale Libraries, SDKs, Tools, Frameworks TensorFlow Apache MXNet NLTK Apache OpenNLP Apache Tika Apache NiFi Custom Processors OpenCV Apache NiFi 1.4 Apache Zeppelin IBM DSX Apache Spark 2.x Apache Hive LLAP Apache HBase with Apache Phoenix Apache ORC Apache Hadoop Hortonworks Schema Registry Hortonworks Streaming Analytics Manager Druid Apache SuperSet - Now in Apache PyTorch Apache Storm - Big Updates Devices Raspberry Pi Zero Wireless Raspberry Pi 3B+ Movidius Nvidia Jetson TX1 Matrix Creator Google AIY Voice Kit Kudrone Christmas Tree Hat Sense Hat Many Cameras and Video Cameras NanoPi Duo Tinkerboard There were a lot of big news this year, https://hortonworks.com/blog/top-hortonworks-blogs-2017/. Apache Hive LLAP became a real production thing and brought Apache Hadoop into the world of EDW completely Open Source. On the Apache Spark front, we past verison 2.0 and Livy became a production standby and became Apache Livy. The JanusGraph database appeared and is quickly becoming the standard for Graphs. Apache Calcite went into so many projects that SQL queries are everywhere including in Apache NiFi. A huge number of interesting software projects arrised including Hortonworks Data Plane, Hortonworks Schema Registry and Hortonworks Streaming Analytics Manager. This was an awesome year for software. Presentations From Talks Available
https://www.slideshare.net/bunkertor/enterprise-data-science-at-scale-princeton-nj-14nov2017 https://www.slideshare.net/bunkertor/realtime-ingesting-and-transforming-sensor-data-social-data-w-nifi-tensorflow https://www.slideshare.net/bunkertor/introduction-to-hdf-30 https://www.slideshare.net/bunkertor/introduction-to-hadoop-76031567 https://www.slideshare.net/bunkertor/apache-nifi-ingesting-enterprise-data-at-scale https://www.slideshare.net/bunkertor/ingesting-drone-data-into-big-data-platforms My HCC Articles of 2017
https://community.hortonworks.com/articles/80412/working-with-airbnbs-superset.html https://community.hortonworks.com/articles/116803/building-a-custom-processor-in-apache-nifi-12-for.html https://community.hortonworks.com/articles/79842/ingesting-osquery-into-apache-phoenix-using-apache.html https://community.hortonworks.com/articles/97062/query-hive-using-python.html https://community.hortonworks.com/articles/79008/using-the-hadoop-attack-library-to-check-your-hado.html https://community.hortonworks.com/articles/81222/adding-stanford-corenlp-to-big-data-pipelines-apac.html https://community.hortonworks.com/articles/81270/adding-stanford-corenlp-to-big-data-pipelines-apac-1.html https://community.hortonworks.com/articles/88404/adding-and-using-hplsql-and-hivemall-with-hive-mac.html https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo.html https://community.hortonworks.com/articles/87632/ingesting-sql-server-tables-into-hive-via-apache-n.html https://community.hortonworks.com/articles/73828/submitting-spark-jobs-from-apache-nifi-using-livy.html https://community.hortonworks.com/articles/76240/using-opennlp-for-identifying-names-from-text.html https://community.hortonworks.com/articles/136024/integrating-nvidia-jetson-tx1-running-tensorrt-int.html https://community.hortonworks.com/articles/136026/integrating-nvidia-jetson-tx1-running-tensorrt-int-1.html https://community.hortonworks.com/articles/136028/integrating-nvidia-jetson-tx1-running-tensorrt-int-2.html https://community.hortonworks.com/articles/136039/integrating-nvidia-jetson-tx1-running-tensorrt-int-3.html https://community.hortonworks.com/articles/150026/hl7-processing-part-3-apache-zeppelin-sql-bi-and-a.html https://community.hortonworks.com/articles/104226/simple-backups-of-hadoop-with-apache-nifi-12.html https://community.hortonworks.com/articles/77609/securing-your-clusters-in-the-public-cloud.html https://community.hortonworks.com/articles/92495/monitor-apache-nifi-with-apache-nifi.html https://community.hortonworks.com/articles/77621/creating-an-email-bot-in-apache-nifi.html https://community.hortonworks.com/articles/80418/open-nlp-example-apache-nifi-processor.html https://community.hortonworks.com/articles/76924/data-processing-pipeline-parsing-pdfs-and-identify.html https://community.hortonworks.com/articles/86801/working-with-s3-compatible-data-stores-via-apache.html https://community.hortonworks.com/articles/101904/part-2-iot-augmenting-gps-data-with-weather.html https://community.hortonworks.com/articles/118148/creating-wordclouds-from-dataflows-with-apache-nif.html https://community.hortonworks.com/articles/121916/controlling-big-data-flows-with-gestures-minifi-ni.html https://community.hortonworks.com/articles/76935/using-sentiment-analysis-and-nlp-tools-with-hdp-25.html https://community.hortonworks.com/articles/87397/steganography-with-apache-nifi-1.html https://community.hortonworks.com/articles/83100/deep-learning-iot-workflows-with-raspberry-pi-mqtt.html https://community.hortonworks.com/articles/154957/converting-json-to-sql-ddl.html https://community.hortonworks.com/articles/81694/extracttext-nifi-custom-processor-powered-by-apach.html https://community.hortonworks.com/articles/92345/store-a-flow-to-disk-and-then-reserialize-it-to-co.html https://community.hortonworks.com/articles/92496/qadcdc-our-how-to-ingest-some-database-tables-to-h.html https://community.hortonworks.com/articles/73811/trigger-sonicpi-music-via-apache-nifi.html https://community.hortonworks.com/articles/99861/ingesting-ibeacon-data-via-ble-to-mqtt-wifi-gatewa.html https://community.hortonworks.com/articles/101679/iot-ingesting-gps-data-from-raspberry-pi-zero-wire.html https://community.hortonworks.com/articles/104255/ingesting-and-testing-jms-data-with-nifi.html https://community.hortonworks.com/articles/89455/ingesting-gps-data-from-onion-omega2-devices-with.html https://community.hortonworks.com/articles/89547/tracking-phone-location-for-android-and-iot-with-o.html https://community.hortonworks.com/articles/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html https://community.hortonworks.com/articles/108966/minifi-for-sensor-data-ingest-from-devices.html https://community.hortonworks.com/articles/110469/simple-backup-and-restore-of-hdfs-data-via-hdf-30.html https://community.hortonworks.com/articles/110475/ingesting-sensor-data-from-raspberry-pis-running-r.html https://community.hortonworks.com/articles/118132/minifi-capturing-converting-tensorflow-inception-t.html https://community.hortonworks.com/articles/122077/ingesting-csv-data-and-pushing-it-as-avro-to-kafka.html https://community.hortonworks.com/articles/130814/sensors-and-image-capture-and-deep-learning-analys.html https://community.hortonworks.com/articles/86570/hosting-and-ingesting-data-from-web-pages-desktop.html https://community.hortonworks.com/articles/142686/real-time-ingesting-and-transforming-sensor-and-so.html https://community.hortonworks.com/articles/77988/ingest-remote-camera-images-from-raspberry-pi-via.html https://community.hortonworks.com/articles/108718/ingesting-rdbms-data-as-new-tables-arrive-automagi.html https://community.hortonworks.com/articles/149982/hl7-ingest-part-4-streaming-analytics-manager-and.html https://community.hortonworks.com/articles/149910/handling-hl7-records-part-1-hl7-ingest.html https://community.hortonworks.com/articles/80339/iot-capturing-photos-and-analyzing-the-image-with.html https://community.hortonworks.com/articles/77403/basic-image-processing-and-linux-utilities-as-part.html https://community.hortonworks.com/articles/103863/using-an-asus-tinkerboard-with-tensorflow-and-pyth.html https://community.hortonworks.com/articles/146704/edge-analytics-with-nvidia-jetson-tx1-running-apac.html https://community.hortonworks.com/articles/148730/integrating-apache-spark-2x-jobs-with-apache-nifi.html https://community.hortonworks.com/articles/154760/generating-avro-schemas-and-ensuring-field-names-m.html https://community.hortonworks.com/articles/155326/monitoring-energy-usage-utilizing-apache-nifi-pyth.html My Articles on DZone
https://dzone.com/articles/generating-avro-schemas-and-ensuring-field-names-m https://dzone.com/articles/favorite-tech-of-the-year-early-edition https://dzone.com/articles/integrating-apache-spark-2x-jobs-with-apache-nifi https://dzone.com/articles/using-jolt-in-big-data-streams-to-remove-nulls https://dzone.com/articles/processing-hl7-records https://dzone.com/articles/big-data-is-growing https://dzone.com/articles/ingesting-rdbms-data-as-new-tables-arrive-automagi https://dzone.com/articles/using-websockets-with-apache-nifi https://dzone.com/articles/using-the-new-flick-hat-for-raspberry-pi https://dzone.com/articles/real-time-ingest-and-ai https://dzone.com/articles/tensorflow-for-real-world-applications https://dzone.com/articles/integrating-nvidia-jetson-tx1-running-tensorrt-int https://dzone.com/articles/real-time-tensorflow-camera-analysis-with-sensors https://dzone.com/articles/tensorflow-and-nifi-big-data-ai-sandwich https://dzone.com/articles/minifi-capturing-converting-tensorflow-inception-t https://dzone.com/articles/creating-wordclouds-from-dataflows-with-apache-nif https://dzone.com/articles/building-a-custom-processor-in-apache-nifi-12-for https://dzone.com/articles/data-engineer-as-dj https://dzone.com/articles/how-to-automatically-migrate-all-tables-from-a-dat https://dzone.com/articles/dataworks-summit-2017-sj-updates https://dzone.com/articles/hdf-30-for-utilities https://dzone.com/articles/hdp-26-what-why-how-and-now https://dzone.com/articles/using-apache-minifi-on-edge-devices-part-1 https://dzone.com/articles/creating-an-email-bot-in-apache-nifi https://dzone.com/articles/this-week-in-hadoop-and-more-deep-deep-learning-an https://dzone.com/articles/using-python-for-big-data-workloads-part-2 https://dzone.com/articles/using-tinkerboard-with-tensorflow-and-python https://dzone.com/articles/using-python-for-big-data-workloads-part-1 https://dzone.com/articles/part-2-iot-augmenting-gps-data-with-weather https://dzone.com/articles/this-week-in-hadoop-and-more-apache-calcite-kylin https://dzone.com/articles/iot-ingesting-gps-data-from-raspberry-pi-zero-wire https://dzone.com/articles/a-new-era-of-open-source-streaming https://dzone.com/articles/day-1-dataworks-summit-munich-report https://dzone.com/articles/this-week-in-hadoop-and-more-dl-conferences-course https://dzone.com/articles/advanced-apache-nifi-flow-techniques https://dzone.com/articles/a-big-data-reference-architecture-for-iot https://dzone.com/articles/ingesting-gps-data-from-onion-omega2-devices-with-apache-nifi https://dzone.com/articles/sentiment-shoot-out https://dzone.com/articles/best-of-dataworks-summit-2017-munich https://dzone.com/articles/deep-learning-on-big-data-platforms https://dzone.com/articles/tensorflow-on-the-edge-part-2-of-5 https://dzone.com/articles/this-week-in-hadoop-and-more-nifi-drones-dataworks https://dzone.com/articles/oracle-code-new-york-report https://dzone.com/articles/deep-learning-for-data-engineers-part-1 https://dzone.com/articles/this-week-in-hadoop-and-more-keras-deep-learning-a https://dzone.com/articles/happy-pi-day-2017 https://dzone.com/articles/deep-learning-and-machine-learning-guide-part-iii https://dzone.com/articles/this-week-in-hadoop-and-more-deep-and-machine-lear https://dzone.com/articles/backup-restore-dr https://dzone.com/articles/big-data-performance-part-1 https://dzone.com/articles/nifi-spark-hbase-kafka-machine-learning-and-deep-l https://dzone.com/articles/hadoop-101-hbase-client-access https://dzone.com/articles/deep-learning-and-machine-learning-guide-part-ii https://dzone.com/articles/this-week-in-hadoop-and-more-cloud-visualization-d https://dzone.com/articles/big-data-ml-dl-command-line-tools https://dzone.com/articles/machine-learning-resources https://dzone.com/articles/tensorflow-on-the-edge https://dzone.com/articles/deep-learning-and-machine-learning-killer-tools-li https://dzone.com/articles/cool-projects-big-data-machine-learning-apache-nifi https://dzone.com/articles/protect-your-cloud-big-data-assets https://dzone.com/articles/edge-testing-your-hadoop-environment https://dzone.com/articles/this-week-in-hadoop-and-more-6 https://dzone.com/articles/picamera-ingest-real-time https://dzone.com/articles/this-week-in-hadoop-and-more-nlp-and-dl https://dzone.com/articles/quick-tips-apache-phoenixhbase https://dzone.com/articles/the-physics-of-big-data My RefCard
https://dzone.com/refcardz/introduction-to-tensorflow My Guide https://dzone.com/guides/artificial-intelligence-machine-learning-and-predi My Github Source Code I have some example Apache NiFi custom processors developed in JDK 8 including ones for TensorFlow, OpenNLP, DL4J, Apache Tika, Stanford CoreNLP and more. I also published all the Python scripts, documentation, Shell scripts, SQL, Apache NiFi Templates and Apache Zeppelin notebooks as Apache licensed open source on Github.
https://github.com/tspannhw/nifi-tensorflow-processor https://github.com/tspannhw/nifi-nlp-processor https://github.com/tspannhw/nifi-attributecleaner-processor https://github.com/tspannhw/apachelivy-nifi-spark2-integration https://github.com/tspannhw/nvidiajetsontx1-mxnet https://github.com/tspannhw/nifi-dl4j-processor
https://github.com/tspannhw/dws2017sydney https://github.com/tspannhw/rpi-flickhat-minifi https://github.com/tspannhw/rpi-rainbowhat https://github.com/tspannhw/rpi-sensehat-minifi-python https://github.com/tspannhw/rpizw-nifi-mqtt-gps https://github.com/tspannhw/EnterpriseNIFI https://github.com/tspannhw/IngestingDroneData https://github.com/tspannhw/spy https://github.com/tspannhw/webdataingest https://github.com/tspannhw/mxnet_rpi https://github.com/tspannhw/nifi-extracttext-processor https://github.com/tspannhw/nifi-corenlp-processor https://github.com/tspannhw/nlp-utilities https://github.com/tspannhw/rpi-sensehat-mqtt-nifi https://github.com/tspannhw/rpi-picamera-mqtt-nifi https://github.com/tspannhw/iot-scripts https://github.com/tspannhw/phoenix https://github.com/tspannhw/hive Next year will be amazing, more libraries, more use cases for Deep Learning, enhancements to all the great projects and tools out there. Another Google AIY Kit, more DataWorks Summits, Hadoop 3, HDF 4, HDP 3, so many things to look forward to. See you at meetups, summits and online next year.
... View more
12-22-2017
08:49 PM
5 Kudos
In the Holidays, it's nice to know how much energy you are using. So one small step is I bought a low-end inexpensive TPLink Energy Monitoring plug for one device. I have been monitoring phone charging and my Apple monitor. Let's read the data and do some queries in Apache Hive and Apache Spark 2 SQL. Processing Live Energy Feeds in The Cloud Monitor Energy From a Local OSX If your local instance does not have access to Apache Hive, you will need to send the data via Site-to-Site to a Remote Apache NiFi / HDF server/cluster that can. For Apache Hive Usage, Please Convert to Apache ORC Files To Create Your New Table, Grab the hive.ddl Inside of Apache Zeppelin, we can create our table based on the above DDL. We could have also let Apache NiFi create the table for us. I like to keep my DDL with my notebook. Just a personal choice. We can then query our table in Apache Zeppelin utilizing Apache Spark 2 SQL and Apache Hive QL. Overview Step 1: Purchase an inexpensive energy monitoring plug Step 2: Connect it to a Phone App via WIFI Step 3: Once Configured, you can now access via Python Step 4: Install the HS100 Python Library in Python 3.x Step 5: Fork My Github and Use My Shell Script and Python Script Step 6: Add the Local Apache NiFi Flow which will call that Script Step 7: Add a Remote Apache NiFi Flow for Processing into Apache Hadoop Step 8: Create Your Table Step 9: Query with Apache Hive and Apache Spark SQL via Apache Zeppelin or Other UI Step 10: Turn that extra stuff off and save money! The Open Source Code and Artefacts Shell Script (smartreader.sh) python3 meterreader.py Python Code (meterreader.py) from pyHS100 import SmartPlug, SmartBulb
#from pprint import pformat as pf
import json
import datetime
plug = SmartPlug("192.168.1.200")
row = { }
emeterdaily = plug.get_emeter_daily(year=2017, month=12)
for k, v in emeterdaily.items():
row["hour%s" % k] = v
hwinfo = plug.hw_info
for k, v in hwinfo.items():
row["%s" % k] = v
sysinfo = plug.get_sysinfo()
for k, v in sysinfo.items():
row["%s" % k] = v
timezone = plug.timezone
for k, v in timezone.items():
row["%s" % k] = v
emetermonthly = plug.get_emeter_monthly(year=2017)
for k, v in emetermonthly.items():
row["day%s" % k] = v
realtime = plug.get_emeter_realtime()
for k, v in realtime.items():
row["%s" % k] = v
row['alias'] = plug.alias
row['time'] = plug.time.strftime('%m/%d/%Y %H:%M:%S')
row['ledon'] = plug.led
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
json_string = json.dumps(row)
print(json_string) The code is basically a small tweak on the example code provided with the pyHS100 code. This code allows you to access the HS110 that I have. My PC and my smart meter are on the same WiFi which can't be 5G. Example Data {"hour19": 0.036, "hour20": 0.021, "hour21": 0.017, "sw_ver": "1.1.1 Build 160725 Rel.164033", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "060BFEA28A8CD1E67146EB5B2B599CC8", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim Spann's MiniFi Controller SmartPlug - Desk1", "icon_hash": "", "relay_state": 1, "on_time": 161599, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -32, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "day12": 0.074, "current": 0.04011, "voltage": 122.460974, "power": 1.8772, "total": 0.074, "time": "12/21/2017 13:21:52", "ledon": true, "systemtime": "12/21/2017 13:21:53"} As you can see we only get the hours and days where we had usage. Since this is new, I don't have them all. I created my schema to handle all the days of a month and all the hours of a day. We are going to have a sparse table. If I was monitoring millions of devices, I would put this in Apache HBase. I may do that later. Let's create an HDFS directory for Loading Apache ORC Files hdfs dfs -mkdir -p /smartPlug
hdfs dfs -chmod -R 777 /smartPlug Table DDL CREATE EXTERNAL TABLE IF NOT EXISTS smartPlug (hour19 DOUBLE, hour20 DOUBLE, hour21 DOUBLE, hour22 DOUBLE, hour23 DOUBLE, hour18 DOUBLE, hour17 DOUBLE, hour16 DOUBLE, hour15 DOUBLE, hour14 DOUBLE, hour13 DOUBLE, hour12 DOUBLE, hour11 DOUBLE, hour10 DOUBLE, hour9 DOUBLE, hour8 DOUBLE, hour7 DOUBLE, hour6 DOUBLE, hour5 DOUBLE, hour4 DOUBLE, hour3 DOUBLE, hour2 DOUBLE, hour1 DOUBLE, hour0 DOUBLE, sw_ver STRING, hw_ver STRING, mac STRING, type STRING, hwId STRING, fwId STRING, oemId STRING, dev_name STRING, model STRING, deviceId STRING, alias STRING, icon_hash STRING, relay_state INT, on_time INT, feature STRING, updating INT, rssi INT, led_off INT, latitude DOUBLE, longitude DOUBLE, index INT, zone_str STRING, tz_str STRING, dst_offset INT, day31 DOUBLE, day30 DOUBLE, day29 DOUBLE, day28 DOUBLE, day27 DOUBLE, day26 DOUBLE, day25 DOUBLE, day24 DOUBLE, day23 DOUBLE, day22 DOUBLE, day21 DOUBLE, day20 DOUBLE, day19 DOUBLE, day18 DOUBLE, day17 DOUBLE, day16 DOUBLE, day15 DOUBLE, day14 DOUBLE, day13 DOUBLE, day12 DOUBLE, day11 DOUBLE, day10 DOUBLE, day9 DOUBLE, day8 DOUBLE, day7 DOUBLE, day6 DOUBLE, day5 DOUBLE, day4 DOUBLE, day3 DOUBLE, day2 DOUBLE, day1 DOUBLE, current DOUBLE, voltage DOUBLE, power DOUBLE, total DOUBLE, time STRING, ledon BOOLEAN, systemtime STRING) STORED AS ORC
LOCATION '/smartPlug' A Simple Query on Some of the Variables select `current`,voltage, power,total,time,systemtime, on_time, rssi, latitude, longitude
from smartPlug Note that current is a special word in SQL so we tick it. An Apache Calcite Query Inside Apache NiFi SELECT * FROM FLOWFILE WHERE "current" > 0 With the Python API I can turn it off, so don't monitor then. In an updated article I will add a few smart plugs and turn them on and off based on things occurring. Perhaps turn off a light when no motion detected. We can do anything with Apache NiFi, Apache MiniFi and Python. The API also allows for turning the green LED light on the plug on and off. The Screen Prints above are from the IoS version of the TPLink KASA app, which let's you configure and monitor your plug. For many people that's good enough, but not for me. Resources smartplugprocessing.xml monitorpowerlocal.xml https://github.com/GadgetReactor/pyHS100 https://pypi.python.org/pypi/pyHS100 pip3 install pyhs100 https://github.com/tspannhw/nifi-smartplug/tree/master
... View more
Labels:
12-11-2017
03:02 PM
2 Kudos
Building schemas is tedious work and fraught with errors. The InferAvroSchema processor can get you started. It generates a compliant schema for use. There is one caveat, you have to make sure you are using Apache Avro safe field names. I have a custom processor that will clean your attributes if you need them Avro-safe. See processor listed below. Example Flow Utilizing InferAvroSchema InferAvroSchema Details Step 0: Use Apache NiFi to Convert Data to JSON or CSV Step 1: Send JSON or CSV Data to InferAvroSchema I recommend setting output destination to flowfile-attribute, input content type to json, pretty avro output to true. Step 2: The New schema is now in attribute: inferred.avro.schema. inferred.avro.schema
{ "type" : "record", "name" : "schema1", "fields" : [ { "name" : "table", "type" : "string", "doc" : "Type inferred from '\"schema1.tableName\"'" } ] } This schema can then be used for conversions directly or stored in Hortonworks Schema Registry or Apache NiFi Built-in Avro Registry. Now you can use it for ConvertRecord, QueryRecord and other Record processing. Example Generated Schema in Avro-JSON Format Stored in Hortonworks Schema Registry: Source: https://github.com/tspannhw/nifi-attributecleaner-processor
... View more
Labels:
12-06-2017
06:29 PM
Thanks @bkosaraju Strict Host Key Checking is set to False. If with target hosts, you mean, the SFTP server (our destination) then it is not in the list of /.ssh/known_hosts) Just tried manual SFTP again and this time received, Connection Reset by Peer. Checking SFTP side in case our Node IP is blacklisted (possibly due to Max Failure condition.) Update -- 12-07-2017 -- The IP for the NIFI Node was blacklisted on the SFTP Side. We've whitelisted the IP and the processor is now functional. Thanks.
... View more
12-06-2017
05:16 PM
Yes continuously, automatically. By default it polls for new files every 60 seconds, you can shrink that. You can also convert those files to Apache ORC and auto build new Hive tables on them if the files are CSV, TSV, Avro, Excel, JSON, XML, EDI, HL7 or C-CDA. Install Apache NiFi on an edge node, there are ways to combine them with HDP 2.6 and HDF 3 with the new Ambari. But it's easiest to have a separate node for Apache NiFi to start. You can also just download nifi unzip and run on a laptop that has JDK 8 installed https://www.apache.org/dyn/closer.lua?path=/nifi/1.4.0/nifi-1.4.0-bin.zip
... View more
12-05-2017
12:30 AM
1 Kudo
Overview We create Apache Hive tables for analytics automagically from DDL generated by Apache NiFi. Hortonworks SAM Run Time The results of this produces HBase and Druid analytics. SAM has produced two Druid Datasources that we can slice up and explore in superset. In Apache Zeppelin, we can run our Hive DDL (listed below) and run all of our queries including joining our two patient data tables. We can sort and create some basic graphs. All of this data is available to you in your favorite BI tools. Generated Apache Hive Tables from ORC Files (DDL) CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING,
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING)
STORED AS ORC LOCATION '/patientdata';
CREATE EXTERNAL TABLE IF NOT EXISTS ccda (problemSectionact_02observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_05idsroot STRING, problemSectionact_02observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_01texttext_01value STRING, vitalSignsSectionorganizerobservations_04texttext_01value STRING, vitalSignsSectionorganizercodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesvalue STRING, vitalSignsSectionorganizerobservations_04valuesvalue STRING, problemSectionact_03observationidroot STRING, problemSectionact_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05effectiveTimevalue STRING, RouteOnAttributeRoute STRING, vitalSignsSectionorganizerobservations_03codecode STRING, vitalSignsSectionorganizerobservations_04statusCodecode STRING, problemSectionidroot STRING, codecode STRING, problemSectionact_01effectiveTimelow STRING, problemSectioncodecodeSystemName STRING, codedisplayName STRING, problemSectionact_01observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_01idsroot STRING, vitalSignsSectionorganizerobservations_02idsroot STRING, vitalSignsSectionorganizerobservations_04idsroot STRING, vitalSignsSectionorganizerobservations_03idsroot STRING, problemSectionact_01observationeffectiveTimelow STRING, filecreationTime STRING, problemSectionact_01observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationvaluestranslationscode STRING, problemSectionact_03statusCodecode STRING, problemSectionact_03observationvaluestranslationscodeSystem STRING, problemSectionact_02idroot STRING, problemSectionact_03codecode STRING, problemSectionact_01observationidroot STRING, problemSectionact_02observationvaluestranslationscodeSystem STRING, problemSectionact_03observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectionorganizercodecode STRING, vitalSignsSectionorganizerobservations_02statusCodecode STRING, problemSectionact_03observationvaluestranslationsdisplayName STRING, vitalSignsSectionorganizerobservations_03codecodeSystem STRING, problemSectionact_03observationproblemStatuscodecode STRING, problemSectionact_01observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_04textreferencevalue STRING, filelastAccessTime STRING, vitalSignsSectionorganizerobservations_01codedisplayName STRING, filesize STRING, problemSectioncodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesunit STRING, vitalSignsSectionorganizerobservations_02effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_05idsextension STRING, vitalSignsSectionorganizerobservations_04codecode STRING, vitalSignsSectionorganizerobservations_05valuesvalue STRING, vitalSignsSectionorganizerobservations_04idsextension STRING, vitalSignsSectionorganizerobservations_02valuesvalue STRING, problemSectionact_02observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_02idsextension STRING, vitalSignsSectionorganizerobservations_03idsextension STRING, vitalSignsSectionorganizerobservations_01idsextension STRING, problemSectiontitle STRING, vitalSignsSectionorganizerobservations_01codecodeSystemName STRING, problemSectionact_03observationvaluestranslationsoriginalTextreferencevalue STRING, vitalSignsSectionorganizerobservations_04valuesunit STRING, problemSectionact_02idextension STRING, vitalSignsSectionorganizerobservations_05statusCodecode STRING, vitalSignsSectionorganizerobservations_04effectiveTimevalue STRING, problemSectionact_02observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesvalue STRING, vitalSignsSectionorganizereffectiveTimevalue STRING, problemSectionact_03observationvaluestranslationscode STRING, vitalSignsSectionorganizerobservations_03codedisplayName STRING, vitalSignsSectionorganizerobservations_02texttext_01value STRING, vitalSignsSectionorganizerobservations_05texttext_01value STRING, absolutepath STRING, vitalSignsSectioncodedisplayName STRING, problemSectionact_03idextension STRING, problemSectionact_01observationvaluestranslationsoriginalTextreferencevalue STRING, problemSectionact_02observationvaluestranslationsoriginalTextreferencevalue STRING, filelastModifiedTime STRING, problemSectioncodecode STRING, vitalSignsSectionorganizeridsroot STRING, problemSectionact_02observationproblemStatuscodecodeSystem STRING, vitalSignsSectionorganizerobservations_05codecodeSystem STRING, filegroup STRING, problemSectionact_01observationproblemStatusvaluescode STRING, problemSectionact_01observationvaluestranslationsdisplayName STRING, problemSectionact_02codecode STRING, idextension STRING, vitalSignsSectioncodecode STRING, problemSectionact_03observationproblemStatuscodecodeSystemName STRING, problemSectionact_01idroot STRING, vitalSignsSectiontitle STRING, problemSectionact_01observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesunit STRING, vitalSignsSectionorganizerobservations_01textreferencevalue STRING, effectiveTime STRING, vitalSignsSectionorganizerobservations_03codecodeSystemName STRING, problemSectionact_03observationstatusCodecode STRING, problemSectionact_02statusCodecode STRING, problemSectionact_02observationidextension STRING, problemSectionact_01idextension STRING, vitalSignsSectionorganizerstatusCodecode STRING, vitalSignsSectionorganizerobservations_05codedisplayName STRING, vitalSignsSectionorganizerobservations_04codecodeSystem STRING, vitalSignsSectionorganizerobservations_02codedisplayName STRING, problemSectionact_01observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_05codecode STRING, vitalSignsSectionorganizerobservations_04codecodeSystemName STRING, problemSectionact_02observationvaluestranslationsdisplayName STRING, idroot STRING, vitalSignsSectionorganizerobservations_02textreferencevalue STRING, problemSectionact_01observationidextension STRING, problemSectionact_01observationvaluestranslationscodeSystem STRING, problemSectionact_01codecode STRING, problemSectionact_02observationproblemStatusvaluesdisplayName STRING, problemSectionact_01codecodeSystem STRING, codecodeSystemName STRING, vitalSignsSectionorganizerobservations_01effectiveTimevalue STRING, vitalSignsSectionorganizercodedisplayName STRING, vitalSignsSectionorganizerobservations_02codecodeSystemName STRING, vitalSignsSectionorganizerobservations_03textreferencevalue STRING, vitalSignsSectionorganizerobservations_02valuesunit STRING, problemSectionact_03observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationproblemStatuscodecode STRING, vitalSignsSectionorganizerobservations_03statusCodecode STRING, problemSectionact_03observationproblemStatusvaluescode STRING, problemSectionact_02observationeffectiveTimelow STRING, problemSectionact_03observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03texttext_01value STRING, problemSectionact_01observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizercodecodeSystemName STRING, problemSectionact_03observationidextension STRING, vitalSignsSectionorganizerobservations_01codecode STRING, codecodeSystem STRING, problemSectionact_02effectiveTimelow STRING, problemSectioncodedisplayName STRING, problemSectionact_02observationproblemStatusvaluescode STRING, vitalSignsSectionorganizeridsextension STRING, problemSectionact_02observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_02codecode STRING, title STRING, problemSectionact_03idroot STRING, problemSectionidextension STRING, problemSectionact_03observationproblemStatusstatusCodecode STRING, problemSectionact_03effectiveTimelow STRING, problemSectionact_02observationproblemStatusvaluescodeSystemName STRING, fileowner STRING, vitalSignsSectionorganizerobservations_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05textreferencevalue STRING, filepermissions STRING, vitalSignsSectionorganizerobservations_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05valuesunit STRING, problemSectionact_01observationvaluestranslationscode STRING, problemSectionact_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05codecodeSystemName STRING, problemSectionact_03codecodeSystem STRING, vitalSignsSectioncodecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectioncodecodeSystemName STRING, problemSectionact_01observationproblemStatuscodecode STRING, problemSectionact_02observationidroot STRING, vitalSignsSectionorganizerobservations_01codecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_03effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_04codedisplayName STRING, problemSectionact_03observationeffectiveTimelow STRING) STORED AS ORC LOCATION '/ccda';
CREATE EXTERNAL TABLE IF NOT EXISTS hl7_oru_flat (OBX_1_UserDefinedAccessChecks STRING, OBR_1_OrderingProvider_FamilyName STRING, MSH_MessageControlID STRING, OBX_1_ObservationIdentifier_Text STRING, MSH_SendingApplication_NamespaceID STRING, OBR_1_UniversalServiceIdentifier_Text STRING, MSH_ReceivingApplication_NamespaceID STRING, MSH_ProcessingID_ProcessingID STRING, PID_SSNNumberPatient STRING, OBR_1_FillerOrderNumber_EntityIdentifier STRING, PID_PatientAccountNumber_ID STRING, PID_DateOfBirth STRING, PD1_PatientPrimaryCareProviderNameIDNo_IDNumber STRING, PID_Sex STRING, MSH_MessageType_MessageType STRING, OBX_1_ReferencesRange STRING, OBR_1_OrderingProvider_IDNumber STRING, PD1_PatientPrimaryCareProviderNameIDNo_FamilyName STRING, OBX_1_Units_NameOfCodingSystem STRING, OBX_1_Units_Identifier STRING, PID_PatientName_GivenName STRING, OBX_1_ObservationSubID STRING, PD1_PatientPrimaryCareProviderNameIDNo_GivenName STRING, OBR_1_PlacerOrderNumber_NamespaceID STRING, MSH_MessageType_TriggerEvent STRING, PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority STRING, OBR_1_ResultStatus STRING, PID_PatientName_FamilyName STRING, MSH_EncodingCharacters STRING, MSH_VersionID STRING, OBR_1_UniversalServiceIdentifier_Identifier STRING, OBR_1_ObservationDateTime STRING, OBR_1_ScheduledDateTime STRING, OBX_1_ObservationIdentifier_Identifier STRING, OBR_1_OrderingProvider_GivenName STRING, OBR_1_SetIDObservationRequest STRING, OBR_1_ResultsRptStatusChngDateTime STRING, OBR_1_PlacerOrderNumber_EntityIdentifier STRING, OBX_1_NatureOfAbnormalTest STRING, OBX_1_SetIDOBX STRING, MSH_FieldSeparator STRING, PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName STRING, OBX_1_Units_Text STRING, OBX_1_ValueType STRING, PID_PatientIDInternalID_ID STRING, OBX_1_ObservationValue STRING, OBR_1_OrderingProvider_MiddleInitialOrName STRING) STORED AS ORC LOCATION '/hl7/flat/oru';
Apache Hive Queries to Run in Apache Zeppelin or Anywhere via JDBC/ODBC select * from patientdata;
select * from ccda;
select vitalsignssectionorganizerobservations_01valuesvalue, vitalsignssectionorganizerobservations_04valuesvalue,vitalsignssectionorganizerobservations_01texttext_01value
from ccda;
select * from hl7_oru_flat;
select * from hl7_oru_flat h join patientdata p on h.PID_SSNNumberPatient = p.PID_SSNNumberPatient
Zeppelin Notebook medicalzeppelinjs.txt
... View more
Labels:
12-04-2017
04:12 PM
2 Kudos
Overview In this section of HL7 processing, we look at what to do next with HL7 data now that it has been converted into Big Data friendly AVRO with a schema attached. We can easily combine that data with our other patientdata that came from MQTT. We do a simple JOIN on SSN and send this combined data to Kafka with a new schema and to Druid for analytics. We also Route some of the data to HDFS and HBase for storage and another feed to Druid for analytics. S.A.M. Flow Overview I have a Simple Hortonworks Streaming Analytics Manager application that ingests two Kafka feeds (one for PatientData and one for HL7 Converted to AVRO by Apache NiFi). I join those two records together on a key and push them to Kafka and to Druid. Our Environment For Running SAM, Just Click what you need I added Druid, Apache HBase, HDFS, Apache Hive, Apache Kafka, Apache Storm and Apache Zookeeper. I am using all except Apache Hive. We can merge together the Kafka feeds sent from Apache NiFi. One feed is of patient data that started as JSON data sent via MQTT from a remote node. The other feed is from another system that sent it via Kafka to Apache NiFi to convert from HL7 to AVRO. Above in Hortonworks Schema Registry we can see our schemas beautifully displayed with version information. To setup a Kafka source, you just follow the dropdowns and name it. No manual effort. This is the other feed. As you can see it grabs the schema associated with the Kafka topic and displays all the fields and types. Here is the join, we pick the field to join, what type of join LEFT, a type of Windowing and pick the fields we want. Again no manual effort or typing. Druid is as easy to pick up, just put in the name for your new data source. SAM will create it. Here is how we do an aggregate on one of the numeric values. We will send the SSN Number and Observation Value Max to the existing HBase patient observations table with the column family obs. HBase DDL: create 'patient_observations', 'obs' The results of this flow are we have data added to HBase and to HDFS. We also have 2 Druid data sources populated for Superset analysis. hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.2.2.6.2.0-205, r5210d2ed88d7e241646beab51e9ac147a973bdcc, Sat Aug 26 09:33:50 UTC 2017
hbase(main):001:0> list
TABLE
ATLAS_ENTITY_AUDIT_EVENTS
atlas_titan
patient_observations
3 row(s) in 0.3220 seconds
=> ["ATLAS_ENTITY_AUDIT_EVENTS", "atlas_titan", "patient_observations"]
hbase(main):002:0> scan 'patient_observations'
ROW COLUMN+CELL
2ba61288-3c45-4eb4-a0fa-e92e44ef210f column=obs:OBX_1_ObservationValue_MAX, timestamp=1512503796419, value=mg/dL
2ba61288-3c45-4eb4-a0fa-e92e44ef210f column=obs:PID_SSNNumberPatient, timestamp=1512503796419, value=123456789
1 row(s) in 0.2110 seconds
hbase(main):003:0>
hdfs dfs -cat /hl7sam/7-HDFS-8-999-1512490296249.txt
XXXXXX,000000000000000000,999999999999,ORU,M,99,LAST,JOHN,65-99,R,000000001,NM,65,|,1,341856649,1,John,20150101000100,mg/dL,F,20150101000100,20150101000100,2.3,^~\&,M,NPI,R01,HNAM_ORDERID,FIRST,1620,M,SMITH,159,1234567890,19700101,P,HealthOrg01,Basic Metabolic Panel,Glucose Lvl,GLU,H,123456789,Q1111111111111111111,20150101000100,648088,Johnson
See: https://community.hortonworks.com/articles/149910/handling-hl7-records-part-1-hl7-ingest.html SAM Flow: sam-hl7json.txt References: http://www.openhealthtools.org/ https://community.hortonworks.com/articles/122077/ingesting-csv-data-and-pushing-it-as-avro-to-kafka.html
... View more
Labels:
12-02-2017
08:07 PM
2 Kudos
Step 1: Collect HL7 Health Records
Python to Send JSON Data to MQTT (Data Generated by https://www.mockaroo.com/)
import paho.mqtt.client as mqtt
import json
# MQTT
client = mqtt.Client()
client.connect("localhost", 14162, 60)
row = [{"PID_SSNNumberPatient":823456789,"email":"ahospital0@census.gov","gender":"Male","ip_address":"138.135.180.206","drug_provider":"OrchidPharma Inc","icd9":"94140","icd9_description":"Deep necrosis of underlying tissues [deep third degree] without mention of loss of a body part, face and head, unspecified site","icd9P_proc":"7942","icd9_proc_description":"Closed reduction of separated epiphysis, radius and ulna","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_8) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.801.0 Safari/535.1","drug_used":"Naratriptan"}]
json_string = json.dumps(row)
client.publish("patientdata",payload=json_string,qos=1,retain=False)
client.disconnect()
Step 2: Get to Apache NiFi via FTP, sFTP, File, Apache Kafka, MQTT, REST API, TCP/IP or hundreds more options.
Schemas
{
"type": "record",
"name": "hl7oru",
"fields": [
{
"name": "OBX_1_UserDefinedAccessChecks",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_OrderingProvider_FamilyName",
"type": "string",
"doc": "Type inferred from '\"Johnson\"'"
},
{
"name": "MSH_MessageControlID",
"type": "string",
"doc": "Type inferred from '\"Q1111111111111111111\"'"
},
{
"name": "OBX_1_ObservationIdentifier_Text",
"type": "string",
"doc": "Type inferred from '\"Glucose Lvl\"'"
},
{
"name": "MSH_SendingApplication_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"XXXXXX\"'"
},
{
"name": "OBR_1_UniversalServiceIdentifier_Text",
"type": "string",
"doc": "Type inferred from '\"Basic Metabolic Panel\"'"
},
{
"name": "MSH_ReceivingApplication_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"HealthOrg01\"'"
},
{
"name": "MSH_ProcessingID_ProcessingID",
"type": "string",
"doc": "Type inferred from '\"P\"'"
},
{
"name": "PID_SSNNumberPatient",
"type": "string",
"doc": "Type inferred from '\"123456789\"'"
},
{
"name": "OBR_1_FillerOrderNumber_EntityIdentifier",
"type": "string",
"doc": "Type inferred from '\"000000000000000000\"'"
},
{
"name": "PID_PatientAccountNumber_ID",
"type": "string",
"doc": "Type inferred from '\"999999999999\"'"
},
{
"name": "PID_DateOfBirth",
"type": "string",
"doc": "Type inferred from '\"19700101\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
"type": "string",
"doc": "Type inferred from '\"1234567890\"'"
},
{
"name": "PID_Sex",
"type": "string",
"doc": "Type inferred from '\"M\"'"
},
{
"name": "MSH_MessageType_MessageType",
"type": "string",
"doc": "Type inferred from '\"ORU\"'"
},
{
"name": "OBX_1_ReferencesRange",
"type": "string",
"doc": "Type inferred from '\"H\"'"
},
{
"name": "OBR_1_OrderingProvider_IDNumber",
"type": "string",
"doc": "Type inferred from '\"1620\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
"type": "string",
"doc": "Type inferred from '\"LAST\"'"
},
{
"name": "OBX_1_Units_NameOfCodingSystem",
"type": "string",
"doc": "Type inferred from '\"99\"'"
},
{
"name": "OBX_1_Units_Identifier",
"type": "string",
"doc": "Type inferred from '\"65-99\"'"
},
{
"name": "PID_PatientName_GivenName",
"type": "string",
"doc": "Type inferred from '\"JOHN\"'"
},
{
"name": "OBX_1_ObservationSubID",
"type": "string",
"doc": "Type inferred from '\"159\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
"type": "string",
"doc": "Type inferred from '\"FIRST\"'"
},
{
"name": "OBR_1_PlacerOrderNumber_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"HNAM_ORDERID\"'"
},
{
"name": "MSH_MessageType_TriggerEvent",
"type": "string",
"doc": "Type inferred from '\"R01\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
"type": "string",
"doc": "Type inferred from '\"NPI\"'"
},
{
"name": "OBR_1_ResultStatus",
"type": "string",
"doc": "Type inferred from '\"M\"'"
},
{
"name": "PID_PatientName_FamilyName",
"type": "string",
"doc": "Type inferred from '\"SMITH\"'"
},
{
"name": "MSH_EncodingCharacters",
"type": "string",
"doc": "Type inferred from '\"^~\\&\"'"
},
{
"name": "MSH_VersionID",
"type": "string",
"doc": "Type inferred from '\"2.3\"'"
},
{
"name": "OBR_1_UniversalServiceIdentifier_Identifier",
"type": "string",
"doc": "Type inferred from '\"648088\"'"
},
{
"name": "OBR_1_ObservationDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_ScheduledDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBX_1_ObservationIdentifier_Identifier",
"type": "string",
"doc": "Type inferred from '\"GLU\"'"
},
{
"name": "OBR_1_OrderingProvider_GivenName",
"type": "string",
"doc": "Type inferred from '\"John\"'"
},
{
"name": "OBR_1_SetIDObservationRequest",
"type": "string",
"doc": "Type inferred from '\"1\"'"
},
{
"name": "OBR_1_ResultsRptStatusChngDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_PlacerOrderNumber_EntityIdentifier",
"type": "string",
"doc": "Type inferred from '\"341856649\"'"
},
{
"name": "OBX_1_NatureOfAbnormalTest",
"type": "string",
"doc": "Type inferred from '\"F\"'"
},
{
"name": "OBX_1_SetIDOBX",
"type": "string",
"doc": "Type inferred from '\"1\"'"
},
{
"name": "MSH_FieldSeparator",
"type": "string",
"doc": "Type inferred from '\"|\"'"
},
{
"name": "PD1",
"type": {
"type": "record",
"name": "PD1",
"fields": [
{
"name": "PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
"type": "string",
"doc": "Type inferred from '\"M\"'"
}
]
},
"doc": "Type inferred from '{\"PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName\":\"M\"}'"
},
{
"name": "OBX_1_Units_Text",
"type": "string",
"doc": "Type inferred from '\"65\"'"
},
{
"name": "OBX_1_ValueType",
"type": "string",
"doc": "Type inferred from '\"NM\"'"
},
{
"name": "PID_PatientIDInternalID_ID",
"type": "string",
"doc": "Type inferred from '\"000000001\"'"
},
{
"name": "OBX_1_ObservationValue",
"type": "string",
"doc": "Type inferred from '\"mg/dL\"'"
},
{
"name": "OBR_1_OrderingProvider_MiddleInitialOrName",
"type": "string",
"doc": "Type inferred from '\"R\"'"
}
]
}
patientbothjson.txt
patientdatajs.txt
simple.txt
Jolt Scripts
{
"OBX_1.UserDefinedAccessChecks": "OBX_1_UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1_OrderingProvider_FamilyName",
"MSH.MessageControlID": "MSH_MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1_ObservationIdentifier_Text",
"MSH.SendingApplication.NamespaceID": "MSH_SendingApplication_NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1_UniversalServiceIdentifier_Text",
"MSH.ReceivingApplication.NamespaceID": "MSH_ReceivingApplication_NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH_ProcessingID_ProcessingID",
"PID.SSNNumberPatient": "PID_SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1_FillerOrderNumber_EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID_PatientAccountNumber_ID",
"PID.DateOfBirth": "PID_DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
"PID.Sex": "PID_Sex",
"MSH.MessageType.MessageType": "MSH_MessageType_MessageType",
"OBX_1.ReferencesRange": "OBX_1_ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1_OrderingProvider_IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1_Units_NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1_Units_Identifier",
"PID.PatientName.GivenName": "PID_PatientName_GivenName",
"OBX_1.ObservationSubID": "OBX_1_ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1_PlacerOrderNumber_NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH_MessageType_TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1_ResultStatus",
"PID.PatientName.FamilyName": "PID_PatientName_FamilyName",
"MSH.EncodingCharacters": "MSH_EncodingCharacters",
"MSH.VersionID": "MSH_VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1_UniversalServiceIdentifier_Identifier",
"OBR_1.ObservationDateTime": "OBR_1_ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1_ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1_ObservationIdentifier_Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1_OrderingProvider_GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1_SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1_ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1_PlacerOrderNumber_EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1_NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1_SetIDOBX",
"MSH.FieldSeparator": "MSH_FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1_Units_Text",
"OBX_1.ValueType": "OBX_1_ValueType",
"PID.PatientIDInternalID.ID": "PID_PatientIDInternalID_ID",
"OBX_1.ObservationValue": "OBX_1_ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1_OrderingProvider_MiddleInitialOrName"
}
{
"OBX_1.UserDefinedAccessChecks": "OBX_1.UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1.OrderingProvider.FamilyName",
"MSH.MessageControlID": "MSH.MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1.ObservationIdentifier.Text",
"MSH.SendingApplication.NamespaceID": "MSH.SendingApplication.NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1.UniversalServiceIdentifier.Text",
"MSH.ReceivingApplication.NamespaceID": "MSH.ReceivingApplication.NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH.ProcessingID.ProcessingID",
"PID.SSNNumberPatient": "PID.SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1.FillerOrderNumber.EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID.PatientAccountNumber.ID",
"PID.DateOfBirth": "PID.DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber",
"PID.Sex": "PID.Sex",
"MSH.MessageType.MessageType": "MSH.MessageType.MessageType",
"OBX_1.ReferencesRange": "OBX_1.ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1.OrderingProvider.IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1.Units.NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1.Units.Identifier",
"PID.PatientName.GivenName": "PID.PatientName.GivenName",
"OBX_1.ObservationSubID": "OBX_1.ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1.PatientPrimaryCareProviderNameIDNo.GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1.PlacerOrderNumber.NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH.MessageType.TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1.ResultStatus",
"PID.PatientName.FamilyName": "PID.PatientName.FamilyName",
"MSH.EncodingCharacters": "MSH.EncodingCharacters",
"MSH.VersionID": "MSH.VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1.UniversalServiceIdentifier.Identifier",
"OBR_1.ObservationDateTime": "OBR_1.ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1.ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1.ObservationIdentifier.Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1.OrderingProvider.GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1.SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1.ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1.PlacerOrderNumber.EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1.NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1.SetIDOBX",
"MSH.FieldSeparator": "MSH.FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1.Units.Text",
"OBX_1.ValueType": "OBX_1.ValueType",
"PID.PatientIDInternalID.ID": "PID.PatientIDInternalID.ID",
"OBX_1.ObservationValue": "OBX_1.ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1.OrderingProvider.MiddleInitialOrName"
}
Step 3: Profit
Build a Big Data Environment For Our Data
(Build HDFS Directories)
su hdfs
hdfs dfs -mkdir -p /hl7/hl7-mdm
hdfs dfs -mkdir -p /hl7/hl7-adt
hdfs dfs -mkdir -p /hl7/hl7-orm
hdfs dfs -mkdir -p /hl7/hl7-oru
hdfs dfs -mkdir -p /hl7/json/hl7-mdm
hdfs dfs -mkdir -p /hl7/json/hl7-adt
hdfs dfs -mkdir -p /hl7/json/hl7-orm
hdfs dfs -mkdir -p /hl7/json/hl7-oru
hdfs dfs -mkdir -p /hl7/flat/oru
hdfs dfs -mkdir -p /patientdata
hdfs dfs -chmod -R 777 /hl7
hdfs dfs -chmod -R 777 /patientdata
hdfs dfs -ls -R /hl7
hdfs dfs -ls -R /patientdata
(Build Hive DDL)
CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING,
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING)
STORED AS ORC LOCATION '/patientdata'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7oru
(OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING,
Identifier:STRING>,
ReferencesRange:STRING,
Units:STRUCT<NameOfCodingSystem:STRING,
Identifier:STRING,
Text:STRING>,
ObservationSubID:STRING,
NatureOfAbnormalTest:STRING,
SetIDOBX:STRING,
ValueType:STRING,
ObservationValue:STRING>,
OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING,
IDNumber:STRING,
GivenName:STRING,
MiddleInitialOrName:STRING>,
UniversalServiceIdentifier:STRUCT<Text:STRING,
Identifier:STRING>,
FillerOrderNumber:STRUCT<EntityIdentifier:STRING>,
PlacerOrderNumber:STRUCT<NamespaceID:STRING,
EntityIdentifier:STRING>,
ResultStatus:STRING,
ObservationDateTime:STRING,
ScheduledDateTime:STRING,
SetIDObservationRequest:STRING,
ResultsRptStatusChngDateTime:STRING>,
MSH STRUCT<MessageControlID:STRING,
SendingApplication:STRUCT<NamespaceID:STRING>,
ReceivingApplication:STRUCT<NamespaceID:STRING>,
ProcessingID:STRUCT<ProcessingID:STRING>,
MessageType:STRUCT<MessageType:STRING,
TriggerEvent:STRING>,
EncodingCharacters:STRING,
VersionID:STRING,
FieldSeparator:STRING>,
PID STRUCT<SSNNumberPatient:STRING,
PatientAccountNumber:STRUCT<ID:STRING>,
DateOfBirth:STRING,
Sex:STRING,
PatientName:STRUCT<GivenName:STRING,
FamilyName:STRING>,
PatientIDInternalID:STRUCT<ID:STRING>>,
PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING,
FamilyName:STRING,
GivenName:STRING,
AssigningAuthority:STRING,
MiddleInitialOrName:STRING>>)
STORED AS ORC
LOCATION '/hl7/hl7-oru'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7_oru_flat
(OBX_1_UserDefinedAccessChecks STRING,
OBR_1_OrderingProvider_FamilyName STRING,
MSH_MessageControlID STRING,
OBX_1_ObservationIdentifier_Text STRING,
MSH_SendingApplication_NamespaceID STRING,
OBR_1_UniversalServiceIdentifier_Text STRING,
MSH_ReceivingApplication_NamespaceID STRING,
MSH_ProcessingID_ProcessingID STRING,
PID_SSNNumberPatient STRING,
OBR_1_FillerOrderNumber_EntityIdentifier STRING,
PID_PatientAccountNumber_ID STRING,
PID_DateOfBirth STRING,
PD1_PatientPrimaryCareProviderNameIDNo_IDNumber STRING,
PID_Sex STRING,
MSH_MessageType_MessageType STRING,
OBX_1_ReferencesRange STRING,
OBR_1_OrderingProvider_IDNumber STRING,
PD1_PatientPrimaryCareProviderNameIDNo_FamilyName STRING,
OBX_1_Units_NameOfCodingSystem STRING,
OBX_1_Units_Identifier STRING,
PID_PatientName_GivenName STRING,
OBX_1_ObservationSubID STRING,
PD1_PatientPrimaryCareProviderNameIDNo_GivenName STRING,
OBR_1_PlacerOrderNumber_NamespaceID STRING,
MSH_MessageType_TriggerEvent STRING,
PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority STRING,
OBR_1_ResultStatus STRING,
PID_PatientName_FamilyName STRING,
MSH_EncodingCharacters STRING,
MSH_VersionID STRING,
OBR_1_UniversalServiceIdentifier_Identifier STRING,
OBR_1_ObservationDateTime STRING,
OBR_1_ScheduledDateTime STRING,
OBX_1_ObservationIdentifier_Identifier STRING,
OBR_1_OrderingProvider_GivenName STRING,
OBR_1_SetIDObservationRequest STRING,
OBR_1_ResultsRptStatusChngDateTime STRING,
OBR_1_PlacerOrderNumber_EntityIdentifier STRING,
OBX_1_NatureOfAbnormalTest STRING,
OBX_1_SetIDOBX STRING,
MSH_FieldSeparator STRING,
PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName STRING,
OBX_1_Units_Text STRING,
OBX_1_ValueType STRING,
PID_PatientIDInternalID_ID STRING,
OBX_1_ObservationValue STRING,
OBR_1_OrderingProvider_MiddleInitialOrName STRING)
STORED AS ORC
LOCATION '/hl7/flat/oru'
CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING,
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING)
STORED AS ORC LOCATION '/patientdata'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7-oru (OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING, Identifier:STRING>,
ReferencesRange:STRING, Units:STRUCT<NameOfCodingSystem:STRING, Identifier:STRING, Text:STRING>,
ObservationSubID:STRING, NatureOfAbnormalTest:STRING, SetIDOBX:STRING, ValueType:STRING, ObservationValue:STRING>,
OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING, IDNumber:STRING, GivenName:STRING, MiddleInitialOrName:STRING>,
UniversalServiceIdentifier:STRUCT<Text:STRING, Identifier:STRING>, FillerOrderNumber:STRUCT<EntityIdentifier:STRING>,
PlacerOrderNumber:STRUCT<NamespaceID:STRING, EntityIdentifier:STRING>, ResultStatus:STRING, ObservationDateTime:STRING,
ScheduledDateTime:STRING, SetIDObservationRequest:STRING, ResultsRptStatusChngDateTime:STRING>, MSH STRUCT<MessageControlID:STRING,
SendingApplication:STRUCT<NamespaceID:STRING>, ReceivingApplication:STRUCT<NamespaceID:STRING>, ProcessingID:STRUCT<ProcessingID:STRING>,
MessageType:STRUCT<MessageType:STRING, TriggerEvent:STRING>, EncodingCharacters:STRING, VersionID:STRING, FieldSeparator:STRING>,
PID STRUCT<SSNNumberPatient:STRING, PatientAccountNumber:STRUCT<ID:STRING>, DateOfBirth:STRING, Sex:STRING, PatientName:STRUCT<GivenName:STRING,
FamilyName:STRING>, PatientIDInternalID:STRUCT<ID:STRING>>, PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING, FamilyName:STRING,
GivenName:STRING, AssigningAuthority:STRING, MiddleInitialOrName:STRING>>) STORED AS ORC
LOCATION '/hl7/hl7oru'
(Build Kafka Topics)
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientboth
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic simple
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientdata
Script to send a File to Kafka
sendmessage.sh /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic hl7-oru < hl7sampledata.txt
HBase DDL
hbase shell
create 'patient_observations', 'obs'
list
Running a Mosquitto MQTT Broker (OSX)
/usr/local/Cellar/mosquitto/1.4.14_2/sbin/mosquitto --daemon --verbose --port 14162
Removing Unneeded HDFS Files
hdfs dfs -rm -r -f -skipTrash $1
Example Data from Internet
MSH|^~\&|XXXXXX||HealthOrg01||||ORU^R01|Q1111111111111111111|P|2.3|<cr>PID|||000000001||SMITH^JOHN||19700101|M||||||||||999999999999|123456789|<cr>PD1||||1234567890^LAST^FIRST^M^^^^^NPI|<cr>OBR|1|341856649^HNAM_ORDERID|000000000000000000|648088^Basic Metabolic Panel|||20150101000100|||||||||1620^Johnson^John^R||||||20150101000100|||M|||||||||||20150101000100|<cr>OBX|1|NM|GLU^Glucose Lvl|159|mg/dL|65-99^65^99|H|||F|||20150101000100|
See: Handling HL7 Records and Storing in Apache Hive for SQL Queries Part 2: - C-CDA Data and Custom Avro Processor
Code Section
Map<String, String> attributes = flowFile.getAttributes();
Map<String, String> attributesClean = new HashMap<>();
String tempKey = "";
for (Map.Entry<String, String> entry : attributes.entrySet()){
tempKey = entry.getKey().replaceFirst("[^A-Za-z]", "");<br> tempKey = tempKey.replaceAll("[^A-Za-z0-9_]", "");<br> attributesClean.put(tempKey, entry.getValue());<br> session.removeAttribute(flowFile, entry.getKey());<br>}<br>session.putAllAttributes(flowFile, attributesClean);
Apache NiFi FlowFile
hl7new.xml
Next Sections: https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo.html https://community.hortonworks.com/content/kbentry/150026/hl7-processing-part-3-apache-zeppelin-sql-bi-and-a.html https://community.hortonworks.com/articles/149982/hl7-ingest-part-4-streaming-analytics-manager-and.html Important Starter Articles https://community.hortonworks.com/articles/138249/nifi-in-healthcare-ingesting-hl7-data-in-nifi.html https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi.html https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo.html
References:
https://hortonworks.com/tutorial/real-time-event-processing-in-nifi-sam-schema-registry-and-superset/ https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-hl7-nar/1.4.0/org.apache.nifi.processors.hl7.RouteHL7/index.html https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi.html https://github.com/cqframework/clinical_quality_language https://github.com/apache/nifi/blob/master/nifi-commons/nifi-hl7-query-language/src/test/java/org/apache/nifi/hl7/query/TestHL7Query.java https://stackoverflow.com/questions/43251660/apache-nifi-routehl7-issue/43263583 https://gist.github.com/alopresto/9b46011185efd0380e6c5da0a3412e8f
... View more
Labels:
12-01-2017
09:33 PM
2 Kudos
We need to ingest C-CDA files, which often have weird names in them. To Compound the naming issues, the Extract C-CDA processor flattens XML to have periods which are not allowed in Apache Avro names. Apache NiFi DataFlow
Ingest C-CDA XML Files Extract C-CDA Attributes Route on Attribute AttributeCleaner - my new custom one AttributeToJSON SetSchema QueryRecord ConvertAvroToORC PutHDFS AttributeCleanerProcessor is my new processor to rename all the attributes. This is a very simple version. These are the converted file names. Above is the JSON fields with values. Route when code field is not null. If code is active then convert to Apache ORC for Hive. SQL Table CREATE EXTERNAL TABLE IF NOT EXISTS ccda (problemSectionact_02observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_05idsroot STRING, problemSectionact_02observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_01texttext_01value STRING, vitalSignsSectionorganizerobservations_04texttext_01value STRING, vitalSignsSectionorganizercodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesvalue STRING, vitalSignsSectionorganizerobservations_04valuesvalue STRING, problemSectionact_03observationidroot STRING, problemSectionact_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05effectiveTimevalue STRING, RouteOnAttributeRoute STRING, vitalSignsSectionorganizerobservations_03codecode STRING, vitalSignsSectionorganizerobservations_04statusCodecode STRING, problemSectionidroot STRING, codecode STRING, problemSectionact_01effectiveTimelow STRING, problemSectioncodecodeSystemName STRING, codedisplayName STRING, problemSectionact_01observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_01idsroot STRING, vitalSignsSectionorganizerobservations_02idsroot STRING, vitalSignsSectionorganizerobservations_04idsroot STRING, vitalSignsSectionorganizerobservations_03idsroot STRING, problemSectionact_01observationeffectiveTimelow STRING, filecreationTime STRING, problemSectionact_01observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationvaluestranslationscode STRING, problemSectionact_03statusCodecode STRING, problemSectionact_03observationvaluestranslationscodeSystem STRING, problemSectionact_02idroot STRING, problemSectionact_03codecode STRING, problemSectionact_01observationidroot STRING, problemSectionact_02observationvaluestranslationscodeSystem STRING, problemSectionact_03observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectionorganizercodecode STRING, vitalSignsSectionorganizerobservations_02statusCodecode STRING, problemSectionact_03observationvaluestranslationsdisplayName STRING, vitalSignsSectionorganizerobservations_03codecodeSystem STRING, problemSectionact_03observationproblemStatuscodecode STRING, problemSectionact_01observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_04textreferencevalue STRING, filelastAccessTime STRING, vitalSignsSectionorganizerobservations_01codedisplayName STRING, filesize STRING, problemSectioncodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesunit STRING, vitalSignsSectionorganizerobservations_02effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_05idsextension STRING, vitalSignsSectionorganizerobservations_04codecode STRING, vitalSignsSectionorganizerobservations_05valuesvalue STRING, vitalSignsSectionorganizerobservations_04idsextension STRING, vitalSignsSectionorganizerobservations_02valuesvalue STRING, problemSectionact_02observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_02idsextension STRING, vitalSignsSectionorganizerobservations_03idsextension STRING, vitalSignsSectionorganizerobservations_01idsextension STRING, problemSectiontitle STRING, vitalSignsSectionorganizerobservations_01codecodeSystemName STRING, problemSectionact_03observationvaluestranslationsoriginalTextreferencevalue STRING, vitalSignsSectionorganizerobservations_04valuesunit STRING, problemSectionact_02idextension STRING, vitalSignsSectionorganizerobservations_05statusCodecode STRING, vitalSignsSectionorganizerobservations_04effectiveTimevalue STRING, problemSectionact_02observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesvalue STRING, vitalSignsSectionorganizereffectiveTimevalue STRING, problemSectionact_03observationvaluestranslationscode STRING, vitalSignsSectionorganizerobservations_03codedisplayName STRING, vitalSignsSectionorganizerobservations_02texttext_01value STRING, vitalSignsSectionorganizerobservations_05texttext_01value STRING, absolutepath STRING, vitalSignsSectioncodedisplayName STRING, problemSectionact_03idextension STRING, problemSectionact_01observationvaluestranslationsoriginalTextreferencevalue STRING, problemSectionact_02observationvaluestranslationsoriginalTextreferencevalue STRING, filelastModifiedTime STRING, problemSectioncodecode STRING, vitalSignsSectionorganizeridsroot STRING, problemSectionact_02observationproblemStatuscodecodeSystem STRING, vitalSignsSectionorganizerobservations_05codecodeSystem STRING, filegroup STRING, problemSectionact_01observationproblemStatusvaluescode STRING, problemSectionact_01observationvaluestranslationsdisplayName STRING, problemSectionact_02codecode STRING, idextension STRING, vitalSignsSectioncodecode STRING, problemSectionact_03observationproblemStatuscodecodeSystemName STRING, problemSectionact_01idroot STRING, vitalSignsSectiontitle STRING, problemSectionact_01observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesunit STRING, vitalSignsSectionorganizerobservations_01textreferencevalue STRING, effectiveTime STRING, vitalSignsSectionorganizerobservations_03codecodeSystemName STRING, problemSectionact_03observationstatusCodecode STRING, problemSectionact_02statusCodecode STRING, problemSectionact_02observationidextension STRING, problemSectionact_01idextension STRING, vitalSignsSectionorganizerstatusCodecode STRING, vitalSignsSectionorganizerobservations_05codedisplayName STRING, vitalSignsSectionorganizerobservations_04codecodeSystem STRING, vitalSignsSectionorganizerobservations_02codedisplayName STRING, problemSectionact_01observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_05codecode STRING, vitalSignsSectionorganizerobservations_04codecodeSystemName STRING, problemSectionact_02observationvaluestranslationsdisplayName STRING, idroot STRING, vitalSignsSectionorganizerobservations_02textreferencevalue STRING, problemSectionact_01observationidextension STRING, problemSectionact_01observationvaluestranslationscodeSystem STRING, problemSectionact_01codecode STRING, problemSectionact_02observationproblemStatusvaluesdisplayName STRING, problemSectionact_01codecodeSystem STRING, codecodeSystemName STRING, vitalSignsSectionorganizerobservations_01effectiveTimevalue STRING, vitalSignsSectionorganizercodedisplayName STRING, vitalSignsSectionorganizerobservations_02codecodeSystemName STRING, vitalSignsSectionorganizerobservations_03textreferencevalue STRING, vitalSignsSectionorganizerobservations_02valuesunit STRING, problemSectionact_03observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationproblemStatuscodecode STRING, vitalSignsSectionorganizerobservations_03statusCodecode STRING, problemSectionact_03observationproblemStatusvaluescode STRING, problemSectionact_02observationeffectiveTimelow STRING, problemSectionact_03observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03texttext_01value STRING, problemSectionact_01observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizercodecodeSystemName STRING, problemSectionact_03observationidextension STRING, vitalSignsSectionorganizerobservations_01codecode STRING, codecodeSystem STRING, problemSectionact_02effectiveTimelow STRING, problemSectioncodedisplayName STRING, problemSectionact_02observationproblemStatusvaluescode STRING, vitalSignsSectionorganizeridsextension STRING, problemSectionact_02observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_02codecode STRING, title STRING, problemSectionact_03idroot STRING, problemSectionidextension STRING, problemSectionact_03observationproblemStatusstatusCodecode STRING, problemSectionact_03effectiveTimelow STRING, problemSectionact_02observationproblemStatusvaluescodeSystemName STRING, fileowner STRING, vitalSignsSectionorganizerobservations_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05textreferencevalue STRING, filepermissions STRING, vitalSignsSectionorganizerobservations_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05valuesunit STRING, problemSectionact_01observationvaluestranslationscode STRING, problemSectionact_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05codecodeSystemName STRING, problemSectionact_03codecodeSystem STRING, vitalSignsSectioncodecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectioncodecodeSystemName STRING, problemSectionact_01observationproblemStatuscodecode STRING, problemSectionact_02observationidroot STRING, vitalSignsSectionorganizerobservations_01codecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_03effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_04codedisplayName STRING, problemSectionact_03observationeffectiveTimelow STRING) STORED AS ORC LOCATION '/ccda' AttributeCleaner Apache Avro names can't have spaces, dots, dashes or weird symbols. So we remove them. Dirty Name problem.section.act_01... Clean Safe Name problemSectionact_01observationvaluestranslationsoriginalTextreferencevalue As we need to remove those periods created when the processor flattens out the XML. Source Code for Custom Processor https://github.com/tspannhw/nifi-attributecleaner-processor References http://calcite.apache.org/docs/reference.html Apache NiFi Flow c-cda-ingest-with-custom-processor.xml
... View more
Labels: