Created on 12-02-2017 08:07 PM - edited 08-17-2019 09:49 AM
Step 1: Collect HL7 Health Records
Python to Send JSON Data to MQTT (Data Generated by https://www.mockaroo.com/)
import paho.mqtt.client as mqtt
import json
# MQTT
client = mqtt.Client()
client.connect("localhost", 14162, 60)
row = [{"PID_SSNNumberPatient":823456789,"email":"ahospital0@census.gov","gender":"Male","ip_address":"138.135.180.206","drug_provider":"OrchidPharma Inc","icd9":"94140","icd9_description":"Deep necrosis of underlying tissues [deep third degree] without mention of loss of a body part, face and head, unspecified site","icd9P_proc":"7942","icd9_proc_description":"Closed reduction of separated epiphysis, radius and ulna","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_8) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.801.0 Safari/535.1","drug_used":"Naratriptan"}]
json_string = json.dumps(row)
client.publish("patientdata",payload=json_string,qos=1,retain=False)
client.disconnect()
Step 2: Get to Apache NiFi via FTP, sFTP, File, Apache Kafka, MQTT, REST API, TCP/IP or hundreds more options.
Schemas
{
"type": "record",
"name": "hl7oru",
"fields": [
{
"name": "OBX_1_UserDefinedAccessChecks",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_OrderingProvider_FamilyName",
"type": "string",
"doc": "Type inferred from '\"Johnson\"'"
},
{
"name": "MSH_MessageControlID",
"type": "string",
"doc": "Type inferred from '\"Q1111111111111111111\"'"
},
{
"name": "OBX_1_ObservationIdentifier_Text",
"type": "string",
"doc": "Type inferred from '\"Glucose Lvl\"'"
},
{
"name": "MSH_SendingApplication_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"XXXXXX\"'"
},
{
"name": "OBR_1_UniversalServiceIdentifier_Text",
"type": "string",
"doc": "Type inferred from '\"Basic Metabolic Panel\"'"
},
{
"name": "MSH_ReceivingApplication_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"HealthOrg01\"'"
},
{
"name": "MSH_ProcessingID_ProcessingID",
"type": "string",
"doc": "Type inferred from '\"P\"'"
},
{
"name": "PID_SSNNumberPatient",
"type": "string",
"doc": "Type inferred from '\"123456789\"'"
},
{
"name": "OBR_1_FillerOrderNumber_EntityIdentifier",
"type": "string",
"doc": "Type inferred from '\"000000000000000000\"'"
},
{
"name": "PID_PatientAccountNumber_ID",
"type": "string",
"doc": "Type inferred from '\"999999999999\"'"
},
{
"name": "PID_DateOfBirth",
"type": "string",
"doc": "Type inferred from '\"19700101\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
"type": "string",
"doc": "Type inferred from '\"1234567890\"'"
},
{
"name": "PID_Sex",
"type": "string",
"doc": "Type inferred from '\"M\"'"
},
{
"name": "MSH_MessageType_MessageType",
"type": "string",
"doc": "Type inferred from '\"ORU\"'"
},
{
"name": "OBX_1_ReferencesRange",
"type": "string",
"doc": "Type inferred from '\"H\"'"
},
{
"name": "OBR_1_OrderingProvider_IDNumber",
"type": "string",
"doc": "Type inferred from '\"1620\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
"type": "string",
"doc": "Type inferred from '\"LAST\"'"
},
{
"name": "OBX_1_Units_NameOfCodingSystem",
"type": "string",
"doc": "Type inferred from '\"99\"'"
},
{
"name": "OBX_1_Units_Identifier",
"type": "string",
"doc": "Type inferred from '\"65-99\"'"
},
{
"name": "PID_PatientName_GivenName",
"type": "string",
"doc": "Type inferred from '\"JOHN\"'"
},
{
"name": "OBX_1_ObservationSubID",
"type": "string",
"doc": "Type inferred from '\"159\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
"type": "string",
"doc": "Type inferred from '\"FIRST\"'"
},
{
"name": "OBR_1_PlacerOrderNumber_NamespaceID",
"type": "string",
"doc": "Type inferred from '\"HNAM_ORDERID\"'"
},
{
"name": "MSH_MessageType_TriggerEvent",
"type": "string",
"doc": "Type inferred from '\"R01\"'"
},
{
"name": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
"type": "string",
"doc": "Type inferred from '\"NPI\"'"
},
{
"name": "OBR_1_ResultStatus",
"type": "string",
"doc": "Type inferred from '\"M\"'"
},
{
"name": "PID_PatientName_FamilyName",
"type": "string",
"doc": "Type inferred from '\"SMITH\"'"
},
{
"name": "MSH_EncodingCharacters",
"type": "string",
"doc": "Type inferred from '\"^~\\&\"'"
},
{
"name": "MSH_VersionID",
"type": "string",
"doc": "Type inferred from '\"2.3\"'"
},
{
"name": "OBR_1_UniversalServiceIdentifier_Identifier",
"type": "string",
"doc": "Type inferred from '\"648088\"'"
},
{
"name": "OBR_1_ObservationDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_ScheduledDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBX_1_ObservationIdentifier_Identifier",
"type": "string",
"doc": "Type inferred from '\"GLU\"'"
},
{
"name": "OBR_1_OrderingProvider_GivenName",
"type": "string",
"doc": "Type inferred from '\"John\"'"
},
{
"name": "OBR_1_SetIDObservationRequest",
"type": "string",
"doc": "Type inferred from '\"1\"'"
},
{
"name": "OBR_1_ResultsRptStatusChngDateTime",
"type": "string",
"doc": "Type inferred from '\"20150101000100\"'"
},
{
"name": "OBR_1_PlacerOrderNumber_EntityIdentifier",
"type": "string",
"doc": "Type inferred from '\"341856649\"'"
},
{
"name": "OBX_1_NatureOfAbnormalTest",
"type": "string",
"doc": "Type inferred from '\"F\"'"
},
{
"name": "OBX_1_SetIDOBX",
"type": "string",
"doc": "Type inferred from '\"1\"'"
},
{
"name": "MSH_FieldSeparator",
"type": "string",
"doc": "Type inferred from '\"|\"'"
},
{
"name": "PD1",
"type": {
"type": "record",
"name": "PD1",
"fields": [
{
"name": "PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
"type": "string",
"doc": "Type inferred from '\"M\"'"
}
]
},
"doc": "Type inferred from '{\"PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName\":\"M\"}'"
},
{
"name": "OBX_1_Units_Text",
"type": "string",
"doc": "Type inferred from '\"65\"'"
},
{
"name": "OBX_1_ValueType",
"type": "string",
"doc": "Type inferred from '\"NM\"'"
},
{
"name": "PID_PatientIDInternalID_ID",
"type": "string",
"doc": "Type inferred from '\"000000001\"'"
},
{
"name": "OBX_1_ObservationValue",
"type": "string",
"doc": "Type inferred from '\"mg/dL\"'"
},
{
"name": "OBR_1_OrderingProvider_MiddleInitialOrName",
"type": "string",
"doc": "Type inferred from '\"R\"'"
}
]
}
Jolt Scripts
{
"OBX_1.UserDefinedAccessChecks": "OBX_1_UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1_OrderingProvider_FamilyName",
"MSH.MessageControlID": "MSH_MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1_ObservationIdentifier_Text",
"MSH.SendingApplication.NamespaceID": "MSH_SendingApplication_NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1_UniversalServiceIdentifier_Text",
"MSH.ReceivingApplication.NamespaceID": "MSH_ReceivingApplication_NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH_ProcessingID_ProcessingID",
"PID.SSNNumberPatient": "PID_SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1_FillerOrderNumber_EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID_PatientAccountNumber_ID",
"PID.DateOfBirth": "PID_DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
"PID.Sex": "PID_Sex",
"MSH.MessageType.MessageType": "MSH_MessageType_MessageType",
"OBX_1.ReferencesRange": "OBX_1_ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1_OrderingProvider_IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1_Units_NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1_Units_Identifier",
"PID.PatientName.GivenName": "PID_PatientName_GivenName",
"OBX_1.ObservationSubID": "OBX_1_ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1_PlacerOrderNumber_NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH_MessageType_TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1_ResultStatus",
"PID.PatientName.FamilyName": "PID_PatientName_FamilyName",
"MSH.EncodingCharacters": "MSH_EncodingCharacters",
"MSH.VersionID": "MSH_VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1_UniversalServiceIdentifier_Identifier",
"OBR_1.ObservationDateTime": "OBR_1_ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1_ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1_ObservationIdentifier_Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1_OrderingProvider_GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1_SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1_ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1_PlacerOrderNumber_EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1_NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1_SetIDOBX",
"MSH.FieldSeparator": "MSH_FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1_Units_Text",
"OBX_1.ValueType": "OBX_1_ValueType",
"PID.PatientIDInternalID.ID": "PID_PatientIDInternalID_ID",
"OBX_1.ObservationValue": "OBX_1_ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1_OrderingProvider_MiddleInitialOrName"
}
{
"OBX_1.UserDefinedAccessChecks": "OBX_1.UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1.OrderingProvider.FamilyName",
"MSH.MessageControlID": "MSH.MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1.ObservationIdentifier.Text",
"MSH.SendingApplication.NamespaceID": "MSH.SendingApplication.NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1.UniversalServiceIdentifier.Text",
"MSH.ReceivingApplication.NamespaceID": "MSH.ReceivingApplication.NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH.ProcessingID.ProcessingID",
"PID.SSNNumberPatient": "PID.SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1.FillerOrderNumber.EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID.PatientAccountNumber.ID",
"PID.DateOfBirth": "PID.DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber",
"PID.Sex": "PID.Sex",
"MSH.MessageType.MessageType": "MSH.MessageType.MessageType",
"OBX_1.ReferencesRange": "OBX_1.ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1.OrderingProvider.IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1.Units.NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1.Units.Identifier",
"PID.PatientName.GivenName": "PID.PatientName.GivenName",
"OBX_1.ObservationSubID": "OBX_1.ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1.PatientPrimaryCareProviderNameIDNo.GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1.PlacerOrderNumber.NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH.MessageType.TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1.ResultStatus",
"PID.PatientName.FamilyName": "PID.PatientName.FamilyName",
"MSH.EncodingCharacters": "MSH.EncodingCharacters",
"MSH.VersionID": "MSH.VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1.UniversalServiceIdentifier.Identifier",
"OBR_1.ObservationDateTime": "OBR_1.ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1.ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1.ObservationIdentifier.Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1.OrderingProvider.GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1.SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1.ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1.PlacerOrderNumber.EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1.NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1.SetIDOBX",
"MSH.FieldSeparator": "MSH.FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1.Units.Text",
"OBX_1.ValueType": "OBX_1.ValueType",
"PID.PatientIDInternalID.ID": "PID.PatientIDInternalID.ID",
"OBX_1.ObservationValue": "OBX_1.ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1.OrderingProvider.MiddleInitialOrName"
}
Step 3: Profit
Build a Big Data Environment For Our Data
(Build HDFS Directories)
su hdfs hdfs dfs -mkdir -p /hl7/hl7-mdm hdfs dfs -mkdir -p /hl7/hl7-adt hdfs dfs -mkdir -p /hl7/hl7-orm hdfs dfs -mkdir -p /hl7/hl7-oru hdfs dfs -mkdir -p /hl7/json/hl7-mdm hdfs dfs -mkdir -p /hl7/json/hl7-adt hdfs dfs -mkdir -p /hl7/json/hl7-orm hdfs dfs -mkdir -p /hl7/json/hl7-oru hdfs dfs -mkdir -p /hl7/flat/oru hdfs dfs -mkdir -p /patientdata hdfs dfs -chmod -R 777 /hl7 hdfs dfs -chmod -R 777 /patientdata hdfs dfs -ls -R /hl7 hdfs dfs -ls -R /patientdata
(Build Hive DDL)
CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING, drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING) STORED AS ORC LOCATION '/patientdata' CREATE EXTERNAL TABLE IF NOT EXISTS hl7oru (OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING, Identifier:STRING>, ReferencesRange:STRING, Units:STRUCT<NameOfCodingSystem:STRING, Identifier:STRING, Text:STRING>, ObservationSubID:STRING, NatureOfAbnormalTest:STRING, SetIDOBX:STRING, ValueType:STRING, ObservationValue:STRING>, OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING, IDNumber:STRING, GivenName:STRING, MiddleInitialOrName:STRING>, UniversalServiceIdentifier:STRUCT<Text:STRING, Identifier:STRING>, FillerOrderNumber:STRUCT<EntityIdentifier:STRING>, PlacerOrderNumber:STRUCT<NamespaceID:STRING, EntityIdentifier:STRING>, ResultStatus:STRING, ObservationDateTime:STRING, ScheduledDateTime:STRING, SetIDObservationRequest:STRING, ResultsRptStatusChngDateTime:STRING>, MSH STRUCT<MessageControlID:STRING, SendingApplication:STRUCT<NamespaceID:STRING>, ReceivingApplication:STRUCT<NamespaceID:STRING>, ProcessingID:STRUCT<ProcessingID:STRING>, MessageType:STRUCT<MessageType:STRING, TriggerEvent:STRING>, EncodingCharacters:STRING, VersionID:STRING, FieldSeparator:STRING>, PID STRUCT<SSNNumberPatient:STRING, PatientAccountNumber:STRUCT<ID:STRING>, DateOfBirth:STRING, Sex:STRING, PatientName:STRUCT<GivenName:STRING, FamilyName:STRING>, PatientIDInternalID:STRUCT<ID:STRING>>, PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING, FamilyName:STRING, GivenName:STRING, AssigningAuthority:STRING, MiddleInitialOrName:STRING>>) STORED AS ORC LOCATION '/hl7/hl7-oru' CREATE EXTERNAL TABLE IF NOT EXISTS hl7_oru_flat (OBX_1_UserDefinedAccessChecks STRING, OBR_1_OrderingProvider_FamilyName STRING, MSH_MessageControlID STRING, OBX_1_ObservationIdentifier_Text STRING, MSH_SendingApplication_NamespaceID STRING, OBR_1_UniversalServiceIdentifier_Text STRING, MSH_ReceivingApplication_NamespaceID STRING, MSH_ProcessingID_ProcessingID STRING, PID_SSNNumberPatient STRING, OBR_1_FillerOrderNumber_EntityIdentifier STRING, PID_PatientAccountNumber_ID STRING, PID_DateOfBirth STRING, PD1_PatientPrimaryCareProviderNameIDNo_IDNumber STRING, PID_Sex STRING, MSH_MessageType_MessageType STRING, OBX_1_ReferencesRange STRING, OBR_1_OrderingProvider_IDNumber STRING, PD1_PatientPrimaryCareProviderNameIDNo_FamilyName STRING, OBX_1_Units_NameOfCodingSystem STRING, OBX_1_Units_Identifier STRING, PID_PatientName_GivenName STRING, OBX_1_ObservationSubID STRING, PD1_PatientPrimaryCareProviderNameIDNo_GivenName STRING, OBR_1_PlacerOrderNumber_NamespaceID STRING, MSH_MessageType_TriggerEvent STRING, PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority STRING, OBR_1_ResultStatus STRING, PID_PatientName_FamilyName STRING, MSH_EncodingCharacters STRING, MSH_VersionID STRING, OBR_1_UniversalServiceIdentifier_Identifier STRING, OBR_1_ObservationDateTime STRING, OBR_1_ScheduledDateTime STRING, OBX_1_ObservationIdentifier_Identifier STRING, OBR_1_OrderingProvider_GivenName STRING, OBR_1_SetIDObservationRequest STRING, OBR_1_ResultsRptStatusChngDateTime STRING, OBR_1_PlacerOrderNumber_EntityIdentifier STRING, OBX_1_NatureOfAbnormalTest STRING, OBX_1_SetIDOBX STRING, MSH_FieldSeparator STRING, PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName STRING, OBX_1_Units_Text STRING, OBX_1_ValueType STRING, PID_PatientIDInternalID_ID STRING, OBX_1_ObservationValue STRING, OBR_1_OrderingProvider_MiddleInitialOrName STRING) STORED AS ORC LOCATION '/hl7/flat/oru' CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING, drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING) STORED AS ORC LOCATION '/patientdata' CREATE EXTERNAL TABLE IF NOT EXISTS hl7-oru (OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING, Identifier:STRING>, ReferencesRange:STRING, Units:STRUCT<NameOfCodingSystem:STRING, Identifier:STRING, Text:STRING>, ObservationSubID:STRING, NatureOfAbnormalTest:STRING, SetIDOBX:STRING, ValueType:STRING, ObservationValue:STRING>, OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING, IDNumber:STRING, GivenName:STRING, MiddleInitialOrName:STRING>, UniversalServiceIdentifier:STRUCT<Text:STRING, Identifier:STRING>, FillerOrderNumber:STRUCT<EntityIdentifier:STRING>, PlacerOrderNumber:STRUCT<NamespaceID:STRING, EntityIdentifier:STRING>, ResultStatus:STRING, ObservationDateTime:STRING, ScheduledDateTime:STRING, SetIDObservationRequest:STRING, ResultsRptStatusChngDateTime:STRING>, MSH STRUCT<MessageControlID:STRING, SendingApplication:STRUCT<NamespaceID:STRING>, ReceivingApplication:STRUCT<NamespaceID:STRING>, ProcessingID:STRUCT<ProcessingID:STRING>, MessageType:STRUCT<MessageType:STRING, TriggerEvent:STRING>, EncodingCharacters:STRING, VersionID:STRING, FieldSeparator:STRING>, PID STRUCT<SSNNumberPatient:STRING, PatientAccountNumber:STRUCT<ID:STRING>, DateOfBirth:STRING, Sex:STRING, PatientName:STRUCT<GivenName:STRING, FamilyName:STRING>, PatientIDInternalID:STRUCT<ID:STRING>>, PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING, FamilyName:STRING, GivenName:STRING, AssigningAuthority:STRING, MiddleInitialOrName:STRING>>) STORED AS ORC LOCATION '/hl7/hl7oru'
(Build Kafka Topics)
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientboth /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic simple /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm_avro /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt_avro /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm_avro /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru_avro /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientdata
Script to send a File to Kafka
sendmessage.sh /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic hl7-oru < hl7sampledata.txt
HBase DDL
hbase shell create 'patient_observations', 'obs' list
Running a Mosquitto MQTT Broker (OSX)
/usr/local/Cellar/mosquitto/1.4.14_2/sbin/mosquitto --daemon --verbose --port 14162
Removing Unneeded HDFS Files
hdfs dfs -rm -r -f -skipTrash $1
Example Data from Internet
MSH|^~\&|XXXXXX||HealthOrg01||||ORU^R01|Q1111111111111111111|P|2.3|<cr>PID|||000000001||SMITH^JOHN||19700101|M||||||||||999999999999|123456789|<cr>PD1||||1234567890^LAST^FIRST^M^^^^^NPI|<cr>OBR|1|341856649^HNAM_ORDERID|000000000000000000|648088^Basic Metabolic Panel|||20150101000100|||||||||1620^Johnson^John^R||||||20150101000100|||M|||||||||||20150101000100|<cr>OBX|1|NM|GLU^Glucose Lvl|159|mg/dL|65-99^65^99|H|||F|||20150101000100|
Code Section
Map<String, String> attributes = flowFile.getAttributes();
Map<String, String> attributesClean = new HashMap<>();
String tempKey = "";
for (Map.Entry<String, String> entry : attributes.entrySet()){
tempKey = entry.getKey().replaceFirst("[^A-Za-z]", "");<br> tempKey = tempKey.replaceAll("[^A-Za-z0-9_]", "");<br> attributesClean.put(tempKey, entry.getValue());<br> session.removeAttribute(flowFile, entry.getKey());<br>}<br>session.putAllAttributes(flowFile, attributesClean);
Apache NiFi FlowFile
hl7new.xml
Next Sections:
Important Starter Articles
https://community.hortonworks.com/articles/138249/nifi-in-healthcare-ingesting-hl7-data-in-nifi.html
References: