- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 05-22-2017 11:23 PM - edited 08-17-2019 12:52 PM
Ingesting JMS Data into Hive
A company has a lot of data transmitting around the enterprise asynchronously with Apache ActiveMQ, they want to tap into it and convert JSON messages coming from web servers and store into Hadoop.
I am storing the data into Apache Phoenix / HBase via SQL. And also since it's so easy, I am storing the data into ORC files in HDFS for Apache Hive access.
Apache NiFi 1.2 generates DDL for a Hive Table for us
hive.ddl CREATE EXTERNAL TABLE IF NOT EXISTS meetup (id INT, first_name STRING, last_name STRING, email STRING, ip_address STRING, company STRING, macaddress STRING, cell_phone STRING) STORED AS ORC LOCATION '/meetup' insert into meetup (id , first_name , last_name , email , ip_address , company , macaddress , cell_phone) values (?,?,?,?,?,?,?,?)
HDF Flow
ConsumeJMS
Path 1: Store in Hadoop as ORC with a Hive Table
- InferAvroSchema: get a schema from the JSON data
- ConvertJSONtoAVRO: build an AVRO file from JSON data
- MergeContent: build a larger chunk of AVRO data
- ConvertAvroToORC: build ORC files
- PutHDFS: land in your Hadoop data lake
Path 2: Upsert into Phoenix (or any SQL database)
- EvaluateJSONPath: extract fields from the JSON file
- UpdateAttribute: Set SQL fields
- ReplaceText: Create SQL statement with ?.
- PutSQL: Send to Phoenix through Connection Pool (JDBC)
Path 3: Store Raw JSON in Hadoop
- PutHDFS: Store JSON data on ingest
Path 4: Call Original REST API to Obtain Data and Send to JMS
- GetHTTP: call a REST API to retrieve JSON arrays
- SplitJSON: split the JSON file into individual records
- PutJMS <or> PublishJMS: two ways to push messages to JMS. One uses a JMS controller and another uses a JMS client without a controller. I should benchmark this.
Error Message from Failed Load
If I get errors on JMS send, I send the UUID of the file to Slack for ChatOps.
Zeppelin Display of SQL Data
To check the tables I use Apache Zeppelin to query Phoenix and Hive tables.
Formatting in UpdateAttribute for SQL Arguments
To set the ? properties for the JDBC prepared statements, we go by #, starting from 1. The type is the JDBC Type, 12 is String; and the value is the value of the FlowFile fields.
Yet another message queue ingested with no fuss.