Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to extract data from mysql to oracle using Apache nifi? Are there some demos?

avatar
New Contributor
 
1 ACCEPTED SOLUTION

avatar

@justin zhang

Currently, Nifi does not have dedicated processors for different types of databases (Oracle, MySQL, etc...), and hence, no notion of CDC (change data capture). To copy from one DB to another you would use the ExecuteSQL and PutSQL processors. You would also configure the ExecuteSQL processor to run at appropriate time intervals (depending on your requirements).

  • ExecuteSQL: Executes a user-defined SQL SELECT command, writing the results to a FlowFile in Avro format
  • PutSQL: Updates a database by executing the SQL DDM statement defined by the FlowFile’s content

https://nifi.apache.org/docs/nifi-docs/html/getting-started.html#database-access

Documentation for Processors:

https://nifi.apache.org/docs.html

View solution in original post

5 REPLIES 5

avatar
Master Mentor

I would probably look at bulk extract options of MySQL, dump to flat file, then use listFile and FetchFile processors to another directory visibly by Oracle and bulk insert into that.

avatar

@justin zhang

Currently, Nifi does not have dedicated processors for different types of databases (Oracle, MySQL, etc...), and hence, no notion of CDC (change data capture). To copy from one DB to another you would use the ExecuteSQL and PutSQL processors. You would also configure the ExecuteSQL processor to run at appropriate time intervals (depending on your requirements).

  • ExecuteSQL: Executes a user-defined SQL SELECT command, writing the results to a FlowFile in Avro format
  • PutSQL: Updates a database by executing the SQL DDM statement defined by the FlowFile’s content

https://nifi.apache.org/docs/nifi-docs/html/getting-started.html#database-access

Documentation for Processors:

https://nifi.apache.org/docs.html

avatar

Nifi has just added a QueryDatabaseTable processor for simple change capture https://cwiki.apache.org/confluence/display/NIFI/Release+Notes

avatar
Guru

I would agree with this approach if Nifi is required. However, loading data to and from databases in bulk should probably be done with something like Sqoop as it will parallelize the ETL job. Nifi processor will just run a thread. The SQL processors of Nifi are really meant for persisting or enriching processor output not for bulk loading.

avatar
Contributor

The following feedback is based on using NiFi for Change Data Capture (CDC) use cases with source data tables managed by ORACLE, MS SQL, PostgreSQL, and MySQL, RDBMS.

When supported by the RDBMS that manages the source data table, turn on the table's CDC feature, which automatically creates in the background a dedicated CDC table which contains all of the columns in the source data table, as well as additional metadata columns that can be used to support down-stream ETL logic processing. The RDBMS will automatically detect the new and changed records within the source data table for you, and will duplicate those new and changed records into the dedicated CDC table. Against that dedicated CDC table, execute a QueryDatabaseTable processor which uses an SQL SELECT query to fetch the latest records written to the CDC table (since the last time the QueryDatabaseTable processor executed successfully).

If the source data table has columns which hold the time-stamps of when the record was first created, or if recently updated, and if you do not have access to a Hadoop environment which supports Sqoop, you can still use NiFi to bulk extract the records in the source data table in parallel, using streams. First, you logically fragment the source data table into windows of time, such as a given month of a given year. For each window of time, create a corresponding QueryDatabaseTable processor. In this way, you can easily execute the extract across N threads of a NiFi node (or on N NiFi nodes). Essentially, you create the first QueryDatabaseTable, clone it N-1 time, and simply edit the predicate expression of each SQL SELECT so that it fetches the source data table records that were created within the desired window of time.

If the source data table has the time-stamp for update events as well, then clones of these bulk extract QueryDatabaseTable processes can be slightly modified and used to grab on-going updates to records, as well as new records, created within those windows of time. These on-going CDC type QueryDatabaseTable processes can be scheduled to execute based on the probability of update events for a given window of time. The update time-stamp can then be used by NiFi to hand off individual CDC records to specific NiFi processors for routing, mediation, data transformation, data aggregation, data egress (e.g., PutKafka).