Created 12-12-2018 10:29 AM
Hi,
I have a flow where it puts 50 files in destination using putsftp processor.I need to make sure that the 50 files has reached destination.I have tried using mergecontent but since its clustered nifi with 10 nodes,its not possible.Can you please suggest a solution.
Created 12-12-2018 10:30 AM
@Matt Clarkecan you please help
Created 12-12-2018 01:06 PM
@Matt Clarke Can you please help me out
Created 12-12-2018 06:30 PM
Perhaps the load distribution capability added to connections can help you here:
https://blogs.apache.org/nifi/entry/load-balancing-across-the-cluster
-
You would want to use the either the "Partition by Attribute" or "Single Node" strategy. This would allow you to move all 50 files to a single node for your mergeContent processor.
-
Thank you,
Matt
-
If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.
Created 12-13-2018 04:53 AM
@Matt Clarke we are using Nifi 1.7 version and the above said configurations are not present.I had tried using the groovy script.
def flowFiles = session.get(50) if(!flowFiles || flowFiles.size() < 50) {
session.rollback() } else
{ def flowFile = session.create(flowFiles)
session.provenanceReporter.join(flowFiles, flowFile)
session.remove(flowFiles)
session.transfer(flowFile, REL_SUCCESS) }
But the files are not getting processed,its stuck in queue.Can you please any solution
Created 01-16-2019 01:55 PM
Have you considered upgrading to NiFi 1.8 to take advantage of the load distribution capability of connections?
I am assuming you script is executing on each node in your cluster? So the script is essentially looking for 50 flowfiles on each node which would explain why it just sits there.
I am not a groovy script writer, so i am of little help there.
---
The only other option that comes to mind is incrementing a value in a distributedMapCache server per node. Then have a side flow that constantly checks the sum of those cache values until it equals 50. That flow then notifies all 50 files were written and resets the per node cache values back to zero.
Processors
Flow 1: --> PutSFTP ---> FetchDistirbutedMapCache (get current stored value for node) --> ReplaceText (Replace content with retrieved value +1) ---> PutDistributedMapCache (write new value to cache)
Flow 2: GenerateFlowFile (primary node only) ---> FetchDistributedMapCache (x3 to retreive stored cache value for each node) --> RouteOnAttribute (add relationship for when sum of all cache values equals 50, terminate unmatched) --> PutEmail (notification)
---
Thanks,
Matt