Created 03-23-2016 05:37 AM
Created 03-23-2016 12:28 PM
Currently, Nifi does not have dedicated processors for different types of databases (Oracle, MySQL, etc...), and hence, no notion of CDC (change data capture). To copy from one DB to another you would use the ExecuteSQL and PutSQL processors. You would also configure the ExecuteSQL processor to run at appropriate time intervals (depending on your requirements).
https://nifi.apache.org/docs/nifi-docs/html/getting-started.html#database-access
Documentation for Processors:
Created 03-23-2016 06:46 AM
I would probably look at bulk extract options of MySQL, dump to flat file, then use listFile and FetchFile processors to another directory visibly by Oracle and bulk insert into that.
Created 03-23-2016 12:28 PM
Currently, Nifi does not have dedicated processors for different types of databases (Oracle, MySQL, etc...), and hence, no notion of CDC (change data capture). To copy from one DB to another you would use the ExecuteSQL and PutSQL processors. You would also configure the ExecuteSQL processor to run at appropriate time intervals (depending on your requirements).
https://nifi.apache.org/docs/nifi-docs/html/getting-started.html#database-access
Documentation for Processors:
Created 03-30-2016 09:26 PM
Nifi has just added a QueryDatabaseTable processor for simple change capture https://cwiki.apache.org/confluence/display/NIFI/Release+Notes
Created 03-27-2016 11:28 PM
I would agree with this approach if Nifi is required. However, loading data to and from databases in bulk should probably be done with something like Sqoop as it will parallelize the ETL job. Nifi processor will just run a thread. The SQL processors of Nifi are really meant for persisting or enriching processor output not for bulk loading.
Created 08-12-2016 04:19 PM
The following feedback is based on using NiFi for Change Data Capture (CDC) use cases with source data tables managed by ORACLE, MS SQL, PostgreSQL, and MySQL, RDBMS.
When supported by the RDBMS that manages the source data table, turn on the table's CDC feature, which automatically creates in the background a dedicated CDC table which contains all of the columns in the source data table, as well as additional metadata columns that can be used to support down-stream ETL logic processing. The RDBMS will automatically detect the new and changed records within the source data table for you, and will duplicate those new and changed records into the dedicated CDC table. Against that dedicated CDC table, execute a QueryDatabaseTable processor which uses an SQL SELECT query to fetch the latest records written to the CDC table (since the last time the QueryDatabaseTable processor executed successfully).
If the source data table has columns which hold the time-stamps of when the record was first created, or if recently updated, and if you do not have access to a Hadoop environment which supports Sqoop, you can still use NiFi to bulk extract the records in the source data table in parallel, using streams. First, you logically fragment the source data table into windows of time, such as a given month of a given year. For each window of time, create a corresponding QueryDatabaseTable processor. In this way, you can easily execute the extract across N threads of a NiFi node (or on N NiFi nodes). Essentially, you create the first QueryDatabaseTable, clone it N-1 time, and simply edit the predicate expression of each SQL SELECT so that it fetches the source data table records that were created within the desired window of time.
If the source data table has the time-stamp for update events as well, then clones of these bulk extract QueryDatabaseTable processes can be slightly modified and used to grab on-going updates to records, as well as new records, created within those windows of time. These on-going CDC type QueryDatabaseTable processes can be scheduled to execute based on the probability of update events for a given window of time. The update time-stamp can then be used by NiFi to hand off individual CDC records to specific NiFi processors for routing, mediation, data transformation, data aggregation, data egress (e.g., PutKafka).