Created on 10-23-2017 07:01 PM - edited 08-17-2019 10:34 AM
This tutorial demonstrates how to use the QueryDatabaseTable and PutKudu processors to read data from a MySQL database and put into Kudu. Thanks to @Cam Mach for his assistance with this article.
Note: The PutKudu processor was introduced in NiFi 1.4.0.
This tutorial was tested using the following environment and components:
In your MySQL instance, choose a database ("nifi_db" in my instance) and create the table "users":
unix> mysql -u root -p unix> Enter password:<enter> mysql> use nifi_db; mysql>CREATE TABLE `users` ( `id` mediumint(9) NOT NULL AUTO_INCREMENT, `title` text, `first_name` text, `last_name` text, `street` text, `city` text, `state` text, `zip` text, `gender` text, `email` text, `username` text, `password` text, `phone` text, `cell` text, `ssn` text, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=103 DEFAULT CHARSET=latin1;
Add data to the "users" table:
mysql>INSERT INTO `users` (`id`, `title`, `first_name`, `last_name`, `street`, `city`, `state`, `zip`, `gender`, `email`, `username`, `password`, `phone`, `cell`, `ssn`) VALUES (1, 'miss', 'marlene', 'shaw', '3450 w belt line rd', 'abilene', 'florida', '31995', 'F', 'marlene.shaw75@example.com', 'goldenpanda70', 'naughty', '(176)-908-6931', '(711)-565-2194', '800-71-1872'), (2, 'ms', 'letitia', 'jordan', '2974 mockingbird hill', 'irvine', 'new jersey', '64361', 'F', 'letitia.jordan64@example.com', 'lazytiger614', 'aaaaa1', '(860)-602-3314', '(724)-685-3472', '548-93-7031'), (3, 'mr', 'todd', 'graham', '5760 spring hill rd', 'garden grove', 'north carolina', '81790', 'M', 'todd.graham39@example.com', 'purplekoala484', 'paintball', '(230)-874-6532', '(186)-529-4912', '362-31-5248'), (4, 'mr', 'seth', 'martinez', '4377 fincher rd', 'chandler', 'south carolina', '73651', 'M', 'seth.martinez82@example.com', 'bigbutterfly149', 'navy', '(122)-782-5822', '(720)-778-8541', '200-80-9087'), (5, 'mr', 'guy', 'mckinney', '4524 hogan st', 'iowa park', 'ohio', '24140', 'M', 'guy.mckinney53@example.com', 'blueduck623', 'office', '(309)-556-7859', '(856)-764-9146', '973-37-9077'), (6, 'ms', 'anna', 'smith', '5047 cackson st', 'rancho cucamonga', 'pennsylvania', '56486', 'F', 'anna.smith74@example.com', 'goldenfish121', 'albion', '(335)-388-7351', '(485)-150-6348', '680-20-6440'), (7, 'mr', 'johnny', 'johnson', '7250 bruce st', 'gresham', 'new mexico', '83973', 'M', 'johnny.johnson73@example.com', 'crazyduck127', 'toast', '(142)-971-3099', '(991)-131-1582', '683-26-4133'), (8, 'mrs', 'robin', 'white', '7882 northaven rd', 'orlando', 'connecticut', '40452', 'F', 'robin.white46@example.com', 'whitetiger371', 'elizabeth', '(311)-659-3812', '(689)-468-6420', '960-70-3399'), (9, 'miss', 'allison', 'williams', '7648 edwards rd', 'edison', 'louisiana', '52040', 'F', 'allison.williams82@example.com', 'beautifulfish354', 'sanfran', '(328)-592-3520', '(550)-172-4018', '164-78-8160');
For my setup, I followed the Apache Kudu Quickstart instructions to easily set up and run a Kudu VM.
To check that your VM is running:
unix> VBoxManage list runningvms "kudu-demo" {b39279b5-3dd6-478a-ac9d-2204bf88e7b9}
To see what IP Kudu is running on:
unix> VBoxManage guestproperty get kudu-demo /VirtualBox/GuestInfo/Net/0/V4/IP Value: 192.168.58.100
The Kudu web client runs on port 8051:
Create a table in Kudu by first connecting to Impala in the virtual machine:
unix> ssh demo@quickstart.cloudera -t impala-shell demo@quickstart.cloudera's password: [quickstart.cloudera:21000] >
( Note: The username and password for the Quickstart VM is "demo".)
Create the Kudu table with the same columns and data types as the MySQL table:
[quickstart.cloudera:21000] > CREATE TABLE users_kudu ( id BIGINT, title STRING, first_name STRING, last_name STRING, street STRING, city STRING, state STRING, zip STRING, gender STRING, email STRING, username STRING, password STRING, cell STRING, ssn STRING, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU;
Follow the following detailed instructions to set up the flow. Alternatively, a template of the flow can be downloaded here: putkudu-querydatabasetable.xml
1. Start NiFi. Two controller services are needed for the flow. Click the "Configuration" button (gear icon) from the Operate palette:
This opens the NiFi Flow Configuration window. Select the "Controller Services" tab. Click the "+" button and add a DBCPConnectionPool controller service:
Configure the controller service as follows (adjusting the property values to match your own MySQL instance and environment):
Next, add an AvroReader controller service:
Apply the default configuration:
Select the "lightning bolt" icon for each controller service to enable them:
2. Return to the NiFi canvas. Add a QueryDatabaseTable processor:
Configure the processor as follows:
where:
3. Add a PutKudu processor and connect the two processors:
Configure the PuKudu processor as follows:
where:
Auto-terminate the Success relationship:
On the canvas, make a "failure" relationship connection from the PutKudu processor to itself:
4. The flow is ready to run.
Start the QueryDatabaseTable processor.
Looking at the contents of the FlowFile in the queue, the data from the MySQL table has been ingested and converted to Avro format:
Start the PutKudu processor to put the data into Kudu:
This can be confirmed via a Select query:
With the flow still running, add another row of data to the Mysql "users" table:
The flow processes this data and the new row appears in Kudu:
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi: