Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to schedule process to fetch only new files from a directory in apache nifi?

avatar
Explorer

Hi,

 

I am looking to fetch only new files added in the directory exactly one time and once file is picked it should not be picked again in apache nifi. I want to schedule this process to to every 3 hours. Please provide solution with screenshot the properties you used to do this process or which processors you are using. I am bit confused between listfile getfile and fetchfile and which properties to used.

 

Any help in this issue will be greatly appreciated.

Thank You!

2 ACCEPTED SOLUTIONS

avatar

Once it brings it it wont bring again because it will save its timestamp and then use that to get newer files added and so on.

View solution in original post

avatar
Super Mentor

@CodeLa @SAMSAL 

I want to point out that tracking timestamps will not always guarantee NiFi will consume all files from the input file directory depending on how they are being placed in that directory.

The ListFile processor looks at the last modified timestamp on the file.  It then lists all files since the last recorded timestamp stored in NiFi state manager from the previous processor execution.  On first run their will be no state and this everything currently is listed.

Now consider the scenarios below which can affect above from listing all files:

  • The mechanism that is writing the files to that inout directory is not updating the last modified timestamp on the file once it is done writing to it.  Let say we have file 1 that starts being written to as 12:00:01.000 and file 2 that starts being written as 12:00:01.300. File 2 completes first and is consumed by listFile and stored state is updated to reflect 12:00:01.300.  Now File 1 completes, but is never consumed by ListFile since its last modified timestamp is older than file 2.

If you are in such a scenario, the ListFile offers a different "Listing Strategy" called "Tracking Entities" which tracks filenames as well in a cache service which allows it to still list files that may have an older timestamp.

Another thing to consider is listFile may list the same file more than once. Consider this scenario:

  • You tell NiFi ListFile to list files from directory /nifi/myfiles/.  The mechanism writing these files to the target directory does update the last modified timestamp as file is being written, but does not use a ".<filename>" (dot rename) approach to writing these files (means file is initially a hidden file until file write completes and then is renamed and made unhidden. Default listFile config ignores hidden files).  So when ListFile runs, it sees that file with newer last modified timestamp and lists it.  Then on next execution it sees same file again because its last modified timestamp is updated as file is still being written to.

If you are in such a scenario, you would want to make use of the "Minimum File Age" property.  This property tells the listFile to ignore any files were the last modified time stamp when compared to current time is not at least that configured amount of time old (that means last modified timestamp has not changed for configured amount of time).  That configured time is arbitrary and what ever length is needed for you to be confident file write was complete. 

Something else you need to consider depends on if both the following are true:

1. You are using a multi node NiFi cluster
2. The configured directory you are listing from is mounted to every node.

Since every node in a NiFi cluster is executing the same dataflow, you want to avoid every node from listing the same files. IN this scenario you would change the "Execution" configuration from "All nodes" to "Primary" on the ListFile and change "input Directory location" from "local" to "remote".  Then you will want to set "load balance Strategy" to "Round Robin" on the connection between ListFile and FetchFile.

NOTE: Never set the Execution on any processor that has an inbound connection to "Primary node".  ONLY processor with not inbound connection should be considered for this execution configuration.

I know this is a lot to digest, but very important to be aware of to ensure success.

If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post.

Thank you,

Matt 

View solution in original post

8 REPLIES 8

avatar

Take  a look at the Nifi ListFile & Fetch File processors. They both work together. The ListFile will read files metadata based on the last read file modified date and will keep state of that so that only newly added files will be read. The fetch file will take the filename parameter from the ListFile processor and fetch the contents.

Hope that helps

avatar
Explorer

Hi samsal,

Thanks for the reply can you please share the screen shots i'm bit confused related to which properties to use in Listfile and fetchfile.

avatar

You really dont need a screenshot because you are not changing much properties:

 

1-  Create ListFile Processor & set the "Input Directory" to whatever directory you want to track.

2- Create a FetchFile Processor and connect the ListFile to it via the "success" relationship. under the processor properties keep the "File to Fetch" property set to "${absolute.path}/${filename}" since the path and the file name will be set in those attributes using the ListFile and that is it.

 

After that the content of the file will be passed via the success relation and you can do whatever you want with it just as if you are using GetFile except the ListFile will keep state of the latest file timestamp it grabbed and basically use that to grab any new files added to the folder and update the state to new timestamp and so.

avatar
Explorer

Hi samsal,

Thanks for your help. I have used list file and then fetch file and their is one only file in my directory and I've set Listing strategy in listfile to 'Tracking Timestamps' and when I executed the job it brings the file once only. I am confused will it bring same file only once or whenever I execute the job?

avatar

Once it brings it it wont bring again because it will save its timestamp and then use that to get newer files added and so on.

avatar
Explorer

Got it. Thank you

avatar
Super Mentor

@CodeLa @SAMSAL 

I want to point out that tracking timestamps will not always guarantee NiFi will consume all files from the input file directory depending on how they are being placed in that directory.

The ListFile processor looks at the last modified timestamp on the file.  It then lists all files since the last recorded timestamp stored in NiFi state manager from the previous processor execution.  On first run their will be no state and this everything currently is listed.

Now consider the scenarios below which can affect above from listing all files:

  • The mechanism that is writing the files to that inout directory is not updating the last modified timestamp on the file once it is done writing to it.  Let say we have file 1 that starts being written to as 12:00:01.000 and file 2 that starts being written as 12:00:01.300. File 2 completes first and is consumed by listFile and stored state is updated to reflect 12:00:01.300.  Now File 1 completes, but is never consumed by ListFile since its last modified timestamp is older than file 2.

If you are in such a scenario, the ListFile offers a different "Listing Strategy" called "Tracking Entities" which tracks filenames as well in a cache service which allows it to still list files that may have an older timestamp.

Another thing to consider is listFile may list the same file more than once. Consider this scenario:

  • You tell NiFi ListFile to list files from directory /nifi/myfiles/.  The mechanism writing these files to the target directory does update the last modified timestamp as file is being written, but does not use a ".<filename>" (dot rename) approach to writing these files (means file is initially a hidden file until file write completes and then is renamed and made unhidden. Default listFile config ignores hidden files).  So when ListFile runs, it sees that file with newer last modified timestamp and lists it.  Then on next execution it sees same file again because its last modified timestamp is updated as file is still being written to.

If you are in such a scenario, you would want to make use of the "Minimum File Age" property.  This property tells the listFile to ignore any files were the last modified time stamp when compared to current time is not at least that configured amount of time old (that means last modified timestamp has not changed for configured amount of time).  That configured time is arbitrary and what ever length is needed for you to be confident file write was complete. 

Something else you need to consider depends on if both the following are true:

1. You are using a multi node NiFi cluster
2. The configured directory you are listing from is mounted to every node.

Since every node in a NiFi cluster is executing the same dataflow, you want to avoid every node from listing the same files. IN this scenario you would change the "Execution" configuration from "All nodes" to "Primary" on the ListFile and change "input Directory location" from "local" to "remote".  Then you will want to set "load balance Strategy" to "Round Robin" on the connection between ListFile and FetchFile.

NOTE: Never set the Execution on any processor that has an inbound connection to "Primary node".  ONLY processor with not inbound connection should be considered for this execution configuration.

I know this is a lot to digest, but very important to be aware of to ensure success.

If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post.

Thank you,

Matt 

avatar
Explorer

Hi, 

 

Matt thanks for the explanation