Created 06-20-2017 05:30 PM
Hi All,
We use ExecuteScript processor to run some python code. So, python file used in processor is just a wrapper, which invokes actual python process.
My problem is that when I change a file with python code on file system, it's not been reloaded in NiFi flows until I fully restart NiFi cluster. I understand that happens due to the classes being loaded in classloader of JVM after first use (since it is actually Jython).
Question: is there work around to reload classes after python code is changed on file system, instead of restarting NiFi Cluster?
Thanks!
Created on 06-26-2017 05:05 PM - edited 08-17-2019 08:37 PM
Alright, so I ended up with simple script and one processor in NiFi. Modules for reload should be provided in "modules_list" property of the processor (comma delimited).
Script body:
import sys, json def class_reloader(modules_to_reload): reload_msg = "" all_module_names = sys.modules.keys() all_module_names.sort() for mn in all_module_names: m = sys.modules[mn] # -- find full match of names with given modules if mn in modules_to_reload: try: reload(m) reload_msg = reload_msg + mn + "|" except: return 1, reload_msg continue # -- find if mn is submodule of any given one for mtr in modules_to_reload: if mn.startswith(mtr+'.'): try: reload(m) reload_msg = reload_msg + mn + "|" break except: return 1, reload_msg return 0, reload_msg #-------------------------------# flowFile = session.create() if(flowFile != None): modules_prop = modules_list.getValue() ml = [] if modules_prop: ml = modules_prop.split(',') cr = class_reloader(ml) flowFile = session.putAttribute(flowFile, 'class_reload_result', str(cr[0])) flowFile = session.putAttribute(flowFile, 'class_reload_found', str(cr[1])) session.transfer(flowFile, REL_SUCCESS) # implicit return at the end
The code can be improved to navigate to FAILURE relationship in case of non-zero response code from the method.
It's not perfect solution, but will work in most cases. If you have better one - please share! 🙂
Created 06-20-2017 05:37 PM
I think you're running into this issue, perhaps try an explicit module unload at the end of your script? That will probably have a performance impact, but if it works, we can file an improvement Jira to look at adding this to the Jython support in NiFi, to unload the modules if the Module Directory property (or the files it points to) change.
Created 06-22-2017 07:48 PM
Hi Matt, thanks for your response. The solution by your link won't work for Jython in NiFi (actually, it created a lot of issues and I had to reboot NiFi services). But it gave me some ideas on what I can do and how. Once I complete all the tests, I'll put an answer with recommendations for others.
As for the permanent solution, I think performance impact would be too big to check whether file have been changed every time the processor is being triggered by incoming flow file. Instead it can be done on "Start" only. But this still won't resolve issues with classes (modules) having the same name but deployed under different locations (paths), which make env sharing or versioning impossible (dev and qa, for example, or different builds/versions during dev stages). I would suggest to have custom class loader (modules defined) on processor level instead of global.
Created on 06-26-2017 05:05 PM - edited 08-17-2019 08:37 PM
Alright, so I ended up with simple script and one processor in NiFi. Modules for reload should be provided in "modules_list" property of the processor (comma delimited).
Script body:
import sys, json def class_reloader(modules_to_reload): reload_msg = "" all_module_names = sys.modules.keys() all_module_names.sort() for mn in all_module_names: m = sys.modules[mn] # -- find full match of names with given modules if mn in modules_to_reload: try: reload(m) reload_msg = reload_msg + mn + "|" except: return 1, reload_msg continue # -- find if mn is submodule of any given one for mtr in modules_to_reload: if mn.startswith(mtr+'.'): try: reload(m) reload_msg = reload_msg + mn + "|" break except: return 1, reload_msg return 0, reload_msg #-------------------------------# flowFile = session.create() if(flowFile != None): modules_prop = modules_list.getValue() ml = [] if modules_prop: ml = modules_prop.split(',') cr = class_reloader(ml) flowFile = session.putAttribute(flowFile, 'class_reload_result', str(cr[0])) flowFile = session.putAttribute(flowFile, 'class_reload_found', str(cr[1])) session.transfer(flowFile, REL_SUCCESS) # implicit return at the end
The code can be improved to navigate to FAILURE relationship in case of non-zero response code from the method.
It's not perfect solution, but will work in most cases. If you have better one - please share! 🙂