Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Either insert or update to Postgres via Apache NiFi flow

avatar
New Contributor

I am trying to transfer data between two databases with similar structure of tables using NiFi. Example of data structure:

User:{varchar name, integer id}.

There are no "Maximum-value Columns" so it is impossible to determine if there is new data or not. So each time I create "snapshot" of the full table content. The problem is that it is unclear either particular record should be inserted or updated in the target database.

I created two branches of processors: with inserts and with updates. Only insert works for new records and only update for existing. But (!) PutSQL processor works with bunch of flow files. For example batch size is 100 and processors work once a day. Assume there was 98 records yesterday. They will be inserted. Today there are 200 records (98 from yesterday and 102 new). In this flow if NiFi tries to update first 100 records and insert them then both actions will fail: first 98 records should be updated while last 2 should be inserted.

How to solve this issue? I know it is possible to use batch size 1 but it work too slow.

1 ACCEPTED SOLUTION

avatar
Master Guru

I assume you are using a PostgreSQL version before 9.5. If not, you could try INSERT ... ON CONFLICT DO UPDATE. If you're using ConvertJSONToSQL that won't be an option, but you may be able to use EvaluateJsonPath -> ReplaceText to generate your own SQL statement instead.

If you do have a PG version < 9.5, then "upsert" is a known and common issue with PostgreSQL (and many/most/all relational database systems). You could try the example cited by the article, but that likely involves a stored function.

This is a kind of replication use case, where "change data" is coming from a source system and needs to be applied at a target system. Work is underway in NiFi for a Change Data Capture (CDC) processor, the first one will support only MySQL (NIFI-3413) but should provide APIs and/or a mini-framework such that other CDC processors could be written for various databases such as PostgreSQL.

View solution in original post

1 REPLY 1

avatar
Master Guru

I assume you are using a PostgreSQL version before 9.5. If not, you could try INSERT ... ON CONFLICT DO UPDATE. If you're using ConvertJSONToSQL that won't be an option, but you may be able to use EvaluateJsonPath -> ReplaceText to generate your own SQL statement instead.

If you do have a PG version < 9.5, then "upsert" is a known and common issue with PostgreSQL (and many/most/all relational database systems). You could try the example cited by the article, but that likely involves a stored function.

This is a kind of replication use case, where "change data" is coming from a source system and needs to be applied at a target system. Work is underway in NiFi for a Change Data Capture (CDC) processor, the first one will support only MySQL (NIFI-3413) but should provide APIs and/or a mini-framework such that other CDC processors could be written for various databases such as PostgreSQL.