Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to count the flowfiles from incoming queue in nifi

avatar

Hi,

I have a flow where it puts 50 files in destination using putsftp processor.I need to make sure that the 50 files has reached destination.I have tried using mergecontent but since its clustered nifi with 10 nodes,its not possible.Can you please suggest a solution.

5 REPLIES 5

avatar

@Matt Clarkecan you please help

avatar

@Matt Clarke Can you please help me out

avatar
Master Mentor

@Gillu Varghese

Perhaps the load distribution capability added to connections can help you here:

https://blogs.apache.org/nifi/entry/load-balancing-across-the-cluster

-

You would want to use the either the "Partition by Attribute" or "Single Node" strategy. This would allow you to move all 50 files to a single node for your mergeContent processor.

-

Thank you,

Matt

-

If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.

avatar

@Matt Clarke we are using Nifi 1.7 version and the above said configurations are not present.I had tried using the groovy script.

def flowFiles = session.get(50) if(!flowFiles || flowFiles.size() < 50) {

session.rollback() } else

{ def flowFile = session.create(flowFiles)

session.provenanceReporter.join(flowFiles, flowFile)

session.remove(flowFiles)

session.transfer(flowFile, REL_SUCCESS) }

But the files are not getting processed,its stuck in queue.Can you please any solution

avatar
Master Mentor

@Gillu Varghese

Have you considered upgrading to NiFi 1.8 to take advantage of the load distribution capability of connections?

I am assuming you script is executing on each node in your cluster? So the script is essentially looking for 50 flowfiles on each node which would explain why it just sits there.
I am not a groovy script writer, so i am of little help there.

---

The only other option that comes to mind is incrementing a value in a distributedMapCache server per node. Then have a side flow that constantly checks the sum of those cache values until it equals 50. That flow then notifies all 50 files were written and resets the per node cache values back to zero.

Processors
Flow 1: --> PutSFTP ---> FetchDistirbutedMapCache (get current stored value for node) --> ReplaceText (Replace content with retrieved value +1) ---> PutDistributedMapCache (write new value to cache)
Flow 2: GenerateFlowFile (primary node only) ---> FetchDistributedMapCache (x3 to retreive stored cache value for each node) --> RouteOnAttribute (add relationship for when sum of all cache values equals 50, terminate unmatched) --> PutEmail (notification)

---

Thanks,

Matt