Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi Custom Processor - Can I load an object once and use it for consecutive flow files ?

avatar

Hi, I am building a Nifi custom processor that does prediction from a machine learning model for every flow file. I need to load the model once in memory when I start the processor and I use it in the onTrigger function for every flow file. Is there an @onStart function that I can override?

1 ACCEPTED SOLUTION

avatar
Contributor

@Tarek

@onScheduled is executed once per run. It is not executed for every flow file. So i think you can use this annotation.

View solution in original post

8 REPLIES 8

avatar
Contributor

Hi Tarek

I think you can make use of init method in your custom processor. you can override this method to load your model in memory

protected void init(final ProcessorInitializationContext context)

avatar

Thanks @ashok.kumar

The problem is that one of the properties of my custom processor is "Model directory" .. so in the init method I will have no access to this property because it is still being initialized. I am considering loading the model in the

@onScheduled method but I do not know if @onScheduled is executed once perflow file or once per multiple flow files


avatar
Contributor

@Tarek

@onScheduled is executed once per run. It is not executed for every flow file. So i think you can use this annotation.

avatar

Thanks @ashok.kumar

I just want to make sure I understand the behavior of onScheduled. Lets say that I have 200 flow files that come in 2 bursts (i.e., 100 flow files per burst) and there is only on thread available to Nifi. Lets assume started the custom processor once before the arrival of the first burst so I assume the model will not be loaded yet. Now when the first burst comes, the onScheduled method will be called and the model is now loaded once for the entire execution of the 100 flowfiles. But then lets assume another processor needs to execute before the second burst comes. At this point I believe that the memory used by the model will be deallocated and then reallocated again when the second burst comes.

Is that the correct behavior of onScheduled method ?

avatar
Contributor

@Tarek

I didn't get the point where you say when second brust comes then memory used by model will get deallocated and reallocated again.

"But then lets assume another processor needs to execute before the second burst comes. At this point I believe that the memory used by the model will be deallocated and then reallocated again when the second burst comes."

Let me put an example,

Say we have GenerateFlowFile processor and i have schedule it run every 10 second. In each run it generates 100 files.

So say it has run for 10 times on an interval of 10s and generated 1000 files. Now here @onScheduled will be called only once at entire time.

Does this help.

avatar

Thanks @ashok.kumar that really clarifies it. Just to make sure I understand. In your example, the function will be called once for the entire 1000 file or it will be called 10 times (i.e., once for each run) ? Is there any difference if my processor's run schedule is 0 seconds ?

I understood from your answer that @OnScheduled runs once for the 1000 flow files but if thats the case then when does the @@onUnScheduled gets called.

Thanks again ashok. This discussion is really helpful for me

avatar
Contributor

@Tarek

It will be called once for entire 1000 file. There is no difference if processor is schedule for 0 second.

@onUnScheduled will get called when we stop the processor.

@stopped gets called when all concurrent task of processor is finished.

avatar

Thanks @ashok.kumar . I accepted the @onScheduled answer