Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

FetchSFTP and reuse of connection

avatar
Rising Star

Is FetchSFTP reusing an established connection across individual fetches?

I have a case where I only want to fetch datafiles if there exists a <datafile>.sha256 checksum file as well. Idea is to ListSFTP with a filter on .sha256, and then do a FetchSFTP for the found filename, and a fetch with .sha256 removed. Will each file fetched with FetchSFTP open and close the SFTP connection, or is NiFi reusing the connection behind the scenes?

Ideas on how to track/notify about datafiles with missing checksum files? Those won't be caught, but someone might want to know about them.

1 ACCEPTED SOLUTION

avatar
Master Mentor
@Henrik Olsen

The FetchSFTP processor is deprecated in favor of the ListSFTP/FetchSFTP processors. The list/fetch model is works better in a NiFi cluster type configuration. Both the GetSFTP and ListSFTP processor should only ever be run on "primary node" only when used in a NiFi cluster. FetchSFTP should be configured to run on all nodes.

-

That being said, the GetSFTP will retrieve up to the configured "Max Selects" in a single connection. The ListSFTP will return the filenames of all files in a single connection. (The 0 byte Flowfiles generated from listSFTP should be routed to a Remote Process Group that will redistribute those 0 Byte FlowFiles to all nodes in the cluster where FetchSFTP will retrieve the actual content.

-

Regardless of how you retrieve the files, you are looking for a way to only process those files where you also retrieved the corresponding sha256 file. This can be accomplished using the Wait and Notify processors:

77566-screen-shot-2018-06-05-at-103616-am.png

In the above flow I have all the retrieved data (both <datafile> and <datafile>.sha256 files) coming in to a RouteOnAttribute processor. I route all the <datafile>.sha256 FlowFiles to a Notify processor. (in my test I had 20 <datafile> files and only 16 corresponding <datafile>.sha256 files). The Notify processor is configured to write the ${filename} to a DistributeMapCache service that every node in my cluster can access. My Wait processor is then designed to check that same DistributedMapCache service looking for "${filename}.sha256". If a match is found the Wait processor will release the <datafile> to the success relationship for further processing in your dataflow. The Wait processor is also configured to wait on so long looking for a match. So you see in my example that after the length of time the 4 Flowfiles that did not have a matching sha256 filename in the cache were routed to "expired" relationship. Set expiration high enough to allot for the time needed to retrieve both files.

-

Thank you,

Matt

-

If you found this Answer addressed your original question, please take a moment to login and click "Accept" below the answer.

View solution in original post

5 REPLIES 5

avatar
Rising Star

In general, I have some trouble seeing how to best match up file pairs like <datafile> and <datafile>.sha256, as they come in separate flowfiles through NiFi, with no relation at all.

I've tried to use the distributed mapcache (storing checksums given in .sha256 files), but not sure it's the best way, and I don't see how to control time-to-live, size etc for it.

Some wishes. Track which file exist without checksum and vice versa (for notification of possible errors). Allow some time for file pairs (datafile + checksum) to complete (might be a race condition between upload them to SFTP from external source and my ListSFTP). Ideas on this pattern with accompanying checksum files?

avatar
Master Mentor
@Henrik Olsen

The FetchSFTP processor is deprecated in favor of the ListSFTP/FetchSFTP processors. The list/fetch model is works better in a NiFi cluster type configuration. Both the GetSFTP and ListSFTP processor should only ever be run on "primary node" only when used in a NiFi cluster. FetchSFTP should be configured to run on all nodes.

-

That being said, the GetSFTP will retrieve up to the configured "Max Selects" in a single connection. The ListSFTP will return the filenames of all files in a single connection. (The 0 byte Flowfiles generated from listSFTP should be routed to a Remote Process Group that will redistribute those 0 Byte FlowFiles to all nodes in the cluster where FetchSFTP will retrieve the actual content.

-

Regardless of how you retrieve the files, you are looking for a way to only process those files where you also retrieved the corresponding sha256 file. This can be accomplished using the Wait and Notify processors:

77566-screen-shot-2018-06-05-at-103616-am.png

In the above flow I have all the retrieved data (both <datafile> and <datafile>.sha256 files) coming in to a RouteOnAttribute processor. I route all the <datafile>.sha256 FlowFiles to a Notify processor. (in my test I had 20 <datafile> files and only 16 corresponding <datafile>.sha256 files). The Notify processor is configured to write the ${filename} to a DistributeMapCache service that every node in my cluster can access. My Wait processor is then designed to check that same DistributedMapCache service looking for "${filename}.sha256". If a match is found the Wait processor will release the <datafile> to the success relationship for further processing in your dataflow. The Wait processor is also configured to wait on so long looking for a match. So you see in my example that after the length of time the 4 Flowfiles that did not have a matching sha256 filename in the cache were routed to "expired" relationship. Set expiration high enough to allot for the time needed to retrieve both files.

-

Thank you,

Matt

-

If you found this Answer addressed your original question, please take a moment to login and click "Accept" below the answer.

avatar
Rising Star

When using the DistributeMapCache, how do I know how it behaves? Do I have control over time-to-live, eviction strategy etc? Can I see what it contains, or only do single specific key lookups?

avatar
Master Mentor

@Henrik Olsen

The FetchSFTP will make a separate connection for each file being retrieved. Concurrent Tasks will allow you to specify the number of concurrent connections allowing more then one file to retrieved per processor execution schedule (still one file per connection).

-

Yes, HDF 3.1 will have all these goodies. Suggest skipping directly to HDF 3.1.2 which was just released since it has a loyt of fixes for some annoying bugs in HDF 3.1 and 3.1.1.

-

You will have the option to use either an external REDIS configured how you like or an internal NiFi DistributedMapCacheServer with the WAIT and NOTIFY processors.

-

The DistributedMapCacheServer provides the following configurations:

77609-screen-shot-2018-06-07-at-92713-am.png

There is no TTL for the DistributedMapCacheServer option.

-

There also isn't a processor that will dump out the current contents of the DistirbutedMapCacheServer, but you should be able to write a script that can do that for you. Here is an example script that is used to remove a cached entry:
https://gist.github.com/ijokarumawak/14d560fec5a052b3a157b38a11955772

-

I do not know a lot about REDIS, but as an externally managed cache service, it probably will give you a lot more options as well as a cluster capability so you don't have a single point of failure like you would have with the DistributedMapCacheServer.

-

Thank you,

Matt

avatar
Rising Star

Thanks a lot, Matt. Good solution and example.

I'm currently restricted to an old NiFi (version 1.1), and wasn't aware of the notify/wait pattern available in newer versions. From which version is that available, and where in general can I lookup from which version a particular feature/processor is available?

Will soon be working on a newer HDF 3.1, so hopefully that will unlock new needed features.

-

I still didn't quite get whether FetchSFTP (used with ListSFTP) will reuse connection across multiple file fetches, or it open/closes for every flowfile processed.

Regards and thanks

Henrik