Created 07-10-2024 11:23 AM
Hi All,
How to fetch all files in a directory based on one specific file arrive in that directory
Ex: when i've file with extension .trg is there i've to fetch all files (irrespective of name and extension of these files) in that directory
list
there are .txt, .xml and . xlsx files are there in \\server\share\test\
all these files should be fetched whenever there is file with extension .trg arrive
I've tried this scenario using
ListSMB-->wait/notify--> Fetchsmb but didn't working
are there any solutions suggested by anyone?
Created 07-11-2024 03:31 AM
Hi,
Not over complicating your scenario and assuming that you get a .trg file every once in a while where you dont have to worry about clashes or concurrency issue, I would solve this as follows
1- Use ListFile processor and points it to target directory. This processor needs to run on a schedule where its not continuously reading the same file while you are processing them. You have to figure out how much time between the different listing is enough to process the files in case of trg arrival. Also make sure to set the Record Writer property set so that you get an array of all the files in one flow file. there wont be tracking in this case (set listing strategy No Tracking) since we will be continuously reading the same files again and again in case no trg file has made it yet. The output of this processor is going to be an array of all the files found where each file object has the following properties (assuming we have json writer):
{
"filename": "...",
"path": "....",
"directory": false,
"size": 256496,
"lastModified": 1707490322483,
"permissions": null,
"owner": null,
"group": null
}
2- Use QueryRecord by adding dynamic property with the following query:
select * from flowfile where exists (
select 1 from flowfile where filename like '%.trg'
)
This will produce the array list from above only if .trg file is found amongs them, otherwise nothing will happen and we will wait for the next listing from above.
3- If the condition above is met and trg file has made it , then use SplitRecord (or SplitJson in case you are using json writer ) to split each file object.
4- Use EvaluateJsonPath to get the filename and path for each file object.
5- Use FetchFile provided the attributes above to get the file and then do whatever needed. Make sure to set the completion strategy to move or delete the file so that you dont re process again.
This is a very simplistic solution that might work in case like I said you get .trg file every once in a while where there is enough time to process each trg files batch. Also if you are not dealing with large number of files. If any of those conditions are not met , you definitly have to re consider. Another option that would work better , is to have two flows where one is continuously picking up whatever files come and place it in staging area and log it in the DB, so that when trg file arrives you invoke the other flow using nifi api to read and process whatever got logged in the DB. The DB table will have the staging area path for each logged file so you pass that to the FetchFile processor. This way you can manage clashes and concurrency issues better as well as you dont have to continuously keep listing all the files like above and query the dataset to look for trg files. The files already has been moved to the staing area and whenever trg arrives the list is read once and the files are processed.
If find this helpful please accept the solution.
Thanks
Created 07-26-2024 07:36 PM
Thanks @SAMSAL for giving solution, I'm using listsmb and Fetchsmb , with the similar approch. How can we ensure to delete those files after fetch, as we are not using no tracking files will come recursively .
2)can i use list file & fetchfile to connect smb share (\\server\share$\path\)
thanks!!
Created 07-27-2024 03:26 AM
Hi @PradNiFi1236 ,
Regarding the first question, similar to FetchFile the FetchSmb has a completion strategy property where you can tell what do what the file after the fetch:
To delete you can simply select Delete File.
For the second question, Im not sure as I have never used it but you can try it and see if it works.
Hope that helps.
Created 08-05-2024 10:53 AM
@SAMSAL , Thanks for the reply but our current nifi version is 1.23.2 , where it doesn't have that completion strategy. So thinking of implementing executeScript or executeStreamcommand in order to delete the files from SMB after fetchSMB.
Please let me know if you have any suggestions for this script , nifi hosted on kube clusters and i've credentials for SMB as well.
Created 07-15-2024 09:14 AM
@PradNiFi1236
Another option might be to have two listFile processors.
ListFile one is configured with a file filter so that it is only looking for the trg file. Once the .trg file is listed it feeds an InvokeHTTP processor the you use to start listFile two processor via NiFi rest-api call that is configured to list all the files including the .trg file. Then ListFile two feeds FetchFile to get each files content. Then somewhere in this dataflow you use another invokeHTTP processor to invoke a NiFi rest-api call stop listFile two processor.
So you have two different dataflows in above example. With one watching for the triger file and using it to start dataflow 2.
--------
Another option requires you to create a custom processor or use a scripting processor to perform a complete listing when a trigger file is received. The trigger file comes from am upstream processor like ListFile (configured to only consume .trg files). The trg file in conjunction with "path" attribute is used in your custom processor to list all files from that target path.
Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt