Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Master Guru

Ingesting JMS Data into Hive

A company has a lot of data transmitting around the enterprise asynchronously with Apache ActiveMQ, they want to tap into it and convert JSON messages coming from web servers and store into Hadoop.

I am storing the data into Apache Phoenix / HBase via SQL. And also since it's so easy, I am storing the data into ORC files in HDFS for Apache Hive access.

15670-activemqmessages.png

Apache NiFi 1.2 generates DDL for a Hive Table for us

hive.ddl
CREATE EXTERNAL TABLE IF NOT EXISTS meetup 
(id INT, first_name STRING, last_name STRING, email STRING, ip_address STRING, company STRING, macaddress STRING, cell_phone STRING) STORED AS ORC
LOCATION '/meetup'
insert into meetup
(id , first_name , last_name , email , ip_address , company , macaddress , cell_phone)
values
(?,?,?,?,?,?,?,?)

HDF Flow

15706-jmshdfflow.png

ConsumeJMS

Path 1: Store in Hadoop as ORC with a Hive Table

  • InferAvroSchema: get a schema from the JSON data
  • ConvertJSONtoAVRO: build an AVRO file from JSON data
  • MergeContent: build a larger chunk of AVRO data
  • ConvertAvroToORC: build ORC files
  • PutHDFS: land in your Hadoop data lake

Path 2: Upsert into Phoenix (or any SQL database)

  • EvaluateJSONPath: extract fields from the JSON file
  • UpdateAttribute: Set SQL fields
  • ReplaceText: Create SQL statement with ?.
  • PutSQL: Send to Phoenix through Connection Pool (JDBC)

Path 3: Store Raw JSON in Hadoop

  • PutHDFS: Store JSON data on ingest

Path 4: Call Original REST API to Obtain Data and Send to JMS

  • GetHTTP: call a REST API to retrieve JSON arrays
  • SplitJSON: split the JSON file into individual records
  • PutJMS <or> PublishJMS: two ways to push messages to JMS. One uses a JMS controller and another uses a JMS client without a controller. I should benchmark this.

Error Message from Failed Load

15701-jmsslackerrormessages.png

If I get errors on JMS send, I send the UUID of the file to Slack for ChatOps.

Zeppelin Display of SQL Data

15703-jmsdatainzeppelin.png

To check the tables I use Apache Zeppelin to query Phoenix and Hive tables.

Formatting in UpdateAttribute for SQL Arguments

15704-jmsmeeetupsql.png

To set the ? properties for the JDBC prepared statements, we go by #, starting from 1. The type is the JDBC Type, 12 is String; and the value is the value of the FlowFile fields.

Yet another message queue ingested with no fuss.


consumejmsnififlow.png
3,636 Views