- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 12-02-2017 08:07 PM - edited 08-17-2019 09:49 AM
Step 1: Collect HL7 Health Records
Python to Send JSON Data to MQTT (Data Generated by https://www.mockaroo.com/)
import paho.mqtt.client as mqtt import json # MQTT client = mqtt.Client() client.connect("localhost", 14162, 60) row = [{"PID_SSNNumberPatient":823456789,"email":"ahospital0@census.gov","gender":"Male","ip_address":"138.135.180.206","drug_provider":"OrchidPharma Inc","icd9":"94140","icd9_description":"Deep necrosis of underlying tissues [deep third degree] without mention of loss of a body part, face and head, unspecified site","icd9P_proc":"7942","icd9_proc_description":"Closed reduction of separated epiphysis, radius and ulna","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_8) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.801.0 Safari/535.1","drug_used":"Naratriptan"}] json_string = json.dumps(row) client.publish("patientdata",payload=json_string,qos=1,retain=False) client.disconnect()
Step 2: Get to Apache NiFi via FTP, sFTP, File, Apache Kafka, MQTT, REST API, TCP/IP or hundreds more options.
Schemas
{ "type": "record", "name": "hl7oru", "fields": [ { "name": "OBX_1_UserDefinedAccessChecks", "type": "string", "doc": "Type inferred from '\"20150101000100\"'" }, { "name": "OBR_1_OrderingProvider_FamilyName", "type": "string", "doc": "Type inferred from '\"Johnson\"'" }, { "name": "MSH_MessageControlID", "type": "string", "doc": "Type inferred from '\"Q1111111111111111111\"'" }, { "name": "OBX_1_ObservationIdentifier_Text", "type": "string", "doc": "Type inferred from '\"Glucose Lvl\"'" }, { "name": "MSH_SendingApplication_NamespaceID", "type": "string", "doc": "Type inferred from '\"XXXXXX\"'" }, { "name": "OBR_1_UniversalServiceIdentifier_Text", "type": "string", "doc": "Type inferred from '\"Basic Metabolic Panel\"'" }, { "name": "MSH_ReceivingApplication_NamespaceID", "type": "string", "doc": "Type inferred from '\"HealthOrg01\"'" }, { "name": "MSH_ProcessingID_ProcessingID", "type": "string", "doc": "Type inferred from '\"P\"'" }, { "name": "PID_SSNNumberPatient", "type": "string", "doc": "Type inferred from '\"123456789\"'" }, { "name": "OBR_1_FillerOrderNumber_EntityIdentifier", "type": "string", "doc": "Type inferred from '\"000000000000000000\"'" }, { "name": "PID_PatientAccountNumber_ID", "type": "string", "doc": "Type inferred from '\"999999999999\"'" }, { "name": "PID_DateOfBirth", "type": "string", "doc": "Type inferred from '\"19700101\"'" }, { "name": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber", "type": "string", "doc": "Type inferred from '\"1234567890\"'" }, { "name": "PID_Sex", "type": "string", "doc": "Type inferred from '\"M\"'" }, { "name": "MSH_MessageType_MessageType", "type": "string", "doc": "Type inferred from '\"ORU\"'" }, { "name": "OBX_1_ReferencesRange", "type": "string", "doc": "Type inferred from '\"H\"'" }, { "name": "OBR_1_OrderingProvider_IDNumber", "type": "string", "doc": "Type inferred from '\"1620\"'" }, { "name": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName", "type": "string", "doc": "Type inferred from '\"LAST\"'" }, { "name": "OBX_1_Units_NameOfCodingSystem", "type": "string", "doc": "Type inferred from '\"99\"'" }, { "name": "OBX_1_Units_Identifier", "type": "string", "doc": "Type inferred from '\"65-99\"'" }, { "name": "PID_PatientName_GivenName", "type": "string", "doc": "Type inferred from '\"JOHN\"'" }, { "name": "OBX_1_ObservationSubID", "type": "string", "doc": "Type inferred from '\"159\"'" }, { "name": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName", "type": "string", "doc": "Type inferred from '\"FIRST\"'" }, { "name": "OBR_1_PlacerOrderNumber_NamespaceID", "type": "string", "doc": "Type inferred from '\"HNAM_ORDERID\"'" }, { "name": "MSH_MessageType_TriggerEvent", "type": "string", "doc": "Type inferred from '\"R01\"'" }, { "name": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority", "type": "string", "doc": "Type inferred from '\"NPI\"'" }, { "name": "OBR_1_ResultStatus", "type": "string", "doc": "Type inferred from '\"M\"'" }, { "name": "PID_PatientName_FamilyName", "type": "string", "doc": "Type inferred from '\"SMITH\"'" }, { "name": "MSH_EncodingCharacters", "type": "string", "doc": "Type inferred from '\"^~\\&\"'" }, { "name": "MSH_VersionID", "type": "string", "doc": "Type inferred from '\"2.3\"'" }, { "name": "OBR_1_UniversalServiceIdentifier_Identifier", "type": "string", "doc": "Type inferred from '\"648088\"'" }, { "name": "OBR_1_ObservationDateTime", "type": "string", "doc": "Type inferred from '\"20150101000100\"'" }, { "name": "OBR_1_ScheduledDateTime", "type": "string", "doc": "Type inferred from '\"20150101000100\"'" }, { "name": "OBX_1_ObservationIdentifier_Identifier", "type": "string", "doc": "Type inferred from '\"GLU\"'" }, { "name": "OBR_1_OrderingProvider_GivenName", "type": "string", "doc": "Type inferred from '\"John\"'" }, { "name": "OBR_1_SetIDObservationRequest", "type": "string", "doc": "Type inferred from '\"1\"'" }, { "name": "OBR_1_ResultsRptStatusChngDateTime", "type": "string", "doc": "Type inferred from '\"20150101000100\"'" }, { "name": "OBR_1_PlacerOrderNumber_EntityIdentifier", "type": "string", "doc": "Type inferred from '\"341856649\"'" }, { "name": "OBX_1_NatureOfAbnormalTest", "type": "string", "doc": "Type inferred from '\"F\"'" }, { "name": "OBX_1_SetIDOBX", "type": "string", "doc": "Type inferred from '\"1\"'" }, { "name": "MSH_FieldSeparator", "type": "string", "doc": "Type inferred from '\"|\"'" }, { "name": "PD1", "type": { "type": "record", "name": "PD1", "fields": [ { "name": "PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName", "type": "string", "doc": "Type inferred from '\"M\"'" } ] }, "doc": "Type inferred from '{\"PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName\":\"M\"}'" }, { "name": "OBX_1_Units_Text", "type": "string", "doc": "Type inferred from '\"65\"'" }, { "name": "OBX_1_ValueType", "type": "string", "doc": "Type inferred from '\"NM\"'" }, { "name": "PID_PatientIDInternalID_ID", "type": "string", "doc": "Type inferred from '\"000000001\"'" }, { "name": "OBX_1_ObservationValue", "type": "string", "doc": "Type inferred from '\"mg/dL\"'" }, { "name": "OBR_1_OrderingProvider_MiddleInitialOrName", "type": "string", "doc": "Type inferred from '\"R\"'" } ] }
Jolt Scripts
{ "OBX_1.UserDefinedAccessChecks": "OBX_1_UserDefinedAccessChecks", "OBR_1.OrderingProvider.FamilyName": "OBR_1_OrderingProvider_FamilyName", "MSH.MessageControlID": "MSH_MessageControlID", "OBX_1.ObservationIdentifier.Text": "OBX_1_ObservationIdentifier_Text", "MSH.SendingApplication.NamespaceID": "MSH_SendingApplication_NamespaceID", "OBR_1.UniversalServiceIdentifier.Text": "OBR_1_UniversalServiceIdentifier_Text", "MSH.ReceivingApplication.NamespaceID": "MSH_ReceivingApplication_NamespaceID", "MSH.ProcessingID.ProcessingID": "MSH_ProcessingID_ProcessingID", "PID.SSNNumberPatient": "PID_SSNNumberPatient", "OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1_FillerOrderNumber_EntityIdentifier", "PID.PatientAccountNumber.ID": "PID_PatientAccountNumber_ID", "PID.DateOfBirth": "PID_DateOfBirth", "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber", "PID.Sex": "PID_Sex", "MSH.MessageType.MessageType": "MSH_MessageType_MessageType", "OBX_1.ReferencesRange": "OBX_1_ReferencesRange", "OBR_1.OrderingProvider.IDNumber": "OBR_1_OrderingProvider_IDNumber", "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName", "OBX_1.Units.NameOfCodingSystem": "OBX_1_Units_NameOfCodingSystem", "OBX_1.Units.Identifier": "OBX_1_Units_Identifier", "PID.PatientName.GivenName": "PID_PatientName_GivenName", "OBX_1.ObservationSubID": "OBX_1_ObservationSubID", "PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName", "OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1_PlacerOrderNumber_NamespaceID", "MSH.MessageType.TriggerEvent": "MSH_MessageType_TriggerEvent", "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority", "OBR_1.ResultStatus": "OBR_1_ResultStatus", "PID.PatientName.FamilyName": "PID_PatientName_FamilyName", "MSH.EncodingCharacters": "MSH_EncodingCharacters", "MSH.VersionID": "MSH_VersionID", "OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1_UniversalServiceIdentifier_Identifier", "OBR_1.ObservationDateTime": "OBR_1_ObservationDateTime", "OBR_1.ScheduledDateTime": "OBR_1_ScheduledDateTime", "OBX_1.ObservationIdentifier.Identifier": "OBX_1_ObservationIdentifier_Identifier", "OBR_1.OrderingProvider.GivenName": "OBR_1_OrderingProvider_GivenName", "OBR_1.SetIDObservationRequest": "OBR_1_SetIDObservationRequest", "OBR_1.ResultsRptStatusChngDateTime": "OBR_1_ResultsRptStatusChngDateTime", "OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1_PlacerOrderNumber_EntityIdentifier", "OBX_1.NatureOfAbnormalTest": "OBX_1_NatureOfAbnormalTest", "OBX_1.SetIDOBX": "OBX_1_SetIDOBX", "MSH.FieldSeparator": "MSH_FieldSeparator", "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName", "OBX_1.Units.Text": "OBX_1_Units_Text", "OBX_1.ValueType": "OBX_1_ValueType", "PID.PatientIDInternalID.ID": "PID_PatientIDInternalID_ID", "OBX_1.ObservationValue": "OBX_1_ObservationValue", "OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1_OrderingProvider_MiddleInitialOrName" } { "OBX_1.UserDefinedAccessChecks": "OBX_1.UserDefinedAccessChecks", "OBR_1.OrderingProvider.FamilyName": "OBR_1.OrderingProvider.FamilyName", "MSH.MessageControlID": "MSH.MessageControlID", "OBX_1.ObservationIdentifier.Text": "OBX_1.ObservationIdentifier.Text", "MSH.SendingApplication.NamespaceID": "MSH.SendingApplication.NamespaceID", "OBR_1.UniversalServiceIdentifier.Text": "OBR_1.UniversalServiceIdentifier.Text", "MSH.ReceivingApplication.NamespaceID": "MSH.ReceivingApplication.NamespaceID", "MSH.ProcessingID.ProcessingID": "MSH.ProcessingID.ProcessingID", "PID.SSNNumberPatient": "PID.SSNNumberPatient", "OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1.FillerOrderNumber.EntityIdentifier", "PID.PatientAccountNumber.ID": "PID.PatientAccountNumber.ID", "PID.DateOfBirth": "PID.DateOfBirth", "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber", "PID.Sex": "PID.Sex", "MSH.MessageType.MessageType": "MSH.MessageType.MessageType", "OBX_1.ReferencesRange": "OBX_1.ReferencesRange", "OBR_1.OrderingProvider.IDNumber": "OBR_1.OrderingProvider.IDNumber", "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName", "OBX_1.Units.NameOfCodingSystem": "OBX_1.Units.NameOfCodingSystem", "OBX_1.Units.Identifier": "OBX_1.Units.Identifier", "PID.PatientName.GivenName": "PID.PatientName.GivenName", "OBX_1.ObservationSubID": "OBX_1.ObservationSubID", "PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1.PatientPrimaryCareProviderNameIDNo.GivenName", "OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1.PlacerOrderNumber.NamespaceID", "MSH.MessageType.TriggerEvent": "MSH.MessageType.TriggerEvent", "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority", "OBR_1.ResultStatus": "OBR_1.ResultStatus", "PID.PatientName.FamilyName": "PID.PatientName.FamilyName", "MSH.EncodingCharacters": "MSH.EncodingCharacters", "MSH.VersionID": "MSH.VersionID", "OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1.UniversalServiceIdentifier.Identifier", "OBR_1.ObservationDateTime": "OBR_1.ObservationDateTime", "OBR_1.ScheduledDateTime": "OBR_1.ScheduledDateTime", "OBX_1.ObservationIdentifier.Identifier": "OBX_1.ObservationIdentifier.Identifier", "OBR_1.OrderingProvider.GivenName": "OBR_1.OrderingProvider.GivenName", "OBR_1.SetIDObservationRequest": "OBR_1.SetIDObservationRequest", "OBR_1.ResultsRptStatusChngDateTime": "OBR_1.ResultsRptStatusChngDateTime", "OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1.PlacerOrderNumber.EntityIdentifier", "OBX_1.NatureOfAbnormalTest": "OBX_1.NatureOfAbnormalTest", "OBX_1.SetIDOBX": "OBX_1.SetIDOBX", "MSH.FieldSeparator": "MSH.FieldSeparator", "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName", "OBX_1.Units.Text": "OBX_1.Units.Text", "OBX_1.ValueType": "OBX_1.ValueType", "PID.PatientIDInternalID.ID": "PID.PatientIDInternalID.ID", "OBX_1.ObservationValue": "OBX_1.ObservationValue", "OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1.OrderingProvider.MiddleInitialOrName" }
Step 3: Profit
Build a Big Data Environment For Our Data
(Build HDFS Directories)
su hdfs hdfs dfs -mkdir -p /hl7/hl7-mdm hdfs dfs -mkdir -p /hl7/hl7-adt hdfs dfs -mkdir -p /hl7/hl7-orm hdfs dfs -mkdir -p /hl7/hl7-oru hdfs dfs -mkdir -p /hl7/json/hl7-mdm hdfs dfs -mkdir -p /hl7/json/hl7-adt hdfs dfs -mkdir -p /hl7/json/hl7-orm hdfs dfs -mkdir -p /hl7/json/hl7-oru hdfs dfs -mkdir -p /hl7/flat/oru hdfs dfs -mkdir -p /patientdata hdfs dfs -chmod -R 777 /hl7 hdfs dfs -chmod -R 777 /patientdata hdfs dfs -ls -R /hl7 hdfs dfs -ls -R /patientdata
(Build Hive DDL)
CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING, drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING) STORED AS ORC LOCATION '/patientdata' CREATE EXTERNAL TABLE IF NOT EXISTS hl7oru (OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING, Identifier:STRING>, ReferencesRange:STRING, Units:STRUCT<NameOfCodingSystem:STRING, Identifier:STRING, Text:STRING>, ObservationSubID:STRING, NatureOfAbnormalTest:STRING, SetIDOBX:STRING, ValueType:STRING, ObservationValue:STRING>, OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING, IDNumber:STRING, GivenName:STRING, MiddleInitialOrName:STRING>, UniversalServiceIdentifier:STRUCT<Text:STRING, Identifier:STRING>, FillerOrderNumber:STRUCT<EntityIdentifier:STRING>, PlacerOrderNumber:STRUCT<NamespaceID:STRING, EntityIdentifier:STRING>, ResultStatus:STRING, ObservationDateTime:STRING, ScheduledDateTime:STRING, SetIDObservationRequest:STRING, ResultsRptStatusChngDateTime:STRING>, MSH STRUCT<MessageControlID:STRING, SendingApplication:STRUCT<NamespaceID:STRING>, ReceivingApplication:STRUCT<NamespaceID:STRING>, ProcessingID:STRUCT<ProcessingID:STRING>, MessageType:STRUCT<MessageType:STRING, TriggerEvent:STRING>, EncodingCharacters:STRING, VersionID:STRING, FieldSeparator:STRING>, PID STRUCT<SSNNumberPatient:STRING, PatientAccountNumber:STRUCT<ID:STRING>, DateOfBirth:STRING, Sex:STRING, PatientName:STRUCT<GivenName:STRING, FamilyName:STRING>, PatientIDInternalID:STRUCT<ID:STRING>>, PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING, FamilyName:STRING, GivenName:STRING, AssigningAuthority:STRING, MiddleInitialOrName:STRING>>) STORED AS ORC LOCATION '/hl7/hl7-oru' CREATE EXTERNAL TABLE IF NOT EXISTS hl7_oru_flat (OBX_1_UserDefinedAccessChecks STRING, OBR_1_OrderingProvider_FamilyName STRING, MSH_MessageControlID STRING, OBX_1_ObservationIdentifier_Text STRING, MSH_SendingApplication_NamespaceID STRING, OBR_1_UniversalServiceIdentifier_Text STRING, MSH_ReceivingApplication_NamespaceID STRING, MSH_ProcessingID_ProcessingID STRING, PID_SSNNumberPatient STRING, OBR_1_FillerOrderNumber_EntityIdentifier STRING, PID_PatientAccountNumber_ID STRING, PID_DateOfBirth STRING, PD1_PatientPrimaryCareProviderNameIDNo_IDNumber STRING, PID_Sex STRING, MSH_MessageType_MessageType STRING, OBX_1_ReferencesRange STRING, OBR_1_OrderingProvider_IDNumber STRING, PD1_PatientPrimaryCareProviderNameIDNo_FamilyName STRING, OBX_1_Units_NameOfCodingSystem STRING, OBX_1_Units_Identifier STRING, PID_PatientName_GivenName STRING, OBX_1_ObservationSubID STRING, PD1_PatientPrimaryCareProviderNameIDNo_GivenName STRING, OBR_1_PlacerOrderNumber_NamespaceID STRING, MSH_MessageType_TriggerEvent STRING, PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority STRING, OBR_1_ResultStatus STRING, PID_PatientName_FamilyName STRING, MSH_EncodingCharacters STRING, MSH_VersionID STRING, OBR_1_UniversalServiceIdentifier_Identifier STRING, OBR_1_ObservationDateTime STRING, OBR_1_ScheduledDateTime STRING, OBX_1_ObservationIdentifier_Identifier STRING, OBR_1_OrderingProvider_GivenName STRING, OBR_1_SetIDObservationRequest STRING, OBR_1_ResultsRptStatusChngDateTime STRING, OBR_1_PlacerOrderNumber_EntityIdentifier STRING, OBX_1_NatureOfAbnormalTest STRING, OBX_1_SetIDOBX STRING, MSH_FieldSeparator STRING, PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName STRING, OBX_1_Units_Text STRING, OBX_1_ValueType STRING, PID_PatientIDInternalID_ID STRING, OBX_1_ObservationValue STRING, OBR_1_OrderingProvider_MiddleInitialOrName STRING) STORED AS ORC LOCATION '/hl7/flat/oru' CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING, drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING) STORED AS ORC LOCATION '/patientdata' CREATE EXTERNAL TABLE IF NOT EXISTS hl7-oru (OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING, Identifier:STRING>, ReferencesRange:STRING, Units:STRUCT<NameOfCodingSystem:STRING, Identifier:STRING, Text:STRING>, ObservationSubID:STRING, NatureOfAbnormalTest:STRING, SetIDOBX:STRING, ValueType:STRING, ObservationValue:STRING>, OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING, IDNumber:STRING, GivenName:STRING, MiddleInitialOrName:STRING>, UniversalServiceIdentifier:STRUCT<Text:STRING, Identifier:STRING>, FillerOrderNumber:STRUCT<EntityIdentifier:STRING>, PlacerOrderNumber:STRUCT<NamespaceID:STRING, EntityIdentifier:STRING>, ResultStatus:STRING, ObservationDateTime:STRING, ScheduledDateTime:STRING, SetIDObservationRequest:STRING, ResultsRptStatusChngDateTime:STRING>, MSH STRUCT<MessageControlID:STRING, SendingApplication:STRUCT<NamespaceID:STRING>, ReceivingApplication:STRUCT<NamespaceID:STRING>, ProcessingID:STRUCT<ProcessingID:STRING>, MessageType:STRUCT<MessageType:STRING, TriggerEvent:STRING>, EncodingCharacters:STRING, VersionID:STRING, FieldSeparator:STRING>, PID STRUCT<SSNNumberPatient:STRING, PatientAccountNumber:STRUCT<ID:STRING>, DateOfBirth:STRING, Sex:STRING, PatientName:STRUCT<GivenName:STRING, FamilyName:STRING>, PatientIDInternalID:STRUCT<ID:STRING>>, PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING, FamilyName:STRING, GivenName:STRING, AssigningAuthority:STRING, MiddleInitialOrName:STRING>>) STORED AS ORC LOCATION '/hl7/hl7oru'
(Build Kafka Topics)
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientboth /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic simple /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm_avro /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt_avro /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm_avro /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru_avro /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientdata
Script to send a File to Kafka
sendmessage.sh /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic hl7-oru < hl7sampledata.txt
HBase DDL
hbase shell create 'patient_observations', 'obs' list
Running a Mosquitto MQTT Broker (OSX)
/usr/local/Cellar/mosquitto/1.4.14_2/sbin/mosquitto --daemon --verbose --port 14162
Removing Unneeded HDFS Files
hdfs dfs -rm -r -f -skipTrash $1
Example Data from Internet
MSH|^~\&|XXXXXX||HealthOrg01||||ORU^R01|Q1111111111111111111|P|2.3|<cr>PID|||000000001||SMITH^JOHN||19700101|M||||||||||999999999999|123456789|<cr>PD1||||1234567890^LAST^FIRST^M^^^^^NPI|<cr>OBR|1|341856649^HNAM_ORDERID|000000000000000000|648088^Basic Metabolic Panel|||20150101000100|||||||||1620^Johnson^John^R||||||20150101000100|||M|||||||||||20150101000100|<cr>OBX|1|NM|GLU^Glucose Lvl|159|mg/dL|65-99^65^99|H|||F|||20150101000100|
Code Section
Map<String, String> attributes = flowFile.getAttributes(); Map<String, String> attributesClean = new HashMap<>(); String tempKey = ""; for (Map.Entry<String, String> entry : attributes.entrySet()){ tempKey = entry.getKey().replaceFirst("[^A-Za-z]", "");<br> tempKey = tempKey.replaceAll("[^A-Za-z0-9_]", "");<br> attributesClean.put(tempKey, entry.getValue());<br> session.removeAttribute(flowFile, entry.getKey());<br>}<br>session.putAllAttributes(flowFile, attributesClean);
Apache NiFi FlowFile
hl7new.xml
Next Sections:
Important Starter Articles
https://community.hortonworks.com/articles/138249/nifi-in-healthcare-ingesting-hl7-data-in-nifi.html
References:
- https://hortonworks.com/tutorial/real-time-event-processing-in-nifi-sam-schema-registry-and-superset...
- https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-hl7-nar/1.4.0/org.apache.nifi...
- https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi....
- https://github.com/cqframework/clinical_quality_language
- https://github.com/apache/nifi/blob/master/nifi-commons/nifi-hl7-query-language/src/test/java/org/ap...
- https://stackoverflow.com/questions/43251660/apache-nifi-routehl7-issue/43263583
- https://gist.github.com/alopresto/9b46011185efd0380e6c5da0a3412e8f