Support Questions

Find answers, ask questions, and share your expertise

Change Data Capture using NiFi

avatar
Expert Contributor

We have a number of databases, mostly in Oracle and MS SQL Server, which were designed prior without timestamp fields; as a result, Sqoop cannot be used for incremental batch load. In addition, some real-time use-case requirements forced us to look into streaming solutions for Change Data Capture.

We POC'd a database for each. To interface with the databases and get the changed data (or delta), for Oracle, we used XStream (11g and before) and for MS SQL Server, we enabled CDC tables. We wrote custom Java code and transform the delta units (using GSON) to JSON String and use that as payload to Kafka Producer, which eventually will be consumed by/into our HDP cluster as changes to corresponding Hive database/tables.

To make our solution maintainable, we are switching to NiFi, but as we are new to this technology, we are still in research stage. Can anyone propose a NiFi version solution something similar to what we've done above (interface with CDC mechanism of Oracle, then delta units to JSON, then produce in Kafka, then update Hive tables)? What are the processors to be used?

1 ACCEPTED SOLUTION

avatar
Super Collaborator

NiFi doesnot YET have a CDC kind of processor - as-in the processor that would look into logs to determine the changed rows in a given time span. However, there is a processor "QueryDatabaseTable" which essentially returns the rows that have been updated since last retrieval - but the problem with this processor is that it scans the whole table to find the changes values, and this could pose a performance bottle neck if your source table is really big. Here is the documentation for QueryDatabaseTable - https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.QueryDatabaseT... (esp pay attention to the property 'Maximum value columns')

Here is the blog that walks you through setting up a CDC using QueryDatabaseTable - https://hortonworks.com/blog/change-data-capture-using-nifi/

Lastly, specific to your question, should you go down this route, below are the nifi processors that you probably need:

  • QueryDatabaseTable
  • ConvertAvroToJson
  • PublishKafka
  • PutHiveQL / PutHiveStreaming

As an alternate to this you may also look into Attunity which has a CDC capability

Hopefully this helps, if it does, please remember to upvote and 'accept' the answer. Thank you!

View solution in original post

3 REPLIES 3

avatar
Super Collaborator

NiFi doesnot YET have a CDC kind of processor - as-in the processor that would look into logs to determine the changed rows in a given time span. However, there is a processor "QueryDatabaseTable" which essentially returns the rows that have been updated since last retrieval - but the problem with this processor is that it scans the whole table to find the changes values, and this could pose a performance bottle neck if your source table is really big. Here is the documentation for QueryDatabaseTable - https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.QueryDatabaseT... (esp pay attention to the property 'Maximum value columns')

Here is the blog that walks you through setting up a CDC using QueryDatabaseTable - https://hortonworks.com/blog/change-data-capture-using-nifi/

Lastly, specific to your question, should you go down this route, below are the nifi processors that you probably need:

  • QueryDatabaseTable
  • ConvertAvroToJson
  • PublishKafka
  • PutHiveQL / PutHiveStreaming

As an alternate to this you may also look into Attunity which has a CDC capability

Hopefully this helps, if it does, please remember to upvote and 'accept' the answer. Thank you!

avatar
Master Guru

In addition to QueryDatabaseTable, you may be interested in the GenerateTableFetch processor. It is similar to QueryDatabaseTable except that it does not execute SQL queries, it generates them and sends out flow files with SQL queries. This allows you to distribute to the fetching in parallel over a NiFi cluster. In an upcoming release, GenerateTableFetch will accept incoming flow files, so you could enhance the workflow with the ListDatabaseTables processor, sending those tables to GenerateTableFetch, thus parallelizing the fetching of multiple pages of multiple tables.

avatar
Super Collaborator

This is not streaming, but SQL merge command may be useful here: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge