Created on 10-30-2016 04:15 PM - edited 08-17-2019 08:37 AM
INGESTING RDBMS DATA
I previously posted an article on ingesting and converting data (https://community.hortonworks.com/articles/64069/converting-a-large-json-file-into-csv.html). Once you have a SQL database loaded, you will eventually need to store your data in your one unified datalake. This is quite simple with NiFi. If you have a specialized tool that reads from your RDBMS logs and sends them to Kafka or JMS, that would be easy to ingest as well. For those wishing to stay open source, NiFi works great. If you don't have a good increasing key to use, you can add an article one that increases on every insert. Almost every database supports this from MariaDB to Oracle.
ALTER TABLE `useraccount` ADD COLUMN `id` INT AUTO_INCREMENT UNIQUE FIRST;
For mine, I just added an autoincrement id column to be my trigger.
For Apache NiFi, you will need connections to all your sources and sinks. So I need a DB Connection Pool for Apache Phoenix and MySQL (DBCPConnectionPool) as well as Hive (HiveConnectionPool).
Tools Required:
To build a SQL database, I needed a source of interesting and plentiful data.
So I used the excellent free API: https://api.randomuser.me/. It's easy to get this URL to return 5,000 formatted JSON results via the extra parameters: ?results=3&format=pretty.
This API returns JSON in this format that requires some basic transformation (easily done in NiFi).
{"results":[ {"gender":"male", "name":{"title":"monsieur","first":"lohan","last":"marchand"}, "location":{"street":"6684 rue jean-baldassini","city":"auboranges","state":"schwyz","postcode":9591}, "email":"lohan.marchand@example.com", "login":{"username":"biggoose202","password":"esther","salt":"QIU1HBsr","md5":"9e60da6d4490cd6d102e8010ac98f283","sha1":"3de3ea419da1afe5c83518f8b46f157895266d17","sha256":"c6750c1a5bd18cac01c63d9e58a57d75520861733666ddb7ea6e767a7460479b"}, "dob":"1965-01-28 03:56:58", "registered":"2014-07-26 11:06:46", "phone":"(849)-890-5523", "cell":"(395)-127-9369", "id":{"name":"AVS","value":"756.OUVK.GFAB.51"}, "picture":{"large":"https://randomuser.me/api/portraits/men/69.jpg","medium":"https://randomuser.me/api/portraits/med/men/69.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/men/69.jpg"},"nat":"CH"}]
Then I created a MySQL table to populate with JSON data.
drop table useraccount; create table useraccount( gender varchar(200), title varchar(200), first varchar(200), last varchar(200), street varchar(200), city varchar(200), state varchar(200), postcode varchar(200), email varchar(200), username varchar(200), password varchar(200), salt varchar(200), md5 varchar(200), sha1 varchar(200), sha256 varchar(200), dob varchar(200), registered varchar(200), phone varchar(200), cell varchar(200), name varchar(200), value varchar(200), large varchar(200), medium varchar(200), thumbnail varchar(200), nat varchar(200));
I created a Phoenix table ontop of HBase to hold data:
create table useraccount( gender varchar, title varchar, firstname varchar, lastname varchar, street varchar, city varchar, state varchar, postcode varchar, email varchar, username varchar, password varchar, salt varchar, md5 varchar not null primary key, sha1 varchar, sha256 varchar, dob varchar, registered varchar, phone varchar, cell varchar, name varchar, value2 varchar, large varchar, medium varchar, thumbnail varchar, nat varchar);
Step 1: QueryDatabaseTable
Reads from MySQL tables. This processor just needs the MySQL Connection, table name: useraccount and column: id.
With have two forks from this query table.
Fork 1
Step 2: ConvertAvroToJSON
Use Array
You will get arrays of JSON that look like this:
{ "id" : 656949, "gender" : "female", "title" : "madame", "first" : "amandine", "last" : "sanchez", "street" : "8604 place paul-duquaire", "city" : "savigny", "state" : "genève", "postcode" : "5909", "email" : "amandine.sanchez@example.com", "username" : "ticklishmeercat183", "password" : "hillary", "salt" : "Sgq7HHP1", "md5" : "d82d6c3524f3a1118399113e6c43ed31", "sha1" : "23ce2b372f94d39fb949d95e81e82bece1e06a4a", "sha256" : "49d7e92a2815df1d5fd991ce9ebbbcdffee4e0e7fe398bc32f0331894cae1154", "dob" : "1983-05-22 15:16:49", "registered" : "2011-02-06 22:03:37", "phone" : "(518)-683-8709", "cell" : "(816)-306-5232", "name" : "AVS", "value" : "756.IYWK.GJBH.35", "large" : "https://randomuser.me/api/portraits/women/50.jpg", "medium" : "https://randomuser.me/api/portraits/med/women/50.jpg", "thumbnail" : "https://randomuser.me/api/portraits/thumb/women/50.jpg", "nat" : "CH" }
Step 3: SplitJSON
Use: $.* to split all the arrays into individual JSON records.
Step 4: EvaluateJSONPath
You need to pull out each attribute you want and name it, example
cell for $.cell
See the guide to JSONPath with testing tool here.
Step 5: ReplaceText
Here we format the SQL from the attributes we just parsed from JSON:
upsert into useraccount (gender,title,firstname,lastname,street,city,state,postcode,email, username,password,salt,md5,sha1,sha256,dob,registered,phone,cell,name,value2,large,medium,thumbnail,nat) values ('${'gender'}','${'title'}','${'first'}','${'last'}','${'street'}','${'city'}','${'state'}','${'postcode'}', '${'email'}','${'username'}','${'password'}','${'salt'}','${'md5'}','${'sha1'}','${'sha256'}','${'dob'}', '${'registered'}','${'phone'}','${'cell'}','${'name'}','${'value'}','${'large'}','${'medium'}','${'thumbnail'}','${'nat'}' )
Step 6: PutSQL
With an example Batch Size of 100, we connect to our Phoenix DB Connection Pool.
Fork 2
Step 2: UpdateAttribute
We set orc.table to useraccount
Step 3: ConvertAvroToORC
We set our configuration files for Hive: /etc/hive/conf/hive-site.xml, 64MB stripe, and importantly Hive Table Name to ${orc.table}
Step 4: PutHDFS
Set out configuration /etc/hadoop/conf/core-site.xml and a directory you have access to write to for storing the ORC files.
Step 5: ReplaceText
Search Value: (?s:^.*$)
Replacement Value: ${hive.ddl} LOCATION '${absolute.hdfs.path}'
Always replace and entire text.
Step 6: PutHiveQL
You need to connect to your Hive Connection.
You will see the resulting ORC files in your HDFS directory
[root@tspanndev12 demo]# hdfs dfs -ls /orcdata Found 2 items -rw-r--r-- 3 root hdfs 246806 2016-10-29 01:24 /orcdata/2795061363634412.orc -rw-r--r-- 3 root hdfs 246829 2016-10-29 17:25 /orcdata/2852682816977877.orc
After my first few batches of data are ingested, I check them in Apache Zeppelin. Looks good.
The data has also been loaded into Apache Hive.
Created on 10-31-2016 09:47 PM
Thanks for the tutorial. Are there any workflow examples of how to handle the update and delete transactions and how to replicate it into HBase/Phoenix?
Created on 11-01-2016 01:37 PM
For Phoenix, updates and inserts are handled by the Upsert command. If it's the same data it will be updated, if not inserted.
1) For delete, you can call an ExecuteProcess to delete with a command line program.
1b) For delete, you can call ExecuteScript to have Python/Groovy/Javascript delete it.
2) For delete, you can send a message to Spark Streaming to delete it via SparkSQL (using SitetoSite or Kafka)
Follows general SQL rules.
Apache Phoenix Delete Reference
Created on 11-11-2017 04:52 AM
I will be appreciate if you would like to provide the NiFi template of your tutorial, thanks!
Created on 11-16-2017 10:06 PM