Created on 03-08-2017 06:19 PM - edited 08-17-2019 01:55 PM
Many technologists ask "What is the easiest way to import data into Hive?" It's true, you can write a Sqoop job (making sure YARN containers are tuned properly in regards to # of mappers, the size of the mappers, and the sort allocated memory). And you'll deal with a two step process of moving the RDBMS tables into plain text and then creating an ORC table with an INSERT INTO statement.... I don't call this easy.
What if there were a tool where you could simply drag-and-drop processes (processors) on a canvas and connect them together creating a workflow? There is a tool for this called NiFi! Before we go any further, I'll assume you know and understand NiFi - what is a processor, scheduling a processor, what is a connection (relationship), what are different relationship types, etc:.
Small-Medium RDBMS tables
This article will cover small to medium sized RDBMS tables. This means I can run a single
select * from myrdbmstable;
without returning millions of rows (where we're taxing the RDBMS), I'll write a second article on how-to use the processor GenerateTableFetch that generates select queries that fetch "pages" of rows from a table. Using pages of rows from a table will distribute the select queries amongst multiple NiFi nodes, similar to what Sqoop does with the #mappers where each mapper pages through results.
Prerequisites for Hive
In order to stream data into Hive, we'll utilize Hive's transactional capabilities which require the following:
1) Enable ACID in Hive
2) Use bucketing on your Hive table
3) Store the Hive table as ORC
4) Set the following property on your Hive table TBLPROPERTIES ("transactional"="true")
1-4 will be followed below in Step 2
As described above, choose a small-medium sized RDBMS table (we'll tackle large database tables in another article).
a) Add the QueryDatabaseTable processor to your canvas
You'll see you we need to choose a Database Connection Pooling Service (which we'll define below in step 2), add a table name and finally create a successful relationship
b) Create an RDBMS Connection Pooling Service - Right click on the processor and go to Configure
c) Under the "Properties" tab, click the property "Database Connection Pooling Service" and click "Create new service..." on the drop-down
d) Choose the "DBCPConnectionPool" and click on Create
e) Click on the arrow to go to configure the DBCPConnectionPool
f) Click on the pencil to Edit the DBCPConnectionPool
g) Change the name in Settings - to something that is easily identifiable for your database connection pool
h) Finally go to Properties and define your connection. I'm creating a connection for MySQL, but the general rule of thumb is if a JDBC driver exists, you'll be able to connect (for example I wrote this article to connect to Teradata from within NiFi)
Database Connection URL: jdbc:mysql://localhost:3306/hortonworks Database Driver Class Name: com.mysql.jdbc.Driver Database Driver Location(s): /Users/rcicak/Desktop/mysql-connector-java-5.0.8-bin.jar
Click Apply - and you've created your RDBMS connection pool
i) Enable your connection pool
j) Define a table name - in our case we'll choose "people3", where the table people3 is described in MySQL as the following:
*Note: Don't forget the Maximum-value columns, QueryDatabaseTable will keep track of the last row that was fetched
mysql> DESCRIBE people3;
+-------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+--------------+------+-----+---------+-------+ | id | datetime | NO | PRI | NULL | | | name | varchar(255) | YES | | NULL | | | age | int(11) | YES | | NULL | | +-------+--------------+------+-----+---------+-------+ 3 rows in set (0.00 sec)
k) Change the schedule of the QueryDatabaseProcessor to run every 10 minutes (or whatever your requirement calls for), therefore the select statement is not being execute multiple times per second
a) Add the PutHiveStreamingProcessor to your canvas
b) Create a successful relationship between QueryDatabaseTable (in step 1) and PutHiveStreaming (step 2)
c) The caution went away on QueryDatabaseTable after we added the relationship, now we'll need to resolve the caution on the PutHiveStreaming processor
d) Right click on the PutHiveStreaming processor and choose Configure, going to the "Properties" tab - Adding the following Hive configurations:
Hive Metastore URI: thrift://cregion-hdpmaster2.field.hortonworks.com:9083 Database Name: default Table Name: people3
*Adjust the "Transactions per Batch" accordingly to your SLA requirements
e) Verify your Hive table "people3" exists - and as explained in the Hive prerequisites above, you'll need ACID enabled, store the table as ORC, table properties set to transactional = true and also bucketing in order for PutHiveStreaming to work
create table people3 (id timestamp, name varchar(255), age int) CLUSTERED BY(id) INTO 3 BUCKETS STORED AS ORC tblproperties("transactional"="true");
You've successfully imported your RDBMS table into Hive with two processors - wasn't that easy?
Created on 03-09-2017 08:03 PM
Thanks @Ryan Cicak; I'm getting an error with PutHiveStreaming processor - "Unable to instantiate org.apache.hive.hcatalog.common.HiveClientCache$CacheableHiveMetaStoreClient"; I posted the question already in HCC, but no takers yet.
Created on 06-07-2018 12:42 AM
Nice one. Is there any variation for Large tables as you have mentioned specifically about large tables. Could you help in providing link about that article.
Created on 07-10-2018 09:18 AM
Hello,
Thank you for the post.
The data flow is working fine. however, I am getting duplicate records into hive table. Am I missing something here?
I would like to import entire table records only one time and followed by incremental records only.
Created on 10-04-2021 09:39 PM
Hello @RyanCicak
Im trying. this flow but it doesn't work for me. This is my flow
What should I do?
thanks