1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2486 | 04-03-2024 06:39 AM | |
| 3840 | 01-12-2024 08:19 AM | |
| 2079 | 12-07-2023 01:49 PM | |
| 3064 | 08-02-2023 07:30 AM | |
| 4195 | 03-29-2023 01:22 PM |
12-22-2017
08:49 PM
5 Kudos
In the Holidays, it's nice to know how much energy you are using. So one small step is I bought a low-end inexpensive TPLink Energy Monitoring plug for one device. I have been monitoring phone charging and my Apple monitor. Let's read the data and do some queries in Apache Hive and Apache Spark 2 SQL. Processing Live Energy Feeds in The Cloud Monitor Energy From a Local OSX If your local instance does not have access to Apache Hive, you will need to send the data via Site-to-Site to a Remote Apache NiFi / HDF server/cluster that can. For Apache Hive Usage, Please Convert to Apache ORC Files To Create Your New Table, Grab the hive.ddl Inside of Apache Zeppelin, we can create our table based on the above DDL. We could have also let Apache NiFi create the table for us. I like to keep my DDL with my notebook. Just a personal choice. We can then query our table in Apache Zeppelin utilizing Apache Spark 2 SQL and Apache Hive QL. Overview Step 1: Purchase an inexpensive energy monitoring plug Step 2: Connect it to a Phone App via WIFI Step 3: Once Configured, you can now access via Python Step 4: Install the HS100 Python Library in Python 3.x Step 5: Fork My Github and Use My Shell Script and Python Script Step 6: Add the Local Apache NiFi Flow which will call that Script Step 7: Add a Remote Apache NiFi Flow for Processing into Apache Hadoop Step 8: Create Your Table Step 9: Query with Apache Hive and Apache Spark SQL via Apache Zeppelin or Other UI Step 10: Turn that extra stuff off and save money! The Open Source Code and Artefacts Shell Script (smartreader.sh) python3 meterreader.py Python Code (meterreader.py) from pyHS100 import SmartPlug, SmartBulb
#from pprint import pformat as pf
import json
import datetime
plug = SmartPlug("192.168.1.200")
row = { }
emeterdaily = plug.get_emeter_daily(year=2017, month=12)
for k, v in emeterdaily.items():
row["hour%s" % k] = v
hwinfo = plug.hw_info
for k, v in hwinfo.items():
row["%s" % k] = v
sysinfo = plug.get_sysinfo()
for k, v in sysinfo.items():
row["%s" % k] = v
timezone = plug.timezone
for k, v in timezone.items():
row["%s" % k] = v
emetermonthly = plug.get_emeter_monthly(year=2017)
for k, v in emetermonthly.items():
row["day%s" % k] = v
realtime = plug.get_emeter_realtime()
for k, v in realtime.items():
row["%s" % k] = v
row['alias'] = plug.alias
row['time'] = plug.time.strftime('%m/%d/%Y %H:%M:%S')
row['ledon'] = plug.led
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
json_string = json.dumps(row)
print(json_string) The code is basically a small tweak on the example code provided with the pyHS100 code. This code allows you to access the HS110 that I have. My PC and my smart meter are on the same WiFi which can't be 5G. Example Data {"hour19": 0.036, "hour20": 0.021, "hour21": 0.017, "sw_ver": "1.1.1 Build 160725 Rel.164033", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "060BFEA28A8CD1E67146EB5B2B599CC8", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim Spann's MiniFi Controller SmartPlug - Desk1", "icon_hash": "", "relay_state": 1, "on_time": 161599, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -32, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "day12": 0.074, "current": 0.04011, "voltage": 122.460974, "power": 1.8772, "total": 0.074, "time": "12/21/2017 13:21:52", "ledon": true, "systemtime": "12/21/2017 13:21:53"} As you can see we only get the hours and days where we had usage. Since this is new, I don't have them all. I created my schema to handle all the days of a month and all the hours of a day. We are going to have a sparse table. If I was monitoring millions of devices, I would put this in Apache HBase. I may do that later. Let's create an HDFS directory for Loading Apache ORC Files hdfs dfs -mkdir -p /smartPlug
hdfs dfs -chmod -R 777 /smartPlug Table DDL CREATE EXTERNAL TABLE IF NOT EXISTS smartPlug (hour19 DOUBLE, hour20 DOUBLE, hour21 DOUBLE, hour22 DOUBLE, hour23 DOUBLE, hour18 DOUBLE, hour17 DOUBLE, hour16 DOUBLE, hour15 DOUBLE, hour14 DOUBLE, hour13 DOUBLE, hour12 DOUBLE, hour11 DOUBLE, hour10 DOUBLE, hour9 DOUBLE, hour8 DOUBLE, hour7 DOUBLE, hour6 DOUBLE, hour5 DOUBLE, hour4 DOUBLE, hour3 DOUBLE, hour2 DOUBLE, hour1 DOUBLE, hour0 DOUBLE, sw_ver STRING, hw_ver STRING, mac STRING, type STRING, hwId STRING, fwId STRING, oemId STRING, dev_name STRING, model STRING, deviceId STRING, alias STRING, icon_hash STRING, relay_state INT, on_time INT, feature STRING, updating INT, rssi INT, led_off INT, latitude DOUBLE, longitude DOUBLE, index INT, zone_str STRING, tz_str STRING, dst_offset INT, day31 DOUBLE, day30 DOUBLE, day29 DOUBLE, day28 DOUBLE, day27 DOUBLE, day26 DOUBLE, day25 DOUBLE, day24 DOUBLE, day23 DOUBLE, day22 DOUBLE, day21 DOUBLE, day20 DOUBLE, day19 DOUBLE, day18 DOUBLE, day17 DOUBLE, day16 DOUBLE, day15 DOUBLE, day14 DOUBLE, day13 DOUBLE, day12 DOUBLE, day11 DOUBLE, day10 DOUBLE, day9 DOUBLE, day8 DOUBLE, day7 DOUBLE, day6 DOUBLE, day5 DOUBLE, day4 DOUBLE, day3 DOUBLE, day2 DOUBLE, day1 DOUBLE, current DOUBLE, voltage DOUBLE, power DOUBLE, total DOUBLE, time STRING, ledon BOOLEAN, systemtime STRING) STORED AS ORC
LOCATION '/smartPlug' A Simple Query on Some of the Variables select `current`,voltage, power,total,time,systemtime, on_time, rssi, latitude, longitude
from smartPlug Note that current is a special word in SQL so we tick it. An Apache Calcite Query Inside Apache NiFi SELECT * FROM FLOWFILE WHERE "current" > 0 With the Python API I can turn it off, so don't monitor then. In an updated article I will add a few smart plugs and turn them on and off based on things occurring. Perhaps turn off a light when no motion detected. We can do anything with Apache NiFi, Apache MiniFi and Python. The API also allows for turning the green LED light on the plug on and off. The Screen Prints above are from the IoS version of the TPLink KASA app, which let's you configure and monitor your plug. For many people that's good enough, but not for me. Resources smartplugprocessing.xml monitorpowerlocal.xml https://github.com/GadgetReactor/pyHS100 https://pypi.python.org/pypi/pyHS100 pip3 install pyhs100 https://github.com/tspannhw/nifi-smartplug/tree/master
... View more
Labels:
12-11-2017
03:02 PM
2 Kudos
Building schemas is tedious work and fraught with errors. The InferAvroSchema processor can get you started. It generates a compliant schema for use. There is one caveat, you have to make sure you are using Apache Avro safe field names. I have a custom processor that will clean your attributes if you need them Avro-safe. See processor listed below. Example Flow Utilizing InferAvroSchema InferAvroSchema Details Step 0: Use Apache NiFi to Convert Data to JSON or CSV Step 1: Send JSON or CSV Data to InferAvroSchema I recommend setting output destination to flowfile-attribute, input content type to json, pretty avro output to true. Step 2: The New schema is now in attribute: inferred.avro.schema. inferred.avro.schema
{ "type" : "record", "name" : "schema1", "fields" : [ { "name" : "table", "type" : "string", "doc" : "Type inferred from '\"schema1.tableName\"'" } ] } This schema can then be used for conversions directly or stored in Hortonworks Schema Registry or Apache NiFi Built-in Avro Registry. Now you can use it for ConvertRecord, QueryRecord and other Record processing. Example Generated Schema in Avro-JSON Format Stored in Hortonworks Schema Registry: Source: https://github.com/tspannhw/nifi-attributecleaner-processor
... View more
Labels:
12-05-2017
12:30 AM
1 Kudo
Overview We create Apache Hive tables for analytics automagically from DDL generated by Apache NiFi. Hortonworks SAM Run Time The results of this produces HBase and Druid analytics. SAM has produced two Druid Datasources that we can slice up and explore in superset. In Apache Zeppelin, we can run our Hive DDL (listed below) and run all of our queries including joining our two patient data tables. We can sort and create some basic graphs. All of this data is available to you in your favorite BI tools. Generated Apache Hive Tables from ORC Files (DDL) CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING,
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING)
STORED AS ORC LOCATION '/patientdata';
CREATE EXTERNAL TABLE IF NOT EXISTS ccda (problemSectionact_02observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_05idsroot STRING, problemSectionact_02observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_01texttext_01value STRING, vitalSignsSectionorganizerobservations_04texttext_01value STRING, vitalSignsSectionorganizercodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesvalue STRING, vitalSignsSectionorganizerobservations_04valuesvalue STRING, problemSectionact_03observationidroot STRING, problemSectionact_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05effectiveTimevalue STRING, RouteOnAttributeRoute STRING, vitalSignsSectionorganizerobservations_03codecode STRING, vitalSignsSectionorganizerobservations_04statusCodecode STRING, problemSectionidroot STRING, codecode STRING, problemSectionact_01effectiveTimelow STRING, problemSectioncodecodeSystemName STRING, codedisplayName STRING, problemSectionact_01observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_01idsroot STRING, vitalSignsSectionorganizerobservations_02idsroot STRING, vitalSignsSectionorganizerobservations_04idsroot STRING, vitalSignsSectionorganizerobservations_03idsroot STRING, problemSectionact_01observationeffectiveTimelow STRING, filecreationTime STRING, problemSectionact_01observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationvaluestranslationscode STRING, problemSectionact_03statusCodecode STRING, problemSectionact_03observationvaluestranslationscodeSystem STRING, problemSectionact_02idroot STRING, problemSectionact_03codecode STRING, problemSectionact_01observationidroot STRING, problemSectionact_02observationvaluestranslationscodeSystem STRING, problemSectionact_03observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectionorganizercodecode STRING, vitalSignsSectionorganizerobservations_02statusCodecode STRING, problemSectionact_03observationvaluestranslationsdisplayName STRING, vitalSignsSectionorganizerobservations_03codecodeSystem STRING, problemSectionact_03observationproblemStatuscodecode STRING, problemSectionact_01observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_04textreferencevalue STRING, filelastAccessTime STRING, vitalSignsSectionorganizerobservations_01codedisplayName STRING, filesize STRING, problemSectioncodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesunit STRING, vitalSignsSectionorganizerobservations_02effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_05idsextension STRING, vitalSignsSectionorganizerobservations_04codecode STRING, vitalSignsSectionorganizerobservations_05valuesvalue STRING, vitalSignsSectionorganizerobservations_04idsextension STRING, vitalSignsSectionorganizerobservations_02valuesvalue STRING, problemSectionact_02observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_02idsextension STRING, vitalSignsSectionorganizerobservations_03idsextension STRING, vitalSignsSectionorganizerobservations_01idsextension STRING, problemSectiontitle STRING, vitalSignsSectionorganizerobservations_01codecodeSystemName STRING, problemSectionact_03observationvaluestranslationsoriginalTextreferencevalue STRING, vitalSignsSectionorganizerobservations_04valuesunit STRING, problemSectionact_02idextension STRING, vitalSignsSectionorganizerobservations_05statusCodecode STRING, vitalSignsSectionorganizerobservations_04effectiveTimevalue STRING, problemSectionact_02observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesvalue STRING, vitalSignsSectionorganizereffectiveTimevalue STRING, problemSectionact_03observationvaluestranslationscode STRING, vitalSignsSectionorganizerobservations_03codedisplayName STRING, vitalSignsSectionorganizerobservations_02texttext_01value STRING, vitalSignsSectionorganizerobservations_05texttext_01value STRING, absolutepath STRING, vitalSignsSectioncodedisplayName STRING, problemSectionact_03idextension STRING, problemSectionact_01observationvaluestranslationsoriginalTextreferencevalue STRING, problemSectionact_02observationvaluestranslationsoriginalTextreferencevalue STRING, filelastModifiedTime STRING, problemSectioncodecode STRING, vitalSignsSectionorganizeridsroot STRING, problemSectionact_02observationproblemStatuscodecodeSystem STRING, vitalSignsSectionorganizerobservations_05codecodeSystem STRING, filegroup STRING, problemSectionact_01observationproblemStatusvaluescode STRING, problemSectionact_01observationvaluestranslationsdisplayName STRING, problemSectionact_02codecode STRING, idextension STRING, vitalSignsSectioncodecode STRING, problemSectionact_03observationproblemStatuscodecodeSystemName STRING, problemSectionact_01idroot STRING, vitalSignsSectiontitle STRING, problemSectionact_01observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesunit STRING, vitalSignsSectionorganizerobservations_01textreferencevalue STRING, effectiveTime STRING, vitalSignsSectionorganizerobservations_03codecodeSystemName STRING, problemSectionact_03observationstatusCodecode STRING, problemSectionact_02statusCodecode STRING, problemSectionact_02observationidextension STRING, problemSectionact_01idextension STRING, vitalSignsSectionorganizerstatusCodecode STRING, vitalSignsSectionorganizerobservations_05codedisplayName STRING, vitalSignsSectionorganizerobservations_04codecodeSystem STRING, vitalSignsSectionorganizerobservations_02codedisplayName STRING, problemSectionact_01observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_05codecode STRING, vitalSignsSectionorganizerobservations_04codecodeSystemName STRING, problemSectionact_02observationvaluestranslationsdisplayName STRING, idroot STRING, vitalSignsSectionorganizerobservations_02textreferencevalue STRING, problemSectionact_01observationidextension STRING, problemSectionact_01observationvaluestranslationscodeSystem STRING, problemSectionact_01codecode STRING, problemSectionact_02observationproblemStatusvaluesdisplayName STRING, problemSectionact_01codecodeSystem STRING, codecodeSystemName STRING, vitalSignsSectionorganizerobservations_01effectiveTimevalue STRING, vitalSignsSectionorganizercodedisplayName STRING, vitalSignsSectionorganizerobservations_02codecodeSystemName STRING, vitalSignsSectionorganizerobservations_03textreferencevalue STRING, vitalSignsSectionorganizerobservations_02valuesunit STRING, problemSectionact_03observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationproblemStatuscodecode STRING, vitalSignsSectionorganizerobservations_03statusCodecode STRING, problemSectionact_03observationproblemStatusvaluescode STRING, problemSectionact_02observationeffectiveTimelow STRING, problemSectionact_03observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03texttext_01value STRING, problemSectionact_01observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizercodecodeSystemName STRING, problemSectionact_03observationidextension STRING, vitalSignsSectionorganizerobservations_01codecode STRING, codecodeSystem STRING, problemSectionact_02effectiveTimelow STRING, problemSectioncodedisplayName STRING, problemSectionact_02observationproblemStatusvaluescode STRING, vitalSignsSectionorganizeridsextension STRING, problemSectionact_02observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_02codecode STRING, title STRING, problemSectionact_03idroot STRING, problemSectionidextension STRING, problemSectionact_03observationproblemStatusstatusCodecode STRING, problemSectionact_03effectiveTimelow STRING, problemSectionact_02observationproblemStatusvaluescodeSystemName STRING, fileowner STRING, vitalSignsSectionorganizerobservations_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05textreferencevalue STRING, filepermissions STRING, vitalSignsSectionorganizerobservations_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05valuesunit STRING, problemSectionact_01observationvaluestranslationscode STRING, problemSectionact_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05codecodeSystemName STRING, problemSectionact_03codecodeSystem STRING, vitalSignsSectioncodecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectioncodecodeSystemName STRING, problemSectionact_01observationproblemStatuscodecode STRING, problemSectionact_02observationidroot STRING, vitalSignsSectionorganizerobservations_01codecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_03effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_04codedisplayName STRING, problemSectionact_03observationeffectiveTimelow STRING) STORED AS ORC LOCATION '/ccda';
CREATE EXTERNAL TABLE IF NOT EXISTS hl7_oru_flat (OBX_1_UserDefinedAccessChecks STRING, OBR_1_OrderingProvider_FamilyName STRING, MSH_MessageControlID STRING, OBX_1_ObservationIdentifier_Text STRING, MSH_SendingApplication_NamespaceID STRING, OBR_1_UniversalServiceIdentifier_Text STRING, MSH_ReceivingApplication_NamespaceID STRING, MSH_ProcessingID_ProcessingID STRING, PID_SSNNumberPatient STRING, OBR_1_FillerOrderNumber_EntityIdentifier STRING, PID_PatientAccountNumber_ID STRING, PID_DateOfBirth STRING, PD1_PatientPrimaryCareProviderNameIDNo_IDNumber STRING, PID_Sex STRING, MSH_MessageType_MessageType STRING, OBX_1_ReferencesRange STRING, OBR_1_OrderingProvider_IDNumber STRING, PD1_PatientPrimaryCareProviderNameIDNo_FamilyName STRING, OBX_1_Units_NameOfCodingSystem STRING, OBX_1_Units_Identifier STRING, PID_PatientName_GivenName STRING, OBX_1_ObservationSubID STRING, PD1_PatientPrimaryCareProviderNameIDNo_GivenName STRING, OBR_1_PlacerOrderNumber_NamespaceID STRING, MSH_MessageType_TriggerEvent STRING, PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority STRING, OBR_1_ResultStatus STRING, PID_PatientName_FamilyName STRING, MSH_EncodingCharacters STRING, MSH_VersionID STRING, OBR_1_UniversalServiceIdentifier_Identifier STRING, OBR_1_ObservationDateTime STRING, OBR_1_ScheduledDateTime STRING, OBX_1_ObservationIdentifier_Identifier STRING, OBR_1_OrderingProvider_GivenName STRING, OBR_1_SetIDObservationRequest STRING, OBR_1_ResultsRptStatusChngDateTime STRING, OBR_1_PlacerOrderNumber_EntityIdentifier STRING, OBX_1_NatureOfAbnormalTest STRING, OBX_1_SetIDOBX STRING, MSH_FieldSeparator STRING, PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName STRING, OBX_1_Units_Text STRING, OBX_1_ValueType STRING, PID_PatientIDInternalID_ID STRING, OBX_1_ObservationValue STRING, OBR_1_OrderingProvider_MiddleInitialOrName STRING) STORED AS ORC LOCATION '/hl7/flat/oru';
Apache Hive Queries to Run in Apache Zeppelin or Anywhere via JDBC/ODBC select * from patientdata;
select * from ccda;
select vitalsignssectionorganizerobservations_01valuesvalue, vitalsignssectionorganizerobservations_04valuesvalue,vitalsignssectionorganizerobservations_01texttext_01value
from ccda;
select * from hl7_oru_flat;
select * from hl7_oru_flat h join patientdata p on h.PID_SSNNumberPatient = p.PID_SSNNumberPatient
Zeppelin Notebook medicalzeppelinjs.txt
... View more
Labels:
12-04-2017
04:12 PM
2 Kudos
Overview In this section of HL7 processing, we look at what to do next with HL7 data now that it has been converted into Big Data friendly AVRO with a schema attached. We can easily combine that data with our other patientdata that came from MQTT. We do a simple JOIN on SSN and send this combined data to Kafka with a new schema and to Druid for analytics. We also Route some of the data to HDFS and HBase for storage and another feed to Druid for analytics. S.A.M. Flow Overview I have a Simple Hortonworks Streaming Analytics Manager application that ingests two Kafka feeds (one for PatientData and one for HL7 Converted to AVRO by Apache NiFi). I join those two records together on a key and push them to Kafka and to Druid. Our Environment For Running SAM, Just Click what you need I added Druid, Apache HBase, HDFS, Apache Hive, Apache Kafka, Apache Storm and Apache Zookeeper. I am using all except Apache Hive. We can merge together the Kafka feeds sent from Apache NiFi. One feed is of patient data that started as JSON data sent via MQTT from a remote node. The other feed is from another system that sent it via Kafka to Apache NiFi to convert from HL7 to AVRO. Above in Hortonworks Schema Registry we can see our schemas beautifully displayed with version information. To setup a Kafka source, you just follow the dropdowns and name it. No manual effort. This is the other feed. As you can see it grabs the schema associated with the Kafka topic and displays all the fields and types. Here is the join, we pick the field to join, what type of join LEFT, a type of Windowing and pick the fields we want. Again no manual effort or typing. Druid is as easy to pick up, just put in the name for your new data source. SAM will create it. Here is how we do an aggregate on one of the numeric values. We will send the SSN Number and Observation Value Max to the existing HBase patient observations table with the column family obs. HBase DDL: create 'patient_observations', 'obs' The results of this flow are we have data added to HBase and to HDFS. We also have 2 Druid data sources populated for Superset analysis. hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.2.2.6.2.0-205, r5210d2ed88d7e241646beab51e9ac147a973bdcc, Sat Aug 26 09:33:50 UTC 2017
hbase(main):001:0> list
TABLE
ATLAS_ENTITY_AUDIT_EVENTS
atlas_titan
patient_observations
3 row(s) in 0.3220 seconds
=> ["ATLAS_ENTITY_AUDIT_EVENTS", "atlas_titan", "patient_observations"]
hbase(main):002:0> scan 'patient_observations'
ROW COLUMN+CELL
2ba61288-3c45-4eb4-a0fa-e92e44ef210f column=obs:OBX_1_ObservationValue_MAX, timestamp=1512503796419, value=mg/dL
2ba61288-3c45-4eb4-a0fa-e92e44ef210f column=obs:PID_SSNNumberPatient, timestamp=1512503796419, value=123456789
1 row(s) in 0.2110 seconds
hbase(main):003:0>
hdfs dfs -cat /hl7sam/7-HDFS-8-999-1512490296249.txt
XXXXXX,000000000000000000,999999999999,ORU,M,99,LAST,JOHN,65-99,R,000000001,NM,65,|,1,341856649,1,John,20150101000100,mg/dL,F,20150101000100,20150101000100,2.3,^~\&,M,NPI,R01,HNAM_ORDERID,FIRST,1620,M,SMITH,159,1234567890,19700101,P,HealthOrg01,Basic Metabolic Panel,Glucose Lvl,GLU,H,123456789,Q1111111111111111111,20150101000100,648088,Johnson
See: https://community.hortonworks.com/articles/149910/handling-hl7-records-part-1-hl7-ingest.html SAM Flow: sam-hl7json.txt References: http://www.openhealthtools.org/ https://community.hortonworks.com/articles/122077/ingesting-csv-data-and-pushing-it-as-avro-to-kafka.html
... View more
Labels:
12-02-2017
08:07 PM
2 Kudos
Step 1: Collect HL7 Health Records
Python to Send JSON Data to MQTT (Data Generated by https://www.mockaroo.com/)
import paho.mqtt.client as mqtt
import json
# MQTT
client = mqtt.Client()
client.connect("localhost", 14162, 60)
row = [{"PID_SSNNumberPatient":823456789,"email":"ahospital0@census.gov","gender":"Male","ip_address":"138.135.180.206","drug_provider":"OrchidPharma Inc","icd9":"94140","icd9_description":"Deep necrosis of underlying tissues [deep third degree] without mention of loss of a body part, face and head, unspecified site","icd9P_proc":"7942","icd9_proc_description":"Closed reduction of separated epiphysis, radius and ulna","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_8) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.801.0 Safari/535.1","drug_used":"Naratriptan"}]
json_string = json.dumps(row)
client.publish("patientdata",payload=json_string,qos=1,retain=False)
client.disconnect()
Step 2: Get to Apache NiFi via FTP, sFTP, File, Apache Kafka, MQTT, REST API, TCP/IP or hundreds more options.
Schemas
{
"type": "record",
"name": "hl7oru",
"fields": [
{
"name": "OBX_1_UserDefinedAccessChecks",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_OrderingProvider_FamilyName",
"type": "string",
"doc": "Type inferred from '\"Johnson\"'"
},
{
"name": "MSH_MessageControlID",
"type": "string",
"doc": "Type inferred from '\"Q1111111111111111111\"'"
},
{
"name": "OBX_1_ObservationIdentifier_Text",
"type": "string",
"doc": "Type inferred from '\"Glucose Lvl\"'"
},
{
"name": "MSH_SendingApplication_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"XXXXXX\"'"
},
{
"name": "OBR_1_UniversalServiceIdentifier_Text",
"type": "string",
"doc": "Type inferred from '\"Basic Metabolic Panel\"'"
},
{
"name": "MSH_ReceivingApplication_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"HealthOrg01\"'"
},
{
"name": "MSH_ProcessingID_ProcessingID",
"type": "string",
"doc": "Type inferred from '\"P\"'"
},
{
"name": "PID_SSNNumberPatient",
"type": "string",
"doc": "Type inferred from '\"123456789\"'"
},
{
"name": "OBR_1_FillerOrderNumber_EntityIdentifier",
"type": "string",
"doc": "Type inferred from '\"000000000000000000\"'"
},
{
"name": "PID_PatientAccountNumber_ID",
"type": "string",
"doc": "Type inferred from '\"999999999999\"'"
},
{
"name": "PID_DateOfBirth",
"type": "string",
"doc": "Type inferred from '\"19700101\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
"type": "string",
"doc": "Type inferred from '\"1234567890\"'"
},
{
"name": "PID_Sex",
"type": "string",
"doc": "Type inferred from '\"M\"'"
},
{
"name": "MSH_MessageType_MessageType",
"type": "string",
"doc": "Type inferred from '\"ORU\"'"
},
{
"name": "OBX_1_ReferencesRange",
"type": "string",
"doc": "Type inferred from '\"H\"'"
},
{
"name": "OBR_1_OrderingProvider_IDNumber",
"type": "string",
"doc": "Type inferred from '\"1620\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
"type": "string",
"doc": "Type inferred from '\"LAST\"'"
},
{
"name": "OBX_1_Units_NameOfCodingSystem",
"type": "string",
"doc": "Type inferred from '\"99\"'"
},
{
"name": "OBX_1_Units_Identifier",
"type": "string",
"doc": "Type inferred from '\"65-99\"'"
},
{
"name": "PID_PatientName_GivenName",
"type": "string",
"doc": "Type inferred from '\"JOHN\"'"
},
{
"name": "OBX_1_ObservationSubID",
"type": "string",
"doc": "Type inferred from '\"159\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
"type": "string",
"doc": "Type inferred from '\"FIRST\"'"
},
{
"name": "OBR_1_PlacerOrderNumber_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"HNAM_ORDERID\"'"
},
{
"name": "MSH_MessageType_TriggerEvent",
"type": "string",
"doc": "Type inferred from '\"R01\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
"type": "string",
"doc": "Type inferred from '\"NPI\"'"
},
{
"name": "OBR_1_ResultStatus",
"type": "string",
"doc": "Type inferred from '\"M\"'"
},
{
"name": "PID_PatientName_FamilyName",
"type": "string",
"doc": "Type inferred from '\"SMITH\"'"
},
{
"name": "MSH_EncodingCharacters",
"type": "string",
"doc": "Type inferred from '\"^~\\&\"'"
},
{
"name": "MSH_VersionID",
"type": "string",
"doc": "Type inferred from '\"2.3\"'"
},
{
"name": "OBR_1_UniversalServiceIdentifier_Identifier",
"type": "string",
"doc": "Type inferred from '\"648088\"'"
},
{
"name": "OBR_1_ObservationDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_ScheduledDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBX_1_ObservationIdentifier_Identifier",
"type": "string",
"doc": "Type inferred from '\"GLU\"'"
},
{
"name": "OBR_1_OrderingProvider_GivenName",
"type": "string",
"doc": "Type inferred from '\"John\"'"
},
{
"name": "OBR_1_SetIDObservationRequest",
"type": "string",
"doc": "Type inferred from '\"1\"'"
},
{
"name": "OBR_1_ResultsRptStatusChngDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_PlacerOrderNumber_EntityIdentifier",
"type": "string",
"doc": "Type inferred from '\"341856649\"'"
},
{
"name": "OBX_1_NatureOfAbnormalTest",
"type": "string",
"doc": "Type inferred from '\"F\"'"
},
{
"name": "OBX_1_SetIDOBX",
"type": "string",
"doc": "Type inferred from '\"1\"'"
},
{
"name": "MSH_FieldSeparator",
"type": "string",
"doc": "Type inferred from '\"|\"'"
},
{
"name": "PD1",
"type": {
"type": "record",
"name": "PD1",
"fields": [
{
"name": "PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
"type": "string",
"doc": "Type inferred from '\"M\"'"
}
]
},
"doc": "Type inferred from '{\"PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName\":\"M\"}'"
},
{
"name": "OBX_1_Units_Text",
"type": "string",
"doc": "Type inferred from '\"65\"'"
},
{
"name": "OBX_1_ValueType",
"type": "string",
"doc": "Type inferred from '\"NM\"'"
},
{
"name": "PID_PatientIDInternalID_ID",
"type": "string",
"doc": "Type inferred from '\"000000001\"'"
},
{
"name": "OBX_1_ObservationValue",
"type": "string",
"doc": "Type inferred from '\"mg/dL\"'"
},
{
"name": "OBR_1_OrderingProvider_MiddleInitialOrName",
"type": "string",
"doc": "Type inferred from '\"R\"'"
}
]
}
patientbothjson.txt
patientdatajs.txt
simple.txt
Jolt Scripts
{
"OBX_1.UserDefinedAccessChecks": "OBX_1_UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1_OrderingProvider_FamilyName",
"MSH.MessageControlID": "MSH_MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1_ObservationIdentifier_Text",
"MSH.SendingApplication.NamespaceID": "MSH_SendingApplication_NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1_UniversalServiceIdentifier_Text",
"MSH.ReceivingApplication.NamespaceID": "MSH_ReceivingApplication_NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH_ProcessingID_ProcessingID",
"PID.SSNNumberPatient": "PID_SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1_FillerOrderNumber_EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID_PatientAccountNumber_ID",
"PID.DateOfBirth": "PID_DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
"PID.Sex": "PID_Sex",
"MSH.MessageType.MessageType": "MSH_MessageType_MessageType",
"OBX_1.ReferencesRange": "OBX_1_ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1_OrderingProvider_IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1_Units_NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1_Units_Identifier",
"PID.PatientName.GivenName": "PID_PatientName_GivenName",
"OBX_1.ObservationSubID": "OBX_1_ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1_PlacerOrderNumber_NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH_MessageType_TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1_ResultStatus",
"PID.PatientName.FamilyName": "PID_PatientName_FamilyName",
"MSH.EncodingCharacters": "MSH_EncodingCharacters",
"MSH.VersionID": "MSH_VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1_UniversalServiceIdentifier_Identifier",
"OBR_1.ObservationDateTime": "OBR_1_ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1_ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1_ObservationIdentifier_Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1_OrderingProvider_GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1_SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1_ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1_PlacerOrderNumber_EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1_NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1_SetIDOBX",
"MSH.FieldSeparator": "MSH_FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1_Units_Text",
"OBX_1.ValueType": "OBX_1_ValueType",
"PID.PatientIDInternalID.ID": "PID_PatientIDInternalID_ID",
"OBX_1.ObservationValue": "OBX_1_ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1_OrderingProvider_MiddleInitialOrName"
}
{
"OBX_1.UserDefinedAccessChecks": "OBX_1.UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1.OrderingProvider.FamilyName",
"MSH.MessageControlID": "MSH.MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1.ObservationIdentifier.Text",
"MSH.SendingApplication.NamespaceID": "MSH.SendingApplication.NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1.UniversalServiceIdentifier.Text",
"MSH.ReceivingApplication.NamespaceID": "MSH.ReceivingApplication.NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH.ProcessingID.ProcessingID",
"PID.SSNNumberPatient": "PID.SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1.FillerOrderNumber.EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID.PatientAccountNumber.ID",
"PID.DateOfBirth": "PID.DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber",
"PID.Sex": "PID.Sex",
"MSH.MessageType.MessageType": "MSH.MessageType.MessageType",
"OBX_1.ReferencesRange": "OBX_1.ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1.OrderingProvider.IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1.Units.NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1.Units.Identifier",
"PID.PatientName.GivenName": "PID.PatientName.GivenName",
"OBX_1.ObservationSubID": "OBX_1.ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1.PatientPrimaryCareProviderNameIDNo.GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1.PlacerOrderNumber.NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH.MessageType.TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1.ResultStatus",
"PID.PatientName.FamilyName": "PID.PatientName.FamilyName",
"MSH.EncodingCharacters": "MSH.EncodingCharacters",
"MSH.VersionID": "MSH.VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1.UniversalServiceIdentifier.Identifier",
"OBR_1.ObservationDateTime": "OBR_1.ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1.ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1.ObservationIdentifier.Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1.OrderingProvider.GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1.SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1.ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1.PlacerOrderNumber.EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1.NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1.SetIDOBX",
"MSH.FieldSeparator": "MSH.FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1.Units.Text",
"OBX_1.ValueType": "OBX_1.ValueType",
"PID.PatientIDInternalID.ID": "PID.PatientIDInternalID.ID",
"OBX_1.ObservationValue": "OBX_1.ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1.OrderingProvider.MiddleInitialOrName"
}
Step 3: Profit
Build a Big Data Environment For Our Data
(Build HDFS Directories)
su hdfs
hdfs dfs -mkdir -p /hl7/hl7-mdm
hdfs dfs -mkdir -p /hl7/hl7-adt
hdfs dfs -mkdir -p /hl7/hl7-orm
hdfs dfs -mkdir -p /hl7/hl7-oru
hdfs dfs -mkdir -p /hl7/json/hl7-mdm
hdfs dfs -mkdir -p /hl7/json/hl7-adt
hdfs dfs -mkdir -p /hl7/json/hl7-orm
hdfs dfs -mkdir -p /hl7/json/hl7-oru
hdfs dfs -mkdir -p /hl7/flat/oru
hdfs dfs -mkdir -p /patientdata
hdfs dfs -chmod -R 777 /hl7
hdfs dfs -chmod -R 777 /patientdata
hdfs dfs -ls -R /hl7
hdfs dfs -ls -R /patientdata
(Build Hive DDL)
CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING,
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING)
STORED AS ORC LOCATION '/patientdata'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7oru
(OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING,
Identifier:STRING>,
ReferencesRange:STRING,
Units:STRUCT<NameOfCodingSystem:STRING,
Identifier:STRING,
Text:STRING>,
ObservationSubID:STRING,
NatureOfAbnormalTest:STRING,
SetIDOBX:STRING,
ValueType:STRING,
ObservationValue:STRING>,
OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING,
IDNumber:STRING,
GivenName:STRING,
MiddleInitialOrName:STRING>,
UniversalServiceIdentifier:STRUCT<Text:STRING,
Identifier:STRING>,
FillerOrderNumber:STRUCT<EntityIdentifier:STRING>,
PlacerOrderNumber:STRUCT<NamespaceID:STRING,
EntityIdentifier:STRING>,
ResultStatus:STRING,
ObservationDateTime:STRING,
ScheduledDateTime:STRING,
SetIDObservationRequest:STRING,
ResultsRptStatusChngDateTime:STRING>,
MSH STRUCT<MessageControlID:STRING,
SendingApplication:STRUCT<NamespaceID:STRING>,
ReceivingApplication:STRUCT<NamespaceID:STRING>,
ProcessingID:STRUCT<ProcessingID:STRING>,
MessageType:STRUCT<MessageType:STRING,
TriggerEvent:STRING>,
EncodingCharacters:STRING,
VersionID:STRING,
FieldSeparator:STRING>,
PID STRUCT<SSNNumberPatient:STRING,
PatientAccountNumber:STRUCT<ID:STRING>,
DateOfBirth:STRING,
Sex:STRING,
PatientName:STRUCT<GivenName:STRING,
FamilyName:STRING>,
PatientIDInternalID:STRUCT<ID:STRING>>,
PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING,
FamilyName:STRING,
GivenName:STRING,
AssigningAuthority:STRING,
MiddleInitialOrName:STRING>>)
STORED AS ORC
LOCATION '/hl7/hl7-oru'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7_oru_flat
(OBX_1_UserDefinedAccessChecks STRING,
OBR_1_OrderingProvider_FamilyName STRING,
MSH_MessageControlID STRING,
OBX_1_ObservationIdentifier_Text STRING,
MSH_SendingApplication_NamespaceID STRING,
OBR_1_UniversalServiceIdentifier_Text STRING,
MSH_ReceivingApplication_NamespaceID STRING,
MSH_ProcessingID_ProcessingID STRING,
PID_SSNNumberPatient STRING,
OBR_1_FillerOrderNumber_EntityIdentifier STRING,
PID_PatientAccountNumber_ID STRING,
PID_DateOfBirth STRING,
PD1_PatientPrimaryCareProviderNameIDNo_IDNumber STRING,
PID_Sex STRING,
MSH_MessageType_MessageType STRING,
OBX_1_ReferencesRange STRING,
OBR_1_OrderingProvider_IDNumber STRING,
PD1_PatientPrimaryCareProviderNameIDNo_FamilyName STRING,
OBX_1_Units_NameOfCodingSystem STRING,
OBX_1_Units_Identifier STRING,
PID_PatientName_GivenName STRING,
OBX_1_ObservationSubID STRING,
PD1_PatientPrimaryCareProviderNameIDNo_GivenName STRING,
OBR_1_PlacerOrderNumber_NamespaceID STRING,
MSH_MessageType_TriggerEvent STRING,
PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority STRING,
OBR_1_ResultStatus STRING,
PID_PatientName_FamilyName STRING,
MSH_EncodingCharacters STRING,
MSH_VersionID STRING,
OBR_1_UniversalServiceIdentifier_Identifier STRING,
OBR_1_ObservationDateTime STRING,
OBR_1_ScheduledDateTime STRING,
OBX_1_ObservationIdentifier_Identifier STRING,
OBR_1_OrderingProvider_GivenName STRING,
OBR_1_SetIDObservationRequest STRING,
OBR_1_ResultsRptStatusChngDateTime STRING,
OBR_1_PlacerOrderNumber_EntityIdentifier STRING,
OBX_1_NatureOfAbnormalTest STRING,
OBX_1_SetIDOBX STRING,
MSH_FieldSeparator STRING,
PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName STRING,
OBX_1_Units_Text STRING,
OBX_1_ValueType STRING,
PID_PatientIDInternalID_ID STRING,
OBX_1_ObservationValue STRING,
OBR_1_OrderingProvider_MiddleInitialOrName STRING)
STORED AS ORC
LOCATION '/hl7/flat/oru'
CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING,
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING)
STORED AS ORC LOCATION '/patientdata'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7-oru (OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING, Identifier:STRING>,
ReferencesRange:STRING, Units:STRUCT<NameOfCodingSystem:STRING, Identifier:STRING, Text:STRING>,
ObservationSubID:STRING, NatureOfAbnormalTest:STRING, SetIDOBX:STRING, ValueType:STRING, ObservationValue:STRING>,
OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING, IDNumber:STRING, GivenName:STRING, MiddleInitialOrName:STRING>,
UniversalServiceIdentifier:STRUCT<Text:STRING, Identifier:STRING>, FillerOrderNumber:STRUCT<EntityIdentifier:STRING>,
PlacerOrderNumber:STRUCT<NamespaceID:STRING, EntityIdentifier:STRING>, ResultStatus:STRING, ObservationDateTime:STRING,
ScheduledDateTime:STRING, SetIDObservationRequest:STRING, ResultsRptStatusChngDateTime:STRING>, MSH STRUCT<MessageControlID:STRING,
SendingApplication:STRUCT<NamespaceID:STRING>, ReceivingApplication:STRUCT<NamespaceID:STRING>, ProcessingID:STRUCT<ProcessingID:STRING>,
MessageType:STRUCT<MessageType:STRING, TriggerEvent:STRING>, EncodingCharacters:STRING, VersionID:STRING, FieldSeparator:STRING>,
PID STRUCT<SSNNumberPatient:STRING, PatientAccountNumber:STRUCT<ID:STRING>, DateOfBirth:STRING, Sex:STRING, PatientName:STRUCT<GivenName:STRING,
FamilyName:STRING>, PatientIDInternalID:STRUCT<ID:STRING>>, PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING, FamilyName:STRING,
GivenName:STRING, AssigningAuthority:STRING, MiddleInitialOrName:STRING>>) STORED AS ORC
LOCATION '/hl7/hl7oru'
(Build Kafka Topics)
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientboth
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic simple
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientdata
Script to send a File to Kafka
sendmessage.sh /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic hl7-oru < hl7sampledata.txt
HBase DDL
hbase shell
create 'patient_observations', 'obs'
list
Running a Mosquitto MQTT Broker (OSX)
/usr/local/Cellar/mosquitto/1.4.14_2/sbin/mosquitto --daemon --verbose --port 14162
Removing Unneeded HDFS Files
hdfs dfs -rm -r -f -skipTrash $1
Example Data from Internet
MSH|^~\&|XXXXXX||HealthOrg01||||ORU^R01|Q1111111111111111111|P|2.3|<cr>PID|||000000001||SMITH^JOHN||19700101|M||||||||||999999999999|123456789|<cr>PD1||||1234567890^LAST^FIRST^M^^^^^NPI|<cr>OBR|1|341856649^HNAM_ORDERID|000000000000000000|648088^Basic Metabolic Panel|||20150101000100|||||||||1620^Johnson^John^R||||||20150101000100|||M|||||||||||20150101000100|<cr>OBX|1|NM|GLU^Glucose Lvl|159|mg/dL|65-99^65^99|H|||F|||20150101000100|
See: Handling HL7 Records and Storing in Apache Hive for SQL Queries Part 2: - C-CDA Data and Custom Avro Processor
Code Section
Map<String, String> attributes = flowFile.getAttributes();
Map<String, String> attributesClean = new HashMap<>();
String tempKey = "";
for (Map.Entry<String, String> entry : attributes.entrySet()){
tempKey = entry.getKey().replaceFirst("[^A-Za-z]", "");<br> tempKey = tempKey.replaceAll("[^A-Za-z0-9_]", "");<br> attributesClean.put(tempKey, entry.getValue());<br> session.removeAttribute(flowFile, entry.getKey());<br>}<br>session.putAllAttributes(flowFile, attributesClean);
Apache NiFi FlowFile
hl7new.xml
Next Sections: https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo.html https://community.hortonworks.com/content/kbentry/150026/hl7-processing-part-3-apache-zeppelin-sql-bi-and-a.html https://community.hortonworks.com/articles/149982/hl7-ingest-part-4-streaming-analytics-manager-and.html Important Starter Articles https://community.hortonworks.com/articles/138249/nifi-in-healthcare-ingesting-hl7-data-in-nifi.html https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi.html https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo.html
References:
https://hortonworks.com/tutorial/real-time-event-processing-in-nifi-sam-schema-registry-and-superset/ https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-hl7-nar/1.4.0/org.apache.nifi.processors.hl7.RouteHL7/index.html https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi.html https://github.com/cqframework/clinical_quality_language https://github.com/apache/nifi/blob/master/nifi-commons/nifi-hl7-query-language/src/test/java/org/apache/nifi/hl7/query/TestHL7Query.java https://stackoverflow.com/questions/43251660/apache-nifi-routehl7-issue/43263583 https://gist.github.com/alopresto/9b46011185efd0380e6c5da0a3412e8f
... View more
Labels:
12-01-2017
09:33 PM
2 Kudos
We need to ingest C-CDA files, which often have weird names in them. To Compound the naming issues, the Extract C-CDA processor flattens XML to have periods which are not allowed in Apache Avro names. Apache NiFi DataFlow
Ingest C-CDA XML Files Extract C-CDA Attributes Route on Attribute AttributeCleaner - my new custom one AttributeToJSON SetSchema QueryRecord ConvertAvroToORC PutHDFS AttributeCleanerProcessor is my new processor to rename all the attributes. This is a very simple version. These are the converted file names. Above is the JSON fields with values. Route when code field is not null. If code is active then convert to Apache ORC for Hive. SQL Table CREATE EXTERNAL TABLE IF NOT EXISTS ccda (problemSectionact_02observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_05idsroot STRING, problemSectionact_02observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_01texttext_01value STRING, vitalSignsSectionorganizerobservations_04texttext_01value STRING, vitalSignsSectionorganizercodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesvalue STRING, vitalSignsSectionorganizerobservations_04valuesvalue STRING, problemSectionact_03observationidroot STRING, problemSectionact_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05effectiveTimevalue STRING, RouteOnAttributeRoute STRING, vitalSignsSectionorganizerobservations_03codecode STRING, vitalSignsSectionorganizerobservations_04statusCodecode STRING, problemSectionidroot STRING, codecode STRING, problemSectionact_01effectiveTimelow STRING, problemSectioncodecodeSystemName STRING, codedisplayName STRING, problemSectionact_01observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_01idsroot STRING, vitalSignsSectionorganizerobservations_02idsroot STRING, vitalSignsSectionorganizerobservations_04idsroot STRING, vitalSignsSectionorganizerobservations_03idsroot STRING, problemSectionact_01observationeffectiveTimelow STRING, filecreationTime STRING, problemSectionact_01observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationvaluestranslationscode STRING, problemSectionact_03statusCodecode STRING, problemSectionact_03observationvaluestranslationscodeSystem STRING, problemSectionact_02idroot STRING, problemSectionact_03codecode STRING, problemSectionact_01observationidroot STRING, problemSectionact_02observationvaluestranslationscodeSystem STRING, problemSectionact_03observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectionorganizercodecode STRING, vitalSignsSectionorganizerobservations_02statusCodecode STRING, problemSectionact_03observationvaluestranslationsdisplayName STRING, vitalSignsSectionorganizerobservations_03codecodeSystem STRING, problemSectionact_03observationproblemStatuscodecode STRING, problemSectionact_01observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_04textreferencevalue STRING, filelastAccessTime STRING, vitalSignsSectionorganizerobservations_01codedisplayName STRING, filesize STRING, problemSectioncodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesunit STRING, vitalSignsSectionorganizerobservations_02effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_05idsextension STRING, vitalSignsSectionorganizerobservations_04codecode STRING, vitalSignsSectionorganizerobservations_05valuesvalue STRING, vitalSignsSectionorganizerobservations_04idsextension STRING, vitalSignsSectionorganizerobservations_02valuesvalue STRING, problemSectionact_02observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_02idsextension STRING, vitalSignsSectionorganizerobservations_03idsextension STRING, vitalSignsSectionorganizerobservations_01idsextension STRING, problemSectiontitle STRING, vitalSignsSectionorganizerobservations_01codecodeSystemName STRING, problemSectionact_03observationvaluestranslationsoriginalTextreferencevalue STRING, vitalSignsSectionorganizerobservations_04valuesunit STRING, problemSectionact_02idextension STRING, vitalSignsSectionorganizerobservations_05statusCodecode STRING, vitalSignsSectionorganizerobservations_04effectiveTimevalue STRING, problemSectionact_02observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesvalue STRING, vitalSignsSectionorganizereffectiveTimevalue STRING, problemSectionact_03observationvaluestranslationscode STRING, vitalSignsSectionorganizerobservations_03codedisplayName STRING, vitalSignsSectionorganizerobservations_02texttext_01value STRING, vitalSignsSectionorganizerobservations_05texttext_01value STRING, absolutepath STRING, vitalSignsSectioncodedisplayName STRING, problemSectionact_03idextension STRING, problemSectionact_01observationvaluestranslationsoriginalTextreferencevalue STRING, problemSectionact_02observationvaluestranslationsoriginalTextreferencevalue STRING, filelastModifiedTime STRING, problemSectioncodecode STRING, vitalSignsSectionorganizeridsroot STRING, problemSectionact_02observationproblemStatuscodecodeSystem STRING, vitalSignsSectionorganizerobservations_05codecodeSystem STRING, filegroup STRING, problemSectionact_01observationproblemStatusvaluescode STRING, problemSectionact_01observationvaluestranslationsdisplayName STRING, problemSectionact_02codecode STRING, idextension STRING, vitalSignsSectioncodecode STRING, problemSectionact_03observationproblemStatuscodecodeSystemName STRING, problemSectionact_01idroot STRING, vitalSignsSectiontitle STRING, problemSectionact_01observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesunit STRING, vitalSignsSectionorganizerobservations_01textreferencevalue STRING, effectiveTime STRING, vitalSignsSectionorganizerobservations_03codecodeSystemName STRING, problemSectionact_03observationstatusCodecode STRING, problemSectionact_02statusCodecode STRING, problemSectionact_02observationidextension STRING, problemSectionact_01idextension STRING, vitalSignsSectionorganizerstatusCodecode STRING, vitalSignsSectionorganizerobservations_05codedisplayName STRING, vitalSignsSectionorganizerobservations_04codecodeSystem STRING, vitalSignsSectionorganizerobservations_02codedisplayName STRING, problemSectionact_01observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_05codecode STRING, vitalSignsSectionorganizerobservations_04codecodeSystemName STRING, problemSectionact_02observationvaluestranslationsdisplayName STRING, idroot STRING, vitalSignsSectionorganizerobservations_02textreferencevalue STRING, problemSectionact_01observationidextension STRING, problemSectionact_01observationvaluestranslationscodeSystem STRING, problemSectionact_01codecode STRING, problemSectionact_02observationproblemStatusvaluesdisplayName STRING, problemSectionact_01codecodeSystem STRING, codecodeSystemName STRING, vitalSignsSectionorganizerobservations_01effectiveTimevalue STRING, vitalSignsSectionorganizercodedisplayName STRING, vitalSignsSectionorganizerobservations_02codecodeSystemName STRING, vitalSignsSectionorganizerobservations_03textreferencevalue STRING, vitalSignsSectionorganizerobservations_02valuesunit STRING, problemSectionact_03observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationproblemStatuscodecode STRING, vitalSignsSectionorganizerobservations_03statusCodecode STRING, problemSectionact_03observationproblemStatusvaluescode STRING, problemSectionact_02observationeffectiveTimelow STRING, problemSectionact_03observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03texttext_01value STRING, problemSectionact_01observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizercodecodeSystemName STRING, problemSectionact_03observationidextension STRING, vitalSignsSectionorganizerobservations_01codecode STRING, codecodeSystem STRING, problemSectionact_02effectiveTimelow STRING, problemSectioncodedisplayName STRING, problemSectionact_02observationproblemStatusvaluescode STRING, vitalSignsSectionorganizeridsextension STRING, problemSectionact_02observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_02codecode STRING, title STRING, problemSectionact_03idroot STRING, problemSectionidextension STRING, problemSectionact_03observationproblemStatusstatusCodecode STRING, problemSectionact_03effectiveTimelow STRING, problemSectionact_02observationproblemStatusvaluescodeSystemName STRING, fileowner STRING, vitalSignsSectionorganizerobservations_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05textreferencevalue STRING, filepermissions STRING, vitalSignsSectionorganizerobservations_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05valuesunit STRING, problemSectionact_01observationvaluestranslationscode STRING, problemSectionact_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05codecodeSystemName STRING, problemSectionact_03codecodeSystem STRING, vitalSignsSectioncodecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectioncodecodeSystemName STRING, problemSectionact_01observationproblemStatuscodecode STRING, problemSectionact_02observationidroot STRING, vitalSignsSectionorganizerobservations_01codecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_03effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_04codedisplayName STRING, problemSectionact_03observationeffectiveTimelow STRING) STORED AS ORC LOCATION '/ccda' AttributeCleaner Apache Avro names can't have spaces, dots, dashes or weird symbols. So we remove them. Dirty Name problem.section.act_01... Clean Safe Name problemSectionact_01observationvaluestranslationsoriginalTextreferencevalue As we need to remove those periods created when the processor flattens out the XML. Source Code for Custom Processor https://github.com/tspannhw/nifi-attributecleaner-processor References http://calcite.apache.org/docs/reference.html Apache NiFi Flow c-cda-ingest-with-custom-processor.xml
... View more
Labels:
03-26-2018
02:00 PM
Is Kerberized server supported by LivySessionController ?
... View more
11-17-2017
03:28 PM
4 Kudos
For this edge use case we are using NVidia's TensorRT as well as Apache MXNet. From TensorRT I am using imageNet for image recognition and detectNet for object localization.
For Apache MXNet, I am using their image classifier. So we have multiple deep learning frameworks run on the same capture from an attached USB webcam. For this example I am using a Logitech HD1080, while the Jetson TX1 supports 6+ concurrent high end cameras for those with high end use cases. They also have a more powerful Jetson TX2 for more intense use cases as it has more RAM and a better GPU.
Quick Hardware Breakdown
NVIDIA Maxwell™ GPU with 256 NVIDIA® CUDA® Cores 4 GB LPDDR4 Memory
Python Script # 2017 load pictures and analyze
# https://github.com/tspannhw/mxnet_rpi/blob/master/analyze.py
import time
import sys
import datetime
import subprocess
import sys
import urllib2
import os
import datetime
import traceback
import math
import random, string
import base64
import json
from time import gmtime, strftime
import mxnet as mx
import inception_predict
import numpy as np
import cv2
import math
import random, string
import time
from time import gmtime, strftime
start = time.time()
cap = cv2.VideoCapture(0)
packet_size=3000
def randomword(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
#while True:
# Create unique image name
uniqueid = 'mxnet_uuid_{0}_{1}'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))
ret, frame = cap.read()
imgdir = 'images/'
filename = 'tx1_image_{0}_{1}.jpg'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))
cv2.imwrite(imgdir + filename, frame)
# Run inception prediction on image
try:
topn = inception_predict.predict_from_local_file(imgdir + filename, N=5)
except:
errorcondition = "true"
# CPU Temp
f = open("/sys/devices/virtual/thermal/thermal_zone1/temp","r")
cputemp = str( f.readline() )
cputemp = cputemp.replace('\n','')
cputemp = cputemp.strip()
cputemp = str(round(float(cputemp)) / 1000)
cputempf = str(round(9.0/5.0 * float(cputemp) + 32))
f.close()
# GPU Temp
f = open("/sys/devices/virtual/thermal/thermal_zone2/temp","r")
gputemp = str( f.readline() )
gputemp = gputemp.replace('\n','')
gputemp = gputemp.strip()
gputemp = str(round(float(gputemp)) / 1000)
gputempf = str(round(9.0/5.0 * float(gputemp) + 32))
f.close()
# Face Detect
p = os.popen('/media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/jetson-inference-master/build/aarch64/bin/facedetect.sh ' + filename).read()
face = p.replace('\n','|')
face = face.strip()
# NVidia Image Net Classify
p2 = os.popen('/media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/jetson-inference-master/build/aarch64/bin/runclassify.sh ' + filename).read()
imagenet = p2.replace('\n','|')
imagenet = imagenet.strip()
# 5 MXNET Analysis
top1 = str(topn[0][1])
top1pct = str(round(topn[0][0],3) * 100)
top2 = str(topn[1][1])
top2pct = str(round(topn[1][0],3) * 100)
top3 = str(topn[2][1])
top3pct = str(round(topn[2][0],3) * 100)
top4 = str(topn[3][1])
top4pct = str(round(topn[3][0],3) * 100)
top5 = str(topn[4][1])
top5pct = str(round(topn[4][0],3) * 100)
end = time.time()
# face[-4096:]
row = { 'uuid': uniqueid, 'top1pct': top1pct, 'top1': top1, 'top2pct': top2pct, 'top2': top2,'top3pct': top3pct, 'top3': top3,'top4pct': top4pct,'top4': top4, 'top5pct': top5pct,'top5': top5, 'cputemp': cputemp, 'gputemp': gputemp, 'imagefilename': filename, 'gputempf': gputempf, 'cputempf': cputempf, 'runtime': str(round(end - start)), 'facedetect': face, 'imagenet': imagenet }
json_string = json.dumps(row)
print (json_string )
Setup Jetson TX1 for Deep Learning and Computer Vision
sudo apt-get update -y
sudo apt-get -y install git build-essential libatlas-base-dev libopencv-dev graphviz python-pip
sudo pip install pip --upgrade
sudo pip install setuptools numpy --upgrade
Apache Hive DDL
CREATE EXTERNAL TABLE IF NOT EXISTS jetsonscan
(top3pct STRING, uuid STRING, top1pct STRING, top5 STRING, top4 STRING, top3 STRING, top2 STRING, top1 STRING, top4pct STRING, facedetect STRING, gputempf STRING, gputemp STRING, top5pct STRING, top2pct STRING, cputemp STRING, imagenet STRING, runtime STRING, imagefilename STRING, cputempf STRING) STORED AS ORC
LOCATION '/jetsonscan'
Build Apache MiniFi Configuration
minifi-toolkit-0.2.0/bin/config.sh transform $1 config.yml
scp config.yml nvidia@192.168.1.190:/media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/minifi-0.2.0/conf/
Example Output JSON
{
"top3pct" : "6.1",
"uuid" : "mxnet_uuid_pgo_20171110193628",
"top1pct" : "8.3",
"top5" : "n03110669 cornet, horn, trumpet, trump",
"top4" : "n03481172 hammer",
"top3" : "n02787622 banjo",
"top2" : "n02791270 barbershop",
"top1" : "n04487394 trombone",
"top4pct" : "4.4",
"facedetect" : "networks/facenet-120/snapshot_iter_24000.caffemodel initialized.|[cuda] cudaAllocMapped 16 bytes, CPU 0x1013a0000 GPU 0x1013a0000|maximum bounding boxes: 3136|[cuda] cudaAllocMapped 50176 bytes, CPU 0x1012a6200 GPU 0x1012a6200|[cuda] cudaAllocMapped 12544 bytes, CPU 0x1011a1a00 GPU 0x1011a1a00|failed to load image /media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/images/tx1_image_xmv_20171110193629.jpg|failed to load image '/media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/images/tx1_image_xmv_20171110193629.jpg'|",
"gputempf" : "68.0",
"gputemp" : "20.0",
"top5pct" : "3.2",
"top2pct" : "6.4",
"cputemp" : "21.5",
"imagenet" : "imagenet-console| args (3): 0 [/media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/jetson-inference-master/build/aarch64/bin/imagenet-console] 1 [/media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/images/tx1_image_xmv_20171110193629.jpg] 2 [/media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/images/cfout-tx1_image_xmv_20171110193629.jpg] |||imageNet -- loading classification network model from:| -- prototxt networks/googlenet.prototxt| -- model networks/bvlc_googlenet.caffemodel| -- class_labels networks/ilsvrc12_synset_words.txt| -- input_blob 'data'| -- output_blob 'prob'| -- batch_size 2||[GIE] attempting to open cache file networks/bvlc_googlenet.caffemodel.2.tensorcache|[GIE] loading network profile from cache... networks/bvlc_googlenet.caffemodel.2.tensorcache|[GIE] platform has FP16 support.|[GIE] networks/bvlc_googlenet.caffemodel loaded|[GIE] CUDA engine context initialized with 2 bindings|[GIE] networks/bvlc_googlenet.caffemodel input binding index: 0|[GIE] networks/bvlc_googlenet.caffemodel input dims (b=2 c=3 h=224 w=224) size=1204224|[cuda] cudaAllocMapped 1204224 bytes, CPU 0x100ce0000 GPU 0x100ce0000|[GIE] networks/bvlc_googlenet.caffemodel output 0 prob binding index: 1|[GIE] networks/bvlc_googlenet.caffemodel output 0 prob dims (b=2 c=1000 h=1 w=1) size=8000|[cuda] cudaAllocMapped 8000 bytes, CPU 0x100e20000 GPU 0x100e20000|networks/bvlc_googlenet.caffemodel initialized.|[GIE] networks/bvlc_googlenet.caffemodel loaded|imageNet -- loaded 1000 class info entries|networks/bvlc_googlenet.caffemodel initialized.|failed to load image /media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/images/tx1_image_xmv_20171110193629.jpg|failed to load image '/media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/images/tx1_image_xmv_20171110193629.jpg'|",
"runtime" : "8.0",
"imagefilename" : "tx1_image_xmv_20171110193629.jpg",
"cputempf" : "71.0"
}
Schema (Put this in Hortonworks Schema Registry) - MXRECORD
{ "type" : "record", "name" : "MXRECORD", "fields" :
[ { "name" : "top3pct", "type" : "string", "doc" : "Type inferred from '\"5.0\"'" },
{ "name" : "uuid", "type" : "string", "doc" : "Type inferred from '\"mxnet_uuid_ltu_20171110193847\"'" },
{ "name" : "top1pct", "type" : "string", "doc" : "Type inferred from '\"5.4\"'" },
{ "name" : "top5", "type" : "string", "doc" : "Type inferred from '\"n03970156 plunger, plumber's helper\"'" },
{ "name" : "top4", "type" : "string", "doc" : "Type inferred from '\"n07615774 ice lolly, lolly, lollipop, popsicle\"'" },
{ "name" : "top3", "type" : "string", "doc" : "Type inferred from '\"n04270147 spatula\"'" },
{ "name" : "top2", "type" : "string", "doc" : "Type inferred from '\"n03110669 cornet, horn, trumpet, trump\"'" },
{ "name" : "top1", "type" : "string", "doc" : "Type inferred from '\"n04487394 trombone\"'" },
{ "name" : "top4pct", "type" : "string", "doc" : "Type inferred from '\"4.5\"'" },
{ "name" : "facedetect", "type" : "string" },
{ "name" : "gputempf", "type" : "string", "doc" : "Type inferred from '\"68.0\"'" },
{ "name" : "gputemp", "type" : "string", "doc" : "Type inferred from '\"20.0\"'" },
{ "name" : "top5pct", "type" : "string", "doc" : "Type inferred from '\"4.4\"'" },
{ "name" : "top2pct", "type" : "string", "doc" : "Type inferred from '\"5.3\"'" },
{ "name" : "cputemp", "type" : "string", "doc" : "Type inferred from '\"23.0\"'" },
{ "name" : "imagenet", "type" : "string" },
{ "name" : "runtime", "type" : "string", "doc" : "Type inferred from '\"8.0\"'" },
{ "name" : "imagefilename", "type" : "string", "doc" : "Type inferred from '\"tx1_image_okg_20171110193848.jpg\"'" },
{ "name" : "cputempf", "type" : "string", "doc" : "Type inferred from '\"73.0\"'" }
]
}
Example Apache MiniFi Logs
2017-11-10 15:13:53,061 INFO [Provenance Maintenance Thread-3] o.a.n.p.PersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 51004
2017-11-10 15:13:53,084 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.lucene.SimpleIndexManager Index Writer for provenance_repository/index-1503524885000 has been returned to Index Manager and is no longer in use. Closing Index Writer
2017-11-10 15:13:53,086 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully merged 16 journal files (6 records) into single Provenance Log File provenance_repository/50998.prov in 28 milliseconds
2017-11-10 15:13:53,087 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 70 records. In the past 5 minutes, 29 events have been written to the Provenance Repository, totaling 18.54 KB
2017-11-10 15:14:08,531 INFO [Http Site-to-Site PeerSelector] o.apache.nifi.remote.client.PeerSelector org.apache.nifi.remote.client.PeerSelector@60bcd09e Successfully refreshed Peer Status; remote instance consists of 1 peers
2017-11-10 15:14:38,658 WARN [ExecuteProcess c216f845-1839-3f3c-0000-000000000000 Task] o.a.n.processors.standard.ExecuteProcess ExecuteProcess[id=c216f845-1839-3f3c-0000-000000000000] [15:14:38] src/nnvm/legacy_json_util.cc:190: Loading symbol saved by previous version v0.8.0. Attempting to upgrade...
2017-11-10 15:14:38,665 WARN [ExecuteProcess c216f845-1839-3f3c-0000-000000000000 Task] o.a.n.processors.standard.ExecuteProcess ExecuteProcess[id=c216f845-1839-3f3c-0000-000000000000] [15:14:38] src/nnvm/legacy_json_util.cc:198: Symbol successfully upgraded!
2017-11-10 15:14:38,716 WARN [ExecuteProcess c216f845-1839-3f3c-0000-000000000000 Task] o.a.n.processors.standard.ExecuteProcess ExecuteProcess[id=c216f845-1839-3f3c-0000-000000000000] /media/nvidia/96ed93f9-7c40-4999-85ba-3eb24262d0a5/mxnet/python/mxnet/module/base_module.py:65: UserWarning: Data provided by label_shapes don't match names specified by label_names ([] vs. ['softmax_label'])
2017-11-10 15:14:38,717 WARN [ExecuteProcess c216f845-1839-3f3c-0000-000000000000 Task] o.a.n.processors.standard.ExecuteProcess ExecuteProcess[id=c216f845-1839-3f3c-0000-000000000000] warnings.warn(msg)
2017-11-10 15:14:38,965 WARN [ExecuteProcess c216f845-1839-3f3c-0000-000000000000 Task] o.a.n.processors.standard.ExecuteProcess ExecuteProcess[id=c216f845-1839-3f3c-0000-000000000000] HIGHGUI ERROR: V4L/V4L2: VIDIOC_S_CROP
Resources
https://github.com/tspannhw/nvidiajetsontx1-mxnet https://developer.nvidia.com/embedded/twodaystoademo https://github.com/dusty-nv/jetson-inference https://developer.nvidia.com/tensorrt https://developer.nvidia.com/embedded/buy/jetson-tx1-devkit https://github.com/tspannhw/nvidiajetsontx1-mxnet Flow Files storejetsontx1.xml jetsontx1mx-10nov2017.xml
... View more
Labels:
10-27-2017
06:15 PM
3 Kudos
If you have not attended a DataWorksSummit, I highly recommend it. It is an amazing event held at three locations a year and is a great community experience. The content is deep and highly technical and you will learn about the current state of the art and what is coming next. It's not just Big Data, but AI, Streaming, Microservices, Containers, Cloud and many other topics that startups and enterprises alike need to know. My topic was a simple talk on using Apache NiFi to ingest and transform various data types. There is a small group forming around my quickly released Inception V3 TensorFlow Apache NiFi Processor, I encourage you to try it and provide feedback, pull requests, bug reports, documentation, unit tests, examples and more. The Java API for TensorFlow is new so this is really basic. Thanks to @Simon Elliston Ball for a major cleanup on it. https://github.com/tspannhw/nifi-tensorflow-processor What do we want to do? MiniFi ingests camera images and sensor data Run TensorFlow Inception v3 to recognize
objects in image NiFi stores images, metadata and enriched data in Hadoop NiFi ingests social data and feeds NiFi analyzes sentiment of
textual data •TensorFlow (C++, Python, Java)
via ExecuteStreamCommand
•
•TensorFlow NiFi Java Custom Processor
•
•TensorFlow Running on Edge Nodes (MiniFi)
•
•
• •TensorFlow Mobile (iOS, Android, RPi)
•
•TensorFlow on Spark (Yahoo) via Livy, S2S,
Kafka
•
•TensorFlow Running in Containers in YARN 3.0
on Hadoop
• (NiFI 1.4) gRPC Call to TensorFlow Serving python classify_image.py
--image_file/dir/solarroofpanel.jpg<br>solar dish, solar collector, solar furnace (score
= 0.98316)<br>window screen
(score = 0.00196)<br>manhole cover
(score = 0.00070)<br>radiator (score
= 0.00041)<br>doormat,
welcome mat (score = 0.00041) Python Uses pip install -U textblob python -m textblob.download_corpora pip install -U spacy python -m spacy.en.download all
pip install -U nltk pip install -U numpy run.sh python sentiment.py "$@” sentiment.py
sentiment.pyfrom nltk.sentiment.vader
import SentimentIntensityAnalyzer
import sys
sid = SentimentIntensityAnalyzer()
ss = sid.polarity_scores(sys.argv[1])
print('Compound {0} Negative {1} Neutral {2} Positive {3} '.format( ss['compound'],ss['neg'],ss['neu'],ss['pos']))
These are some good Python libraries to be using. I recommend using Python 3.X unless you are stuck with 2.6/2.7. I have also created two processors for working with text/NLP, these are listed below for Apache OpenNLP and Stanford CoreNLP. Please comment in HCC (here), check out github and do pull requests (https://github.com/tspannhw) and come to a meetup (https://www.meetup.com/futureofdata-princeton/). References:
https://github.com/tspannhw/dws2017sydney https://dataworkssummit.com/sydney-2017/sessions/real-time-ingesting-and-transforming-sensor-data-and-social-data-with-nifi-and-tensorflow/ https://www.slideshare.net/Hadoop_Summit/realtime-ingesting-and-transforming-sensor-data-and-social-data-with-nifi-and-tensorflow https://hortonworks.com/blog/7-sessions-dataworks-summit-sydney-see/ https://community.hortonworks.com/articles/58265/analyzing-images-in-hdf-20-using-tensorflow.html https://community.hortonworks.com/articles/76935/using-sentiment-analysis-and-nlp-tools-with-hdp-25.html http://www.nltk.org/install.html https://github.com/tspannhw/nifi-nlp- processor https://community.hortonworks.com/articles/80418/open-nlp-example-apache-nifi-processor.html https://community.hortonworks.com/articles/81270/adding-stanford-corenlp-to-big-data-pipelines-apac-1.html
... View more
Labels:
12-20-2017
06:47 AM
was going through some ORC based processing from pdf. looks it works better if each page is split into a monochrome image. Examples online show Ghostscript as an option. I was able to leverage this processor to extract images and with a property change it to grayscale if needed. I could now send this to OCR processor for extraction. @Jeremy Dyer
... View more