Created 07-12-2018 08:09 PM
I was planning to do a delta loading from staging table to my target table which is partitioned. My script would look like the following
with max_row_id as( select max(row_id) as MAX from target table)
insert into dc_transdetail_orc select * from staging table where row_id > max_row_id
Can the above query be accomplished by any of the NIFI processors?
I don't want to wrap this up in the shell script because if my insert fails for what ever reason, the max_row-id won't get upated in the target table and the insert will fail for the next set of files as well
Please provide any suggestions and i highly appreciate it.
Thanks
Created 07-12-2018 08:31 PM
There currently aren't any processors to do incremental insert from a fetch all in one processor. However you could use QueryDatabaseTable or GenerateTableFetch to do the incremental fetch, then PutDatabaseRecord to insert the rows into the target database. Either of the fetch processors, if you specify a Maximum Value Column, will determine the current maximum value for that column, and the next time it runs, it will only fetch rows as your statement is above, where the column value > current maximum.
The biggest difference between GenerateTableFetch and QueryDatabaseTable is that GTF generates SQL into the flow files, which then usually get executed with the ExecuteSQL processor. This allows you to distribute the SQL statements among nodes in a NiFi cluster so they can be executed in parallel. QueryDatabaseTable generates and executes the SQL each time, and is designed to run on the primary node only (if in a cluster). GTF figures out the current maximum value by doing its own query for MAX(), where QDT iterates through the rows and keeps the current maximum value while it is processing and outputting the rows.
Created 07-12-2018 11:12 PM
Hi Matt,
Thanks for the information. The queries i have written would take care of delta load.
But i have to execute couple of queries at the same time. I want a processor that can run multiple queries at the same time. Below is how my query looks like
----------------------------------------------------------
with max_row_id as( select max(row_id) as MAX from target table)
insert into dc_transdetail_orc select * from staging table join target table where row_id > max_row_id