Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to Use Nifi to Incrementally Ingest Data from RDBMS?

avatar
Expert Contributor

Hi,

Is there a template of using Nifi to incrementally ingest Data from RDBMS to Hadoop?

I know Sqoop is better choice for this but for some reason I must use Nifi this time. I think the idea is to store the last successful ingestion to a RDB table, and compare row update time to that when using ExecuteSQL to do the select. The questions I have right row are:

  1. How to store the timestamp into RDB table using Nifi? PutSQL seems to be the processor, but how to give the SQL to PutSQL?
  2. How to handle empty resultset of ExecuteSQL response? Is there a simple way to do "if resultset is empty, then do nothing, else update late ingestion timestamp and store resultset to HDFS"?
1 ACCEPTED SOLUTION

avatar
Guru

The best way to do this is with the new QueryDatabaseTable processor. This has a property which lets you list maximum value columns (usually an auto-increment id, sequence or something like a timestamp). Nifi then builds queries in much the same way as sqoop incremental.

The QueryDatabaseTable also maintains the state of the processor, so there is no need to store the values yourself in the flow logic. This state is stored either locally or, if you are using a NiFi cluster, on Zookeeper.

The processor is usually used directly against a table, but can also be applied against a view if you have a more complex query.

If you use this method, you won't have to worry about the empty result set problem. However, one interesting way of dealing with 'missing' data in a NiFi flow is to use the MonitorActivity processor, which can be used to trigger flow in the absences of data over a given time window. Used with scheduling, this could achieve the logic to serve you point 2. That said, for the particular use case, this is moot, as you can just use the QueryDatabaseTable processor which does everything for you.

View solution in original post

9 REPLIES 9

avatar
Guru

The best way to do this is with the new QueryDatabaseTable processor. This has a property which lets you list maximum value columns (usually an auto-increment id, sequence or something like a timestamp). Nifi then builds queries in much the same way as sqoop incremental.

The QueryDatabaseTable also maintains the state of the processor, so there is no need to store the values yourself in the flow logic. This state is stored either locally or, if you are using a NiFi cluster, on Zookeeper.

The processor is usually used directly against a table, but can also be applied against a view if you have a more complex query.

If you use this method, you won't have to worry about the empty result set problem. However, one interesting way of dealing with 'missing' data in a NiFi flow is to use the MonitorActivity processor, which can be used to trigger flow in the absences of data over a given time window. Used with scheduling, this could achieve the logic to serve you point 2. That said, for the particular use case, this is moot, as you can just use the QueryDatabaseTable processor which does everything for you.

avatar
Expert Contributor

@Simon Elliston Ball

QueryDatabaseTable processor looks like exactly what I need. However, it failed to execute..

08:56:45 UTCERRORa4ba040c-7a7e-4068-b7f4-f7d1e1f3823c
QueryDatabaseTable[id=a4ba040c-7a7e-4068-b7f4-f7d1e1f3823c] Unable to execute SQL select query SELECT * FROM LOGS WHERE UPDATE_TIME > '2016-05-30 16:48:00.373' due to org.apache.nifi.processor.exception.ProcessException: Error during database query or conversion of records to Avro: org.apache.nifi.processor.exception.ProcessException: Error during database query or conversion of records to Avro

Executing the generated SQL directly on Sqlplus gave me same error. Looks like timestamp format issue.

avatar

Can you see the root cause for the error in nifi-app.log? It will tell why it failed to convert the data type.

avatar
Master Guru

What are the data types of the fields in the LOGS table? Perhaps there is a type not accounted for either by Avro or the QueryDatabaseTable processor. If it is a timestamp format issue (specifically if only the date and not the time-of-day can be in a literal), try setting the SQL Preprocessing Strategy property to "Oracle". That will strip the time off and may fix the problem, although then you will not get new data until the following day.

avatar
Expert Contributor

This is just a very simple table created using the below:

SQL> create table logs (log varchar2(255), update_time timestamp); 
SQL> insert into logs values ('hello', CURRENT_TIMESTAMP); 
SQL> insert into logs values ('beijing', CURRENT_TIMESTAMP); 
SQL> insert into logs values ('this is a nice day today', CURRENT_TIMESTAMP);

avatar
Contributor

Another option to consider is a transactional replication tool like Oracle GoldenGate or DbVisit. Both tools capture changes made to a database by monitoring the transaction logs, so it's completely under the covers and would not require changing a schema or adding triggers. For GoldenGate once the transaction is capture it can write the change out to Kafka. From there Nifi could pick the transaction up and replicate it to the cluster. DbVisit has a similar capability and potentially has integrated directly with Nifi.

avatar
Expert Contributor

@yjiang did you find a solution to your issue? running into the same error

avatar
Expert Contributor

No. I didn't... It looks like to me a tiny bug in the QueryDatabaseTable processor, but I didn't get a change to dig into it.

avatar
Expert Contributor

Can i use a querydatabasetable processor for multiple tables ? I have a scenario where i am extacting data by joining two tables .i can use executesql processor but that doesnt store state which is very helpful for incremental processing ..