Created 07-10-2018 05:49 PM
Hi, I am building a Nifi custom processor that does prediction from a machine learning model for every flow file. I need to load the model once in memory when I start the processor and I use it in the onTrigger function for every flow file. Is there an @onStart function that I can override?
Created 07-13-2018 01:38 PM
@Tarek
@onScheduled is executed once per run. It is not executed for every flow file. So i think you can use this annotation.
Created 07-13-2018 06:35 AM
Hi Tarek
I think you can make use of init method in your custom processor. you can override this method to load your model in memory
protected void init(final ProcessorInitializationContext context)
Created 07-13-2018 01:20 PM
Thanks @ashok.kumar
The problem is that one of the properties of my custom processor is "Model directory" .. so in the init method I will have no access to this property because it is still being initialized. I am considering loading the model in the
@onScheduled method but I do not know if @onScheduled is executed once perflow file or once per multiple flow files
Created 07-13-2018 01:38 PM
@Tarek
@onScheduled is executed once per run. It is not executed for every flow file. So i think you can use this annotation.
Created 07-13-2018 03:40 PM
Thanks @ashok.kumar
I just want to make sure I understand the behavior of onScheduled. Lets say that I have 200 flow files that come in 2 bursts (i.e., 100 flow files per burst) and there is only on thread available to Nifi. Lets assume started the custom processor once before the arrival of the first burst so I assume the model will not be loaded yet. Now when the first burst comes, the onScheduled method will be called and the model is now loaded once for the entire execution of the 100 flowfiles. But then lets assume another processor needs to execute before the second burst comes. At this point I believe that the memory used by the model will be deallocated and then reallocated again when the second burst comes.
Is that the correct behavior of onScheduled method ?
Created 07-13-2018 04:36 PM
@Tarek
I didn't get the point where you say when second brust comes then memory used by model will get deallocated and reallocated again.
"But then lets assume another processor needs to execute before the second burst comes. At this point I believe that the memory used by the model will be deallocated and then reallocated again when the second burst comes."
Let me put an example,
Say we have GenerateFlowFile processor and i have schedule it run every 10 second. In each run it generates 100 files.
So say it has run for 10 times on an interval of 10s and generated 1000 files. Now here @onScheduled will be called only once at entire time.
Does this help.
Created 07-13-2018 06:40 PM
Thanks @ashok.kumar that really clarifies it. Just to make sure I understand. In your example, the function will be called once for the entire 1000 file or it will be called 10 times (i.e., once for each run) ? Is there any difference if my processor's run schedule is 0 seconds ?
I understood from your answer that @OnScheduled runs once for the 1000 flow files but if thats the case then when does the @@onUnScheduled gets called.
Thanks again ashok. This discussion is really helpful for me
Created 07-14-2018 01:57 AM
@Tarek
It will be called once for entire 1000 file. There is no difference if processor is schedule for 0 second.
@onUnScheduled will get called when we stop the processor.
@stopped gets called when all concurrent task of processor is finished.
Created 07-17-2018 10:56 PM
Thanks @ashok.kumar . I accepted the @onScheduled answer