- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
How can I ingest 500 tables automatically in Apache Nifi
- Labels:
-
Apache NiFi
Created 07-24-2018 04:04 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I'm trying to ingest 500 tables from Oracle to hdfs.
My workflow is QueryDataBaseTable-> convertavrotoorc->Puthdfs->replacetext->puthiveql.
This is working pretty nicely. But as I have to injest 500 tables ingesting tables 1 by 1 wouldn't make sense. How can I give list of tables and make it ingest automatically. LIke schema level ingestion.
I was trying to read a file(which contains all schema.tables) , split it create nifi expression and pass it to querydatabasetable but querydatabasetable doesn't take any input flowfile.
Any help will be appreciated.
Thanks
Created 07-24-2018 04:12 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Try ListDatabaseTables -> GenerateTableFetch -> RemoteProcessGroup -> Input Port -> ExecuteSQL in place of QueryDatabaseTable.
ListDatabaseTables will give you an empty flow file per table, with attributes set on the flow file such as table name. QueryDatabaseTable (QDT) doesn't allow incoming connections, but GenerateTableFetch (GTF) does. The biggest difference between QDT and GTF is that QDT generates and executes the SQL statements, whereas GTF just generates the SQL statements.
This allows you to send to a Remote Process Group that points back to an Input Port on your NiFi cluster/instance. The RPG->Input Port pattern distributes the flow files among the nodes in your cluster. If you have a standalone NiFi instance, you won't need the RPG->Input Port part, but note that since there is only one node, the tables will not be fetched in parallel (but perhaps concurrently if you set Max Concurrent Tasks for ExecuteSQL).
Created 07-24-2018 04:12 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Try ListDatabaseTables -> GenerateTableFetch -> RemoteProcessGroup -> Input Port -> ExecuteSQL in place of QueryDatabaseTable.
ListDatabaseTables will give you an empty flow file per table, with attributes set on the flow file such as table name. QueryDatabaseTable (QDT) doesn't allow incoming connections, but GenerateTableFetch (GTF) does. The biggest difference between QDT and GTF is that QDT generates and executes the SQL statements, whereas GTF just generates the SQL statements.
This allows you to send to a Remote Process Group that points back to an Input Port on your NiFi cluster/instance. The RPG->Input Port pattern distributes the flow files among the nodes in your cluster. If you have a standalone NiFi instance, you won't need the RPG->Input Port part, but note that since there is only one node, the tables will not be fetched in parallel (but perhaps concurrently if you set Max Concurrent Tasks for ExecuteSQL).
Created 07-24-2018 04:22 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thank @Matt Burgess. I'll try the solution and keep everyone updated.
Created 07-25-2018 11:15 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @Matt Burgess I applied your solution which works. But for some reason it executing same select statement over a table thus giving FILE ALREADY EXIST error in Puthdfs. Is it because I have not supplied a MAX VALUE COL? What if we dont have a pkey which can be used as a MAX VALUE COL. I hope I make sense. If not please do let me know.
Created 07-25-2018 01:03 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Without a Maximum Value Column, both GTF and QDT will continue to pull the same data. Without a column for which we could figure out the "last" value, how would they know to pull the rows "since the last time"? Often an increasing ID or a "last updated" timestamp column is used as the Maximum Value Column.
In NiFi 1.7.0 (via NIFI-5143) and in the upcoming HDF 3.2 release, you can use the values in a column for GenerateTableFetch (versus the concept of "row number"), but without a Maximum Value Column you still can't do incremental fetch.
The alternative would be a Change Data Capture solution (CDC), but these are vendor-specific and currently only CaptureChangeMySQL exists (although there is a proposed addition of a MS SQL Server version).
Created 07-24-2018 04:21 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I would recommend an HDF cluster of 5 nodes at least in a cluster and follow Matt's flow with added tasks.
I do have a design with 100+ small MiniFi Java agents running in docker containers each handling one table. I did this for Kafka one per topic, but you could do the same.
My worry would be you would kill Oracle. It's usually not used to have fast NiFi can pull especially if you are doing 50, 100+ concurrent reads.
Have you looked at Sqoop, that can be trigger by NiFi and is good for a one time bulk extracts. You can use your entire Hadoop c luster to pull data. It's pretty fast and easy. Simple command line tool.
You can even do an import all tables
https://sqoop.apache.org/docs/1.4.7/SqoopUserGuide.html#_literal_sqoop_import_all_tables_literal
If it's a one time export do that.
Then for keeping Hive in sync with Oracle use Apache NiFi.
Created 07-24-2018 05:18 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks @Timothy Spann . I have worked with sqoop but sqoop takes a lot of time time to import data. Right now I'm looking for an alternative of sqoop. With sqoop we will have to separately main logs but with Nifi and I can have inbuilt logs and dont have worry about scheduling so Yes sqoop is the last option but NiFi is something I want to try without any dependencies on sqoop.
Created 07-24-2018 05:45 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Apache NiFi is great and the scheduling, UI, logging and provenance are great.
500 tables is going to take some resources. If you have them, Matt's process will work.
Make sure you have 5-10 or more HDF 3.1 NiFi Nodes with 128GB RAM, 16-32 core CPUs. Also make sure Hive has enough resources and you have enough beefy Data Nodes. Are you using LLAP with dedicated LLAP nodes?
Are you using Apache Hive ACID tables yet? Those should update quicker.
Created 05-30-2023 06:13 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
https://dev.to/tspannhw/simple-change-data-capture-cdc-with-sql-selects-via-apache-nifi-flank-19m4
You can use the metadata database processors to list all tables in a database and then read all values from tables