Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Master Guru

From Database MetaData to Data Ingest in a Few Quick Steps

A company wants to know when new tables are added to a JDBC source (say a RDBMS). Using the ListDatabaseTables processor we can get a list of TABLEs (also views, system tables and other database objects), but for our purposes we want tables with data. I have used the ngdbc.jar from SAP HANA to connect and query tables with ease.

For today's example I am connecting to MySQL as I have a MySQL database available for use and modification.

17595-ingestalltablesflow.png

Pre-Step

mysql -u root -p test < person.sql
CREATE USER 'nifi'@'%' IDENTIFIED BY 'reallylongDifficultPassDF&^D&F^Dwird';
GRANT ALL PRIVILEGES ON *.* TO 'nifi'@'%' WITH GRANT OPTION;
COMMIT;
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| MOCK_DATA      |
| mock2          |
| personReg      |
| visitor        |
+----------------+
4 rows in set (0.00 sec)

I created a user to use for my JDBC Connection Pool in NiFi to read the metadata and data.

These table names will show up in NiFi in the db.table.name attribute.

Step 1: ListDatabaseTables: Let's get a list of all the tables in MySQL for the database we have chosen.

17600-listdatabasetables.png

After it starts running you can check it's state and see what tables were ingested and the most recent timestamp (Value).

17597-listdatabasetablestate.png

17607-listtablesprovenance.png17596-tableresults.png

We will get back what catalog we read from, how many tables and each tablename and it's fullname.

HDF NiFi supports generic JDBC drivers and specific coding for Oracle, MS SQL Server 2008 and MS SQL Server 2012+.

17598-databasetypes.png

Step 2: GenerateTableFetch using the table name returned from the list returned by the database control.

17601-generatetablefetch.png

17606-listdbprove.png

Step 3: We use extract text to get the SQL statement created by generate table fetch.

We add a new attribute, sql, with value, ^(.*).

Step 4: ExecuteSQL with that $sql attribute.

17602-executesql.png

Step 5: Convert AVRO files produced by ExecuteSQL into performant Apache ORC Files

17603-orc.png

Step 6: PutHDFS to store these ORC files in Hadoop

17604-puthdfssetttings.png

I added the table name as part of the directory structure so a new directory is created for each transferred table. Now we have dynamic HDFS directory structure creation.

Step 7: Replace Text to build a SQL statement that will generate an external Hive table on our new ORC directory

17605-buildhivetable.png

Step 8: PutHiveQL to execute the statement that was just dynamically created for our new table.

We no have instantly queryable Hadoop data available to Hive, SparkSQL, Zeppelin, ODBC, JDBC and a ton of BI tools and interfaces.

17599-puthiveql.png

Step 9: Finally we can look at the data that we have ingested from MySQL into new Hive tables.

17608-ingestalltableszeppelin.png

That was easy, the best part is as new tables are added to MySQL, they will be autoingested into HDFS and Hive tables built.

Future Updates:

Use Hive Merge to update changed data. We can also ingest to Phoenix/HBase and use the upsert DML.

Test with other databases. Tested with MySQL.

Quick Tip (HANA):

In NiFi, refer to tables with their full name in quotes: "SAP_"."ASDSAD.SDASD.ASDAD"

References:

12,425 Views
Comments
avatar
Expert Contributor

Hi Timothy, ListDatabaseTables link is broken.

avatar
New Contributor

Hi,

I could not find ParseSQL processor in NiFi version 1.5 ( HDF version 3.1.0) ?

79434-parsesql-missing.png

79435-nifi-version.png

If you see ParseSQL is an extracttext processor.

We use extract text to get the SQL statement created by generate table fetch.

We add a new attribute, sql, with value, ^(.*).

avatar
New Contributor

I meet a problem,the data from mysql in hive is replaced. When the property of Conflict Resolution Strategy is append,i cannot query the count from hive with the statement "select count(1) from table_xxx". help!!!