- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 10-23-2017 07:01 PM - edited 08-17-2019 10:34 AM
Objective
This tutorial demonstrates how to use the QueryDatabaseTable and PutKudu processors to read data from a MySQL database and put into Kudu. Thanks to @Cam Mach for his assistance with this article.
Note: The PutKudu processor was introduced in NiFi 1.4.0.
Environment
This tutorial was tested using the following environment and components:
- Mac OS X 10.11.6
- Apache NiFi 1.4.0
- Apache Kudu 1.5.0
- MySQL 5.7.13
PutKudu (AvroReader)
Demo Configuration
MySQL Setup
In your MySQL instance, choose a database ("nifi_db" in my instance) and create the table "users":
unix> mysql -u root -p unix> Enter password:<enter> mysql> use nifi_db; mysql>CREATE TABLE `users` ( `id` mediumint(9) NOT NULL AUTO_INCREMENT, `title` text, `first_name` text, `last_name` text, `street` text, `city` text, `state` text, `zip` text, `gender` text, `email` text, `username` text, `password` text, `phone` text, `cell` text, `ssn` text, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=103 DEFAULT CHARSET=latin1;
Add data to the "users" table:
mysql>INSERT INTO `users` (`id`, `title`, `first_name`, `last_name`, `street`, `city`, `state`, `zip`, `gender`, `email`, `username`, `password`, `phone`, `cell`, `ssn`) VALUES (1, 'miss', 'marlene', 'shaw', '3450 w belt line rd', 'abilene', 'florida', '31995', 'F', 'marlene.shaw75@example.com', 'goldenpanda70', 'naughty', '(176)-908-6931', '(711)-565-2194', '800-71-1872'), (2, 'ms', 'letitia', 'jordan', '2974 mockingbird hill', 'irvine', 'new jersey', '64361', 'F', 'letitia.jordan64@example.com', 'lazytiger614', 'aaaaa1', '(860)-602-3314', '(724)-685-3472', '548-93-7031'), (3, 'mr', 'todd', 'graham', '5760 spring hill rd', 'garden grove', 'north carolina', '81790', 'M', 'todd.graham39@example.com', 'purplekoala484', 'paintball', '(230)-874-6532', '(186)-529-4912', '362-31-5248'), (4, 'mr', 'seth', 'martinez', '4377 fincher rd', 'chandler', 'south carolina', '73651', 'M', 'seth.martinez82@example.com', 'bigbutterfly149', 'navy', '(122)-782-5822', '(720)-778-8541', '200-80-9087'), (5, 'mr', 'guy', 'mckinney', '4524 hogan st', 'iowa park', 'ohio', '24140', 'M', 'guy.mckinney53@example.com', 'blueduck623', 'office', '(309)-556-7859', '(856)-764-9146', '973-37-9077'), (6, 'ms', 'anna', 'smith', '5047 cackson st', 'rancho cucamonga', 'pennsylvania', '56486', 'F', 'anna.smith74@example.com', 'goldenfish121', 'albion', '(335)-388-7351', '(485)-150-6348', '680-20-6440'), (7, 'mr', 'johnny', 'johnson', '7250 bruce st', 'gresham', 'new mexico', '83973', 'M', 'johnny.johnson73@example.com', 'crazyduck127', 'toast', '(142)-971-3099', '(991)-131-1582', '683-26-4133'), (8, 'mrs', 'robin', 'white', '7882 northaven rd', 'orlando', 'connecticut', '40452', 'F', 'robin.white46@example.com', 'whitetiger371', 'elizabeth', '(311)-659-3812', '(689)-468-6420', '960-70-3399'), (9, 'miss', 'allison', 'williams', '7648 edwards rd', 'edison', 'louisiana', '52040', 'F', 'allison.williams82@example.com', 'beautifulfish354', 'sanfran', '(328)-592-3520', '(550)-172-4018', '164-78-8160');
Kudu Setup
For my setup, I followed the Apache Kudu Quickstart instructions to easily set up and run a Kudu VM.
To check that your VM is running:
unix> VBoxManage list runningvms "kudu-demo" {b39279b5-3dd6-478a-ac9d-2204bf88e7b9}
To see what IP Kudu is running on:
unix> VBoxManage guestproperty get kudu-demo /VirtualBox/GuestInfo/Net/0/V4/IP Value: 192.168.58.100
The Kudu web client runs on port 8051:
Create a table in Kudu by first connecting to Impala in the virtual machine:
unix> ssh demo@quickstart.cloudera -t impala-shell demo@quickstart.cloudera's password: [quickstart.cloudera:21000] >
( Note: The username and password for the Quickstart VM is "demo".)
Create the Kudu table with the same columns and data types as the MySQL table:
[quickstart.cloudera:21000] > CREATE TABLE users_kudu ( id BIGINT, title STRING, first_name STRING, last_name STRING, street STRING, city STRING, state STRING, zip STRING, gender STRING, email STRING, username STRING, password STRING, cell STRING, ssn STRING, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU;
NiFi Flow Setup
Follow the following detailed instructions to set up the flow. Alternatively, a template of the flow can be downloaded here: putkudu-querydatabasetable.xml
1. Start NiFi. Two controller services are needed for the flow. Click the "Configuration" button (gear icon) from the Operate palette:
This opens the NiFi Flow Configuration window. Select the "Controller Services" tab. Click the "+" button and add a DBCPConnectionPool controller service:
Configure the controller service as follows (adjusting the property values to match your own MySQL instance and environment):
Next, add an AvroReader controller service:
Apply the default configuration:
Select the "lightning bolt" icon for each controller service to enable them:
2. Return to the NiFi canvas. Add a QueryDatabaseTable processor:
Configure the processor as follows:
where:
- The DBCPConnectionPool controller service created earlier is selected for Database Connection Pooling Service
- "users" is entered for the Table Name
- "id" is entered for the Maximum-value Columns
3. Add a PutKudu processor and connect the two processors:
Configure the PuKudu processor as follows:
where:
- "192.168.58.100:7051" is entered for the Kudu Masters IP and port (7051 is the default port)
- "impala::default.users_kudu" is entered for the Table Name
- Skip head line property is set to "false"
- The AvroReader controller service created earlier is selected for Record Reader
Auto-terminate the Success relationship:
On the canvas, make a "failure" relationship connection from the PutKudu processor to itself:
4. The flow is ready to run.
Run Flow
Start the QueryDatabaseTable processor.
Looking at the contents of the FlowFile in the queue, the data from the MySQL table has been ingested and converted to Avro format:
Start the PutKudu processor to put the data into Kudu:
This can be confirmed via a Select query:
With the flow still running, add another row of data to the Mysql "users" table:
The flow processes this data and the new row appears in Kudu:
Helpful Links
Here are some links to check out if you are interested in other flows which utilize the record-oriented processors and controller services in NiFi:
- Convert CSV to JSON, Avro, XML using ConvertRecord
- Installing a local Hortonworks Registry to use with Apache NiFi
- Running SQL on FlowFiles using QueryRecord Processor
- Using PublishKafkaRecord_0_10 (CSVReader/JSONWriter) in Apache NiFi 1.2+
- Using PutElasticsearchHttpRecord (CSVReader)
- Using PartitionRecord (GrokReader/JSONWriter) to Parse and Group Log Files
- Geo Enrich NiFi Provenance Event Data using LookupRecord
- Using PutMongoRecord to put CSV into MongoDB