Support Questions

Find answers, ask questions, and share your expertise

Append the data to the text file

avatar
Rising Star

We have a requirement to collect all the data that we receive for one day in one text file. We get the data at different time intervals, We need to collect all day on a text file with the filename as currentdate.txt.

Let's say, on 24th July 2024 we received some data like "This is data 1" in the morning then we need to create a new file with the name "24072024.txt" and add the data to the file. Later in the afternoon, we received data like "This is data2".  then this data needs to be added to the text file - "24072024.txt".  Also if the file does not exist then we need to create the new file with that date. 

So please help us with the NiFi flow how to append the text in the file if it exists and then how to handle the condition to create a new text file if it does not exist.

2 ACCEPTED SOLUTIONS

avatar
Super Guru

Hi,

If you search the internet you might find something that would help in your case. Please refer to this  this: https://stackoverflow.com/questions/37530121/putfile-append-file

Here is my suggestion  if you dont want to write\use some custom code.

SAMSAL_0-1721998065442.png

Basically, if you think of the New Content (GenerateFlowFile) as the new data that you got , the idea is to do a Merge regardless, the trick is when you try to Fetch the file if it exist you will end up merging the content of the old to the new, and if it doesnt exist (not.found relationship) you will merge new content to empty string (using ReplaceText from not.found rel) . However there are few caveat with this design that you might need to address:

1- Since the Merge Content uses Text Strategy ( see config below) and it uses newline for Demarcator (delimiter). If the file doesnt exist (first time) it will add empty newline towards the end. you can solve this by using replace text or other methods so its not big deal.

2- The merge order is not predictable so you might get the new content at the top vs being appended to the bottom. That is because the merge content gets the New Content first before the old content. If the order is important , then you can use something like EnforceOrder Processor where you set an integer attribute for order priority on each content. If the order is not important then you can ignore.

3-You need to preserve the filename since MergeContent will produce new file name , however the MergeContent reads an attribute "segment.original.filename" if it finds it , it will use whatever filename specified there.

With that here are the different processor configurations:

1 - GenerateFlowFile (New Content):

SAMSAL_1-1721999166241.png

- filename: simulate having file content with filename (you might have that already)

- segment.original.filename : this is used by MergeContent to set the correct filename after merge.

2- FetchFile:

SAMSAL_2-1721999356244.png

Make sure to set the marked properties as such.

Also Important, under settings tab make sure to set the Penalty  Duration to 0 sec. This is because for some reason when file is not found and the flowfile is directed to not.found relationship it will get penalized! not sure why even when I set the Log Level for Not Found to None.

3- ReplaceText (Replace Orig with Empty String):

SAMSAL_3-1721999735220.png

 

4- Merge Content:

SAMSAL_9-1722000355297.png

SAMSAL_7-1721999974756.png

For the Demarcator (delimiter) use Shift+Enter for the newline character

5- PutFile:

SAMSAL_8-1722000010709.png

 

Hope that helps. if it does please accept the solution and feel free to add your input in case you were able to make it work for your case so other can benefit.

Thanks

 

 

View solution in original post

avatar
Master Mentor

@NagendraKumar 

This is an expensive resource wise use case for NiFi.

Your goal here seems simple, but really isn't because how NiFi is designed to be used to process lost of data in a concurrent fashion.

The PutFile processor does not support an append option just like similar other processor don't.   Reason being concurrency.  Let's consider the typical deployment of NiFi is a muli-node cluster.  Each node loads its  own copy of the dataflows and executes against only the FlowFiles queued in that specific node with not awareness of what queued data may exists on other nodes. 

Now lets look at your use case and how typical data consumption would happen in a multi-node cluster.  The data may be available to all nodes locally as a mounted disk or only available on one node (not sure of your setup here). 
- You want to consume a file (that may or may not have been consumed earlier in the day with same filename?) and append any new data for same filename to an existing file if it already exists in the target directory?
- OR your source directory does not have a consistent file name each day and you just want to consume any file from source directory regardless of filename and append to a file with the current day as its filename?
The strategy is a little different for each of these use cases.

You would typically have a ListFile processor (configured to execute on primary node only) that list new files in the source directory (generates a zero byte FlowFile with various metadata/attributes about that file).  This would then feed into a FetchFile processor that retrieves the content for that File and adds it to the listed FlowFile.  This is a common setup for a multi-node cluster where source is reachable form all nodes.  This setup allows you distribute those zero byte FlowFiles listed by only the primary across all your nodes so each node can fetch content for unique FlowFile (spread resource usage across all nodes).  Even if you are using a single instance of NiFi, it is better to design flows with a multi-node cluster in mind should you ever need to scale out to a cluster later.  The challenge here you really can't have two nodes or even set higher concurrency locally on putFile because two threads could not be appending to same file at same time.   This is why append is not an option.

Now as far as designing a dataflow that that would work on a single NiFi instance, this might be possible through some creative design.    My design is much like the one provided by @SAMSAL.. I just try to take into account some controls over concurrency to avoid multiple concurrent transactions possible resulting in some lost data and design a dataflow that handles when things go as planned and when they do not.

You start with :
1. ListFile processor configured to consume from source directory.
2. Add new process group. Configure this process group with "Single FlowFile Per Node"  in the "Process Group FlowFile Concurrency" property.
3. Enter the process group where you will have a blank canvas. Add an input port. Add an Update Attribute processor. Connect the input port to this Update Attribute processor. 
4. In this Update Attribute processor we are going to create a custom property with name "fragment.identifier" with a value of "${UUID()}" (this create a unique ID for the fragment identifier).  Add a second dynamic property with name "fragment.count" and value of "2".

MattWho_6-1722006909672.png

5. We will now add two more Update Attribute processors.  We will drag a connection from the First UpdateAttribute processor to twice (once to each of these newly added Update Attribute processors.
6. Go back to parent process group and connect you ListFile to the process group input port.
Flow should look like this at  this point in time ( I numbered UpdateAttribute processors to avoid confusion moving forward):

MattWho_0-1722005331133.png
and inside the child process group you should have:

MattWho_2-1722005771249.png

Navigate back into the child processor group to continue building this dataflow.
Since NiFi does not support append into an existing target file, the goal here is to fetch both the new content from source directory (UpdateAttribute 2)  and and the existing file from target directory (UpdateAttribute 3).
7. Configure UpdateAttribute2 with one new custom property with name "fragment.index" and value "2" since we want new content added after original content.

MattWho_4-1722006332456.png

8. Configure UpdateAttribute 3 with three new dynamic properties. One with name "absolute.path" and value set to absolute path of target .  Set other dynamic property with name "fragment.index" and value of "1" since we want this content before new content. Create a third dynamic property with name "filename" with a value of "${now():format('ddMMyyyy')}.txt". 

MattWho_7-1722006949780.png

9. Add a FetchFile processor and connect success from UpdateAttribute2 to it.  Don't change the default configured properties (I named mine "FetchFile - fetch new data"
10 Add another FetchFile processor and connect success from UpdateAttribute3 to it. In only this FetchFile edit the "File to Fetch" property with value "${absolute.path}/${target.filename}" so that this processor fetch content for existing daily fie from target directory.  (I named this FetchFile "FetchFile - fetch existing data")

MattWho_8-1722007222180.png

11. Add a funnel.  Connect "success" from "File to Fetch" to it.  Connect both "success" and "not.found from "FetchFile - fetch existing data" to same funnel. (not.found needs to be route to funnel to handle use case where new ingested file is first for the day so target directory does not yet have that days file).
12. Add a Merge Content processor (configured to use "Merge Strategy" set to "Defragment" and "Attribute Strategy" set to "Keep All Unique Attributes")

MattWho_9-1722007449477.png

13. Add another UpdateAttribute processor.  Add a dynamic property with name "filename" and value set to "target.filename".  This is necessary to make sure we maintain writing out same file date we have been working with since ingestion.  Connect the "merged" relationship from MergeContent to this UpdateAttribute.  If you were to dynamically set the target filename in putFile, you run the risk that a file may be ingested on day 27 but crest in to day 28 before the putfile.

MattWho_10-1722007828586.png

14. Add your PutFile processor and connect Success From above updateAttribute to it.  Configure your PutFile with the target directory path and replace strategy to overwrite exiting file unless you had FetchFile delete it earlier in yoru flow.

MattWho_12-1722008916904.png

 



The Entire flow inside the child process group should look something like this:

MattWho_11-1722008150370.png

NOTE: You'll see in above flow some failure, permission denied, and a single not.found relationships you need to deal with in unexpected conditions that may result in FlowFile routing to one of these.  Would not expect under normal execution to have any FlowFiles route to these.

The concurrency rules on the child process group will make sure this child process group flow completes before allowing another FlowFile to enter for processing.

So you can see how complicated this use case is for NiFi.
I do not know how often your ListFile will be polling for new source files.  I do not know how large you expect your target file to grow.  So if you are trying to use NiFi like a logger that is constantly appending to the file you can see how expensive this flow would get CPU and disk I/O as it needs to constantly ingest the latest target file to append to each time.  if your source file is some file that is being appended to constantly through out the day, maybe configure your NiFi ListFile to run only once an hour.  Then you limit your source and target files fetched to only 24 times per day.  As the day goes on and thee files get larger, there will be more disk I/O impact.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

6 REPLIES 6

avatar
Super Guru

Hi @NagendraKumar ,

Its hard to design something like this here since we dont know the specifics. However there are a lot of processors that can help with file manipulation like : GetFile, ListFile, FetchFile, PutFile. MergeContents . Also not sure if you will encounter concurrency issues where multiple files arrive around the same time. My advise is that you start experimenting with those processor and if you run into specific issue you can post about here and hopefully someone will be able to assist.

 

avatar
Rising Star

Hi @SAMSAL ,

Thanks a lot for your valuable input!

Yes, We are using the GetFile to read the content and PutFile to write the content to the text file. Please let us know if there is any option to update the text file with the content from the flow file instead of replacing or creating a new text file. MergeContent will be used to merge the content of the different flow file items and write them in the file. However, we have a requirement to update or append the existing text file in the folder. Thanks!

avatar
Super Guru

Hi,

If you search the internet you might find something that would help in your case. Please refer to this  this: https://stackoverflow.com/questions/37530121/putfile-append-file

Here is my suggestion  if you dont want to write\use some custom code.

SAMSAL_0-1721998065442.png

Basically, if you think of the New Content (GenerateFlowFile) as the new data that you got , the idea is to do a Merge regardless, the trick is when you try to Fetch the file if it exist you will end up merging the content of the old to the new, and if it doesnt exist (not.found relationship) you will merge new content to empty string (using ReplaceText from not.found rel) . However there are few caveat with this design that you might need to address:

1- Since the Merge Content uses Text Strategy ( see config below) and it uses newline for Demarcator (delimiter). If the file doesnt exist (first time) it will add empty newline towards the end. you can solve this by using replace text or other methods so its not big deal.

2- The merge order is not predictable so you might get the new content at the top vs being appended to the bottom. That is because the merge content gets the New Content first before the old content. If the order is important , then you can use something like EnforceOrder Processor where you set an integer attribute for order priority on each content. If the order is not important then you can ignore.

3-You need to preserve the filename since MergeContent will produce new file name , however the MergeContent reads an attribute "segment.original.filename" if it finds it , it will use whatever filename specified there.

With that here are the different processor configurations:

1 - GenerateFlowFile (New Content):

SAMSAL_1-1721999166241.png

- filename: simulate having file content with filename (you might have that already)

- segment.original.filename : this is used by MergeContent to set the correct filename after merge.

2- FetchFile:

SAMSAL_2-1721999356244.png

Make sure to set the marked properties as such.

Also Important, under settings tab make sure to set the Penalty  Duration to 0 sec. This is because for some reason when file is not found and the flowfile is directed to not.found relationship it will get penalized! not sure why even when I set the Log Level for Not Found to None.

3- ReplaceText (Replace Orig with Empty String):

SAMSAL_3-1721999735220.png

 

4- Merge Content:

SAMSAL_9-1722000355297.png

SAMSAL_7-1721999974756.png

For the Demarcator (delimiter) use Shift+Enter for the newline character

5- PutFile:

SAMSAL_8-1722000010709.png

 

Hope that helps. if it does please accept the solution and feel free to add your input in case you were able to make it work for your case so other can benefit.

Thanks

 

 

avatar
Master Mentor

@NagendraKumar 

This is an expensive resource wise use case for NiFi.

Your goal here seems simple, but really isn't because how NiFi is designed to be used to process lost of data in a concurrent fashion.

The PutFile processor does not support an append option just like similar other processor don't.   Reason being concurrency.  Let's consider the typical deployment of NiFi is a muli-node cluster.  Each node loads its  own copy of the dataflows and executes against only the FlowFiles queued in that specific node with not awareness of what queued data may exists on other nodes. 

Now lets look at your use case and how typical data consumption would happen in a multi-node cluster.  The data may be available to all nodes locally as a mounted disk or only available on one node (not sure of your setup here). 
- You want to consume a file (that may or may not have been consumed earlier in the day with same filename?) and append any new data for same filename to an existing file if it already exists in the target directory?
- OR your source directory does not have a consistent file name each day and you just want to consume any file from source directory regardless of filename and append to a file with the current day as its filename?
The strategy is a little different for each of these use cases.

You would typically have a ListFile processor (configured to execute on primary node only) that list new files in the source directory (generates a zero byte FlowFile with various metadata/attributes about that file).  This would then feed into a FetchFile processor that retrieves the content for that File and adds it to the listed FlowFile.  This is a common setup for a multi-node cluster where source is reachable form all nodes.  This setup allows you distribute those zero byte FlowFiles listed by only the primary across all your nodes so each node can fetch content for unique FlowFile (spread resource usage across all nodes).  Even if you are using a single instance of NiFi, it is better to design flows with a multi-node cluster in mind should you ever need to scale out to a cluster later.  The challenge here you really can't have two nodes or even set higher concurrency locally on putFile because two threads could not be appending to same file at same time.   This is why append is not an option.

Now as far as designing a dataflow that that would work on a single NiFi instance, this might be possible through some creative design.    My design is much like the one provided by @SAMSAL.. I just try to take into account some controls over concurrency to avoid multiple concurrent transactions possible resulting in some lost data and design a dataflow that handles when things go as planned and when they do not.

You start with :
1. ListFile processor configured to consume from source directory.
2. Add new process group. Configure this process group with "Single FlowFile Per Node"  in the "Process Group FlowFile Concurrency" property.
3. Enter the process group where you will have a blank canvas. Add an input port. Add an Update Attribute processor. Connect the input port to this Update Attribute processor. 
4. In this Update Attribute processor we are going to create a custom property with name "fragment.identifier" with a value of "${UUID()}" (this create a unique ID for the fragment identifier).  Add a second dynamic property with name "fragment.count" and value of "2".

MattWho_6-1722006909672.png

5. We will now add two more Update Attribute processors.  We will drag a connection from the First UpdateAttribute processor to twice (once to each of these newly added Update Attribute processors.
6. Go back to parent process group and connect you ListFile to the process group input port.
Flow should look like this at  this point in time ( I numbered UpdateAttribute processors to avoid confusion moving forward):

MattWho_0-1722005331133.png
and inside the child process group you should have:

MattWho_2-1722005771249.png

Navigate back into the child processor group to continue building this dataflow.
Since NiFi does not support append into an existing target file, the goal here is to fetch both the new content from source directory (UpdateAttribute 2)  and and the existing file from target directory (UpdateAttribute 3).
7. Configure UpdateAttribute2 with one new custom property with name "fragment.index" and value "2" since we want new content added after original content.

MattWho_4-1722006332456.png

8. Configure UpdateAttribute 3 with three new dynamic properties. One with name "absolute.path" and value set to absolute path of target .  Set other dynamic property with name "fragment.index" and value of "1" since we want this content before new content. Create a third dynamic property with name "filename" with a value of "${now():format('ddMMyyyy')}.txt". 

MattWho_7-1722006949780.png

9. Add a FetchFile processor and connect success from UpdateAttribute2 to it.  Don't change the default configured properties (I named mine "FetchFile - fetch new data"
10 Add another FetchFile processor and connect success from UpdateAttribute3 to it. In only this FetchFile edit the "File to Fetch" property with value "${absolute.path}/${target.filename}" so that this processor fetch content for existing daily fie from target directory.  (I named this FetchFile "FetchFile - fetch existing data")

MattWho_8-1722007222180.png

11. Add a funnel.  Connect "success" from "File to Fetch" to it.  Connect both "success" and "not.found from "FetchFile - fetch existing data" to same funnel. (not.found needs to be route to funnel to handle use case where new ingested file is first for the day so target directory does not yet have that days file).
12. Add a Merge Content processor (configured to use "Merge Strategy" set to "Defragment" and "Attribute Strategy" set to "Keep All Unique Attributes")

MattWho_9-1722007449477.png

13. Add another UpdateAttribute processor.  Add a dynamic property with name "filename" and value set to "target.filename".  This is necessary to make sure we maintain writing out same file date we have been working with since ingestion.  Connect the "merged" relationship from MergeContent to this UpdateAttribute.  If you were to dynamically set the target filename in putFile, you run the risk that a file may be ingested on day 27 but crest in to day 28 before the putfile.

MattWho_10-1722007828586.png

14. Add your PutFile processor and connect Success From above updateAttribute to it.  Configure your PutFile with the target directory path and replace strategy to overwrite exiting file unless you had FetchFile delete it earlier in yoru flow.

MattWho_12-1722008916904.png

 



The Entire flow inside the child process group should look something like this:

MattWho_11-1722008150370.png

NOTE: You'll see in above flow some failure, permission denied, and a single not.found relationships you need to deal with in unexpected conditions that may result in FlowFile routing to one of these.  Would not expect under normal execution to have any FlowFiles route to these.

The concurrency rules on the child process group will make sure this child process group flow completes before allowing another FlowFile to enter for processing.

So you can see how complicated this use case is for NiFi.
I do not know how often your ListFile will be polling for new source files.  I do not know how large you expect your target file to grow.  So if you are trying to use NiFi like a logger that is constantly appending to the file you can see how expensive this flow would get CPU and disk I/O as it needs to constantly ingest the latest target file to append to each time.  if your source file is some file that is being appended to constantly through out the day, maybe configure your NiFi ListFile to run only once an hour.  Then you limit your source and target files fetched to only 24 times per day.  As the day goes on and thee files get larger, there will be more disk I/O impact.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Rising Star

Thanks alot  @MattWho for your expertise!

avatar
Rising Star

Thanks alot @SAMSAL ! This solution works and thanks once again for the detailed explanation