Support Questions

Find answers, ask questions, and share your expertise

NiFi equivalent to flume spooling directory source

avatar

I'm trying to use NiFi to replace a basic flume agent that uses Spooling Directory Source and send content to Kafka.

http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source

The functionality of Flume Spooling Directory source is describe in flume documentation as:

"This source lets you ingest data by placing files to be ingested into a “spooling” directory on disk. This source will watch the specified directory for new files, and will parse events out of new files as they appear. The event parsing logic is pluggable. After a given file has been fully read into the channel, it is renamed to indicate completion (or optionally deleted)."

Looking for hints on how to do it with NiFi.

1 ACCEPTED SOLUTION

avatar

I've attached a very rough template of an SFTP pipeline that does what you are looking for @Guilherme Braccialli. You could replace the initial GetSFTP processor with a GetFile processor and have pretty much the same functionality that you are looking for.

It polls a directory looking for *.DONE files every 5 seconds. When it gets them it starts pushing them through the pipeline and encrypting/compressing them and dropping them off in HDFS and another SFTP directory. The "Keep Source File" property in the GetFile and GetSFTP processor allows you to delete the file after it is picked up so it isn't captured multiple times.

View solution in original post

7 REPLIES 7

avatar
Guru

To do this you build a pipeline with the GetFiles processor, this can pick up files, and delete / move them afterwards (just as the spooldir source does). For the batching functionality you can use MergeContent, or other batching mechanisms on downstream Put processors.

avatar

@Simon Elliston Ball Thank you. I'm just starting with NiFi. Would you have a sample template that does it?

avatar

Quite often people are actually looking for tailing a file line-by-line. For those cases, a new TailFile processor in NiFi 0.4.0 works better, and it also includes advanced features like detecting (avoiding) duplicate entries on restart, understands log file rolling patterns (so it's not just reading from an active log file, but can also start from the beginning of time), etc.

avatar

I've attached a very rough template of an SFTP pipeline that does what you are looking for @Guilherme Braccialli. You could replace the initial GetSFTP processor with a GetFile processor and have pretty much the same functionality that you are looking for.

It polls a directory looking for *.DONE files every 5 seconds. When it gets them it starts pushing them through the pipeline and encrypting/compressing them and dropping them off in HDFS and another SFTP directory. The "Keep Source File" property in the GetFile and GetSFTP processor allows you to delete the file after it is picked up so it isn't captured multiple times.

avatar

Perfect! Thank you.

avatar
Contributor

Thanks for this valuable post @Brandon Wilson. I wonder if I can create a folder structure in hdfs as in my source folder structure.

For example in my mounted directory there were store folders and there are day folders in this store folders: Store1 -> Day1, Store1 -> Day2, Store2 -> Day1 etc. I set Remote Path as my mounted directory. NIFI can read this folder and subfolders to get xml's and can write hdfs them. Can I write these xml's to the same directory as in file system?

Is there any parametric property value for this aim? Or do I have to use another process in NIFI?

Regards

avatar

Hi @Berk Ardıç,

You can achieve this type of functionality by modifying a couple additional pieces of the flow. First, you can set the GetSFTP to search recursively from your mounted directory. This will traverse the entire path rooted at your target location so it will pick up files from Store1 and Store2 directories. You then have the ability to limit this by leveraging the regex filter properties for the path and the file. This will handle the pickup side of flow.

Then, on the delivery side, you can leverage the path attribute from the flowfile to construct a new destination in HDFS that mirrors the structure from the pickup directory. You can use NiFi expression language in the destination for PutHDFS to construct the appropriate path.

Hope this helps.