Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Master Guru

INGESTING RDBMS DATA

I previously posted an article on ingesting and converting data (https://community.hortonworks.com/articles/64069/converting-a-large-json-file-into-csv.html). Once you have a SQL database loaded, you will eventually need to store your data in your one unified datalake. This is quite simple with NiFi. If you have a specialized tool that reads from your RDBMS logs and sends them to Kafka or JMS, that would be easy to ingest as well. For those wishing to stay open source, NiFi works great. If you don't have a good increasing key to use, you can add an article one that increases on every insert. Almost every database supports this from MariaDB to Oracle.

ALTER TABLE `useraccount` ADD COLUMN `id` INT AUTO_INCREMENT UNIQUE FIRST;  

For mine, I just added an autoincrement id column to be my trigger.

For Apache NiFi, you will need connections to all your sources and sinks. So I need a DB Connection Pool for Apache Phoenix and MySQL (DBCPConnectionPool) as well as Hive (HiveConnectionPool).

9001-datalake1.png

Tools Required:

  • RDMS (I am using MySQL)
  • HDF 2.0 (NiFi 1.0.0+)
  • HDP 2.4+ (I am using HDP 2.5) with HBase and Phoenix enabled and running, HDFS, YARN and Hive running.
  • Optional: Apache Zeppelin for quick data analysis and validation

8990-datalake-flowmain.png

To build a SQL database, I needed a source of interesting and plentiful data.

So I used the excellent free API: https://api.randomuser.me/. It's easy to get this URL to return 5,000 formatted JSON results via the extra parameters: ?results=3&format=pretty.

This API returns JSON in this format that requires some basic transformation (easily done in NiFi).

{"results":[
{"gender":"male",
"name":{"title":"monsieur","first":"lohan","last":"marchand"},
"location":{"street":"6684 rue jean-baldassini","city":"auboranges","state":"schwyz","postcode":9591},
"email":"lohan.marchand@example.com",
"login":{"username":"biggoose202","password":"esther","salt":"QIU1HBsr","md5":"9e60da6d4490cd6d102e8010ac98f283","sha1":"3de3ea419da1afe5c83518f8b46f157895266d17","sha256":"c6750c1a5bd18cac01c63d9e58a57d75520861733666ddb7ea6e767a7460479b"},
"dob":"1965-01-28 03:56:58",
"registered":"2014-07-26 11:06:46",
"phone":"(849)-890-5523",
"cell":"(395)-127-9369",
"id":{"name":"AVS","value":"756.OUVK.GFAB.51"},
"picture":{"large":"https://randomuser.me/api/portraits/men/69.jpg","medium":"https://randomuser.me/api/portraits/med/men/69.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/men/69.jpg"},"nat":"CH"}]

Then I created a MySQL table to populate with JSON data.

drop table useraccount; create table useraccount(
gender varchar(200),
title varchar(200),
first varchar(200),
last varchar(200),
street varchar(200),
city varchar(200),
state varchar(200),
postcode varchar(200),
email varchar(200),
username varchar(200),
password varchar(200),
salt varchar(200),
md5 varchar(200),
sha1 varchar(200),
sha256 varchar(200),
dob varchar(200),
registered varchar(200),
phone varchar(200),
cell varchar(200),
name varchar(200),
value varchar(200),
large varchar(200),
medium varchar(200),
thumbnail varchar(200),
nat varchar(200));

I created a Phoenix table ontop of HBase to hold data:

create table useraccount(
gender varchar,
title varchar,
firstname varchar,
lastname varchar,
street varchar,
city varchar,
state varchar,
postcode varchar,
email varchar,
username varchar,
password varchar,
salt varchar,
md5 varchar not null primary key,
sha1 varchar,
sha256 varchar,
dob varchar,
registered varchar,
phone varchar,
cell varchar,
name varchar,
value2 varchar,
large varchar,
medium varchar,
thumbnail varchar,
nat varchar);

Step 1: QueryDatabaseTable

Reads from MySQL tables. This processor just needs the MySQL Connection, table name: useraccount and column: id.

With have two forks from this query table.

9004-datalake-queryvalue.png

Fork 1

Step 2: ConvertAvroToJSON

Use Array

You will get arrays of JSON that look like this:

{
  "id" : 656949,
  "gender" : "female",
  "title" : "madame",
  "first" : "amandine",
  "last" : "sanchez",
  "street" : "8604 place paul-duquaire",
  "city" : "savigny",
  "state" : "genève",
  "postcode" : "5909",
  "email" : "amandine.sanchez@example.com",
  "username" : "ticklishmeercat183",
  "password" : "hillary",
  "salt" : "Sgq7HHP1",
  "md5" : "d82d6c3524f3a1118399113e6c43ed31",
  "sha1" : "23ce2b372f94d39fb949d95e81e82bece1e06a4a",
  "sha256" : "49d7e92a2815df1d5fd991ce9ebbbcdffee4e0e7fe398bc32f0331894cae1154",
  "dob" : "1983-05-22 15:16:49",
  "registered" : "2011-02-06 22:03:37",
  "phone" : "(518)-683-8709",
  "cell" : "(816)-306-5232",
  "name" : "AVS",
  "value" : "756.IYWK.GJBH.35",
  "large" : "https://randomuser.me/api/portraits/women/50.jpg",
  "medium" : "https://randomuser.me/api/portraits/med/women/50.jpg",
  "thumbnail" : "https://randomuser.me/api/portraits/thumb/women/50.jpg",
  "nat" : "CH"
}

Step 3: SplitJSON

Use: $.* to split all the arrays into individual JSON records.

Step 4: EvaluateJSONPath

You need to pull out each attribute you want and name it, example

cell for $.cell

See the guide to JSONPath with testing tool here.

Step 5: ReplaceText

Here we format the SQL from the attributes we just parsed from JSON:

upsert into useraccount (gender,title,firstname,lastname,street,city,state,postcode,email,
username,password,salt,md5,sha1,sha256,dob,registered,phone,cell,name,value2,large,medium,thumbnail,nat) 
values ('${'gender'}','${'title'}','${'first'}','${'last'}','${'street'}','${'city'}','${'state'}','${'postcode'}',
'${'email'}','${'username'}','${'password'}','${'salt'}','${'md5'}','${'sha1'}','${'sha256'}','${'dob'}',
'${'registered'}','${'phone'}','${'cell'}','${'name'}','${'value'}','${'large'}','${'medium'}','${'thumbnail'}','${'nat'}' )

9005-datalake-replacetext-event.png

Step 6: PutSQL

With an example Batch Size of 100, we connect to our Phoenix DB Connection Pool.

9006-datalakesql.png

Fork 2

Step 2: UpdateAttribute

We set orc.table to useraccount

Step 3: ConvertAvroToORC

We set our configuration files for Hive: /etc/hive/conf/hive-site.xml, 64MB stripe, and importantly Hive Table Name to ${orc.table}

Step 4: PutHDFS

Set out configuration /etc/hadoop/conf/core-site.xml and a directory you have access to write to for storing the ORC files.

Step 5: ReplaceText

Search Value: (?s:^.*$)

Replacement Value: ${hive.ddl} LOCATION '${absolute.hdfs.path}'

Always replace and entire text.

Step 6: PutHiveQL

You need to connect to your Hive Connection.

You will see the resulting ORC files in your HDFS directory

[root@tspanndev12 demo]# hdfs dfs -ls /orcdata
Found 2 items
-rw-r--r--   3 root hdfs     246806 2016-10-29 01:24 /orcdata/2795061363634412.orc
-rw-r--r--   3 root hdfs     246829 2016-10-29 17:25 /orcdata/2852682816977877.orc

After my first few batches of data are ingested, I check them in Apache Zeppelin. Looks good.

9003-datalakezepp2.png

The data has also been loaded into Apache Hive.

9002-datalakezeppelin.png

15,436 Views
Comments
avatar
Expert Contributor

Thanks for the tutorial. Are there any workflow examples of how to handle the update and delete transactions and how to replicate it into HBase/Phoenix?

avatar
Master Guru

For Phoenix, updates and inserts are handled by the Upsert command. If it's the same data it will be updated, if not inserted.

1) For delete, you can call an ExecuteProcess to delete with a command line program.

1b) For delete, you can call ExecuteScript to have Python/Groovy/Javascript delete it.

2) For delete, you can send a message to Spark Streaming to delete it via SparkSQL (using SitetoSite or Kafka)

Follows general SQL rules.

Apache Phoenix Delete Reference

https://phoenix.apache.org/language/index.html#delete

avatar
Explorer

I will be appreciate if you would like to provide the NiFi template of your tutorial, thanks!