Created on 05-22-201711:23 PM - edited 08-17-201912:52 PM
Ingesting JMS Data into Hive
A company has a lot of data transmitting around the enterprise asynchronously with Apache ActiveMQ, they want to tap into it and convert JSON messages coming from web servers and store into Hadoop.
I am storing the data into Apache Phoenix / HBase via SQL. And also since it's so easy, I am storing the data into ORC files in HDFS for Apache Hive access.
Apache NiFi 1.2 generates DDL for a Hive Table for us
hive.ddl
CREATE EXTERNAL TABLE IF NOT EXISTS meetup
(id INT, first_name STRING, last_name STRING, email STRING, ip_address STRING, company STRING, macaddress STRING, cell_phone STRING) STORED AS ORC
LOCATION '/meetup'
insert into meetup
(id , first_name , last_name , email , ip_address , company , macaddress , cell_phone)
values
(?,?,?,?,?,?,?,?)
HDF Flow
ConsumeJMS
Path 1: Store in Hadoop as ORC with a Hive Table
InferAvroSchema: get a schema from the JSON data
ConvertJSONtoAVRO: build an AVRO file from JSON data
MergeContent: build a larger chunk of AVRO data
ConvertAvroToORC: build ORC files
PutHDFS: land in your Hadoop data lake
Path 2: Upsert into Phoenix (or any SQL database)
EvaluateJSONPath: extract fields from the JSON file
UpdateAttribute: Set SQL fields
ReplaceText: Create SQL statement with ?.
PutSQL: Send to Phoenix through Connection Pool (JDBC)
Path 3: Store Raw JSON in Hadoop
PutHDFS: Store JSON data on ingest
Path 4: Call Original REST API to Obtain Data and Send to JMS
GetHTTP: call a REST API to retrieve JSON arrays
SplitJSON: split the JSON file into individual records
PutJMS <or> PublishJMS: two ways to push messages to JMS. One uses a JMS controller and another uses a JMS client without a controller. I should benchmark this.
Error Message from Failed Load
If I get errors on JMS send, I send the UUID of the file to Slack for ChatOps.
Zeppelin Display of SQL Data
To check the tables I use Apache Zeppelin to query Phoenix and Hive tables.
Formatting in UpdateAttribute for SQL Arguments
To set the ? properties for the JDBC prepared statements, we go by #, starting from 1. The type is the JDBC Type, 12 is String; and the value is the value of the FlowFile fields.