1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1914 | 04-03-2024 06:39 AM | |
| 3011 | 01-12-2024 08:19 AM | |
| 1643 | 12-07-2023 01:49 PM | |
| 2420 | 08-02-2023 07:30 AM | |
| 3361 | 03-29-2023 01:22 PM |
12-06-2017
04:57 PM
2 Kudos
Some Apache NiFi examples that may be helpful https://community.hortonworks.com/articles/47854/accessing-facebook-page-data-from-apache-nifi.html https://community.hortonworks.com/articles/86570/hosting-and-ingesting-data-from-web-pages-desktop.html https://community.hortonworks.com/content/kbentry/65239/mp3-jukebox-with-nifi-1x.html https://community.hortonworks.com/articles/65154/nifi-1x-for-automatic-music-playing-pipelines.html https://community.hortonworks.com/articles/73355/adding-a-custom-processor-to-nifi-linkprocessor.html
... View more
12-06-2017
04:51 PM
Apache NiFi can read from sFTP and then use the PutHDFS to put that raw file in an HDFS directory. Two boxes, one line, no code. 5 minutes of work.
... View more
12-06-2017
04:07 PM
https://community.hortonworks.com/articles/149910/handling-hl7-records-part-1-hl7-ingest.html https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo.html https://community.hortonworks.com/articles/149982/hl7-ingest-part-4-streaming-analytics-manager-and.html https://community.hortonworks.com/articles/150026/hl7-processing-part-3-apache-zeppelin-sql-bi-and-a.html Attribute Name Cleaner (Needed for messy C-CDA and HL7 attribute names) https://github.com/tspannhw/nifi-attributecleaner-processor
... View more
12-06-2017
12:50 PM
1 Kudo
Did OpenSSL version change at server? https://superuser.com/questions/972996/create-a-sftp-connection-in-netbeans Did directory permissions change on FTP server? Try a different directory. Did NiFi user change? Increase timeout Definitely try command line sftp from a nifi node. Did any IPs change? https://access.redhat.com/solutions/2071143
... View more
12-05-2017
12:30 AM
1 Kudo
Overview We create Apache Hive tables for analytics automagically from DDL generated by Apache NiFi. Hortonworks SAM Run Time The results of this produces HBase and Druid analytics. SAM has produced two Druid Datasources that we can slice up and explore in superset. In Apache Zeppelin, we can run our Hive DDL (listed below) and run all of our queries including joining our two patient data tables. We can sort and create some basic graphs. All of this data is available to you in your favorite BI tools. Generated Apache Hive Tables from ORC Files (DDL) CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING,
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING)
STORED AS ORC LOCATION '/patientdata';
CREATE EXTERNAL TABLE IF NOT EXISTS ccda (problemSectionact_02observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_05idsroot STRING, problemSectionact_02observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_01texttext_01value STRING, vitalSignsSectionorganizerobservations_04texttext_01value STRING, vitalSignsSectionorganizercodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesvalue STRING, vitalSignsSectionorganizerobservations_04valuesvalue STRING, problemSectionact_03observationidroot STRING, problemSectionact_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05effectiveTimevalue STRING, RouteOnAttributeRoute STRING, vitalSignsSectionorganizerobservations_03codecode STRING, vitalSignsSectionorganizerobservations_04statusCodecode STRING, problemSectionidroot STRING, codecode STRING, problemSectionact_01effectiveTimelow STRING, problemSectioncodecodeSystemName STRING, codedisplayName STRING, problemSectionact_01observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_01idsroot STRING, vitalSignsSectionorganizerobservations_02idsroot STRING, vitalSignsSectionorganizerobservations_04idsroot STRING, vitalSignsSectionorganizerobservations_03idsroot STRING, problemSectionact_01observationeffectiveTimelow STRING, filecreationTime STRING, problemSectionact_01observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationvaluestranslationscode STRING, problemSectionact_03statusCodecode STRING, problemSectionact_03observationvaluestranslationscodeSystem STRING, problemSectionact_02idroot STRING, problemSectionact_03codecode STRING, problemSectionact_01observationidroot STRING, problemSectionact_02observationvaluestranslationscodeSystem STRING, problemSectionact_03observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectionorganizercodecode STRING, vitalSignsSectionorganizerobservations_02statusCodecode STRING, problemSectionact_03observationvaluestranslationsdisplayName STRING, vitalSignsSectionorganizerobservations_03codecodeSystem STRING, problemSectionact_03observationproblemStatuscodecode STRING, problemSectionact_01observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_04textreferencevalue STRING, filelastAccessTime STRING, vitalSignsSectionorganizerobservations_01codedisplayName STRING, filesize STRING, problemSectioncodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesunit STRING, vitalSignsSectionorganizerobservations_02effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_05idsextension STRING, vitalSignsSectionorganizerobservations_04codecode STRING, vitalSignsSectionorganizerobservations_05valuesvalue STRING, vitalSignsSectionorganizerobservations_04idsextension STRING, vitalSignsSectionorganizerobservations_02valuesvalue STRING, problemSectionact_02observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_02idsextension STRING, vitalSignsSectionorganizerobservations_03idsextension STRING, vitalSignsSectionorganizerobservations_01idsextension STRING, problemSectiontitle STRING, vitalSignsSectionorganizerobservations_01codecodeSystemName STRING, problemSectionact_03observationvaluestranslationsoriginalTextreferencevalue STRING, vitalSignsSectionorganizerobservations_04valuesunit STRING, problemSectionact_02idextension STRING, vitalSignsSectionorganizerobservations_05statusCodecode STRING, vitalSignsSectionorganizerobservations_04effectiveTimevalue STRING, problemSectionact_02observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesvalue STRING, vitalSignsSectionorganizereffectiveTimevalue STRING, problemSectionact_03observationvaluestranslationscode STRING, vitalSignsSectionorganizerobservations_03codedisplayName STRING, vitalSignsSectionorganizerobservations_02texttext_01value STRING, vitalSignsSectionorganizerobservations_05texttext_01value STRING, absolutepath STRING, vitalSignsSectioncodedisplayName STRING, problemSectionact_03idextension STRING, problemSectionact_01observationvaluestranslationsoriginalTextreferencevalue STRING, problemSectionact_02observationvaluestranslationsoriginalTextreferencevalue STRING, filelastModifiedTime STRING, problemSectioncodecode STRING, vitalSignsSectionorganizeridsroot STRING, problemSectionact_02observationproblemStatuscodecodeSystem STRING, vitalSignsSectionorganizerobservations_05codecodeSystem STRING, filegroup STRING, problemSectionact_01observationproblemStatusvaluescode STRING, problemSectionact_01observationvaluestranslationsdisplayName STRING, problemSectionact_02codecode STRING, idextension STRING, vitalSignsSectioncodecode STRING, problemSectionact_03observationproblemStatuscodecodeSystemName STRING, problemSectionact_01idroot STRING, vitalSignsSectiontitle STRING, problemSectionact_01observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesunit STRING, vitalSignsSectionorganizerobservations_01textreferencevalue STRING, effectiveTime STRING, vitalSignsSectionorganizerobservations_03codecodeSystemName STRING, problemSectionact_03observationstatusCodecode STRING, problemSectionact_02statusCodecode STRING, problemSectionact_02observationidextension STRING, problemSectionact_01idextension STRING, vitalSignsSectionorganizerstatusCodecode STRING, vitalSignsSectionorganizerobservations_05codedisplayName STRING, vitalSignsSectionorganizerobservations_04codecodeSystem STRING, vitalSignsSectionorganizerobservations_02codedisplayName STRING, problemSectionact_01observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_05codecode STRING, vitalSignsSectionorganizerobservations_04codecodeSystemName STRING, problemSectionact_02observationvaluestranslationsdisplayName STRING, idroot STRING, vitalSignsSectionorganizerobservations_02textreferencevalue STRING, problemSectionact_01observationidextension STRING, problemSectionact_01observationvaluestranslationscodeSystem STRING, problemSectionact_01codecode STRING, problemSectionact_02observationproblemStatusvaluesdisplayName STRING, problemSectionact_01codecodeSystem STRING, codecodeSystemName STRING, vitalSignsSectionorganizerobservations_01effectiveTimevalue STRING, vitalSignsSectionorganizercodedisplayName STRING, vitalSignsSectionorganizerobservations_02codecodeSystemName STRING, vitalSignsSectionorganizerobservations_03textreferencevalue STRING, vitalSignsSectionorganizerobservations_02valuesunit STRING, problemSectionact_03observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationproblemStatuscodecode STRING, vitalSignsSectionorganizerobservations_03statusCodecode STRING, problemSectionact_03observationproblemStatusvaluescode STRING, problemSectionact_02observationeffectiveTimelow STRING, problemSectionact_03observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03texttext_01value STRING, problemSectionact_01observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizercodecodeSystemName STRING, problemSectionact_03observationidextension STRING, vitalSignsSectionorganizerobservations_01codecode STRING, codecodeSystem STRING, problemSectionact_02effectiveTimelow STRING, problemSectioncodedisplayName STRING, problemSectionact_02observationproblemStatusvaluescode STRING, vitalSignsSectionorganizeridsextension STRING, problemSectionact_02observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_02codecode STRING, title STRING, problemSectionact_03idroot STRING, problemSectionidextension STRING, problemSectionact_03observationproblemStatusstatusCodecode STRING, problemSectionact_03effectiveTimelow STRING, problemSectionact_02observationproblemStatusvaluescodeSystemName STRING, fileowner STRING, vitalSignsSectionorganizerobservations_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05textreferencevalue STRING, filepermissions STRING, vitalSignsSectionorganizerobservations_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05valuesunit STRING, problemSectionact_01observationvaluestranslationscode STRING, problemSectionact_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05codecodeSystemName STRING, problemSectionact_03codecodeSystem STRING, vitalSignsSectioncodecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectioncodecodeSystemName STRING, problemSectionact_01observationproblemStatuscodecode STRING, problemSectionact_02observationidroot STRING, vitalSignsSectionorganizerobservations_01codecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_03effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_04codedisplayName STRING, problemSectionact_03observationeffectiveTimelow STRING) STORED AS ORC LOCATION '/ccda';
CREATE EXTERNAL TABLE IF NOT EXISTS hl7_oru_flat (OBX_1_UserDefinedAccessChecks STRING, OBR_1_OrderingProvider_FamilyName STRING, MSH_MessageControlID STRING, OBX_1_ObservationIdentifier_Text STRING, MSH_SendingApplication_NamespaceID STRING, OBR_1_UniversalServiceIdentifier_Text STRING, MSH_ReceivingApplication_NamespaceID STRING, MSH_ProcessingID_ProcessingID STRING, PID_SSNNumberPatient STRING, OBR_1_FillerOrderNumber_EntityIdentifier STRING, PID_PatientAccountNumber_ID STRING, PID_DateOfBirth STRING, PD1_PatientPrimaryCareProviderNameIDNo_IDNumber STRING, PID_Sex STRING, MSH_MessageType_MessageType STRING, OBX_1_ReferencesRange STRING, OBR_1_OrderingProvider_IDNumber STRING, PD1_PatientPrimaryCareProviderNameIDNo_FamilyName STRING, OBX_1_Units_NameOfCodingSystem STRING, OBX_1_Units_Identifier STRING, PID_PatientName_GivenName STRING, OBX_1_ObservationSubID STRING, PD1_PatientPrimaryCareProviderNameIDNo_GivenName STRING, OBR_1_PlacerOrderNumber_NamespaceID STRING, MSH_MessageType_TriggerEvent STRING, PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority STRING, OBR_1_ResultStatus STRING, PID_PatientName_FamilyName STRING, MSH_EncodingCharacters STRING, MSH_VersionID STRING, OBR_1_UniversalServiceIdentifier_Identifier STRING, OBR_1_ObservationDateTime STRING, OBR_1_ScheduledDateTime STRING, OBX_1_ObservationIdentifier_Identifier STRING, OBR_1_OrderingProvider_GivenName STRING, OBR_1_SetIDObservationRequest STRING, OBR_1_ResultsRptStatusChngDateTime STRING, OBR_1_PlacerOrderNumber_EntityIdentifier STRING, OBX_1_NatureOfAbnormalTest STRING, OBX_1_SetIDOBX STRING, MSH_FieldSeparator STRING, PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName STRING, OBX_1_Units_Text STRING, OBX_1_ValueType STRING, PID_PatientIDInternalID_ID STRING, OBX_1_ObservationValue STRING, OBR_1_OrderingProvider_MiddleInitialOrName STRING) STORED AS ORC LOCATION '/hl7/flat/oru';
Apache Hive Queries to Run in Apache Zeppelin or Anywhere via JDBC/ODBC select * from patientdata;
select * from ccda;
select vitalsignssectionorganizerobservations_01valuesvalue, vitalsignssectionorganizerobservations_04valuesvalue,vitalsignssectionorganizerobservations_01texttext_01value
from ccda;
select * from hl7_oru_flat;
select * from hl7_oru_flat h join patientdata p on h.PID_SSNNumberPatient = p.PID_SSNNumberPatient
Zeppelin Notebook medicalzeppelinjs.txt
... View more
Labels:
12-04-2017
04:12 PM
2 Kudos
Overview In this section of HL7 processing, we look at what to do next with HL7 data now that it has been converted into Big Data friendly AVRO with a schema attached. We can easily combine that data with our other patientdata that came from MQTT. We do a simple JOIN on SSN and send this combined data to Kafka with a new schema and to Druid for analytics. We also Route some of the data to HDFS and HBase for storage and another feed to Druid for analytics. S.A.M. Flow Overview I have a Simple Hortonworks Streaming Analytics Manager application that ingests two Kafka feeds (one for PatientData and one for HL7 Converted to AVRO by Apache NiFi). I join those two records together on a key and push them to Kafka and to Druid. Our Environment For Running SAM, Just Click what you need I added Druid, Apache HBase, HDFS, Apache Hive, Apache Kafka, Apache Storm and Apache Zookeeper. I am using all except Apache Hive. We can merge together the Kafka feeds sent from Apache NiFi. One feed is of patient data that started as JSON data sent via MQTT from a remote node. The other feed is from another system that sent it via Kafka to Apache NiFi to convert from HL7 to AVRO. Above in Hortonworks Schema Registry we can see our schemas beautifully displayed with version information. To setup a Kafka source, you just follow the dropdowns and name it. No manual effort. This is the other feed. As you can see it grabs the schema associated with the Kafka topic and displays all the fields and types. Here is the join, we pick the field to join, what type of join LEFT, a type of Windowing and pick the fields we want. Again no manual effort or typing. Druid is as easy to pick up, just put in the name for your new data source. SAM will create it. Here is how we do an aggregate on one of the numeric values. We will send the SSN Number and Observation Value Max to the existing HBase patient observations table with the column family obs. HBase DDL: create 'patient_observations', 'obs' The results of this flow are we have data added to HBase and to HDFS. We also have 2 Druid data sources populated for Superset analysis. hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.2.2.6.2.0-205, r5210d2ed88d7e241646beab51e9ac147a973bdcc, Sat Aug 26 09:33:50 UTC 2017
hbase(main):001:0> list
TABLE
ATLAS_ENTITY_AUDIT_EVENTS
atlas_titan
patient_observations
3 row(s) in 0.3220 seconds
=> ["ATLAS_ENTITY_AUDIT_EVENTS", "atlas_titan", "patient_observations"]
hbase(main):002:0> scan 'patient_observations'
ROW COLUMN+CELL
2ba61288-3c45-4eb4-a0fa-e92e44ef210f column=obs:OBX_1_ObservationValue_MAX, timestamp=1512503796419, value=mg/dL
2ba61288-3c45-4eb4-a0fa-e92e44ef210f column=obs:PID_SSNNumberPatient, timestamp=1512503796419, value=123456789
1 row(s) in 0.2110 seconds
hbase(main):003:0>
hdfs dfs -cat /hl7sam/7-HDFS-8-999-1512490296249.txt
XXXXXX,000000000000000000,999999999999,ORU,M,99,LAST,JOHN,65-99,R,000000001,NM,65,|,1,341856649,1,John,20150101000100,mg/dL,F,20150101000100,20150101000100,2.3,^~\&,M,NPI,R01,HNAM_ORDERID,FIRST,1620,M,SMITH,159,1234567890,19700101,P,HealthOrg01,Basic Metabolic Panel,Glucose Lvl,GLU,H,123456789,Q1111111111111111111,20150101000100,648088,Johnson
See: https://community.hortonworks.com/articles/149910/handling-hl7-records-part-1-hl7-ingest.html SAM Flow: sam-hl7json.txt References: http://www.openhealthtools.org/ https://community.hortonworks.com/articles/122077/ingesting-csv-data-and-pushing-it-as-avro-to-kafka.html
... View more
Labels:
12-02-2017
08:07 PM
2 Kudos
Step 1: Collect HL7 Health Records
Python to Send JSON Data to MQTT (Data Generated by https://www.mockaroo.com/)
import paho.mqtt.client as mqtt
import json
# MQTT
client = mqtt.Client()
client.connect("localhost", 14162, 60)
row = [{"PID_SSNNumberPatient":823456789,"email":"ahospital0@census.gov","gender":"Male","ip_address":"138.135.180.206","drug_provider":"OrchidPharma Inc","icd9":"94140","icd9_description":"Deep necrosis of underlying tissues [deep third degree] without mention of loss of a body part, face and head, unspecified site","icd9P_proc":"7942","icd9_proc_description":"Closed reduction of separated epiphysis, radius and ulna","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_8) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.801.0 Safari/535.1","drug_used":"Naratriptan"}]
json_string = json.dumps(row)
client.publish("patientdata",payload=json_string,qos=1,retain=False)
client.disconnect()
Step 2: Get to Apache NiFi via FTP, sFTP, File, Apache Kafka, MQTT, REST API, TCP/IP or hundreds more options.
Schemas
{
"type": "record",
"name": "hl7oru",
"fields": [
{
"name": "OBX_1_UserDefinedAccessChecks",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_OrderingProvider_FamilyName",
"type": "string",
"doc": "Type inferred from '\"Johnson\"'"
},
{
"name": "MSH_MessageControlID",
"type": "string",
"doc": "Type inferred from '\"Q1111111111111111111\"'"
},
{
"name": "OBX_1_ObservationIdentifier_Text",
"type": "string",
"doc": "Type inferred from '\"Glucose Lvl\"'"
},
{
"name": "MSH_SendingApplication_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"XXXXXX\"'"
},
{
"name": "OBR_1_UniversalServiceIdentifier_Text",
"type": "string",
"doc": "Type inferred from '\"Basic Metabolic Panel\"'"
},
{
"name": "MSH_ReceivingApplication_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"HealthOrg01\"'"
},
{
"name": "MSH_ProcessingID_ProcessingID",
"type": "string",
"doc": "Type inferred from '\"P\"'"
},
{
"name": "PID_SSNNumberPatient",
"type": "string",
"doc": "Type inferred from '\"123456789\"'"
},
{
"name": "OBR_1_FillerOrderNumber_EntityIdentifier",
"type": "string",
"doc": "Type inferred from '\"000000000000000000\"'"
},
{
"name": "PID_PatientAccountNumber_ID",
"type": "string",
"doc": "Type inferred from '\"999999999999\"'"
},
{
"name": "PID_DateOfBirth",
"type": "string",
"doc": "Type inferred from '\"19700101\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
"type": "string",
"doc": "Type inferred from '\"1234567890\"'"
},
{
"name": "PID_Sex",
"type": "string",
"doc": "Type inferred from '\"M\"'"
},
{
"name": "MSH_MessageType_MessageType",
"type": "string",
"doc": "Type inferred from '\"ORU\"'"
},
{
"name": "OBX_1_ReferencesRange",
"type": "string",
"doc": "Type inferred from '\"H\"'"
},
{
"name": "OBR_1_OrderingProvider_IDNumber",
"type": "string",
"doc": "Type inferred from '\"1620\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
"type": "string",
"doc": "Type inferred from '\"LAST\"'"
},
{
"name": "OBX_1_Units_NameOfCodingSystem",
"type": "string",
"doc": "Type inferred from '\"99\"'"
},
{
"name": "OBX_1_Units_Identifier",
"type": "string",
"doc": "Type inferred from '\"65-99\"'"
},
{
"name": "PID_PatientName_GivenName",
"type": "string",
"doc": "Type inferred from '\"JOHN\"'"
},
{
"name": "OBX_1_ObservationSubID",
"type": "string",
"doc": "Type inferred from '\"159\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
"type": "string",
"doc": "Type inferred from '\"FIRST\"'"
},
{
"name": "OBR_1_PlacerOrderNumber_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"HNAM_ORDERID\"'"
},
{
"name": "MSH_MessageType_TriggerEvent",
"type": "string",
"doc": "Type inferred from '\"R01\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
"type": "string",
"doc": "Type inferred from '\"NPI\"'"
},
{
"name": "OBR_1_ResultStatus",
"type": "string",
"doc": "Type inferred from '\"M\"'"
},
{
"name": "PID_PatientName_FamilyName",
"type": "string",
"doc": "Type inferred from '\"SMITH\"'"
},
{
"name": "MSH_EncodingCharacters",
"type": "string",
"doc": "Type inferred from '\"^~\\&\"'"
},
{
"name": "MSH_VersionID",
"type": "string",
"doc": "Type inferred from '\"2.3\"'"
},
{
"name": "OBR_1_UniversalServiceIdentifier_Identifier",
"type": "string",
"doc": "Type inferred from '\"648088\"'"
},
{
"name": "OBR_1_ObservationDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_ScheduledDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBX_1_ObservationIdentifier_Identifier",
"type": "string",
"doc": "Type inferred from '\"GLU\"'"
},
{
"name": "OBR_1_OrderingProvider_GivenName",
"type": "string",
"doc": "Type inferred from '\"John\"'"
},
{
"name": "OBR_1_SetIDObservationRequest",
"type": "string",
"doc": "Type inferred from '\"1\"'"
},
{
"name": "OBR_1_ResultsRptStatusChngDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_PlacerOrderNumber_EntityIdentifier",
"type": "string",
"doc": "Type inferred from '\"341856649\"'"
},
{
"name": "OBX_1_NatureOfAbnormalTest",
"type": "string",
"doc": "Type inferred from '\"F\"'"
},
{
"name": "OBX_1_SetIDOBX",
"type": "string",
"doc": "Type inferred from '\"1\"'"
},
{
"name": "MSH_FieldSeparator",
"type": "string",
"doc": "Type inferred from '\"|\"'"
},
{
"name": "PD1",
"type": {
"type": "record",
"name": "PD1",
"fields": [
{
"name": "PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
"type": "string",
"doc": "Type inferred from '\"M\"'"
}
]
},
"doc": "Type inferred from '{\"PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName\":\"M\"}'"
},
{
"name": "OBX_1_Units_Text",
"type": "string",
"doc": "Type inferred from '\"65\"'"
},
{
"name": "OBX_1_ValueType",
"type": "string",
"doc": "Type inferred from '\"NM\"'"
},
{
"name": "PID_PatientIDInternalID_ID",
"type": "string",
"doc": "Type inferred from '\"000000001\"'"
},
{
"name": "OBX_1_ObservationValue",
"type": "string",
"doc": "Type inferred from '\"mg/dL\"'"
},
{
"name": "OBR_1_OrderingProvider_MiddleInitialOrName",
"type": "string",
"doc": "Type inferred from '\"R\"'"
}
]
}
patientbothjson.txt
patientdatajs.txt
simple.txt
Jolt Scripts
{
"OBX_1.UserDefinedAccessChecks": "OBX_1_UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1_OrderingProvider_FamilyName",
"MSH.MessageControlID": "MSH_MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1_ObservationIdentifier_Text",
"MSH.SendingApplication.NamespaceID": "MSH_SendingApplication_NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1_UniversalServiceIdentifier_Text",
"MSH.ReceivingApplication.NamespaceID": "MSH_ReceivingApplication_NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH_ProcessingID_ProcessingID",
"PID.SSNNumberPatient": "PID_SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1_FillerOrderNumber_EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID_PatientAccountNumber_ID",
"PID.DateOfBirth": "PID_DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
"PID.Sex": "PID_Sex",
"MSH.MessageType.MessageType": "MSH_MessageType_MessageType",
"OBX_1.ReferencesRange": "OBX_1_ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1_OrderingProvider_IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1_Units_NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1_Units_Identifier",
"PID.PatientName.GivenName": "PID_PatientName_GivenName",
"OBX_1.ObservationSubID": "OBX_1_ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1_PlacerOrderNumber_NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH_MessageType_TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1_ResultStatus",
"PID.PatientName.FamilyName": "PID_PatientName_FamilyName",
"MSH.EncodingCharacters": "MSH_EncodingCharacters",
"MSH.VersionID": "MSH_VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1_UniversalServiceIdentifier_Identifier",
"OBR_1.ObservationDateTime": "OBR_1_ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1_ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1_ObservationIdentifier_Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1_OrderingProvider_GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1_SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1_ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1_PlacerOrderNumber_EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1_NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1_SetIDOBX",
"MSH.FieldSeparator": "MSH_FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1_Units_Text",
"OBX_1.ValueType": "OBX_1_ValueType",
"PID.PatientIDInternalID.ID": "PID_PatientIDInternalID_ID",
"OBX_1.ObservationValue": "OBX_1_ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1_OrderingProvider_MiddleInitialOrName"
}
{
"OBX_1.UserDefinedAccessChecks": "OBX_1.UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1.OrderingProvider.FamilyName",
"MSH.MessageControlID": "MSH.MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1.ObservationIdentifier.Text",
"MSH.SendingApplication.NamespaceID": "MSH.SendingApplication.NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1.UniversalServiceIdentifier.Text",
"MSH.ReceivingApplication.NamespaceID": "MSH.ReceivingApplication.NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH.ProcessingID.ProcessingID",
"PID.SSNNumberPatient": "PID.SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1.FillerOrderNumber.EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID.PatientAccountNumber.ID",
"PID.DateOfBirth": "PID.DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber",
"PID.Sex": "PID.Sex",
"MSH.MessageType.MessageType": "MSH.MessageType.MessageType",
"OBX_1.ReferencesRange": "OBX_1.ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1.OrderingProvider.IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1.Units.NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1.Units.Identifier",
"PID.PatientName.GivenName": "PID.PatientName.GivenName",
"OBX_1.ObservationSubID": "OBX_1.ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1.PatientPrimaryCareProviderNameIDNo.GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1.PlacerOrderNumber.NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH.MessageType.TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1.ResultStatus",
"PID.PatientName.FamilyName": "PID.PatientName.FamilyName",
"MSH.EncodingCharacters": "MSH.EncodingCharacters",
"MSH.VersionID": "MSH.VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1.UniversalServiceIdentifier.Identifier",
"OBR_1.ObservationDateTime": "OBR_1.ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1.ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1.ObservationIdentifier.Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1.OrderingProvider.GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1.SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1.ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1.PlacerOrderNumber.EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1.NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1.SetIDOBX",
"MSH.FieldSeparator": "MSH.FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1.Units.Text",
"OBX_1.ValueType": "OBX_1.ValueType",
"PID.PatientIDInternalID.ID": "PID.PatientIDInternalID.ID",
"OBX_1.ObservationValue": "OBX_1.ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1.OrderingProvider.MiddleInitialOrName"
}
Step 3: Profit
Build a Big Data Environment For Our Data
(Build HDFS Directories)
su hdfs
hdfs dfs -mkdir -p /hl7/hl7-mdm
hdfs dfs -mkdir -p /hl7/hl7-adt
hdfs dfs -mkdir -p /hl7/hl7-orm
hdfs dfs -mkdir -p /hl7/hl7-oru
hdfs dfs -mkdir -p /hl7/json/hl7-mdm
hdfs dfs -mkdir -p /hl7/json/hl7-adt
hdfs dfs -mkdir -p /hl7/json/hl7-orm
hdfs dfs -mkdir -p /hl7/json/hl7-oru
hdfs dfs -mkdir -p /hl7/flat/oru
hdfs dfs -mkdir -p /patientdata
hdfs dfs -chmod -R 777 /hl7
hdfs dfs -chmod -R 777 /patientdata
hdfs dfs -ls -R /hl7
hdfs dfs -ls -R /patientdata
(Build Hive DDL)
CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING,
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING)
STORED AS ORC LOCATION '/patientdata'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7oru
(OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING,
Identifier:STRING>,
ReferencesRange:STRING,
Units:STRUCT<NameOfCodingSystem:STRING,
Identifier:STRING,
Text:STRING>,
ObservationSubID:STRING,
NatureOfAbnormalTest:STRING,
SetIDOBX:STRING,
ValueType:STRING,
ObservationValue:STRING>,
OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING,
IDNumber:STRING,
GivenName:STRING,
MiddleInitialOrName:STRING>,
UniversalServiceIdentifier:STRUCT<Text:STRING,
Identifier:STRING>,
FillerOrderNumber:STRUCT<EntityIdentifier:STRING>,
PlacerOrderNumber:STRUCT<NamespaceID:STRING,
EntityIdentifier:STRING>,
ResultStatus:STRING,
ObservationDateTime:STRING,
ScheduledDateTime:STRING,
SetIDObservationRequest:STRING,
ResultsRptStatusChngDateTime:STRING>,
MSH STRUCT<MessageControlID:STRING,
SendingApplication:STRUCT<NamespaceID:STRING>,
ReceivingApplication:STRUCT<NamespaceID:STRING>,
ProcessingID:STRUCT<ProcessingID:STRING>,
MessageType:STRUCT<MessageType:STRING,
TriggerEvent:STRING>,
EncodingCharacters:STRING,
VersionID:STRING,
FieldSeparator:STRING>,
PID STRUCT<SSNNumberPatient:STRING,
PatientAccountNumber:STRUCT<ID:STRING>,
DateOfBirth:STRING,
Sex:STRING,
PatientName:STRUCT<GivenName:STRING,
FamilyName:STRING>,
PatientIDInternalID:STRUCT<ID:STRING>>,
PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING,
FamilyName:STRING,
GivenName:STRING,
AssigningAuthority:STRING,
MiddleInitialOrName:STRING>>)
STORED AS ORC
LOCATION '/hl7/hl7-oru'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7_oru_flat
(OBX_1_UserDefinedAccessChecks STRING,
OBR_1_OrderingProvider_FamilyName STRING,
MSH_MessageControlID STRING,
OBX_1_ObservationIdentifier_Text STRING,
MSH_SendingApplication_NamespaceID STRING,
OBR_1_UniversalServiceIdentifier_Text STRING,
MSH_ReceivingApplication_NamespaceID STRING,
MSH_ProcessingID_ProcessingID STRING,
PID_SSNNumberPatient STRING,
OBR_1_FillerOrderNumber_EntityIdentifier STRING,
PID_PatientAccountNumber_ID STRING,
PID_DateOfBirth STRING,
PD1_PatientPrimaryCareProviderNameIDNo_IDNumber STRING,
PID_Sex STRING,
MSH_MessageType_MessageType STRING,
OBX_1_ReferencesRange STRING,
OBR_1_OrderingProvider_IDNumber STRING,
PD1_PatientPrimaryCareProviderNameIDNo_FamilyName STRING,
OBX_1_Units_NameOfCodingSystem STRING,
OBX_1_Units_Identifier STRING,
PID_PatientName_GivenName STRING,
OBX_1_ObservationSubID STRING,
PD1_PatientPrimaryCareProviderNameIDNo_GivenName STRING,
OBR_1_PlacerOrderNumber_NamespaceID STRING,
MSH_MessageType_TriggerEvent STRING,
PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority STRING,
OBR_1_ResultStatus STRING,
PID_PatientName_FamilyName STRING,
MSH_EncodingCharacters STRING,
MSH_VersionID STRING,
OBR_1_UniversalServiceIdentifier_Identifier STRING,
OBR_1_ObservationDateTime STRING,
OBR_1_ScheduledDateTime STRING,
OBX_1_ObservationIdentifier_Identifier STRING,
OBR_1_OrderingProvider_GivenName STRING,
OBR_1_SetIDObservationRequest STRING,
OBR_1_ResultsRptStatusChngDateTime STRING,
OBR_1_PlacerOrderNumber_EntityIdentifier STRING,
OBX_1_NatureOfAbnormalTest STRING,
OBX_1_SetIDOBX STRING,
MSH_FieldSeparator STRING,
PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName STRING,
OBX_1_Units_Text STRING,
OBX_1_ValueType STRING,
PID_PatientIDInternalID_ID STRING,
OBX_1_ObservationValue STRING,
OBR_1_OrderingProvider_MiddleInitialOrName STRING)
STORED AS ORC
LOCATION '/hl7/flat/oru'
CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING,
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING)
STORED AS ORC LOCATION '/patientdata'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7-oru (OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING, Identifier:STRING>,
ReferencesRange:STRING, Units:STRUCT<NameOfCodingSystem:STRING, Identifier:STRING, Text:STRING>,
ObservationSubID:STRING, NatureOfAbnormalTest:STRING, SetIDOBX:STRING, ValueType:STRING, ObservationValue:STRING>,
OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING, IDNumber:STRING, GivenName:STRING, MiddleInitialOrName:STRING>,
UniversalServiceIdentifier:STRUCT<Text:STRING, Identifier:STRING>, FillerOrderNumber:STRUCT<EntityIdentifier:STRING>,
PlacerOrderNumber:STRUCT<NamespaceID:STRING, EntityIdentifier:STRING>, ResultStatus:STRING, ObservationDateTime:STRING,
ScheduledDateTime:STRING, SetIDObservationRequest:STRING, ResultsRptStatusChngDateTime:STRING>, MSH STRUCT<MessageControlID:STRING,
SendingApplication:STRUCT<NamespaceID:STRING>, ReceivingApplication:STRUCT<NamespaceID:STRING>, ProcessingID:STRUCT<ProcessingID:STRING>,
MessageType:STRUCT<MessageType:STRING, TriggerEvent:STRING>, EncodingCharacters:STRING, VersionID:STRING, FieldSeparator:STRING>,
PID STRUCT<SSNNumberPatient:STRING, PatientAccountNumber:STRUCT<ID:STRING>, DateOfBirth:STRING, Sex:STRING, PatientName:STRUCT<GivenName:STRING,
FamilyName:STRING>, PatientIDInternalID:STRUCT<ID:STRING>>, PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING, FamilyName:STRING,
GivenName:STRING, AssigningAuthority:STRING, MiddleInitialOrName:STRING>>) STORED AS ORC
LOCATION '/hl7/hl7oru'
(Build Kafka Topics)
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientboth
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic simple
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientdata
Script to send a File to Kafka
sendmessage.sh /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic hl7-oru < hl7sampledata.txt
HBase DDL
hbase shell
create 'patient_observations', 'obs'
list
Running a Mosquitto MQTT Broker (OSX)
/usr/local/Cellar/mosquitto/1.4.14_2/sbin/mosquitto --daemon --verbose --port 14162
Removing Unneeded HDFS Files
hdfs dfs -rm -r -f -skipTrash $1
Example Data from Internet
MSH|^~\&|XXXXXX||HealthOrg01||||ORU^R01|Q1111111111111111111|P|2.3|<cr>PID|||000000001||SMITH^JOHN||19700101|M||||||||||999999999999|123456789|<cr>PD1||||1234567890^LAST^FIRST^M^^^^^NPI|<cr>OBR|1|341856649^HNAM_ORDERID|000000000000000000|648088^Basic Metabolic Panel|||20150101000100|||||||||1620^Johnson^John^R||||||20150101000100|||M|||||||||||20150101000100|<cr>OBX|1|NM|GLU^Glucose Lvl|159|mg/dL|65-99^65^99|H|||F|||20150101000100|
See: Handling HL7 Records and Storing in Apache Hive for SQL Queries Part 2: - C-CDA Data and Custom Avro Processor
Code Section
Map<String, String> attributes = flowFile.getAttributes();
Map<String, String> attributesClean = new HashMap<>();
String tempKey = "";
for (Map.Entry<String, String> entry : attributes.entrySet()){
tempKey = entry.getKey().replaceFirst("[^A-Za-z]", "");<br> tempKey = tempKey.replaceAll("[^A-Za-z0-9_]", "");<br> attributesClean.put(tempKey, entry.getValue());<br> session.removeAttribute(flowFile, entry.getKey());<br>}<br>session.putAllAttributes(flowFile, attributesClean);
Apache NiFi FlowFile
hl7new.xml
Next Sections: https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo.html https://community.hortonworks.com/content/kbentry/150026/hl7-processing-part-3-apache-zeppelin-sql-bi-and-a.html https://community.hortonworks.com/articles/149982/hl7-ingest-part-4-streaming-analytics-manager-and.html Important Starter Articles https://community.hortonworks.com/articles/138249/nifi-in-healthcare-ingesting-hl7-data-in-nifi.html https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi.html https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo.html
References:
https://hortonworks.com/tutorial/real-time-event-processing-in-nifi-sam-schema-registry-and-superset/ https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-hl7-nar/1.4.0/org.apache.nifi.processors.hl7.RouteHL7/index.html https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi.html https://github.com/cqframework/clinical_quality_language https://github.com/apache/nifi/blob/master/nifi-commons/nifi-hl7-query-language/src/test/java/org/apache/nifi/hl7/query/TestHL7Query.java https://stackoverflow.com/questions/43251660/apache-nifi-routehl7-issue/43263583 https://gist.github.com/alopresto/9b46011185efd0380e6c5da0a3412e8f
... View more
Labels:
12-01-2017
09:33 PM
2 Kudos
We need to ingest C-CDA files, which often have weird names in them. To Compound the naming issues, the Extract C-CDA processor flattens XML to have periods which are not allowed in Apache Avro names. Apache NiFi DataFlow
Ingest C-CDA XML Files Extract C-CDA Attributes Route on Attribute AttributeCleaner - my new custom one AttributeToJSON SetSchema QueryRecord ConvertAvroToORC PutHDFS AttributeCleanerProcessor is my new processor to rename all the attributes. This is a very simple version. These are the converted file names. Above is the JSON fields with values. Route when code field is not null. If code is active then convert to Apache ORC for Hive. SQL Table CREATE EXTERNAL TABLE IF NOT EXISTS ccda (problemSectionact_02observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_05idsroot STRING, problemSectionact_02observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_01texttext_01value STRING, vitalSignsSectionorganizerobservations_04texttext_01value STRING, vitalSignsSectionorganizercodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesvalue STRING, vitalSignsSectionorganizerobservations_04valuesvalue STRING, problemSectionact_03observationidroot STRING, problemSectionact_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05effectiveTimevalue STRING, RouteOnAttributeRoute STRING, vitalSignsSectionorganizerobservations_03codecode STRING, vitalSignsSectionorganizerobservations_04statusCodecode STRING, problemSectionidroot STRING, codecode STRING, problemSectionact_01effectiveTimelow STRING, problemSectioncodecodeSystemName STRING, codedisplayName STRING, problemSectionact_01observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_01idsroot STRING, vitalSignsSectionorganizerobservations_02idsroot STRING, vitalSignsSectionorganizerobservations_04idsroot STRING, vitalSignsSectionorganizerobservations_03idsroot STRING, problemSectionact_01observationeffectiveTimelow STRING, filecreationTime STRING, problemSectionact_01observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationvaluestranslationscode STRING, problemSectionact_03statusCodecode STRING, problemSectionact_03observationvaluestranslationscodeSystem STRING, problemSectionact_02idroot STRING, problemSectionact_03codecode STRING, problemSectionact_01observationidroot STRING, problemSectionact_02observationvaluestranslationscodeSystem STRING, problemSectionact_03observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectionorganizercodecode STRING, vitalSignsSectionorganizerobservations_02statusCodecode STRING, problemSectionact_03observationvaluestranslationsdisplayName STRING, vitalSignsSectionorganizerobservations_03codecodeSystem STRING, problemSectionact_03observationproblemStatuscodecode STRING, problemSectionact_01observationproblemStatusstatusCodecode STRING, vitalSignsSectionorganizerobservations_04textreferencevalue STRING, filelastAccessTime STRING, vitalSignsSectionorganizerobservations_01codedisplayName STRING, filesize STRING, problemSectioncodecodeSystem STRING, vitalSignsSectionorganizerobservations_01valuesunit STRING, vitalSignsSectionorganizerobservations_02effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_05idsextension STRING, vitalSignsSectionorganizerobservations_04codecode STRING, vitalSignsSectionorganizerobservations_05valuesvalue STRING, vitalSignsSectionorganizerobservations_04idsextension STRING, vitalSignsSectionorganizerobservations_02valuesvalue STRING, problemSectionact_02observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_02idsextension STRING, vitalSignsSectionorganizerobservations_03idsextension STRING, vitalSignsSectionorganizerobservations_01idsextension STRING, problemSectiontitle STRING, vitalSignsSectionorganizerobservations_01codecodeSystemName STRING, problemSectionact_03observationvaluestranslationsoriginalTextreferencevalue STRING, vitalSignsSectionorganizerobservations_04valuesunit STRING, problemSectionact_02idextension STRING, vitalSignsSectionorganizerobservations_05statusCodecode STRING, vitalSignsSectionorganizerobservations_04effectiveTimevalue STRING, problemSectionact_02observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesvalue STRING, vitalSignsSectionorganizereffectiveTimevalue STRING, problemSectionact_03observationvaluestranslationscode STRING, vitalSignsSectionorganizerobservations_03codedisplayName STRING, vitalSignsSectionorganizerobservations_02texttext_01value STRING, vitalSignsSectionorganizerobservations_05texttext_01value STRING, absolutepath STRING, vitalSignsSectioncodedisplayName STRING, problemSectionact_03idextension STRING, problemSectionact_01observationvaluestranslationsoriginalTextreferencevalue STRING, problemSectionact_02observationvaluestranslationsoriginalTextreferencevalue STRING, filelastModifiedTime STRING, problemSectioncodecode STRING, vitalSignsSectionorganizeridsroot STRING, problemSectionact_02observationproblemStatuscodecodeSystem STRING, vitalSignsSectionorganizerobservations_05codecodeSystem STRING, filegroup STRING, problemSectionact_01observationproblemStatusvaluescode STRING, problemSectionact_01observationvaluestranslationsdisplayName STRING, problemSectionact_02codecode STRING, idextension STRING, vitalSignsSectioncodecode STRING, problemSectionact_03observationproblemStatuscodecodeSystemName STRING, problemSectionact_01idroot STRING, vitalSignsSectiontitle STRING, problemSectionact_01observationproblemStatuscodecodeSystemName STRING, vitalSignsSectionorganizerobservations_03valuesunit STRING, vitalSignsSectionorganizerobservations_01textreferencevalue STRING, effectiveTime STRING, vitalSignsSectionorganizerobservations_03codecodeSystemName STRING, problemSectionact_03observationstatusCodecode STRING, problemSectionact_02statusCodecode STRING, problemSectionact_02observationidextension STRING, problemSectionact_01idextension STRING, vitalSignsSectionorganizerstatusCodecode STRING, vitalSignsSectionorganizerobservations_05codedisplayName STRING, vitalSignsSectionorganizerobservations_04codecodeSystem STRING, vitalSignsSectionorganizerobservations_02codedisplayName STRING, problemSectionact_01observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_05codecode STRING, vitalSignsSectionorganizerobservations_04codecodeSystemName STRING, problemSectionact_02observationvaluestranslationsdisplayName STRING, idroot STRING, vitalSignsSectionorganizerobservations_02textreferencevalue STRING, problemSectionact_01observationidextension STRING, problemSectionact_01observationvaluestranslationscodeSystem STRING, problemSectionact_01codecode STRING, problemSectionact_02observationproblemStatusvaluesdisplayName STRING, problemSectionact_01codecodeSystem STRING, codecodeSystemName STRING, vitalSignsSectionorganizerobservations_01effectiveTimevalue STRING, vitalSignsSectionorganizercodedisplayName STRING, vitalSignsSectionorganizerobservations_02codecodeSystemName STRING, vitalSignsSectionorganizerobservations_03textreferencevalue STRING, vitalSignsSectionorganizerobservations_02valuesunit STRING, problemSectionact_03observationproblemStatusvaluesdisplayName STRING, problemSectionact_02observationproblemStatuscodecode STRING, vitalSignsSectionorganizerobservations_03statusCodecode STRING, problemSectionact_03observationproblemStatusvaluescode STRING, problemSectionact_02observationeffectiveTimelow STRING, problemSectionact_03observationvaluestranslationscodeSystemName STRING, vitalSignsSectionorganizerobservations_03texttext_01value STRING, problemSectionact_01observationproblemStatuscodecodeSystem STRING, problemSectionact_03observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizercodecodeSystemName STRING, problemSectionact_03observationidextension STRING, vitalSignsSectionorganizerobservations_01codecode STRING, codecodeSystem STRING, problemSectionact_02effectiveTimelow STRING, problemSectioncodedisplayName STRING, problemSectionact_02observationproblemStatusvaluescode STRING, vitalSignsSectionorganizeridsextension STRING, problemSectionact_02observationstatusCodecode STRING, vitalSignsSectionorganizerobservations_02codecode STRING, title STRING, problemSectionact_03idroot STRING, problemSectionidextension STRING, problemSectionact_03observationproblemStatusstatusCodecode STRING, problemSectionact_03effectiveTimelow STRING, problemSectionact_02observationproblemStatusvaluescodeSystemName STRING, fileowner STRING, vitalSignsSectionorganizerobservations_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05textreferencevalue STRING, filepermissions STRING, vitalSignsSectionorganizerobservations_02codecodeSystem STRING, vitalSignsSectionorganizerobservations_05valuesunit STRING, problemSectionact_01observationvaluestranslationscode STRING, problemSectionact_01statusCodecode STRING, vitalSignsSectionorganizerobservations_05codecodeSystemName STRING, problemSectionact_03codecodeSystem STRING, vitalSignsSectioncodecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystemName STRING, vitalSignsSectioncodecodeSystemName STRING, problemSectionact_01observationproblemStatuscodecode STRING, problemSectionact_02observationidroot STRING, vitalSignsSectionorganizerobservations_01codecodeSystem STRING, problemSectionact_01observationproblemStatusvaluescodeSystem STRING, vitalSignsSectionorganizerobservations_03effectiveTimevalue STRING, vitalSignsSectionorganizerobservations_04codedisplayName STRING, problemSectionact_03observationeffectiveTimelow STRING) STORED AS ORC LOCATION '/ccda' AttributeCleaner Apache Avro names can't have spaces, dots, dashes or weird symbols. So we remove them. Dirty Name problem.section.act_01... Clean Safe Name problemSectionact_01observationvaluestranslationsoriginalTextreferencevalue As we need to remove those periods created when the processor flattens out the XML. Source Code for Custom Processor https://github.com/tspannhw/nifi-attributecleaner-processor References http://calcite.apache.org/docs/reference.html Apache NiFi Flow c-cda-ingest-with-custom-processor.xml
... View more
Labels:
11-29-2017
04:59 PM
3 Kudos
There are many ways to integrate Apache NiFi and Apache Spark.
We can call Apache Spark Streaming via S2S (Apache NiFi's Site to Site) or Kafka. If you want to execute a regular Apache Spark job, you can do that via Apache Livy which is included in HDP 2.6+. This is how Apache Zeppelin integrates with Apache Spark, so it's secure and a reasonable approach. I use this approach when I want to use Spark to process part of my process in the middle of an Apache NiFi flow. Syntax for Calling a Job This job is stored in HDFS as /apps/logs*jar with the class name com.dataflowdeveloper.logs.Logs.
Schema For Apache Livy Status Messages
{
"type": "record",
"name": "livystatus",
"fields": [
{
"name": "id",
"type": [
"null",
"int"
]
},
{
"name": "state",
"type": [
"null",
"string"
]
},
{
"name": "appId",
"type": [
"null",
"string"
]
},
{
"name": "driverLogUrl",
"type": [
"null",
"string"
]
},
{
"name": "sparkUiUrl",
"type": [
"null",
"string"
]
}
]
}
Example Apache Spark Apache Livy Status Message Reformatted For Usage
{
"sparkUiUrl" : "http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0022/",
"id" : "19",
"state" : "success",
"driverLogUrl" : "http://princeton-14-2-1.field.hortonworks.com:8188/applicationhistory/logs/princeton-14-2-1.field.hortonworks.com:45454/container_e02_1511839325046_0022_01_000001/container_e02_1511839325046_0022_01_000001/livy",
"appId" : "application_1511839325046_0022"
}
Apache Livy Status Monitoring Flow Query Record Processor for Querying and Determining with Result to Do Apache Ambari Screen for Turning off CSRF Protection for Apache Livy Results in JSON from Apache Livy REST Call Hortonworks Schema Registry Apache Spark Job Submitted And Running The Apache Spark Job Environment During the Run Results from the Apache Spark job Shown in YARN Logs Apache YARN Run Information on the Apache Spark Job
This is the result of a completed message. As you can see we get some really cool information here. The State is really important, once it has success you can do the other processing you need.
Driver Log URL will point you to the logs, you could ingest this with Apache NiFi.
Spark UI URL will point you to the running Spark logs.
Raw REST JSON Message
{
"from" : 0,
"total" : 1,
"sessions" : [ {
"id" : 12,
"state" : "running",
"appId" : "application_1511839325046_0015",
"appInfo" : {
"driverLogUrl" : "http://princeton-14-2-1.field.hortonworks.com:8042/node/containerlogs/container_e02_1511839325046_0015_01_000001/livy",
"sparkUiUrl" : "http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0015/"
},
"log" : [ "\t diagnostics: [Tue Nov 28 19:24:09 +0000 2017] Scheduler has assigned a container for AM, waiting for AM container to be launched", "\t ApplicationMaster host: N/A", "\t ApplicationMaster RPC port: -1", "\t queue: default", "\t start time: 1511897049505", "\t final status: UNDEFINED", "\t tracking URL: http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0015/", "\t user: livy", "17/11/28 19:24:09 INFO ShutdownHookManager: Shutdown hook called", "17/11/28 19:24:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8ada7f2-43d0-4823-8816-6e930101f2f1" ]
} ]
}
REST URL: http://yourlivyapi:8999/batches/
You can see this in Ambari. My Apache Spark Job Configuration My Apache Spark job needs some data. hdfs dfs -mkdir -p /user/livy/data/
hdfs dfs -put access3.log /user/livy/data
hdfs dfs -chmod -R 777 /user/livy/data
The Run of the chosen Apache Spark Job
Log Type: stdout
Log Upload Time: Tue Nov 28 17:59:02 +0000 2017
Log Length: 34968
===== Log Count: 206857
LogRecord(194.187.168.230,-,-,20/Feb/2016:00:00:45 -0500,GET,200,187,-,Mozilla/5.0 (compatible; Qwantify/2.2w; +https://www.qwant.com/)/*)
LogRecord(194.187.168.230,-,-,20/Feb/2016:00:00:46 -0500,GET,200,24810,-,Mozilla/5.0 (compatible; Qwantify/2.2w; +https://www.qwant.com/)/*)
LogRecord(192.0.84.33,-,-,20/Feb/2016:00:01:56 -0500,HEAD,200,0,-,jetmon/1.0 (Jetpack Site Uptime Monitor by WordPress.com))
LogRecord(66.249.64.8,-,-,20/Feb/2016:00:02:13 -0500,GET,200,28486,-,Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html))
LogRecord(192.0.84.33,-,-,20/Feb/2016:00:02:15 -0500,HEAD,200,0,-,jetmon/1.0 (Jetpack Site Uptime Monitor by WordPress.com))
root
|-- clientIp: string (nullable = true)
|-- clientIdentity: string (nullable = true)
|-- user: string (nullable = true)
|-- dateTime: string (nullable = true)
|-- request: string (nullable = true)
|-- statusCode: integer (nullable = true)
|-- bytesSent: long (nullable = true)
|-- referer: string (nullable = true)
|-- userAgent: string (nullable = true)
+-------+-----------------+
|summary| bytesSent|
+-------+-----------------+
| count| 206857|
| mean|28017.72503226867|
| stddev|137716.9426060656|
| min| 0|
| max| 15379053|
+-------+-----------------+
== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).clientIp, true) AS clientIp#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).clientIdentity, true) AS clientIdentity#11, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).user, true) AS user#12, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).dateTime, true) AS dateTime#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).request, true) AS request#14, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).statusCode AS statusCode#15, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).bytesSent AS bytesSent#16L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).referer, true) AS referer#17, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).userAgent, true) AS userAgent#18]
+- Scan ExternalRDDScan[obj#9]
After writing results
===== Number of Log Records: 206857 Content Size Total: 5795662547, Avg: 28017, Min: 0, Max: 15379053
=====Status Code counts: [(404,21611),(200,170127),(302,1467),(206,87),(304,1260),(406,3242),(500,1106),(409,28),(301,4968),(403,2601),(407,123),(429,1),(405,236)]
=====IP Addresses Accessed > 10 times: [51.255.65.87,146.127.253.45,201.239.138.159,157.55.39.157,1.22.196.230,54.211.201.215,180.179.40.44,....]
My example simple Apache Spark job to parse Apache Logs is included in the github referenced below. Gothcha You may need to disable this (set to false) in Ambari under Apache Spark area: livy.server.csrf_protection.enabled Directories
/var/log/livy2/livy-livy-server.out Flow File livynifiintegration.xml
References:
https://calcite.apache.org/docs/reference.html
http://livy.incubator.apache.org/
https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark
https://github.com/apache/nifi/tree/master/nifi-external
https://community.hortonworks.com/articles/73828/submitting-spark-jobs-from-apache-nifi-using-livy.html
https://github.com/diegobaez/PUBLIC/tree/master/NiFi-SnapSpark http://livy.incubator.apache.org./docs/latest/rest-api.html https://hortonworks.com/blog/livy-a-rest-interface-for-apache-spark/ https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-livy-rest-interface https://github.com/tspannhw/apachelivy-nifi-spark2-integration
... View more
Labels:
11-28-2017
04:46 PM
You have to URL encode http://livy.incubator.apache.org./docs/latest/rest-api.html https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.2/bk_command-line-installation/content/configure_livy.html
... View more