Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

NiFi RouteOnAttribute Processor: How to know when all sub-processes are done?

avatar
Super Collaborator

Suppose I have a SplitJSON processor that splits based on Customer ID, and that eventually flows to a RouteOnAttribute processor, which flows to 3 different processors "A", "B", and "C" based on attributes "IsA", "IsB", and "IsC".

And suppose these are all true for a given Customer ID.

Is there a processor that I can connect them all to that will not begin until A, B, and C are done for a given customer?

In other words, for customer X, I want to run processors A, B, and C simultaneously, and when the are all done, I want to run processor D.

Can someone point me in the right direction there?

1 ACCEPTED SOLUTION

avatar

Hi,

It sounds like something really oriented for a specific case. Then I would try something custom to answer your specific need by using InvokeScriptedProcessor. For example, if you know that you have to wait for three flow files and that you have always only one "batch execution" at a time you could connect A to D, B to D, and C to D, D being an InvokeScriptedProcessor and use a code like:

def flowFiles = session.get(3)
if (flowFiles.size() != 3) return

// my code here

This way it will wait for your three flow files to be here, and once you have the three flow files, you can do whatever you want.

Otherwise it may even be simpler to go for a custom processor.

View solution in original post

2 REPLIES 2

avatar

Hi,

It sounds like something really oriented for a specific case. Then I would try something custom to answer your specific need by using InvokeScriptedProcessor. For example, if you know that you have to wait for three flow files and that you have always only one "batch execution" at a time you could connect A to D, B to D, and C to D, D being an InvokeScriptedProcessor and use a code like:

def flowFiles = session.get(3)
if (flowFiles.size() != 3) return

// my code here

This way it will wait for your three flow files to be here, and once you have the three flow files, you can do whatever you want.

Otherwise it may even be simpler to go for a custom processor.

avatar
Super Collaborator

Thanks. I'll give this a try!