I have a Nifi pipeline (cluster with 3 nodes) that drop postgresql constraints and triggers, transform data in several tables and insert transformed data in a destination database, and recreate constraints and triggers.
I need the pipeline to wait that all the flowfiles have been transformed to release one flowfile to recreate constraints and triggers. I cannot leave the pipeline without waiting all flowfiles because the first flowfile that arrived to the recreate constraint processor will recreate constraints while other flowfiles are still transforming and inserting data to destination database.
Schema of what we need :
What I have tried :
I create a temporary table having a status column that is updated with true when data in the flowfile has been inserted in destination database. Then I have a groovy code that allows me to compare the number of flowfile initially created and the count of the status column equals true. If this comparison is true, the release identifier is created. It means that the last flowfile having the equality passes and recreate the constraints and triggers.
This is not working because the count of several flowfile is the same (I don't know why), so several flowfile are passing the release or none of the flowfile pass the release processor.
Do you an idea to stop the pipeline until all the flowfile arrived to this point (red bar on schema above) ?