Support Questions

Find answers, ask questions, and share your expertise

Is there a WAIT processor in NIFI?

avatar
Contributor

Hi, i'm working on creating a flow like below.

InvokeHttp -> EvaluateJsonPath -> InvokeHttp -> Log

The first InvokeHttp processor will call a url and run a cron job and once the cron gets started the response from server will be a job id.

EvaluateJsonPath will fetch the job id from the response.

Second InvokeHttp processor will take the job id and call the url to check whether the cron job has completed or not.

I want to create a processor just to wait for couple of minutes/seconds before passing on to the next processor as the cron job takes some time to finish.

How can i achieve this in NIFI?

1 ACCEPTED SOLUTION

avatar
Master Mentor
@Pravin Battula

As a third option you could also build a flow to create the delay you are looking for.

This can be done using the UpdateAttribute and RouteOnAttribute processors.

Here is an example that causes a 5 minute delay to all FlowFiles that pass through these two processors:

7613-screen-shot-2016-09-14-at-85500-am.png

The value returned by the now() function is the current epoch time in milliseconds. To add 5 minutes we need to add 300,000 milliseconds to the current time and store tat as a new attribute on the FlowFile. We then check that new attribute against the current time in the routeOnAttribute processor. If the current time is not greater then the delayed time, the FlowFile is routed to unmatched. So here the FlowFile would be stuck in this loop for ~5 minutes. You cab adjust the run schedule on the RouteOnAttribute processor to the desired interval you want to re-check the file(s). 0 sec is the default, but I would recommend changing that to at least 1 sec.

Thanks,

Matt

View solution in original post

7 REPLIES 7

avatar

Hi, quickly I see two options:

- Schedule your first InvokeHttp with a cron based definition and schedule your second InvokeHttp to be cron based as well and to be executed X minutes after the first one so that you are sure to send the request when the cron job has completed.

- If your second InvokeHttp can return three different results (RUNNING, OK, KO), between your second InvokeHttp and Log, you can do a RouteOnContent (or RouteOnAttribute depending of your specs) to check if the job is still running and if yes go back to your second InvokeHttp, otherwise go to your Log processor. In this case, you would schedule your second InvokeHttp to run every X seconds for example.

Hope this helps.

avatar
Master Mentor
@Pravin Battula

As a third option you could also build a flow to create the delay you are looking for.

This can be done using the UpdateAttribute and RouteOnAttribute processors.

Here is an example that causes a 5 minute delay to all FlowFiles that pass through these two processors:

7613-screen-shot-2016-09-14-at-85500-am.png

The value returned by the now() function is the current epoch time in milliseconds. To add 5 minutes we need to add 300,000 milliseconds to the current time and store tat as a new attribute on the FlowFile. We then check that new attribute against the current time in the routeOnAttribute processor. If the current time is not greater then the delayed time, the FlowFile is routed to unmatched. So here the FlowFile would be stuck in this loop for ~5 minutes. You cab adjust the run schedule on the RouteOnAttribute processor to the desired interval you want to re-check the file(s). 0 sec is the default, but I would recommend changing that to at least 1 sec.

Thanks,

Matt

avatar
Contributor

This casues slowdown on processing when we have large set of flowfiles in queues and delay are no more consistent there after. Any other better alternative.

avatar
New Contributor

I did it using a single processor using the penalize function:

This is the executeScript code:

#
# This processor will route the flow into the REL_FAILURE relationship penalizing if for 2h the 1st time
# after 2h the processor will route the flowfile into the success relationship.
#
flowfile = session.get() 
if flowfile is not None: 
        waitState = flowfile.getAttribute( '_wait_state' )

        # this process has no wait_state Set, so set it and penalize it
        if waitState is None: 
                flowfile = session.putAttribute( flowfile, '_wait_state', '1' )
                flowfile = session.penalize(flowfile)
                session.transfer( flowfile, REL_FAILURE )
        # this flowfile has already penalized
        else: 
                flowfile = session.removeAttribute( flowfile, '_wait_state' )
                session.transfer( flowfile, REL_SUCCESS ) 

This is the demo workflow.

12460-waitprocessor1.png

All I have to do is set the penalize value, this is how long the flowfile will stay parked into the failure queue. If this case 120 secs.

12461-screenshot-from-2017-02-14-104810.png

avatar
Explorer

love it, thanks @Alessio Palma

avatar

Thank you @Alessio Palma, this was the solution we decided to use... pretty straight forward!

avatar
New Contributor

Excellent solution, works perfectly and the best part you can copy and paste to any flow. Thanks! @ozw1z5rd1