Member since
01-14-2019
144
Posts
48
Kudos Received
17
Solutions
01-22-2019
07:40 PM
2 Kudos
The Data Load Process We looked at the performance that these engines have in the last article, now it’s time to look at how the data got loaded in. There are trade-offs here to be aware of when loading data into each of these engines, as they use different mechanisms to accomplish the task. Hive Load There is no immediate need for a Schema-on-Write data load when you are using Hive with your native file format. Your only “load” operation is copying the files from your local file system to HDFS. With schema-on-read functionality, Hive can instantly access data as soon as its’ underlying file is loaded into HDFS. In our case the real data load step was in converting this Schema-on-Read external Hive table data into optimized ORC format, therefore loading it from an external table to a Hive-managed table. This was a relatively short process, coming in much under an hour. HBase Load Contrast that with HBase, where a bulk data load for our sample data set of 200M rows (around 30GB of disk size in CSV format) took 4+ hours using a single-threaded Java application running in the cluster. In this case, HBase went through a process of taking several columns of the CSV data and concatenating them together to come up with a composite key. This, along with the fact that the inserts were causing hot-spotting within the Region Servers, slowed things down. One way to improve this performance would be to pre-split the regions so your inserts aren’t all going to one region to start with. We could have parallelized the data load as well to improve the performance, writing a MapReduce job to distribute the work. Druid Load Let’s also contrast that with the Druid load, which took about 2 hours. Druid bulk loads data using a MapReduce job; this is a fairly efficient way of doing things since it distributes the work across the cluster and is why we’re seeing a lower time relative to HBase. Druid still has to do the work of adding its own indexes on top of the data and optionally pre-aggregate the data to a certain user-defined level, so it doesn’t have a trivial path to getting the data in either. Although we didn’t choose to pre-aggregate this data, this is what allows Druid to save a lot of space; instead of storing the raw data, Druid can roll the data up to a minute-level granularity if you think your users will not query deeper than that. But remember - Once you aggregate the data, you no longer have the raw data. Space Considerations Another interesting way to slice this data is by how much space it takes up in each of the 3 columnar formats. Engine Size on Disk with Replication Hive - ORC w/ Zlib 28.4GB HBase - Snappy compression 89.5GB Druid 31.5GB
Hive and Druid have compressed the data very efficiently considering the initial data size was 90GB with replication, but HBase is sitting right around the raw data size. At this point, we've covered both relative loading times for the three engines as well as data storage space requirements across the three. These may change as you use different compression formats or load different kinds of data into the engines, but this is intended as a general reference to understand relative strengths between the three.
... View more
Labels:
01-03-2019
04:55 PM
5 Kudos
This article series is an expansion into the technical details behind the Big Data Processing Engines blog post: https://hortonworks.com/blog/big-data-processing-engines-which-one-do-i-use-part-1/
Intro to performance analysis
Here, we will be deep diving into the performance side of the three Big Data Processing Engines discussed in the above blog post: Druid, HBase, and Hive. I ran a number of query types to represent the various workloads generally executed on these processing engines, and measured their performance in a consistent manner. I ran a few tests on each of the three engines to showcase their different strengths, and show where some are less effective (but could still fill the gap in a pinch).
We will be executing the following queries:
Simple count of all records in a table, highlighting aggregation capabilities
A select with a where clause, highlighting drill-down and "needle in the haystack" OLTP queries
A join, showcasing ad-hoc analysis across the dataset
Updates, showcasing scenarios in which data is constantly changing and our dataset needs to stay up to date Performance Analysis
An aggregation much like an analyst would do, such as summing data on a column
Performance Analysis
A few notes about setup:
Data size: 200 million rows, 30GB on disk (90GB after replication)
Cluster size: 8 nodes, broken down into 2 masters and 6 workers
Node size: 8 core, 16 GB RAM machines in a virtualized environment
Method of querying: Hive on Tez+LLAP was used to query Hive-managed and Druid-managed data. Phoenix was used to query HBase-managed data. A reasonable configuration was used for each of the engines
Cache was taken out of the picture in order to get accurate estimates for initial query execution. Query execution and re-execution times will be much faster with cache in place for each of these engines
Note that this is not an ideal setup for Hive+HBase+Druid. Dedicated nodes for each of these services would yield better numbers but we decided to keep it approachable so you can reproduce these results on your own small cluster. As I laid out, the three processing engines performed about how you would expect given their relative strengths and weaknesses. Take a look at the table below.
Query
HBase/Phoenix (seconds)
Hive (seconds)
Druid (seconds)
Count(*)
281.44
4.72
0.71
Select with filter
1.35
8.71
0.34
Select with join and filter
365.41
9.16
N/A
Update with filter
1.52
9.75
N/A
Aggregation with filter
353.07
8.66
1.72
Here is that same information in a graph format, with HBase capped at 15s to keep the scale readable.
As expected, HBase outshined the other two when it came to ACID operations, with an average of 1.5 seconds on the updates. Druid is not capable of them and Hive took a bit longer. HBase however is not great at aggregation queries, as seen in the ~6 minute query times. Druid is extremely efficient at everything it does, giving no results above 2 seconds and mostly under 1 second. Lastly, Hive with its latest updates has become a real-time database and serviced all queries thrown at it in under 10 seconds.
Queries
Here are all of the queries that were run, multiple times each, to arrive at the results above.
--queries
select count(*) from transactions;
select count(*) from transactions_hbase;
select count(*) from transactions_druid;
select trxn_amt,rep_id from transactions_partitioned where trxn_date="2018-10-09" and trxn_hour=0 and trxn_time="2018-10-09 00:33:59.07";
select * from transactions_hbase_simple where row_key>="2018-09-11 12:03:05.860" and row_key<"2018-09-11 12:03:05.861";
select * from transactions_druid where `__time`='2018-09-11 12:03:05.85 UTC';
select distinct(b.name) from transactions_partitioned a join rep b on a.rep_id=b.id where rep_id in (1,2,3) and trxn_amt > 180;
select distinct b."name" from "transactions_hbase" a join "rep_hbase" b on a."rep_id"=b."ROW_KEY" where b."ROW_KEY" in (1,2,3) and a."trxn_amt">180.0;
update transactions_partitioned set qty=10 where trxn_date="2018-10-09" and trxn_hour=0 and trxn_time>="2018-10-09 00:33:59.07" and trxn_time<"2018-10-09 00:33:59.073";
insert into table transactions_hbase_simple values ('2018-09-11 12:03:05.860~xxx-xxx~xxx xxx~1~2017-02-09', null,null,null,10,null,null,null);
select sum(trxn_amt),rep_id from transactions_partitioned group by rep_id;
select sum("trxn_amt"),"rep_id" from "transactions_hbase" group by "rep_id";
select sum(trxn_amt),rep_id from transactions_druid group by rep_id;
... View more
Labels:
11-02-2018
10:15 PM
3 Kudos
With the arrival of the Hive3Streaming processors, performance has never been better between NiFi and Hive3. Below, we'll be taking a look at the PutHive3Streaming (separate from the PutHiveStreaming processor) processor and how it can fit into a basic Change Data Capture workflow. We will be performing only inserts on the source Hive table and then carrying over those inserts into a destination Hive table. Updates are also supported through this process with the addition of a 'last_updated_datetime' timestamp, but that is out of scope for this article. This is meant to simulate copying data from one HDP cluster to another - we're using the same cluster as the source and destination here however. Here is the completed flow. Take note that most of this flow is just keeping track of the latest ID that we've seen, so that we can pull that back out of HDFS periodically and query Hive for records that were added beyond that latest ID. These last two processors, SelectHive3QL and PutHive3Streaming, are the ones doing the heavy lifting. They are the ones first getting the data from the source table (based on the pre-determined latest ID) and then inserting that retrieved data into the destination Hive table. Here's the configuration for the SelectHive3QL processor. Note the ${curr_id} variable used in the HiveQL Select Query field. That ensures our query will be dynamic. This is the configuration for the PutHive3Streaming processor. Nothing special here - we've configured the Hive Configuration Resources with the hive-site.xml file and used the Avro format (above) to retrieve data and (below) to write it back out. Here's the state of the table as we first check the destination table: And then insert a record into the source table: Here's the new state of the table: Here is the full NiFi flow: puthive3streaming-cdc-flow.xml In conclusion, we've shown one of the ways you can utilize the new PutHive3Streaming processors to perform quick inserts into Hive and at a broader level perform Change Data Capture.
... View more
Labels:
05-10-2018
02:44 PM
2 Kudos
This article is based on the following Kaggle competition:
https://www.kaggle.com/arjunjoshua/predicting-fraud-in-financial-payment-services It is a Scala-based implementation of the data science exploration written in Python. In addition to training a model, we also have the ability to batch-evaluate a set of data stored in a file through the trained model.
Full configuration, build, and installation instructions can be found at the GitHub repo:
https://github.com/anarasimham/anomaly-detection When you execute the model training, you'll get various lines of output as the data is cleaned and the model is built. To view this output, use the link provided by the Spark job console output. This will look like the following: 18/05/10 14:33:58 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: <HOST_IP>
ApplicationMaster RPC port: 0
queue: default
start time: 1525962717635
final status: SUCCEEDED
tracking URL: http://<SPARK_SERVER_HOST_NAME>:8088/proxy/application_1525369563028_0053/
user: root The last few lines, which show the trained model, look like this: +-----+-------------------------------------------------------------------------------------------------------------+-----------------------------------------+----------+
|label|features |probabilities |prediction|
+-----+-------------------------------------------------------------------------------------------------------------+-----------------------------------------+----------+
|0 |(10,[0,2,5,8,9],[1.0,1950.77,106511.31,-1950.77,104560.54]) |[0.9777384996414185,0.02226151153445244] |0.0 |
|0 |(10,[0,2,5,8,9],[1.0,3942.44,25716.56,-3942.44,21774.120000000003]) |[0.9777384996414185,0.02226151153445244] |0.0 |
|0 |[1.0,0.0,7276.69,93.0,0.0,1463.0,0.0,0.0,-7183.69,-5813.69] |[0.9777384996414185,0.02226151153445244] |0.0 |
|0 |(10,[0,2,5,8,9],[1.0,13614.91,30195.0,-13614.91,16580.09]) |[0.9777384996414185,0.02226151153445244] |0.0 |
|0 |[1.0,0.0,17488.56,14180.0,0.0,182385.22,199873.79,0.0,-3308.5600000000013,-34977.130000000005] |[0.9777384996414185,0.02226151153445244] |0.0 |
|0 |[1.0,0.0,19772.53,0.0,0.0,44486.99,64259.52,0.0,-19772.53,-39545.06] |[0.9777384996414185,0.02226151153445244] |0.0 |
|1 |(10,[0,2,3,7,9],[1.0,20128.0,20128.0,1.0,-20128.0]) |[0.022419333457946777,0.9775806665420532]|1.0 |
|0 |[1.0,0.0,33782.98,0.0,0.0,39134.79,16896.7,0.0,-33782.98,-11544.890000000003] |[0.9777384996414185,0.02226151153445244] |0.0 |
|0 |[1.0,0.0,34115.82,32043.0,0.0,245.56,34361.39,0.0,-2072.8199999999997,-68231.65] |[0.9777384996414185,0.02226151153445244] |0.0 | The original data is split into training data and test data, and the above is the results of running the test data through the model.
The "label" column denotes which label (0=legitimate, 1=fraudulent) the row of data truly falls into
The "features" column is all the data that went into training the model, in vectorized format because that is the way the model understands the data
The "probabilities" column denotes how likely the model thinks each of the labels is (first number being 0, second number being 1), and the "prediction" column is what the model thinks the data falls into. You can add additional print statements and re-run the training to explore When you execute the evaluation portion of this project (instructions in the GitHub repo), you will re-load the model from disk and use test data from a file to see if the model is predicting correctly. Note that it is a bad practice to use test data from the training set (like I have) but for simplicity I have done that. You can go to the Spark UI as above to view the output. And there you have it, a straightforward approach to building a Gradient Boosted Decision Tree Machine Learning model based off of financial data. This approach can be applied not only to Finance but can be used to train a whole variety of use cases in other industries.
... View more
Labels:
02-26-2018
09:32 PM
2 Kudos
If you'd like to generate some data to test out the HDP/HDF platforms at a larger scale, you can use the following GitHub repository: https://github.com/anarasimham/data-gen This will allow you to generate two types of data: Point-of-sale (POS) transactions, containing data such as transaction amount, time stamp, store ID, employee ID, part SKU, and quantity of product. These are transactions you make at a store when you are checking out. For simplicity's sake, this assumes each shopper only buys one product (potentially greater than 1 in quantity) Automotive manufacturing parts production records, simulating the completion of parts in an assembly line. Imagine a warehouse completing different components of a car, such as the hood, front bumper, etc. at different points in time and those parts being tested for heat and vibration thresholds. This data will contain a timestamp of when the part was produced, thresholds for heat & vibration, values as tested for heat & vibration, quanity of produced part, a "short name" identifier for the part, a notes field, and a part location Full details of both schemas are documented in the code in file datagen/datagen.py at the repository above. The application is able to generate data and insert into one of two supported locations: Hive MySQL You will need to configure the table by running one of the scripts in the mysql folder after connecting to the desired server and the desired database as the desired user. Once that is done, you can copy the inserter/mysql.passwd.template file into inserter/mysql.passwd and edit it to provide the correct details. If you'd like to insert into Hive, do the same with the hive.passwd.template file. After editing, you can execute using the following command: python main_manf.py 10 mysql This will insert 10 rows of manufacturing data into the configured MySQL database table. At this point, you're ready to explore your data in greater detail. Possible next steps include using NiFi to pull the data out of MySQL and push into Druid for a dashboard-style data lookup workflow. You can also push into Hive for ad-hoc analyses. These activities are out of scope for this article but are suggestions to think about.
... View more
Labels:
11-02-2017
05:31 PM
3 Kudos
Assumptions: -You have a running HDP cluster with Sqoop installed -Basic knowledge of Sqoop and its parameters Ingesting SAP HANA data with Sqoop To ingest SAP HANA data, all you need is a JDBC driver. To the HDP platform, HANA is just another database - drop the JDBC driver in and you can plug & play. 1. Download the JDBC driver. This driver is not publicly available - it is only available to customers using the SAP HANA product. Find it on their members-only website and download it. 2. Drop the JDBC driver into Sqoop's lib directory. For me, this is located at /usr/hdp/current/sqoop-client/lib 3. Execute a Sqoop import. This command has many variations and many command-line parameters, but the following is one such example. sqoop import --connect "jdbc:sap://<HANA_SERVER>:30015" --driver com.sap.db.jdbc.Driver --username <YOUR_USERNAME> --password <PASSWORD> --table "<TABLE_NAME>" --target-dir=/path/to/hdfs/dir -m 1 -- --schema "<YOUR_SCHEMA_NAME>" The '-m 1' argument will limit Sqoop to using one thread, so don't use this if you want parallelism. You'll need to use the --split-by argument and give it a column name to be able to parallelize the import work. If all goes well, Sqoop should start importing the data into your target directory. Happy Sqooping!
... View more
Labels:
11-02-2017
04:57 PM
2 Kudos
Pre-requisites
Basic knowledge of NiFi, processors, custom properties, and configuring a NiFi flow Salesforce developer account with Bulk API (SOAP API) access Working HDF cluster General Workflow You will be performing the following steps for each API interaction after authenticating into Salesforce and getting your SessionID. The following is for an enterprise account. If you have a partner account, these instructions are still mostly valid but you may need to tweak your endpoints slightly when communicating with SFDC.
Retrieve an XML request body from disk or another accessible location Modify the XML if necessary based on requirements Send your request, receive a response from the SFDC API Parse the XML result and pull out any data that is necessary to continue the interaction Authenticate into Salesforce You will first need to authenticate into Salesforce using your username and password. Create a file named login.txt that has the following information in it: <?xml version="1.0" encoding="utf-8"?>
<env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">
<env:Body>
<n1:login xmlns:n1="urn:partner.soap.sforce.com">
<n1:username>YOUR_USERNAME@DOMAIN.COM.TEST</n1:username>
<n1:password>YOURPASSWORD_YOURTOKEN</n1:password>
</n1:login>
</env:Body>
</env:Envelope>
If you have an HDP cluster, place this file in HDFS. Otherwise you will need to place it in a location where all of your NiFi nodes can access it. During my testing, my cluster only contained one NiFi node so I put this file on that node's local file system. You will need to start your flow with the FetchFile or FetchHDFS processor, specifying the location of the file you created above. Note that if you are using HDFS, you will need to reference your core-site.xml and hdfs-site.xml files. Make sure they are somewhere all your nodes can reach. Then you will need to use the InvokeHTTP processor with the following URL, passing in the file that was just fetched: https://test.salesforce.com/services/Soap/u/41.0. This is the URL for the auth server, but after authentication you will use the URL for your own instance. Configure it as a POST request. This will authenticate with the SFDC API and return XML response data. Out of this data, you will need to parse out the SessionID using the EvaluateXQuery processor. Set the destination to 'flowfile-attribute' so we can store the sessionid as an attribute. Add a custom property called 'sessionid' and assign it the value of '/*:Envelope/*:Body/*:loginResponse/*:result/*:sessionId/text()'. At this point if you run your NiFi flow, you should be authenticated into Salesforce and have sessionID stored as an attribute of the flowfile. We will use this attribute in all future communications with SFDC so it is useful to store it as an attribute and reference it when necessary. Communicating after authentication I executed the following steps after authenticating to access the data I was looking for:
Create a job Create a batch inside of your job Check batch status When the batch is complete, retrieve the result ID Using the result ID, retrieve the result data. I asked SFDC to format this data as a CSV but you can choose one of several formats (specified when you create the job) We will go through one API request/response so that you have the template of how to communicate with the API. 1. Retrieve the XML request body You'll need to create a file for each XML request you plan on sending over to the server. The first one you'll need to have is creating a job. <?xml version="1.0" encoding="UTF-8"?>
<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">
<operation>query</operation>
<object>OBJECT_NAME_HERE</object>
<concurrencyMode>Parallel</concurrencyMode>
<contentType>CSV</contentType>
</jobInfo>
Once you create the file and place in a location all your NiFi nodes can access, you can use the FetchFile/FetchHDFS processors to retrieve it. 2. Modify the XML If you'd like, you can dynamically replace the OBJECT_NAME_HERE text after retrieving the file using the ReplaceText processor. I did not do this as I wanted a quick example but you can. 3. Communicate with the API Using the InvokeHTTP processor, communicate with the API. In this processor you'l need to set the following: -X-SFDC-Session header variable as a custom property as I have done in the following screenshot -Content-Type to 'application/xml; charset=UTF-8'. -Remote URL to 'https://YOUR_INSTANCE.salesforce.com/services/async/41.0/job' -HTTP method to POST 4. Parse the result and incorporate it into your next request Now you will need to store the results of your communication in a flowfile-attribute so you can use it in subsequent communication. For example when creating a job, you'll need to store the JobID that comes back in the response to create a batch under that job. You'll need to use EvaluateXQuery for this with the following custom property: key: jobId value: /*:jobInfo/*:id/text() Stringing multiple requests together Now that you have the basic flow, you'll need to chain this 3/4-processor template together with itself by duplicating it and modifying the parameters. Next step is to create a batch, done with the following endpoint and request body. Endpoint: https://cs78.salesforce.com/services/async/41.0/job/${jobId}/batch request: select id from YOUR_OBJECT_NAME limit 100 From that response, you'll need to store both the batchId and the state in flowfile attributes using EvaluateXQuery. After that, you'll need to query the server using a scheduled InvokeHTTP processor (mine is every 5 seconds) until it returns a 'Completed' status. Then, you can retrieve the resultId and ultimately the CSV result. When you're done, here's what it could look like: screen-shot-2017-11-02-at-125859.png Full NiFi flow here: sfdc.xml
... View more
Labels:
09-22-2017
09:03 PM
3 Kudos
Ingesting HL7 Data in NiFi
Pre-requisites
We will assume you have the following:
Basic understanding of Linux CLI
HDF cluster or sandbox with NiFi, Kafka, and HDFS installed and running
Basic knowledge of connecting components in NiFi and configuring settings
Overview
Here's the finished product:
This NiFi flow accomplishes the following:
Read the data from a Kafka stream
Format the data to comply with HL7 specifications
Extract the HL7 attributes from the formatted content block
Transform the attribute list into a JSON block
Impose the data's natural hierarchy onto the JSON rather than leaving it as a flat-mapped structure
Write the data to HDFS
Here's an example of our input data:
MSH|^~\&|XXXXXX||HealthOrg01||||ORU^R01|Q1111111111111111111|P|2.3|<cr>PID|||000000001||SMITH^JOHN||19700101|M||||||||||999999999999|123456789|<cr>PD1||||1234567890^LAST^FIRST^M^^^^^NPI|<cr>OBR|1|341856649^HNAM_ORDERID|000000000000000000|648088^Basic Metabolic Panel|||20150101000100|||||||||1620^Johnson^John^R||||||20150101000100|||M|||||||||||20150101000100|<cr>OBX|1|NM|GLU^Glucose Lvl|159|mg/dL|65-99^65^99|H|||F|||20150101000100|
And the output data after being written to HDFS:
{
"OBX_1": {
"UserDefinedAccessChecks": "20150101000100",
"ObservationIdentifier": {
"Text": "Glucose Lvl",
"Identifier": "GLU"
},
"ReferencesRange": "H",
"Units": {
"NameOfCodingSystem": "99",
"Identifier": "65-99",
"Text": "65"
},
"ObservationSubID": "159",
"NatureOfAbnormalTest": "F",
"SetIDOBX": "1",
"ValueType": "NM",
"ObservationValue": "mg\/dL"
},
"OBR_1": {
"OrderingProvider": {
"FamilyName": "Johnson",
"IDNumber": "1620",
"GivenName": "John",
"MiddleInitialOrName": "R"
},
"UniversalServiceIdentifier": {
"Text": "Basic Metabolic Panel",
"Identifier": "648088"
},
"FillerOrderNumber": {
"EntityIdentifier": "000000000000000000"
},
"PlacerOrderNumber": {
"NamespaceID": "HNAM_ORDERID",
"EntityIdentifier": "341856649"
},
"ResultStatus": "M",
"ObservationDateTime": "20150101000100",
"ScheduledDateTime": "20150101000100",
"SetIDObservationRequest": "1",
"ResultsRptStatusChngDateTime": "20150101000100"
},
"MSH": {
"MessageControlID": "Q1111111111111111111",
"SendingApplication": {
"NamespaceID": "XXXXXX"
},
"ReceivingApplication": {
"NamespaceID": "HealthOrg01"
},
"ProcessingID": {
"ProcessingID": "P"
},
"MessageType": {
"MessageType": "ORU",
"TriggerEvent": "R01"
},
"EncodingCharacters": "^~\&",
"VersionID": "2.3",
"FieldSeparator": "|"
},
"uuid": "de394ca2-cbaf-4703-9e25-4ea280a8c691",
"PID": {
"SSNNumberPatient": "123456789",
"PatientAccountNumber": {
"ID": "999999999999"
},
"DateOfBirth": "19700101",
"Sex": "M",
"PatientName": {
"GivenName": "JOHN",
"FamilyName": "SMITH"
},
"PatientIDInternalID": {
"ID": "000000001"
}
},
"path": ".\/",
"PD1": {
"PatientPrimaryCareProviderNameIDNo": {
"IDNumber": "1234567890",
"FamilyName": "LAST",
"GivenName": "FIRST",
"AssigningAuthority": "NPI",
"MiddleInitialOrName": "M"
}
},
"filename": "279877223850444",
"kafka": {
"partition": "0",
"offset": "221",
"topic": "test"
}
}
Reading from Kafka
To read from Kafka, we will need to first create a topic. Execute the following to create a topic:
./kafka-topics.sh --create --zookeeper <ZOOKEEPER_HOSTNAME>:2181 --replication-factor 1 --partitions 1 --topic test
You should have a topic named test available to you.
We can now create the ConsumeKafka processor with this configuration:
You can test that this topic is functioning correctly by using the command-line kafka-console-producer and kafka-console-consumer tools in two different command prompts.
Formatting to Spec
The text data we are using had to be compressed into one line since we are using Kafka to read it (Kafka needs each entry as a single line without any carriage returns), so we now need to de-compress our FlowFile into the appropriate number of lines. We will be replacing all '<cr>' strings with a carriage return ('\r', 0x0D). Use the following configuration on the ReplaceText processor to do so:
Extracting HL7 attributes
We will use the built-in ExtractHL7Attributes processor to transform our formatted text into "Attributes" that are native to NiFi and understood by it.
Transforming into JSON
We can now do a simple conversion of those attributes into JSON. Note that our data is destined for the flowfile-content, so we will now overwrite the formatted data that we used to extract our attributes from. Up until now, we had both the content and attributes which are duplicates of the content.
Formatting the JSON
To format the JSON, we will use a Jolt Shift specification. This specification will fully depend on the data you are using and expecting. The sample spec below is based on my sample input data.
{
"OBX_1.UserDefinedAccessChecks": "OBX_1.UserDefinedAccessChecks",
"OBR_1.OrderingProvider.FamilyName": "OBR_1.OrderingProvider.FamilyName",
"MSH.MessageControlID": "MSH.MessageControlID",
"OBX_1.ObservationIdentifier.Text": "OBX_1.ObservationIdentifier.Text",
"MSH.SendingApplication.NamespaceID": "MSH.SendingApplication.NamespaceID",
"OBR_1.UniversalServiceIdentifier.Text": "OBR_1.UniversalServiceIdentifier.Text",
"MSH.ReceivingApplication.NamespaceID": "MSH.ReceivingApplication.NamespaceID",
"MSH.ProcessingID.ProcessingID": "MSH.ProcessingID.ProcessingID",
"uuid": "uuid",
"PID.SSNNumberPatient": "PID.SSNNumberPatient",
"OBR_1.FillerOrderNumber.EntityIdentifier": "OBR_1.FillerOrderNumber.EntityIdentifier",
"path": "path",
"PID.PatientAccountNumber.ID": "PID.PatientAccountNumber.ID",
"PID.DateOfBirth": "PID.DateOfBirth",
"PD1.PatientPrimaryCareProviderNameIDNo.IDNumber": "PD1.PatientPrimaryCareProviderNameIDNo.IDNumber",
"PID.Sex": "PID.Sex",
"MSH.MessageType.MessageType": "MSH.MessageType.MessageType",
"OBX_1.ReferencesRange": "OBX_1.ReferencesRange",
"OBR_1.OrderingProvider.IDNumber": "OBR_1.OrderingProvider.IDNumber",
"PD1.PatientPrimaryCareProviderNameIDNo.FamilyName": "PD1.PatientPrimaryCareProviderNameIDNo.FamilyName",
"OBX_1.Units.NameOfCodingSystem": "OBX_1.Units.NameOfCodingSystem",
"OBX_1.Units.Identifier": "OBX_1.Units.Identifier",
"filename": "filename",
"PID.PatientName.GivenName": "PID.PatientName.GivenName",
"OBX_1.ObservationSubID": "OBX_1.ObservationSubID",
"PD1.PatientPrimaryCareProviderNameIDNo.GivenName": "PD1.PatientPrimaryCareProviderNameIDNo.GivenName",
"OBR_1.PlacerOrderNumber.NamespaceID": "OBR_1.PlacerOrderNumber.NamespaceID",
"MSH.MessageType.TriggerEvent": "MSH.MessageType.TriggerEvent",
"PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority": "PD1.PatientPrimaryCareProviderNameIDNo.AssigningAuthority",
"OBR_1.ResultStatus": "OBR_1.ResultStatus",
"PID.PatientName.FamilyName": "PID.PatientName.FamilyName",
"MSH.EncodingCharacters": "MSH.EncodingCharacters",
"MSH.VersionID": "MSH.VersionID",
"kafka.partition": "kafka.partition",
"OBR_1.UniversalServiceIdentifier.Identifier": "OBR_1.UniversalServiceIdentifier.Identifier",
"OBR_1.ObservationDateTime": "OBR_1.ObservationDateTime",
"OBR_1.ScheduledDateTime": "OBR_1.ScheduledDateTime",
"OBX_1.ObservationIdentifier.Identifier": "OBX_1.ObservationIdentifier.Identifier",
"OBR_1.OrderingProvider.GivenName": "OBR_1.OrderingProvider.GivenName",
"OBR_1.SetIDObservationRequest": "OBR_1.SetIDObservationRequest",
"OBR_1.ResultsRptStatusChngDateTime": "OBR_1.ResultsRptStatusChngDateTime",
"OBR_1.PlacerOrderNumber.EntityIdentifier": "OBR_1.PlacerOrderNumber.EntityIdentifier",
"OBX_1.NatureOfAbnormalTest": "OBX_1.NatureOfAbnormalTest",
"OBX_1.SetIDOBX": "OBX_1.SetIDOBX",
"MSH.FieldSeparator": "MSH.FieldSeparator",
"PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName": "PD1.PatientPrimaryCareProviderNameIDNo.MiddleInitialOrName",
"OBX_1.Units.Text": "OBX_1.Units.Text",
"OBX_1.ValueType": "OBX_1.ValueType",
"kafka.offset": "kafka.offset",
"PID.PatientIDInternalID.ID": "PID.PatientIDInternalID.ID",
"kafka.topic": "kafka.topic",
"OBX_1.ObservationValue": "OBX_1.ObservationValue",
"OBR_1.OrderingProvider.MiddleInitialOrName": "OBR_1.OrderingProvider.MiddleInitialOrName"
}
Here's the associated NiFi configuration:
Writing to HDFS
Finally, we will persist this JSON-transformed data to HDFS for further analysis and storage. Configure your PutHDFS processor as follows
Note that the 'Hadoop Configuration Resources' is a required field for your connection from NiFi to HDFS to work properly, so be sure to fill that in with wherever your core-site.xml and hdfs-site.xml HDFS configuration files are on disk.
Be sure to create the directory /tmp/tst/helloworld in hdfs and change ownership to the nifi user:
hdfs dfs -mkdir -p /tmp/tst/helloworld
hdfs dfs -chown -R nifi /tmp/tst
Putting it all together
Now you should be able to connect all of your processors, start them, and see your data move through.
To send the sample data through our new flow, do the following:
1. SSH into a kafka broker in your cluster and cd to the binaries folder - in my environment, that is located at /usr/hdp/current/kafka-broker/bin.
2. Create a file in your home directory and paste the contents of your sample input data (from the Overview section above). This file should be exactly 1 line, no more.
3. Feed your sample data into the kafka producer with the following command:
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <KAFKA_BROKER_HOSTNAME>:6667 --topic test < hl7data.txt
If all goes well, when you NiFi flow refreshes you should see it go all the way through. At this point you can check HDFS with the following command:
hdfs dfs -ls /tmp/tst/helloworld
You should see a file there. You can read it with the following:
hdfs dfs -cat /tmp/tst/helloworld/<FILENAME>
Thanks for reading!
Full NiFi template: hl7flow-template.xml
... View more