Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (2)
avatar
Master Guru

Step 1: Collect HL7 Health Records

Python to Send JSON Data to MQTT (Data Generated by https://www.mockaroo.com/)

import paho.mqtt.client as mqtt
import json
# MQTT
client = mqtt.Client()
client.connect("localhost", 14162, 60)
row = [{"PID_SSNNumberPatient":823456789,"email":"ahospital0@census.gov","gender":"Male","ip_address":"138.135.180.206","drug_provider":"OrchidPharma Inc","icd9":"94140","icd9_description":"Deep necrosis of underlying tissues [deep third degree] without mention of loss of a body part, face and head, unspecified site","icd9P_proc":"7942","icd9_proc_description":"Closed reduction of separated epiphysis, radius and ulna","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_8) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.801.0 Safari/535.1","drug_used":"Naratriptan"}]
json_string = json.dumps(row)
client.publish("patientdata",payload=json_string,qos=1,retain=False)
client.disconnect()

Step 2: Get to Apache NiFi via FTP, sFTP, File, Apache Kafka, MQTT, REST API, TCP/IP or hundreds more options.

Schemas

{
 "type": "record",
 "name": "hl7oru",
 "fields": [
  {
   "name": "OBX_1_UserDefinedAccessChecks",
   "type": "string",
   "doc": "Type inferred from '\"20150101000100\"'"
  },
  {
   "name": "OBR_1_OrderingProvider_FamilyName",
   "type": "string",
   "doc": "Type inferred from '\"Johnson\"'"
  },
  {
   "name": "MSH_MessageControlID",
   "type": "string",
   "doc": "Type inferred from '\"Q1111111111111111111\"'"
  },
  {
   "name": "OBX_1_ObservationIdentifier_Text",
   "type": "string",
   "doc": "Type inferred from '\"Glucose Lvl\"'"
  },
  {
   "name": "MSH_SendingApplication_NamespaceID",
   "type": "string",
   "doc": "Type inferred from '\"XXXXXX\"'"
  },
  {
   "name": "OBR_1_UniversalServiceIdentifier_Text",
   "type": "string",
   "doc": "Type inferred from '\"Basic Metabolic Panel\"'"
  },
  {
   "name": "MSH_ReceivingApplication_NamespaceID",
   "type": "string",
   "doc": "Type inferred from '\"HealthOrg01\"'"
  },
  {
   "name": "MSH_ProcessingID_ProcessingID",
   "type": "string",
   "doc": "Type inferred from '\"P\"'"
  },
  {
   "name": "PID_SSNNumberPatient",
   "type": "string",
   "doc": "Type inferred from '\"123456789\"'"
  },
  {
   "name": "OBR_1_FillerOrderNumber_EntityIdentifier",
   "type": "string",
   "doc": "Type inferred from '\"000000000000000000\"'"
  },
  {
   "name": "PID_PatientAccountNumber_ID",
   "type": "string",
   "doc": "Type inferred from '\"999999999999\"'"
  },
  {
   "name": "PID_DateOfBirth",
   "type": "string",
   "doc": "Type inferred from '\"19700101\"'"
  },
  {
   "name": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
   "type": "string",
   "doc": "Type inferred from '\"1234567890\"'"
  },
  {
   "name": "PID_Sex",
   "type": "string",
   "doc": "Type inferred from '\"M\"'"
  },
  {
   "name": "MSH_MessageType_MessageType",
   "type": "string",
   "doc": "Type inferred from '\"ORU\"'"
  },
  {
   "name": "OBX_1_ReferencesRange",
   "type": "string",
   "doc": "Type inferred from '\"H\"'"
  },
  {
   "name": "OBR_1_OrderingProvider_IDNumber",
   "type": "string",
   "doc": "Type inferred from '\"1620\"'"
  },
  {
   "name": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
   "type": "string",
   "doc": "Type inferred from '\"LAST\"'"
  },
  {
   "name": "OBX_1_Units_NameOfCodingSystem",
   "type": "string",
   "doc": "Type inferred from '\"99\"'"
  },
  {
   "name": "OBX_1_Units_Identifier",
   "type": "string",
   "doc": "Type inferred from '\"65-99\"'"
  },
  {
   "name": "PID_PatientName_GivenName",
   "type": "string",
   "doc": "Type inferred from '\"JOHN\"'"
  },
  {
   "name": "OBX_1_ObservationSubID",
   "type": "string",
   "doc": "Type inferred from '\"159\"'"
  },
  {
   "name": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
   "type": "string",
   "doc": "Type inferred from '\"FIRST\"'"
  },
  {
   "name": "OBR_1_PlacerOrderNumber_NamespaceID",
   "type": "string",
   "doc": "Type inferred from '\"HNAM_ORDERID\"'"
  },
  {
   "name": "MSH_MessageType_TriggerEvent",
   "type": "string",
   "doc": "Type inferred from '\"R01\"'"
  },
  {
   "name": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
   "type": "string",
   "doc": "Type inferred from '\"NPI\"'"
  },
  {
   "name": "OBR_1_ResultStatus",
   "type": "string",
   "doc": "Type inferred from '\"M\"'"
  },
  {
   "name": "PID_PatientName_FamilyName",
   "type": "string",
   "doc": "Type inferred from '\"SMITH\"'"
  },
  {
   "name": "MSH_EncodingCharacters",
   "type": "string",
   "doc": "Type inferred from '\"^~\\&\"'"
  },
  {
   "name": "MSH_VersionID",
   "type": "string",
   "doc": "Type inferred from '\"2.3\"'"
  },
  {
   "name": "OBR_1_UniversalServiceIdentifier_Identifier",
   "type": "string",
   "doc": "Type inferred from '\"648088\"'"
  },
  {
   "name": "OBR_1_ObservationDateTime",
   "type": "string",
   "doc": "Type inferred from '\"20150101000100\"'"
  },
  {
   "name": "OBR_1_ScheduledDateTime",
   "type": "string",
   "doc": "Type inferred from '\"20150101000100\"'"
  },
  {
   "name": "OBX_1_ObservationIdentifier_Identifier",
   "type": "string",
   "doc": "Type inferred from '\"GLU\"'"
  },
  {
   "name": "OBR_1_OrderingProvider_GivenName",
   "type": "string",
   "doc": "Type inferred from '\"John\"'"
  },
  {
   "name": "OBR_1_SetIDObservationRequest",
   "type": "string",
   "doc": "Type inferred from '\"1\"'"
  },
  {
   "name": "OBR_1_ResultsRptStatusChngDateTime",
   "type": "string",
   "doc": "Type inferred from '\"20150101000100\"'"
  },
  {
   "name": "OBR_1_PlacerOrderNumber_EntityIdentifier",
   "type": "string",
   "doc": "Type inferred from '\"341856649\"'"
  },
  {
   "name": "OBX_1_NatureOfAbnormalTest",
   "type": "string",
   "doc": "Type inferred from '\"F\"'"
  },
  {
   "name": "OBX_1_SetIDOBX",
   "type": "string",
   "doc": "Type inferred from '\"1\"'"
  },
  {
   "name": "MSH_FieldSeparator",
   "type": "string",
   "doc": "Type inferred from '\"|\"'"
  },
  {
   "name": "PD1",
   "type": {
    "type": "record",
    "name": "PD1",
    "fields": [
     {
      "name": "PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
      "type": "string",
      "doc": "Type inferred from '\"M\"'"
     }
    ]
   },
   "doc": "Type inferred from '{\"PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName\":\"M\"}'"
  },
  {
   "name": "OBX_1_Units_Text",
   "type": "string",
   "doc": "Type inferred from '\"65\"'"
  },
  {
   "name": "OBX_1_ValueType",
   "type": "string",
   "doc": "Type inferred from '\"NM\"'"
  },
  {
   "name": "PID_PatientIDInternalID_ID",
   "type": "string",
   "doc": "Type inferred from '\"000000001\"'"
  },
  {
   "name": "OBX_1_ObservationValue",
   "type": "string",
   "doc": "Type inferred from '\"mg/dL\"'"
  },
  {
   "name": "OBR_1_OrderingProvider_MiddleInitialOrName",
   "type": "string",
   "doc": "Type inferred from '\"R\"'"
  }
 ]
}

patientbothjson.txt

patientdatajs.txt

simple.txt

Jolt Scripts

{
"OBX_1.UserDefinedAccessChecks": "OBX_1_UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1_OrderingProvider_FamilyName",
"MSH.MessageControlID": "MSH_MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1_ObservationIdentifier_Text",
"MSH.SendingApplication.NamespaceID": "MSH_SendingApplication_NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1_UniversalServiceIdentifier_Text",
"MSH.ReceivingApplication.NamespaceID": "MSH_ReceivingApplication_NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH_ProcessingID_ProcessingID",
"PID.SSNNumberPatient": "PID_SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1_FillerOrderNumber_EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID_PatientAccountNumber_ID",
"PID.DateOfBirth": "PID_DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1_PatientPrimaryCareProviderNameIDNo_IDNumber",
"PID.Sex": "PID_Sex",
"MSH.MessageType.MessageType": "MSH_MessageType_MessageType",
"OBX_1.ReferencesRange": "OBX_1_ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1_OrderingProvider_IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1_PatientPrimaryCareProviderNameIDNo_FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1_Units_NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1_Units_Identifier",
"PID.PatientName.GivenName": "PID_PatientName_GivenName",
"OBX_1.ObservationSubID": "OBX_1_ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1_PatientPrimaryCareProviderNameIDNo_GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1_PlacerOrderNumber_NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH_MessageType_TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1_ResultStatus",
"PID.PatientName.FamilyName": "PID_PatientName_FamilyName",
"MSH.EncodingCharacters": "MSH_EncodingCharacters",
"MSH.VersionID": "MSH_VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1_UniversalServiceIdentifier_Identifier",
"OBR_1.ObservationDateTime": "OBR_1_ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1_ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1_ObservationIdentifier_Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1_OrderingProvider_GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1_SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1_ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1_PlacerOrderNumber_EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1_NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1_SetIDOBX",
"MSH.FieldSeparator": "MSH_FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1_Units_Text",
"OBX_1.ValueType": "OBX_1_ValueType",
"PID.PatientIDInternalID.ID": "PID_PatientIDInternalID_ID",
"OBX_1.ObservationValue": "OBX_1_ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1_OrderingProvider_MiddleInitialOrName"
}
{
"OBX_1.UserDefinedAccessChecks": "OBX_1.UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1.OrderingProvider.FamilyName",
"MSH.MessageControlID": "MSH.MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1.ObservationIdentifier.Text",
"MSH.SendingApplication.NamespaceID": "MSH.SendingApplication.NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1.UniversalServiceIdentifier.Text",
"MSH.ReceivingApplication.NamespaceID": "MSH.ReceivingApplication.NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH.ProcessingID.ProcessingID",
"PID.SSNNumberPatient": "PID.SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1.FillerOrderNumber.EntityIdentifier",
"PID.PatientAccountNumber.ID": "PID.PatientAccountNumber.ID",
"PID.DateOfBirth": "PID.DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber",
"PID.Sex": "PID.Sex",
"MSH.MessageType.MessageType": "MSH.MessageType.MessageType",
"OBX_1.ReferencesRange": "OBX_1.ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1.OrderingProvider.IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1.Units.NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1.Units.Identifier",
"PID.PatientName.GivenName": "PID.PatientName.GivenName",
"OBX_1.ObservationSubID": "OBX_1.ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1.PatientPrimaryCareProviderNameIDNo.GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1.PlacerOrderNumber.NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH.MessageType.TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1.ResultStatus",
"PID.PatientName.FamilyName": "PID.PatientName.FamilyName",
"MSH.EncodingCharacters": "MSH.EncodingCharacters",
"MSH.VersionID": "MSH.VersionID",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1.UniversalServiceIdentifier.Identifier",
"OBR_1.ObservationDateTime": "OBR_1.ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1.ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1.ObservationIdentifier.Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1.OrderingProvider.GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1.SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1.ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1.PlacerOrderNumber.EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1.NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1.SetIDOBX",
"MSH.FieldSeparator": "MSH.FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1.Units.Text",
"OBX_1.ValueType": "OBX_1.ValueType",
"PID.PatientIDInternalID.ID": "PID.PatientIDInternalID.ID",
"OBX_1.ObservationValue": "OBX_1.ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1.OrderingProvider.MiddleInitialOrName"
}

42885-kakfatohl7flow.png

42886-hl7schema.png

42887-consumehl7flow.png

42888-jolt.png

42889-processmqttjson.png

42890-mqtthl7.png


Step 3: Profit

Build a Big Data Environment For Our Data


(Build HDFS Directories)

su hdfs 
hdfs dfs -mkdir -p /hl7/hl7-mdm
hdfs dfs -mkdir -p /hl7/hl7-adt
hdfs dfs -mkdir -p /hl7/hl7-orm
hdfs dfs -mkdir -p /hl7/hl7-oru
hdfs dfs -mkdir -p /hl7/json/hl7-mdm
hdfs dfs -mkdir -p /hl7/json/hl7-adt
hdfs dfs -mkdir -p /hl7/json/hl7-orm
hdfs dfs -mkdir -p /hl7/json/hl7-oru
hdfs dfs -mkdir -p /hl7/flat/oru
hdfs dfs -mkdir -p /patientdata
hdfs dfs -chmod -R 777 /hl7
hdfs dfs -chmod -R 777 /patientdata
hdfs dfs -ls -R /hl7
hdfs dfs -ls -R /patientdata


(Build Hive DDL)

CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING, 
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING) 
STORED AS ORC LOCATION '/patientdata'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7oru 
(OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING,
 Identifier:STRING>,
 ReferencesRange:STRING,
 Units:STRUCT<NameOfCodingSystem:STRING,
 Identifier:STRING,
 Text:STRING>,
 ObservationSubID:STRING,
 NatureOfAbnormalTest:STRING,
 SetIDOBX:STRING,
 ValueType:STRING,
 ObservationValue:STRING>,
 OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING,
 IDNumber:STRING,
 GivenName:STRING,
 MiddleInitialOrName:STRING>,
 UniversalServiceIdentifier:STRUCT<Text:STRING,
 Identifier:STRING>,
 FillerOrderNumber:STRUCT<EntityIdentifier:STRING>,
 PlacerOrderNumber:STRUCT<NamespaceID:STRING,
 EntityIdentifier:STRING>,
 ResultStatus:STRING,
 ObservationDateTime:STRING,
 ScheduledDateTime:STRING,
 SetIDObservationRequest:STRING,
 ResultsRptStatusChngDateTime:STRING>,
 MSH STRUCT<MessageControlID:STRING,
 SendingApplication:STRUCT<NamespaceID:STRING>,
 ReceivingApplication:STRUCT<NamespaceID:STRING>,
 ProcessingID:STRUCT<ProcessingID:STRING>,
 MessageType:STRUCT<MessageType:STRING,
 TriggerEvent:STRING>,
 EncodingCharacters:STRING,
 VersionID:STRING,
 FieldSeparator:STRING>,
 PID STRUCT<SSNNumberPatient:STRING,
 PatientAccountNumber:STRUCT<ID:STRING>,
 DateOfBirth:STRING,
 Sex:STRING,
 PatientName:STRUCT<GivenName:STRING,
 FamilyName:STRING>,
 PatientIDInternalID:STRUCT<ID:STRING>>,
 PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING,
 FamilyName:STRING,
 GivenName:STRING,
 AssigningAuthority:STRING,
 MiddleInitialOrName:STRING>>) 
 STORED AS ORC
LOCATION '/hl7/hl7-oru'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7_oru_flat 
(OBX_1_UserDefinedAccessChecks STRING,
 OBR_1_OrderingProvider_FamilyName STRING,
 MSH_MessageControlID STRING,
 OBX_1_ObservationIdentifier_Text STRING,
 MSH_SendingApplication_NamespaceID STRING,
 OBR_1_UniversalServiceIdentifier_Text STRING,
 MSH_ReceivingApplication_NamespaceID STRING,
 MSH_ProcessingID_ProcessingID STRING,
 PID_SSNNumberPatient STRING,
 OBR_1_FillerOrderNumber_EntityIdentifier STRING,
 PID_PatientAccountNumber_ID STRING,
 PID_DateOfBirth STRING,
 PD1_PatientPrimaryCareProviderNameIDNo_IDNumber STRING,
 PID_Sex STRING,
 MSH_MessageType_MessageType STRING,
 OBX_1_ReferencesRange STRING,
 OBR_1_OrderingProvider_IDNumber STRING,
 PD1_PatientPrimaryCareProviderNameIDNo_FamilyName STRING,
 OBX_1_Units_NameOfCodingSystem STRING,
 OBX_1_Units_Identifier STRING,
 PID_PatientName_GivenName STRING,
 OBX_1_ObservationSubID STRING,
 PD1_PatientPrimaryCareProviderNameIDNo_GivenName STRING,
 OBR_1_PlacerOrderNumber_NamespaceID STRING,
 MSH_MessageType_TriggerEvent STRING,
 PD1_PatientPrimaryCareProviderNameIDNo_AssigningAuthority STRING,
 OBR_1_ResultStatus STRING,
 PID_PatientName_FamilyName STRING,
 MSH_EncodingCharacters STRING,
 MSH_VersionID STRING,
 OBR_1_UniversalServiceIdentifier_Identifier STRING,
 OBR_1_ObservationDateTime STRING,
 OBR_1_ScheduledDateTime STRING,
 OBX_1_ObservationIdentifier_Identifier STRING,
 OBR_1_OrderingProvider_GivenName STRING,
 OBR_1_SetIDObservationRequest STRING,
 OBR_1_ResultsRptStatusChngDateTime STRING,
 OBR_1_PlacerOrderNumber_EntityIdentifier STRING,
 OBX_1_NatureOfAbnormalTest STRING,
 OBX_1_SetIDOBX STRING,
 MSH_FieldSeparator STRING,
 PD1_PatientPrimaryCareProviderNameIDNo_MiddleInitialOrName STRING,
 OBX_1_Units_Text STRING,
 OBX_1_ValueType STRING,
 PID_PatientIDInternalID_ID STRING,
 OBX_1_ObservationValue STRING,
 OBR_1_OrderingProvider_MiddleInitialOrName STRING) 
STORED AS ORC
LOCATION '/hl7/flat/oru'
CREATE EXTERNAL TABLE IF NOT EXISTS patientdata (PID_SSNNumberPatient INT, email STRING, gender STRING, ip_address STRING, 
drug_provider STRING, icd9 STRING, icd9_description STRING, icd9P_proc STRING, icd9_proc_description STRING, user_agent STRING, drug_used STRING) 
STORED AS ORC LOCATION '/patientdata'
CREATE EXTERNAL TABLE IF NOT EXISTS hl7-oru (OBX_1 STRUCT<UserDefinedAccessChecks:STRING, ObservationIdentifier:STRUCT<Text:STRING, Identifier:STRING>, 
ReferencesRange:STRING, Units:STRUCT<NameOfCodingSystem:STRING, Identifier:STRING, Text:STRING>, 
ObservationSubID:STRING, NatureOfAbnormalTest:STRING, SetIDOBX:STRING, ValueType:STRING, ObservationValue:STRING>, 
OBR_1 STRUCT<OrderingProvider:STRUCT<FamilyName:STRING, IDNumber:STRING, GivenName:STRING, MiddleInitialOrName:STRING>, 
UniversalServiceIdentifier:STRUCT<Text:STRING, Identifier:STRING>, FillerOrderNumber:STRUCT<EntityIdentifier:STRING>, 
PlacerOrderNumber:STRUCT<NamespaceID:STRING, EntityIdentifier:STRING>, ResultStatus:STRING, ObservationDateTime:STRING, 
ScheduledDateTime:STRING, SetIDObservationRequest:STRING, ResultsRptStatusChngDateTime:STRING>, MSH STRUCT<MessageControlID:STRING, 
SendingApplication:STRUCT<NamespaceID:STRING>, ReceivingApplication:STRUCT<NamespaceID:STRING>, ProcessingID:STRUCT<ProcessingID:STRING>, 
MessageType:STRUCT<MessageType:STRING, TriggerEvent:STRING>, EncodingCharacters:STRING, VersionID:STRING, FieldSeparator:STRING>, 
PID STRUCT<SSNNumberPatient:STRING, PatientAccountNumber:STRUCT<ID:STRING>, DateOfBirth:STRING, Sex:STRING, PatientName:STRUCT<GivenName:STRING,
 FamilyName:STRING>, PatientIDInternalID:STRUCT<ID:STRING>>, PD1 STRUCT<PatientPrimaryCareProviderNameIDNo:STRUCT<IDNumber:STRING, FamilyName:STRING, 
 GivenName:STRING, AssigningAuthority:STRING, MiddleInitialOrName:STRING>>) STORED AS ORC
LOCATION '/hl7/hl7oru'

(Build Kafka Topics)

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientboth
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic simple
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-mdm_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-adt_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-orm_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hl7-oru_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic patientdata

Script to send a File to Kafka

sendmessage.sh /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic hl7-oru < hl7sampledata.txt

HBase DDL

hbase shell
create 'patient_observations', 'obs'
list

Running a Mosquitto MQTT Broker (OSX)

/usr/local/Cellar/mosquitto/1.4.14_2/sbin/mosquitto  --daemon --verbose --port 14162

Removing Unneeded HDFS Files

hdfs dfs -rm -r -f -skipTrash $1

Example Data from Internet

MSH|^~\&|XXXXXX||HealthOrg01||||ORU^R01|Q1111111111111111111|P|2.3|<cr>PID|||000000001||SMITH^JOHN||19700101|M||||||||||999999999999|123456789|<cr>PD1||||1234567890^LAST^FIRST^M^^^^^NPI|<cr>OBR|1|341856649^HNAM_ORDERID|000000000000000000|648088^Basic Metabolic Panel|||20150101000100|||||||||1620^Johnson^John^R||||||20150101000100|||M|||||||||||20150101000100|<cr>OBX|1|NM|GLU^Glucose Lvl|159|mg/dL|65-99^65^99|H|||F|||20150101000100|

See: Handling HL7 Records and Storing in Apache Hive for SQL Queries Part 2: - C-CDA Data and Custom Avro...

42874-junit1.png

Code Section

Map<String, String> attributes = flowFile.getAttributes();
Map<String, String> attributesClean = new HashMap<>();
String tempKey = "";
for (Map.Entry<String, String> entry : attributes.entrySet()){
  tempKey = entry.getKey().replaceFirst("[^A-Za-z]", "");<br>  tempKey = tempKey.replaceAll("[^A-Za-z0-9_]", "");<br>  attributesClean.put(tempKey, entry.getValue());<br>  session.removeAttribute(flowFile, entry.getKey());<br>}<br>session.putAllAttributes(flowFile, attributesClean);

Apache NiFi FlowFile
hl7new.xml

Next Sections:

https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo...

https://community.hortonworks.com/content/kbentry/150026/hl7-processing-part-3-apache-zeppelin-sql-b...

https://community.hortonworks.com/articles/149982/hl7-ingest-part-4-streaming-analytics-manager-and....

Important Starter Articles

https://community.hortonworks.com/articles/138249/nifi-in-healthcare-ingesting-hl7-data-in-nifi.html

https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi....

https://community.hortonworks.com/articles/149891/handling-hl7-records-and-storing-in-apache-hive-fo...

References:

3,168 Views