I am reading data from a messaging queue (as text message) and the data pipeline is working fine if i store every message in a separate text file.
The next step is to Merge all the messages in to a single file (txt) and i placed MergeContent Processor before the PutFile. It endsup with the error: Error: file with the same name already exists.
The Data pipeline is as :
ConsumeMQTT (Reads Source) -- UpdateAttribute (SetsFileName and txt type) -- MergeContent (MergeFile) -- PutFile (Writes File)
ConsumeMQTT (Reads Source) -- MergeContent (MergeFile) -- PutFile (Writes File)
If i setup the "Conflict Resolution Strategy" in the PutFile to "replace", this flow is working as only the latest value gets stored in the file. I cannot append data in the same file using the data flow as above.
Does your ConsumeMQTT processor produce all output FlowFIles with the same Filename?
Your MergeContent processor will merge if both configured min settings are satisfied at the end of execution. If you have your MergeContent processor configured to run as fast as possible (run schedule set to 0 sec default). It may upon execution only see one FlowFile on incoming connection at that moment in time and put only one FlowFile in a bin and merge just that one FlowFile since you set your min num entries to "1".
I suggest you edit your MergeContent as follows:
1. Configure "Correlation Attribute Name" to "filename".
2. Perhaps increase your min setting from 1 to some higher value.
3. Always set "Max Bin Age". (This is your forced merge property, it will force a bin to merge even if it has not reach both min values within this configured amount of time.)
4. Make sure you have enough bins to accommodate the expected number of unique filenames plus 1 extra. (If all bins have allocated FlowFiles to them and the next FlowFile cannot be added to an existing bin, the oldest bin will be forced to merge to free a bin).
The NiFi putFile processor does not support append. Append actions can cause all kinds of issue especially with a NiFi cluster where the target directory of the putFile is mounted to all the NiFi nodes.
You can't have multiple nodes trying to write/append to the same file at the same time.
My suggestion would be to use the UpdateAttribute processor before your putFile to modify the filename attribute. Perhaps prepend a uuid to the filename to ensure uniqueness across multiple files or NiFi nodes (if clustered).
Hope this helps you,