Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Merge Content processors with dependency from route attribute

avatar
Expert Contributor

Hello,

Is it possible for me execute Merge Content processor from a route attribute?

For example, I have 1 flow that constantly pulls in data from a sql server, merges the files and puts it into hdfs. The condition for the merge is 5 min or 100mb.

ExecuteSQL-MergeFiles->PutHDFS

Additionally, I have another flow that loops through a list and once it reaches a certain number it updates a file. This is done through route Attribute.

ExecuteSQL(same one as above) -> RouteOnAttribute -> IncrementCounter OR UpdateFile if condition met

Here's a screenshot for better understanding of flow:

5612-screen-shot-2016-07-07-at-22833-pm.png

At the bottom corner there you'll see the UpdateAttribute is merging the files and putting into HDFS. At the same time its going to RouteOnAttribute.

What I'm looking to do is when RouteOnAttribute is matched, MergeContent and PutHDFS gets executed. I can't have the MergeContent after the "Matched" RouteOnAttribute since I need to store the results of each ExecuteQuery.

My temp solution right now is to set a max files in MergeContent equal to the RouteOnAttribute "Matched" count but looking to see if there were other ways to do this.

The reason i'm doing this is because I want to ensure that before moving onto the next cycle (RouteOnAttribute - Matched) all the files are in HDFs.

Thanks,

1 ACCEPTED SOLUTION

avatar
Super Mentor

@mliem NiFi components (Processors, RPGs, input/output ports, etc...) are designed to run asynchronous. There is no mechanism built in to NiFi for triggering one processor to run as a result of another processor completing its job. That being said, everything you can do via the UI can be done as well through calls directly to the NiFi API. You may consider playing around with the capability using the invokeHTTP processor to make calls to the NiFi API to start and stop specific processor at specific points in your dataflow. Once a processor is started it will run retrieving a thread from the controller to do so. Stopping That processor will not kill that thread, the processor will simply not be scheduled to run again and will be in a state of "stopping" during that time frame.. You can not start a processor that is still "stopping". So you want to be careful where you invoke your start and stop actions. (For example, following your "matched" criteria you start the mergeContent and after the mergeContent you invoke the stop of the mergeContent.)

For speed and efficiency's sake, I would look for ways to keep your flow asychronous in design.

If you do choose to go this route, I would also build some monitoring into your flow using the monitorActivity processor. This processor can be used to monitor that data continues to flow based upon some configured threshold. If that threshold is exceeded it generates a FlowFile that can be routed to a putEmail processor (as and example) to alert someone that the dataflow is down. This is a safety net so to speak in the event one of your api calls fails for some reason (Network hicup for example).

Thanks,

Matt

View solution in original post

4 REPLIES 4

avatar
Super Mentor

It may be helpful to understand your dataflow better if you can paste a screenshot of the second dataflow you want to alter.

avatar
Expert Contributor

@mclark thanks. added

avatar
Super Mentor

@mliem NiFi components (Processors, RPGs, input/output ports, etc...) are designed to run asynchronous. There is no mechanism built in to NiFi for triggering one processor to run as a result of another processor completing its job. That being said, everything you can do via the UI can be done as well through calls directly to the NiFi API. You may consider playing around with the capability using the invokeHTTP processor to make calls to the NiFi API to start and stop specific processor at specific points in your dataflow. Once a processor is started it will run retrieving a thread from the controller to do so. Stopping That processor will not kill that thread, the processor will simply not be scheduled to run again and will be in a state of "stopping" during that time frame.. You can not start a processor that is still "stopping". So you want to be careful where you invoke your start and stop actions. (For example, following your "matched" criteria you start the mergeContent and after the mergeContent you invoke the stop of the mergeContent.)

For speed and efficiency's sake, I would look for ways to keep your flow asychronous in design.

If you do choose to go this route, I would also build some monitoring into your flow using the monitorActivity processor. This processor can be used to monitor that data continues to flow based upon some configured threshold. If that threshold is exceeded it generates a FlowFile that can be routed to a putEmail processor (as and example) to alert someone that the dataflow is down. This is a safety net so to speak in the event one of your api calls fails for some reason (Network hicup for example).

Thanks,

Matt

avatar
Expert Contributor

@mclark Great suggestion, thanks! Will definitely take a look at incorporating invokeHTTP.