Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi RouteOnAttribute Processor: How to know when all sub-processes are done?

avatar
Super Collaborator

Suppose I have a SplitJSON processor that splits based on Customer ID, and that eventually flows to a RouteOnAttribute processor, which flows to 3 different processors "A", "B", and "C" based on attributes "IsA", "IsB", and "IsC".

And suppose these are all true for a given Customer ID.

Is there a processor that I can connect them all to that will not begin until A, B, and C are done for a given customer?

In other words, for customer X, I want to run processors A, B, and C simultaneously, and when the are all done, I want to run processor D.

Can someone point me in the right direction there?

1 ACCEPTED SOLUTION

avatar

Hi,

It sounds like something really oriented for a specific case. Then I would try something custom to answer your specific need by using InvokeScriptedProcessor. For example, if you know that you have to wait for three flow files and that you have always only one "batch execution" at a time you could connect A to D, B to D, and C to D, D being an InvokeScriptedProcessor and use a code like:

def flowFiles = session.get(3)
if (flowFiles.size() != 3) return

// my code here

This way it will wait for your three flow files to be here, and once you have the three flow files, you can do whatever you want.

Otherwise it may even be simpler to go for a custom processor.

View solution in original post

2 REPLIES 2

avatar

Hi,

It sounds like something really oriented for a specific case. Then I would try something custom to answer your specific need by using InvokeScriptedProcessor. For example, if you know that you have to wait for three flow files and that you have always only one "batch execution" at a time you could connect A to D, B to D, and C to D, D being an InvokeScriptedProcessor and use a code like:

def flowFiles = session.get(3)
if (flowFiles.size() != 3) return

// my code here

This way it will wait for your three flow files to be here, and once you have the three flow files, you can do whatever you want.

Otherwise it may even be simpler to go for a custom processor.

avatar
Super Collaborator

Thanks. I'll give this a try!