Created 03-14-2017 08:53 AM
We have a number of databases, mostly in Oracle and MS SQL Server, which were designed prior without timestamp fields; as a result, Sqoop cannot be used for incremental batch load. In addition, some real-time use-case requirements forced us to look into streaming solutions for Change Data Capture.
We POC'd a database for each. To interface with the databases and get the changed data (or delta), for Oracle, we used XStream (11g and before) and for MS SQL Server, we enabled CDC tables. We wrote custom Java code and transform the delta units (using GSON) to JSON String and use that as payload to Kafka Producer, which eventually will be consumed by/into our HDP cluster as changes to corresponding Hive database/tables.
To make our solution maintainable, we are switching to NiFi, but as we are new to this technology, we are still in research stage. Can anyone propose a NiFi version solution something similar to what we've done above (interface with CDC mechanism of Oracle, then delta units to JSON, then produce in Kafka, then update Hive tables)? What are the processors to be used?
Created 03-14-2017 03:56 PM
NiFi doesnot YET have a CDC kind of processor - as-in the processor that would look into logs to determine the changed rows in a given time span. However, there is a processor "QueryDatabaseTable" which essentially returns the rows that have been updated since last retrieval - but the problem with this processor is that it scans the whole table to find the changes values, and this could pose a performance bottle neck if your source table is really big. Here is the documentation for QueryDatabaseTable - https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.QueryDatabaseT... (esp pay attention to the property 'Maximum value columns')
Here is the blog that walks you through setting up a CDC using QueryDatabaseTable - https://hortonworks.com/blog/change-data-capture-using-nifi/
Lastly, specific to your question, should you go down this route, below are the nifi processors that you probably need:
As an alternate to this you may also look into Attunity which has a CDC capability
Hopefully this helps, if it does, please remember to upvote and 'accept' the answer. Thank you!
Created 03-14-2017 03:56 PM
NiFi doesnot YET have a CDC kind of processor - as-in the processor that would look into logs to determine the changed rows in a given time span. However, there is a processor "QueryDatabaseTable" which essentially returns the rows that have been updated since last retrieval - but the problem with this processor is that it scans the whole table to find the changes values, and this could pose a performance bottle neck if your source table is really big. Here is the documentation for QueryDatabaseTable - https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.QueryDatabaseT... (esp pay attention to the property 'Maximum value columns')
Here is the blog that walks you through setting up a CDC using QueryDatabaseTable - https://hortonworks.com/blog/change-data-capture-using-nifi/
Lastly, specific to your question, should you go down this route, below are the nifi processors that you probably need:
As an alternate to this you may also look into Attunity which has a CDC capability
Hopefully this helps, if it does, please remember to upvote and 'accept' the answer. Thank you!
Created 03-15-2017 01:11 PM
In addition to QueryDatabaseTable, you may be interested in the GenerateTableFetch processor. It is similar to QueryDatabaseTable except that it does not execute SQL queries, it generates them and sends out flow files with SQL queries. This allows you to distribute to the fetching in parallel over a NiFi cluster. In an upcoming release, GenerateTableFetch will accept incoming flow files, so you could enhance the workflow with the ListDatabaseTables processor, sending those tables to GenerateTableFetch, thus parallelizing the fetching of multiple pages of multiple tables.
Created 03-14-2017 06:29 PM
This is not streaming, but SQL merge command may be useful here: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge