Created on 07-26-2024 08:52 AM - edited 07-26-2024 08:57 AM
@NagendraKumar
This is an expensive resource wise use case for NiFi.
Your goal here seems simple, but really isn't because how NiFi is designed to be used to process lost of data in a concurrent fashion.
The PutFile processor does not support an append option just like similar other processor don't. Reason being concurrency. Let's consider the typical deployment of NiFi is a muli-node cluster. Each node loads its own copy of the dataflows and executes against only the FlowFiles queued in that specific node with not awareness of what queued data may exists on other nodes.
Now lets look at your use case and how typical data consumption would happen in a multi-node cluster. The data may be available to all nodes locally as a mounted disk or only available on one node (not sure of your setup here).
- You want to consume a file (that may or may not have been consumed earlier in the day with same filename?) and append any new data for same filename to an existing file if it already exists in the target directory?
- OR your source directory does not have a consistent file name each day and you just want to consume any file from source directory regardless of filename and append to a file with the current day as its filename?
The strategy is a little different for each of these use cases.
You would typically have a ListFile processor (configured to execute on primary node only) that list new files in the source directory (generates a zero byte FlowFile with various metadata/attributes about that file). This would then feed into a FetchFile processor that retrieves the content for that File and adds it to the listed FlowFile. This is a common setup for a multi-node cluster where source is reachable form all nodes. This setup allows you distribute those zero byte FlowFiles listed by only the primary across all your nodes so each node can fetch content for unique FlowFile (spread resource usage across all nodes). Even if you are using a single instance of NiFi, it is better to design flows with a multi-node cluster in mind should you ever need to scale out to a cluster later. The challenge here you really can't have two nodes or even set higher concurrency locally on putFile because two threads could not be appending to same file at same time. This is why append is not an option.
Now as far as designing a dataflow that that would work on a single NiFi instance, this might be possible through some creative design. My design is much like the one provided by @SAMSAL.. I just try to take into account some controls over concurrency to avoid multiple concurrent transactions possible resulting in some lost data and design a dataflow that handles when things go as planned and when they do not.
You start with :
1. ListFile processor configured to consume from source directory.
2. Add new process group. Configure this process group with "Single FlowFile Per Node" in the "Process Group FlowFile Concurrency" property.
3. Enter the process group where you will have a blank canvas. Add an input port. Add an Update Attribute processor. Connect the input port to this Update Attribute processor.
4. In this Update Attribute processor we are going to create a custom property with name "fragment.identifier" with a value of "${UUID()}" (this create a unique ID for the fragment identifier). Add a second dynamic property with name "fragment.count" and value of "2".
5. We will now add two more Update Attribute processors. We will drag a connection from the First UpdateAttribute processor to twice (once to each of these newly added Update Attribute processors.
6. Go back to parent process group and connect you ListFile to the process group input port.
Flow should look like this at this point in time ( I numbered UpdateAttribute processors to avoid confusion moving forward):
and inside the child process group you should have:
Navigate back into the child processor group to continue building this dataflow.
Since NiFi does not support append into an existing target file, the goal here is to fetch both the new content from source directory (UpdateAttribute 2) and and the existing file from target directory (UpdateAttribute 3).
7. Configure UpdateAttribute2 with one new custom property with name "fragment.index" and value "2" since we want new content added after original content.
8. Configure UpdateAttribute 3 with three new dynamic properties. One with name "absolute.path" and value set to absolute path of target . Set other dynamic property with name "fragment.index" and value of "1" since we want this content before new content. Create a third dynamic property with name "filename" with a value of "${now():format('ddMMyyyy')}.txt".
9. Add a FetchFile processor and connect success from UpdateAttribute2 to it. Don't change the default configured properties (I named mine "FetchFile - fetch new data"
10 Add another FetchFile processor and connect success from UpdateAttribute3 to it. In only this FetchFile edit the "File to Fetch" property with value "${absolute.path}/${target.filename}" so that this processor fetch content for existing daily fie from target directory. (I named this FetchFile "FetchFile - fetch existing data")
11. Add a funnel. Connect "success" from "File to Fetch" to it. Connect both "success" and "not.found from "FetchFile - fetch existing data" to same funnel. (not.found needs to be route to funnel to handle use case where new ingested file is first for the day so target directory does not yet have that days file).
12. Add a Merge Content processor (configured to use "Merge Strategy" set to "Defragment" and "Attribute Strategy" set to "Keep All Unique Attributes")
13. Add another UpdateAttribute processor. Add a dynamic property with name "filename" and value set to "target.filename". This is necessary to make sure we maintain writing out same file date we have been working with since ingestion. Connect the "merged" relationship from MergeContent to this UpdateAttribute. If you were to dynamically set the target filename in putFile, you run the risk that a file may be ingested on day 27 but crest in to day 28 before the putfile.
14. Add your PutFile processor and connect Success From above updateAttribute to it. Configure your PutFile with the target directory path and replace strategy to overwrite exiting file unless you had FetchFile delete it earlier in yoru flow.
The Entire flow inside the child process group should look something like this:
NOTE: You'll see in above flow some failure, permission denied, and a single not.found relationships you need to deal with in unexpected conditions that may result in FlowFile routing to one of these. Would not expect under normal execution to have any FlowFiles route to these.
The concurrency rules on the child process group will make sure this child process group flow completes before allowing another FlowFile to enter for processing.
So you can see how complicated this use case is for NiFi.
I do not know how often your ListFile will be polling for new source files. I do not know how large you expect your target file to grow. So if you are trying to use NiFi like a logger that is constantly appending to the file you can see how expensive this flow would get CPU and disk I/O as it needs to constantly ingest the latest target file to append to each time. if your source file is some file that is being appended to constantly through out the day, maybe configure your NiFi ListFile to run only once an hour. Then you limit your source and target files fetched to only 24 times per day. As the day goes on and thee files get larger, there will be more disk I/O impact.
Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt