Support Questions

Find answers, ask questions, and share your expertise

InvokeScriptedProcessor in Python

avatar
Contributor

I'm trying to port this InvokeScriptedProcessor - Hello World! into Python. The Groovy example uses Generics in some of the methods like

Set<Relationship> getRelationships() {
	return [REL_SUCCESS] as Set
}

@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }

List<PropertyDescriptor> getPropertyDescriptors() { return null }

How can I port this to Python?

I tried to return Java's HashSet like

def getRelationships(self):
    set = HashSet()
    set.add(self.REL_SUCCESS)
    return set

and then I am getting 2016-12-30 12:04:10,829 ERROR [NiFi Web Server-139] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=4eb16182-0159-1000-dc3a-531dc23b13e6] Unable to get relationships from scripted Processor: java.lang.reflect.UndeclaredThrowableException

I also tried Python's Set like

def getRelationships(self):
    return Set([self.REL_SUCCESS,])

In this case I received 2016-12-30 11:04:50,588 ERROR [Timer-Driven Process Thread-8] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=4eb16182-0159-1000-dc3a-531dc23b13e6] Unable to get relationships from scripted Processor: java.lang.ClassCastException: org.python.core.PyObjectDerived cannot be cast to java.util.Set

Here's a complete code so far:

import java.util
from org.apache.nifi.processor import Processor, Relationship
from sets import Set


class PythonProcessor(Processor):
    def __init__(self):
        self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
        self.log = None

    def initialize(self, context):
        self.log = context.getLogger()

    def getRelationships(self):
        return Set([self.REL_SUCCESS])

    def validate(self, context):
        return None

    def getPropertyDescriptor(self, name):
        return None

    def getPropertyDescriptors(self):
        return None

    def validate(self, context):
        return None

    def onPropertyModified(self, descriptor, oldValue, newValue):
        pass

    def getIdentifier(self):
        return None

processor = PythonProcessor()

I will appreciate any help

1 ACCEPTED SOLUTION

avatar
Contributor

I found the solution. Looks like Set is not automatically translated to corresponding PySet class in Jython. Here's the solution in case someone else encounters the same:

from org.apache.nifi.processor import Processor, Relationship
from org.python.core import PySet


class PythonProcessor(Processor):
    def __init__(self):
        self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
        self.REL_FAILURE = Relationship.Builder().name("failure").description("FlowFiles that failed to be processed").build()
        self.REL_UNMATCH = Relationship.Builder().name("unmatch").description("FlowFiles that did not match rules").build()
        self.log = None

    def initialize(self, context):
        self.log = context.getLogger()

    def getRelationships(self):
        return PySet([self.REL_SUCCESS, self.REL_FAILURE, self.REL_UNMATCH])

    def validate(self, context):
        return None

    def getPropertyDescriptor(self, name):
        return None

    def getPropertyDescriptors(self):
        return None

    def validate(self, context):
        return None

    def onPropertyModified(self, descriptor, oldValue, newValue):
        pass

    def getIdentifier(self):
        return None

processor = PythonProcessor()

View solution in original post

2 REPLIES 2

avatar
Contributor

I found the solution. Looks like Set is not automatically translated to corresponding PySet class in Jython. Here's the solution in case someone else encounters the same:

from org.apache.nifi.processor import Processor, Relationship
from org.python.core import PySet


class PythonProcessor(Processor):
    def __init__(self):
        self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
        self.REL_FAILURE = Relationship.Builder().name("failure").description("FlowFiles that failed to be processed").build()
        self.REL_UNMATCH = Relationship.Builder().name("unmatch").description("FlowFiles that did not match rules").build()
        self.log = None

    def initialize(self, context):
        self.log = context.getLogger()

    def getRelationships(self):
        return PySet([self.REL_SUCCESS, self.REL_FAILURE, self.REL_UNMATCH])

    def validate(self, context):
        return None

    def getPropertyDescriptor(self, name):
        return None

    def getPropertyDescriptors(self):
        return None

    def validate(self, context):
        return None

    def onPropertyModified(self, descriptor, oldValue, newValue):
        pass

    def getIdentifier(self):
        return None

processor = PythonProcessor()

avatar
Master Guru

There are some examples of InvokedScriptedProcessor using Jython in the unit tests here. The examples use set() (not Set(), I don't know if there's a difference there between the return types or not)