Support Questions

Find answers, ask questions, and share your expertise

NiFi Execute Script - Reload Classes

avatar
Super Collaborator

Hi All,

We use ExecuteScript processor to run some python code. So, python file used in processor is just a wrapper, which invokes actual python process.

My problem is that when I change a file with python code on file system, it's not been reloaded in NiFi flows until I fully restart NiFi cluster. I understand that happens due to the classes being loaded in classloader of JVM after first use (since it is actually Jython).

Question: is there work around to reload classes after python code is changed on file system, instead of restarting NiFi Cluster?

Thanks!

1 ACCEPTED SOLUTION

avatar
Super Collaborator

Alright, so I ended up with simple script and one processor in NiFi. Modules for reload should be provided in "modules_list" property of the processor (comma delimited).

17511-reload-python-1.png

Script body:

import sys, json

def class_reloader(modules_to_reload):
    reload_msg = ""
    all_module_names = sys.modules.keys()
    all_module_names.sort()
    for mn in all_module_names:
        m = sys.modules[mn]
        # -- find full match of names with given modules
        if mn in modules_to_reload:
            try:
                reload(m)
                reload_msg = reload_msg + mn + "|"
            except:
                return 1, reload_msg
            continue

        # -- find if mn is submodule of any given one
        for mtr in modules_to_reload:
            if mn.startswith(mtr+'.'):
                try:
                    reload(m)
                    reload_msg = reload_msg + mn + "|"
                    break
                except:
                    return 1, reload_msg

    return 0, reload_msg

#-------------------------------# 
flowFile = session.create()
if(flowFile != None):
    modules_prop = modules_list.getValue()
    ml = []
    if modules_prop:
        ml = modules_prop.split(',')
    cr = class_reloader(ml)


    flowFile = session.putAttribute(flowFile, 'class_reload_result', str(cr[0]))
    flowFile = session.putAttribute(flowFile, 'class_reload_found', str(cr[1]))
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end

The code can be improved to navigate to FAILURE relationship in case of non-zero response code from the method.

It's not perfect solution, but will work in most cases. If you have better one - please share! 🙂

View solution in original post

3 REPLIES 3

avatar
Master Guru

I think you're running into this issue, perhaps try an explicit module unload at the end of your script? That will probably have a performance impact, but if it works, we can file an improvement Jira to look at adding this to the Jython support in NiFi, to unload the modules if the Module Directory property (or the files it points to) change.

avatar
Super Collaborator

Hi Matt, thanks for your response. The solution by your link won't work for Jython in NiFi (actually, it created a lot of issues and I had to reboot NiFi services). But it gave me some ideas on what I can do and how. Once I complete all the tests, I'll put an answer with recommendations for others.

As for the permanent solution, I think performance impact would be too big to check whether file have been changed every time the processor is being triggered by incoming flow file. Instead it can be done on "Start" only. But this still won't resolve issues with classes (modules) having the same name but deployed under different locations (paths), which make env sharing or versioning impossible (dev and qa, for example, or different builds/versions during dev stages). I would suggest to have custom class loader (modules defined) on processor level instead of global.

avatar
Super Collaborator

Alright, so I ended up with simple script and one processor in NiFi. Modules for reload should be provided in "modules_list" property of the processor (comma delimited).

17511-reload-python-1.png

Script body:

import sys, json

def class_reloader(modules_to_reload):
    reload_msg = ""
    all_module_names = sys.modules.keys()
    all_module_names.sort()
    for mn in all_module_names:
        m = sys.modules[mn]
        # -- find full match of names with given modules
        if mn in modules_to_reload:
            try:
                reload(m)
                reload_msg = reload_msg + mn + "|"
            except:
                return 1, reload_msg
            continue

        # -- find if mn is submodule of any given one
        for mtr in modules_to_reload:
            if mn.startswith(mtr+'.'):
                try:
                    reload(m)
                    reload_msg = reload_msg + mn + "|"
                    break
                except:
                    return 1, reload_msg

    return 0, reload_msg

#-------------------------------# 
flowFile = session.create()
if(flowFile != None):
    modules_prop = modules_list.getValue()
    ml = []
    if modules_prop:
        ml = modules_prop.split(',')
    cr = class_reloader(ml)


    flowFile = session.putAttribute(flowFile, 'class_reload_result', str(cr[0]))
    flowFile = session.putAttribute(flowFile, 'class_reload_found', str(cr[1]))
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end

The code can be improved to navigate to FAILURE relationship in case of non-zero response code from the method.

It's not perfect solution, but will work in most cases. If you have better one - please share! 🙂