I have a scenario where my process flow monitors a directory with listFtp and processes any file as it shows up.
the problem I am running across and trying to solve is I have had instances where the same file was sent to the ftp path multiple times, and resulted in processing the file twice. it happened days later. (same file name, same contents, everything)
my thought was to maybe use the DetectDuplicate processor, but that processor uses a cache, and we would want to persist that cache. We also have active/inactive clustered environments where we would need to keep all this synched for when environments flip. we'd also probably want to make sure that persistent cache didn't grow out of control somehow.
the other thought I had was using another file that would be built when processing files, that would contain a list of previously processed file, then read that in when I process a new file, and use the QueryRecord processor to search for the filename and somehow stop if I got a hit. I have been able to simulate reading that file of filenames, and finding a duplicate on it, but, what I can't get is how to stop processing when the duplicate is found. As far as I can tell, I can rout the duplicate entry to another processor, and route the original file to another processor, but stopping the flow (and writing the duplicate file name or contents to some error flow) has me stuck.
I'm relatively new with NiFi, as are most of my coworkers, so I'm looking to get any feedback to my design, or if I should approach this another way.