Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Is there a WAIT processor in NIFI?

avatar
Contributor

Hi, i'm working on creating a flow like below.

InvokeHttp -> EvaluateJsonPath -> InvokeHttp -> Log

The first InvokeHttp processor will call a url and run a cron job and once the cron gets started the response from server will be a job id.

EvaluateJsonPath will fetch the job id from the response.

Second InvokeHttp processor will take the job id and call the url to check whether the cron job has completed or not.

I want to create a processor just to wait for couple of minutes/seconds before passing on to the next processor as the cron job takes some time to finish.

How can i achieve this in NIFI?

1 ACCEPTED SOLUTION

avatar
Master Mentor
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
7 REPLIES 7

avatar

Hi, quickly I see two options:

- Schedule your first InvokeHttp with a cron based definition and schedule your second InvokeHttp to be cron based as well and to be executed X minutes after the first one so that you are sure to send the request when the cron job has completed.

- If your second InvokeHttp can return three different results (RUNNING, OK, KO), between your second InvokeHttp and Log, you can do a RouteOnContent (or RouteOnAttribute depending of your specs) to check if the job is still running and if yes go back to your second InvokeHttp, otherwise go to your Log processor. In this case, you would schedule your second InvokeHttp to run every X seconds for example.

Hope this helps.

avatar
Master Mentor
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar
Contributor

This casues slowdown on processing when we have large set of flowfiles in queues and delay are no more consistent there after. Any other better alternative.

avatar
New Contributor

I did it using a single processor using the penalize function:

This is the executeScript code:

#
# This processor will route the flow into the REL_FAILURE relationship penalizing if for 2h the 1st time
# after 2h the processor will route the flowfile into the success relationship.
#
flowfile = session.get() 
if flowfile is not None: 
        waitState = flowfile.getAttribute( '_wait_state' )

        # this process has no wait_state Set, so set it and penalize it
        if waitState is None: 
                flowfile = session.putAttribute( flowfile, '_wait_state', '1' )
                flowfile = session.penalize(flowfile)
                session.transfer( flowfile, REL_FAILURE )
        # this flowfile has already penalized
        else: 
                flowfile = session.removeAttribute( flowfile, '_wait_state' )
                session.transfer( flowfile, REL_SUCCESS ) 

This is the demo workflow.

12460-waitprocessor1.png

All I have to do is set the penalize value, this is how long the flowfile will stay parked into the failure queue. If this case 120 secs.

12461-screenshot-from-2017-02-14-104810.png

avatar
Explorer

love it, thanks @Alessio Palma

avatar

Thank you @Alessio Palma, this was the solution we decided to use... pretty straight forward!

avatar
New Contributor

Excellent solution, works perfectly and the best part you can copy and paste to any flow. Thanks! @ozw1z5rd1