Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to count the flowfiles from incoming queue in nifi

How to count the flowfiles from incoming queue in nifi

New Contributor

Hi,

I have a flow where it puts 50 files in destination using putsftp processor.I need to make sure that the 50 files has reached destination.I have tried using mergecontent but since its clustered nifi with 10 nodes,its not possible.Can you please suggest a solution.

5 REPLIES 5

Re: How to count the flowfiles from incoming queue in nifi

New Contributor

@Matt Clarkecan you please help

Re: How to count the flowfiles from incoming queue in nifi

New Contributor

@Matt Clarke Can you please help me out

Re: How to count the flowfiles from incoming queue in nifi

Master Guru

@Gillu Varghese

Perhaps the load distribution capability added to connections can help you here:

https://blogs.apache.org/nifi/entry/load-balancing-across-the-cluster

-

You would want to use the either the "Partition by Attribute" or "Single Node" strategy. This would allow you to move all 50 files to a single node for your mergeContent processor.

-

Thank you,

Matt

-

If you found this answer addressed your question, please take a moment to login in and click the "ACCEPT" link.

Re: How to count the flowfiles from incoming queue in nifi

New Contributor

@Matt Clarke we are using Nifi 1.7 version and the above said configurations are not present.I had tried using the groovy script.

def flowFiles = session.get(50) if(!flowFiles || flowFiles.size() < 50) {

session.rollback() } else

{ def flowFile = session.create(flowFiles)

session.provenanceReporter.join(flowFiles, flowFile)

session.remove(flowFiles)

session.transfer(flowFile, REL_SUCCESS) }

But the files are not getting processed,its stuck in queue.Can you please any solution

Re: How to count the flowfiles from incoming queue in nifi

Master Guru

@Gillu Varghese

Have you considered upgrading to NiFi 1.8 to take advantage of the load distribution capability of connections?

I am assuming you script is executing on each node in your cluster? So the script is essentially looking for 50 flowfiles on each node which would explain why it just sits there.
I am not a groovy script writer, so i am of little help there.

---

The only other option that comes to mind is incrementing a value in a distributedMapCache server per node. Then have a side flow that constantly checks the sum of those cache values until it equals 50. That flow then notifies all 50 files were written and resets the per node cache values back to zero.

Processors
Flow 1: --> PutSFTP ---> FetchDistirbutedMapCache (get current stored value for node) --> ReplaceText (Replace content with retrieved value +1) ---> PutDistributedMapCache (write new value to cache)
Flow 2: GenerateFlowFile (primary node only) ---> FetchDistributedMapCache (x3 to retreive stored cache value for each node) --> RouteOnAttribute (add relationship for when sum of all cache values equals 50, terminate unmatched) --> PutEmail (notification)

---

Thanks,

Matt

Don't have an account?
Coming from Hortonworks? Activate your account here