Created on 12-04-2017 04:12 PM - edited 08-17-2019 09:53 AM
In this section of HL7 processing, we look at what to do next with HL7 data now that it has been converted into Big Data friendly AVRO with a schema attached.
We can easily combine that data with our other patientdata that came from MQTT. We do a simple JOIN on SSN and send this combined data to Kafka with a new schema and to Druid for analytics.
We also Route some of the data to HDFS and HBase for storage and another feed to Druid for analytics.
S.A.M. Flow Overview
I have a Simple Hortonworks Streaming Analytics Manager application that ingests two Kafka feeds (one for PatientData and one for HL7 Converted to AVRO by Apache NiFi). I join those two records together on a key and push them to Kafka and to Druid.
Our Environment For Running SAM, Just Click what you need
I added Druid, Apache HBase, HDFS, Apache Hive, Apache Kafka, Apache Storm and Apache Zookeeper. I am using all except Apache Hive.
We can merge together the Kafka feeds sent from Apache NiFi. One feed is of patient data that started as JSON data sent via MQTT from a remote node. The other feed is from another system that sent it via Kafka to Apache NiFi to convert from HL7 to AVRO.
Above in Hortonworks Schema Registry we can see our schemas beautifully displayed with version information.
To setup a Kafka source, you just follow the dropdowns and name it. No manual effort.
This is the other feed. As you can see it grabs the schema associated with the Kafka topic and displays all the fields and types.
Here is the join, we pick the field to join, what type of join LEFT, a type of Windowing and pick the fields we want. Again no manual effort or typing.
Druid is as easy to pick up, just put in the name for your new data source. SAM will create it.
Here is how we do an aggregate on one of the numeric values.
We will send the SSN Number and Observation Value Max to the existing HBase patient observations table with the column family obs. HBase DDL: create 'patient_observations', 'obs'
The results of this flow are we have data added to HBase and to HDFS. We also have 2 Druid data sources populated for Superset analysis.
hbase shell HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 126.96.36.199.6.2.0-205, r5210d2ed88d7e241646beab51e9ac147a973bdcc, Sat Aug 26 09:33:50 UTC 2017 hbase(main):001:0> list TABLE ATLAS_ENTITY_AUDIT_EVENTS atlas_titan patient_observations 3 row(s) in 0.3220 seconds => ["ATLAS_ENTITY_AUDIT_EVENTS", "atlas_titan", "patient_observations"] hbase(main):002:0> scan 'patient_observations' ROW COLUMN+CELL 2ba61288-3c45-4eb4-a0fa-e92e44ef210f column=obs:OBX_1_ObservationValue_MAX, timestamp=1512503796419, value=mg/dL 2ba61288-3c45-4eb4-a0fa-e92e44ef210f column=obs:PID_SSNNumberPatient, timestamp=1512503796419, value=123456789 1 row(s) in 0.2110 seconds hbase(main):003:0> hdfs dfs -cat /hl7sam/7-HDFS-8-999-1512490296249.txt XXXXXX,000000000000000000,999999999999,ORU,M,99,LAST,JOHN,65-99,R,000000001,NM,65,|,1,341856649,1,John,20150101000100,mg/dL,F,20150101000100,20150101000100,2.3,^~\&,M,NPI,R01,HNAM_ORDERID,FIRST,1620,M,SMITH,159,1234567890,19700101,P,HealthOrg01,Basic Metabolic Panel,Glucose Lvl,GLU,H,123456789,Q1111111111111111111,20150101000100,648088,Johnson