Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Output a FlowFile in a Process Group, triggered by another FlowFile

avatar
Expert Contributor

I am working on a workflow inside a Process Group. The flow inside the Process Group ends with an update on SalesForce API, but what this Process Group outputs is the original input. I will only send to the Output Port once I get a successful Response from the processor calling the SalesForce API.

What should I do within the Process Group to to have this result?

23397-nifi-canvass.png

1 ACCEPTED SOLUTION

avatar
Master Mentor
@J. D. Bacolod

The use case you describe is an exact fit for the "Wait" and "Notify" processors introduced in HDF 3.0/Apache NiFi 1.2.0.

Using these processor would work as follows:

23402-screen-shot-2017-08-03-at-91426-am.png

The input (original FlowFile) is routed to both a Wait processor and your exiting flow. The "Response" relationship from your InvokeHTTP processor would route to the corresponding Notify processor.

The Copy of the FlowFile that was routed to the Wait processor will continuously loop in the "wait" relationship until a release signal identifier for the FlowFile is written to a DistirbutedMapCache service by the Notify processor.

Thanks,

Matt

View solution in original post

3 REPLIES 3

avatar
Master Mentor
@J. D. Bacolod

The use case you describe is an exact fit for the "Wait" and "Notify" processors introduced in HDF 3.0/Apache NiFi 1.2.0.

Using these processor would work as follows:

23402-screen-shot-2017-08-03-at-91426-am.png

The input (original FlowFile) is routed to both a Wait processor and your exiting flow. The "Response" relationship from your InvokeHTTP processor would route to the corresponding Notify processor.

The Copy of the FlowFile that was routed to the Wait processor will continuously loop in the "wait" relationship until a release signal identifier for the FlowFile is written to a DistirbutedMapCache service by the Notify processor.

Thanks,

Matt

avatar
Expert Contributor

Thanks for your answer. We have NiFi below v1.2.0. Is there a workaround while we are not yet upgrading our cluster?

avatar
Master Mentor

@J. D. Bacolod

Those processors were added for specific uses cases such as yours.

You can accomplish the same thing almost using the putDistributedMapCache and FetchDistributeMapCache processors along with an UpdateAttribute processor.

23426-screen-shot-2017-08-04-at-101513-am.png

I used the UpdateAttribute processor to set a unique value in a new attribute named "release-value". In my case the value is assigned it was:

The FetchDistributedMapCache processor then acts as the wait processor did looping FlowFile in the "not-found" relationship until the corresponding value is found in the cache.

The "release-value" is written to the cache using the PutDistributedMapCache processor down the other path after the InvokeHTTP processor. It will receive the "Response" relationship.

Keep in mind, the FetchDistributedMapCache processor does not have an "expire" relationship. If a response if never received for some FlowFile or the cache expired/evicted the needed value, those FlowFiles will loop forever. You can solve this two ways:

1. Set File Expiration on the connection containing the"not-found" relationship that will purge files that have not found a matching key value in the cache by the time the FlowFile's age has reached x value. With this option aged data is just lost.

2. Build a FlowFile expire loop which kicks these looping not-found FlowFiles out of loop after x amount of time so they can be handled by other processors. This can be done using the "Advanced" UI of an UpdateAttribute processor and a RouteOnAttribute processor:

23433-screen-shot-2017-08-04-at-111003-am.png

The UpdateAttribute sets a new attribute I called "initial-date" if and only if it has not already been set on the FlowFile. This can be done as follows using the "Advanced" UI of the UpdateAttribute processor :

23435-screen-shot-2017-08-04-at-111249-am.png

The RouteOnAttribute Processor then compares the current date plus x milliseconds to that attribute's value to see if file has been looping for more the x amount of time. (Using 6 minutes (360000 ms) as an example, my RouteOnAttribute would have a property/routing rule like this:

23434-screen-shot-2017-08-04-at-111132-am.png

FlowFiles that have been looping for 360000 milliseconds or more will then get routed to "expired" relationship where you can choose what you want to do with them.

As you can see the processors wrap the above flow up in only two processors versus 5 processors you would need in older versions to get same functionality.

Thanks,

Matt