Created on 04-02-201709:16 PM - edited 08-17-201901:26 PM
The QueryDatabaseTable processor can easily ingest data from a table based on a incrementing key. A sequence id or primary key that is autogeneratored like Postgresql and MariaDB do is ideal. You can also do an incrementing data or Oracle Sequence ID. As long as it increments when you get a new one you can set. If your tables don't this, you could write a trigger or procedure in your database that sends it to a transaction table with such an autogenerated id and NiFi will grab that.
Clearly real CDC involves reading Write Ahead Logs or Transaction logs at a deep level and grabbing all changes. That is coming and can now be done by tools like Atunity + NiFi.
For use cases that I have, I just need to grab new rows when they are added to a table and I control the ID.
I convert from AVRO to JSON so I can extract attributes since I want to do some routing based on column values. Based on one field in the table, I want to determine where I land the data. It can be sent to HBase (and Phoenix), HDFS or Hive.
I split my records for easy processing.
One thing you I highly recommend you do for SQL safety and to prevent errors.
Example SQL for CDC:
upsert into trials (trialid, trialdescription, fileName) values (1,'FENTANYL','5ab2d068-dd53-4674-bcf8-17f7d80d0553')
CREATE EXTERNAL TABLE IF NOT EXISTS trials2 (trialid INT, trialdescription STRING, trialtype STRING) STORED AS ORC
CREATE TABLE trials (trialid integer not null primary key, trialdescription varchar, filename varchar);
Set your SQL Attributes for SQL Safety. The types are the numeric values for JDBC Types. 12 is String. -5 is BIG INT.
Then your SQL is standard JDBC syntax with ?'s for place markers.
Here is some cool data: I used Google Location API called via NiFi REST CALL to enhance some data and get lat and long from a vague location. This kind of thing happens in Twitter all the time.