Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
Super Guru

Ingesting JMS Data into Hive

A company has a lot of data transmitting around the enterprise asynchronously with Apache ActiveMQ, they want to tap into it and convert JSON messages coming from web servers and store into Hadoop.

I am storing the data into Apache Phoenix / HBase via SQL. And also since it's so easy, I am storing the data into ORC files in HDFS for Apache Hive access.


Apache NiFi 1.2 generates DDL for a Hive Table for us

(id INT, first_name STRING, last_name STRING, email STRING, ip_address STRING, company STRING, macaddress STRING, cell_phone STRING) STORED AS ORC
LOCATION '/meetup'
insert into meetup
(id , first_name , last_name , email , ip_address , company , macaddress , cell_phone)

HDF Flow



Path 1: Store in Hadoop as ORC with a Hive Table

  • InferAvroSchema: get a schema from the JSON data
  • ConvertJSONtoAVRO: build an AVRO file from JSON data
  • MergeContent: build a larger chunk of AVRO data
  • ConvertAvroToORC: build ORC files
  • PutHDFS: land in your Hadoop data lake

Path 2: Upsert into Phoenix (or any SQL database)

  • EvaluateJSONPath: extract fields from the JSON file
  • UpdateAttribute: Set SQL fields
  • ReplaceText: Create SQL statement with ?.
  • PutSQL: Send to Phoenix through Connection Pool (JDBC)

Path 3: Store Raw JSON in Hadoop

  • PutHDFS: Store JSON data on ingest

Path 4: Call Original REST API to Obtain Data and Send to JMS

  • GetHTTP: call a REST API to retrieve JSON arrays
  • SplitJSON: split the JSON file into individual records
  • PutJMS <or> PublishJMS: two ways to push messages to JMS. One uses a JMS controller and another uses a JMS client without a controller. I should benchmark this.

Error Message from Failed Load


If I get errors on JMS send, I send the UUID of the file to Slack for ChatOps.

Zeppelin Display of SQL Data


To check the tables I use Apache Zeppelin to query Phoenix and Hive tables.

Formatting in UpdateAttribute for SQL Arguments


To set the ? properties for the JDBC prepared statements, we go by #, starting from 1. The type is the JDBC Type, 12 is String; and the value is the value of the FlowFile fields.

Yet another message queue ingested with no fuss.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.
Version history
Last update:
‎08-17-2019 12:52 PM
Updated by:
Top Kudoed Authors