Created 05-30-2016 05:41 PM
Hi,
Is there a template of using Nifi to incrementally ingest Data from RDBMS to Hadoop?
I know Sqoop is better choice for this but for some reason I must use Nifi this time. I think the idea is to store the last successful ingestion to a RDB table, and compare row update time to that when using ExecuteSQL to do the select. The questions I have right row are:
Created 05-31-2016 09:28 AM
The best way to do this is with the new QueryDatabaseTable processor. This has a property which lets you list maximum value columns (usually an auto-increment id, sequence or something like a timestamp). Nifi then builds queries in much the same way as sqoop incremental.
The QueryDatabaseTable also maintains the state of the processor, so there is no need to store the values yourself in the flow logic. This state is stored either locally or, if you are using a NiFi cluster, on Zookeeper.
The processor is usually used directly against a table, but can also be applied against a view if you have a more complex query.
If you use this method, you won't have to worry about the empty result set problem. However, one interesting way of dealing with 'missing' data in a NiFi flow is to use the MonitorActivity processor, which can be used to trigger flow in the absences of data over a given time window. Used with scheduling, this could achieve the logic to serve you point 2. That said, for the particular use case, this is moot, as you can just use the QueryDatabaseTable processor which does everything for you.
Created 05-31-2016 09:28 AM
The best way to do this is with the new QueryDatabaseTable processor. This has a property which lets you list maximum value columns (usually an auto-increment id, sequence or something like a timestamp). Nifi then builds queries in much the same way as sqoop incremental.
The QueryDatabaseTable also maintains the state of the processor, so there is no need to store the values yourself in the flow logic. This state is stored either locally or, if you are using a NiFi cluster, on Zookeeper.
The processor is usually used directly against a table, but can also be applied against a view if you have a more complex query.
If you use this method, you won't have to worry about the empty result set problem. However, one interesting way of dealing with 'missing' data in a NiFi flow is to use the MonitorActivity processor, which can be used to trigger flow in the absences of data over a given time window. Used with scheduling, this could achieve the logic to serve you point 2. That said, for the particular use case, this is moot, as you can just use the QueryDatabaseTable processor which does everything for you.
Created 06-01-2016 09:00 AM
QueryDatabaseTable processor looks like exactly what I need. However, it failed to execute..
08:56:45 UTCERRORa4ba040c-7a7e-4068-b7f4-f7d1e1f3823c QueryDatabaseTable[id=a4ba040c-7a7e-4068-b7f4-f7d1e1f3823c] Unable to execute SQL select query SELECT * FROM LOGS WHERE UPDATE_TIME > '2016-05-30 16:48:00.373' due to org.apache.nifi.processor.exception.ProcessException: Error during database query or conversion of records to Avro: org.apache.nifi.processor.exception.ProcessException: Error during database query or conversion of records to Avro
Executing the generated SQL directly on Sqlplus gave me same error. Looks like timestamp format issue.
Created 06-01-2016 01:48 PM
Can you see the root cause for the error in nifi-app.log? It will tell why it failed to convert the data type.
Created 06-01-2016 06:44 PM
What are the data types of the fields in the LOGS table? Perhaps there is a type not accounted for either by Avro or the QueryDatabaseTable processor. If it is a timestamp format issue (specifically if only the date and not the time-of-day can be in a literal), try setting the SQL Preprocessing Strategy property to "Oracle". That will strip the time off and may fix the problem, although then you will not get new data until the following day.
Created 06-13-2016 09:07 AM
This is just a very simple table created using the below:
SQL> create table logs (log varchar2(255), update_time timestamp); SQL> insert into logs values ('hello', CURRENT_TIMESTAMP); SQL> insert into logs values ('beijing', CURRENT_TIMESTAMP); SQL> insert into logs values ('this is a nice day today', CURRENT_TIMESTAMP);
Created 06-17-2016 12:04 AM
Another option to consider is a transactional replication tool like Oracle GoldenGate or DbVisit. Both tools capture changes made to a database by monitoring the transaction logs, so it's completely under the covers and would not require changing a schema or adding triggers. For GoldenGate once the transaction is capture it can write the change out to Kafka. From there Nifi could pick the transaction up and replicate it to the cluster. DbVisit has a similar capability and potentially has integrated directly with Nifi.
Created 06-22-2016 01:18 PM
@yjiang did you find a solution to your issue? running into the same error
Created 06-25-2016 02:28 PM
No. I didn't... It looks like to me a tiny bug in the QueryDatabaseTable processor, but I didn't get a change to dig into it.
Created 04-21-2018 03:20 AM
Can i use a querydatabasetable processor for multiple tables ? I have a scenario where i am extacting data by joining two tables .i can use executesql processor but that doesnt store state which is very helpful for incremental processing ..