Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Process Group variables per node in cluster mode

New Contributor

71510-5-6-2018-11-18-56-am.png

Hi,

I'm designing a system that does some work triggered by HTTP requests.

Requests are distributed via remote group to different nodes and each node runs the "FetchAndSaveFromHDFS" process group.

I have two questions:

1. What is the best way to define dynamic configuration per process group per Node. Each http request contains some configuration in addiction to the data , each process group should run according to this configuration (I know i can add the configuration to the flowfile as attribute but is there a better way ?

2. How can i guarantee that process group will handle only one flowfile at a time (will not take file from the input queue until all processors in this group has finished)

Thanks

2 REPLIES 2

Super Guru

@rom weinstein

1.You can use Process group variables introduced in NiFi.1.4 instead of adding flowfile attributes and use the defined process group variables in your flow.
We cannot configure dynamic configurations per Node but for process group we can, by using process group variables.

if you define process group variables in particular process group(like FetchAndSaveFromHDFS) then the scope of variables will be in this process group.

Please refer to this link for more details regarding process group variables.

2.You can use Control rate processor to release a flowfile per certain amount of time(like for 1 min..) but this won't care about if the flowfile processing already finished or not, we are going to release flowfile per 1 min(or) Take a look on Wait and Notify processors keep your logic to wait until processing is done then release another flowfile.

This link demonstrates how to use Wait and Notify processors.

-

If the Answer addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.

@rom weinstein

2. Second first! The implementation that you are looking forward to, needs a workaround. We can look at your requirement of processing one flow file at a time as an iteration of the loop. The control starts from line #1 and until all the lines of code are not processed inside the loop, the next record is not picked up to be processed further. So we need to plan our NiFi flow accordingly. In this implementation, three major parts we are looking at are

  • Wait - Will wait for a signal to release the flow file
  • Notify - Will notify the flow to take the next step
  • DistributeMapCache - Will store the "cache" which will result in Wait and Notify processors' "communication"

Have a look at the snapshot below. It is the how your flow will look something like.

71515-screen-shot-2018-05-07-at-23238-am.png

The Processor Group named "Processing data" is your logic and rest of it is the template. Follows a quick sneak peek into the logic.

  • Update Attribute assign an attribute to the flow files with a number which is incremented by 1 every time a flow file goes through it. So it helps uniquely identifying the flow files.
  • All the flow files then go to Wait. They wait there until and unless summoned.
  • Initially, we send our flow file marked with "token" 1 to the Notify processor. Notify in turn populates the Map Cache with that value for a key called "tokenCounter".
  • The Wait processor keeps on reading from the MapCache for the value of "tokenCounter" and release the flow file tokenized with the token value equal to the value of tokenCounter from Map Cache.
  • Once processed, we increment 1 to the token value from processed flow file and send it to Notify. Notify does the third step mentioned, all again, and update the value of "tokenCounter", which in turn trigger the Wait processor to release the flow file with that token.
  • Rinse and Repeat.

I am attaching the template for your reference. Let know if that helps!

1. Now first part! What does the line "per process group per Node" exactly means? Do you want to have different values for variables on different NiFi nodes? I am not pretty much sure if "different nodes" can have "different values" for the same variables! If some variables are used by multiple processors/controller services, you can have them as Processor Group variables. Probably a better understanding may help in answering the question better.

Let know if the answer helps to fix your issue!

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.