Created on 06-30-2017 04:07 PM - edited 08-17-2019 12:24 PM
From Database MetaData to Data Ingest in a Few Quick Steps
A company wants to know when new tables are added to a JDBC source (say a RDBMS). Using the ListDatabaseTables processor we can get a list of TABLEs (also views, system tables and other database objects), but for our purposes we want tables with data. I have used the ngdbc.jar from SAP HANA to connect and query tables with ease.
For today's example I am connecting to MySQL as I have a MySQL database available for use and modification.
Pre-Step
mysql -u root -p test < person.sql CREATE USER 'nifi'@'%' IDENTIFIED BY 'reallylongDifficultPassDF&^D&F^Dwird'; GRANT ALL PRIVILEGES ON *.* TO 'nifi'@'%' WITH GRANT OPTION; COMMIT; mysql> show tables; +----------------+ | Tables_in_test | +----------------+ | MOCK_DATA | | mock2 | | personReg | | visitor | +----------------+ 4 rows in set (0.00 sec)
I created a user to use for my JDBC Connection Pool in NiFi to read the metadata and data.
These table names will show up in NiFi in the db.table.name attribute.
Step 1: ListDatabaseTables: Let's get a list of all the tables in MySQL for the database we have chosen.
After it starts running you can check it's state and see what tables were ingested and the most recent timestamp (Value).
We will get back what catalog we read from, how many tables and each tablename and it's fullname.
HDF NiFi supports generic JDBC drivers and specific coding for Oracle, MS SQL Server 2008 and MS SQL Server 2012+.
Step 2: GenerateTableFetch using the table name returned from the list returned by the database control.
Step 3: We use extract text to get the SQL statement created by generate table fetch.
We add a new attribute, sql, with value, ^(.*).
Step 4: ExecuteSQL with that $sql attribute.
Step 5: Convert AVRO files produced by ExecuteSQL into performant Apache ORC Files
Step 6: PutHDFS to store these ORC files in Hadoop
I added the table name as part of the directory structure so a new directory is created for each transferred table. Now we have dynamic HDFS directory structure creation.
Step 7: Replace Text to build a SQL statement that will generate an external Hive table on our new ORC directory
Step 8: PutHiveQL to execute the statement that was just dynamically created for our new table.
We no have instantly queryable Hadoop data available to Hive, SparkSQL, Zeppelin, ODBC, JDBC and a ton of BI tools and interfaces.
Step 9: Finally we can look at the data that we have ingested from MySQL into new Hive tables.
That was easy, the best part is as new tables are added to MySQL, they will be autoingested into HDFS and Hive tables built.
Future Updates:
Use Hive Merge to update changed data. We can also ingest to Phoenix/HBase and use the upsert DML.
Test with other databases. Tested with MySQL.
Quick Tip (HANA):
In NiFi, refer to tables with their full name in quotes: "SAP_"."ASDSAD.SDASD.ASDAD"
References:
Created on 11-06-2017 01:19 PM
Hi Timothy, ListDatabaseTables link is broken.
Created on 11-26-2017 08:42 PM
Created on 07-10-2018 09:14 AM - edited 08-17-2019 12:23 PM
Hi,
I could not find ParseSQL processor in NiFi version 1.5 ( HDF version 3.1.0) ?
Created on 09-11-2018 01:27 PM
If you see ParseSQL is an extracttext processor.
We use extract text to get the SQL statement created by generate table fetch.
We add a new attribute, sql, with value, ^(.*).
Created on 02-01-2019 04:22 PM
I meet a problem,the data from mysql in hive is replaced. When the property of Conflict Resolution Strategy is append,i cannot query the count from hive with the statement "select count(1) from table_xxx". help!!!