Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Interesting Nifi ExecuteStreamCommand issue (2 processors run concurrently using 1 thread) ?

avatar

I have an interesting issue with Nifi concurrency. I have a dataflow with two connected ExecuteStreamCommand processors (each with concurrent tasks=1), but when I set the number of threads of the whole instance to 1, the two processors still manage to somehow run concurrently although I expect them to run sequentially. Each processor makes a call to python code as part of its execution. The first processor takes on average 2.5 seconds per flowfile and the second processors takes on average 4.5 seconds per flowfile.. I gave the dataflow 100 flowlfiles and I was expecting all flowfiles will finish in around 700 seconds (i.e., sequential execution) but they manage to finish in 480 seconds which suggests that each processor is using a separate thread and they do not wait on each other. Am I missing something here ?

Note: The command that I run in the ExecuteStreamCommand processor is a call to python code that does busy wait. For example, the following is a snippet for busy wait for 2.5 seconds:

import time
current_time = time.time()
while (time.time() < current_time+sleep_time):
       pass

3 REPLIES 3

avatar
Master Guru

Correct, each instance of a processor gets at least one thread (up to Max Concurrent Tasks) so they will execute concurrently. The sequential part is just that a flow file will not be sent to the second processor until it has finished processing in the first. But the second processor could be processing the first flow file while the first processor is processing the second flow file (if that makes sense).

If you need pure sequential processing, you may need to put your two commands into a shell script and call from ExecuteStreamCommand, or if you need to process multiple flow files at once, you might try a Jython script in an ExecuteScript processor where you do a session.get(<batch size>), which returns a list of flowfiles of size somewhere between 0 and your batch size (based on how many were available at the time of the get). Lastly, you might be able to use the Wait/Notify processors to create barrier synchronization.

avatar

Thanks @Matt Burgess for your response. I observe the behavior that you mentioned when the "Maximum Timer Driven Thread Count" is more than 1. But I explicitly go to the Controller settings in the global menu to set the "Maximum Timer Driven Thread Count" to be 1. In this case, I was expecting that each processor will request one thread (Max Concurrrent Tasks = 1) but since there is only one thread in the Nifi instance then only one processor can be using the thread at given time which results in something similar to sequential execution. Any insights about the expected behavior when "Maximum Timer Driven Thread Count = 1" ?

My main goal is to predict the execution time of Nifi when there is limited number of threads compared to the number of concurrent tasks in order to know how can I provision my Nifi cluster and place my processors.

Currently I see the same behavior when the thread count = 5 and thread count = 1 so I am not sure how Nifi enforces this parameter. Any insights is much appreciated ?

Below is a snapshot of my thread count settings:

48400-screenshot-from-2018-01-11-14-55-57.png

avatar

@Matt Burgess

Any insights about how you expect the behavior with "Maximum Timer Driven Thread Count = 1" and two processor with (Max Concurrrent Tasks = 1) ? Any pointers will be greatly appreciated