Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

InvokeScriptedProcessor in Python

avatar
Contributor

I'm trying to port this InvokeScriptedProcessor - Hello World! into Python. The Groovy example uses Generics in some of the methods like

Set<Relationship> getRelationships() {
	return [REL_SUCCESS] as Set
}

@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }

List<PropertyDescriptor> getPropertyDescriptors() { return null }

How can I port this to Python?

I tried to return Java's HashSet like

def getRelationships(self):
    set = HashSet()
    set.add(self.REL_SUCCESS)
    return set

and then I am getting 2016-12-30 12:04:10,829 ERROR [NiFi Web Server-139] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=4eb16182-0159-1000-dc3a-531dc23b13e6] Unable to get relationships from scripted Processor: java.lang.reflect.UndeclaredThrowableException

I also tried Python's Set like

def getRelationships(self):
    return Set([self.REL_SUCCESS,])

In this case I received 2016-12-30 11:04:50,588 ERROR [Timer-Driven Process Thread-8] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=4eb16182-0159-1000-dc3a-531dc23b13e6] Unable to get relationships from scripted Processor: java.lang.ClassCastException: org.python.core.PyObjectDerived cannot be cast to java.util.Set

Here's a complete code so far:

import java.util
from org.apache.nifi.processor import Processor, Relationship
from sets import Set


class PythonProcessor(Processor):
    def __init__(self):
        self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
        self.log = None

    def initialize(self, context):
        self.log = context.getLogger()

    def getRelationships(self):
        return Set([self.REL_SUCCESS])

    def validate(self, context):
        return None

    def getPropertyDescriptor(self, name):
        return None

    def getPropertyDescriptors(self):
        return None

    def validate(self, context):
        return None

    def onPropertyModified(self, descriptor, oldValue, newValue):
        pass

    def getIdentifier(self):
        return None

processor = PythonProcessor()

I will appreciate any help

1 ACCEPTED SOLUTION

avatar
Contributor

I found the solution. Looks like Set is not automatically translated to corresponding PySet class in Jython. Here's the solution in case someone else encounters the same:

from org.apache.nifi.processor import Processor, Relationship
from org.python.core import PySet


class PythonProcessor(Processor):
    def __init__(self):
        self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
        self.REL_FAILURE = Relationship.Builder().name("failure").description("FlowFiles that failed to be processed").build()
        self.REL_UNMATCH = Relationship.Builder().name("unmatch").description("FlowFiles that did not match rules").build()
        self.log = None

    def initialize(self, context):
        self.log = context.getLogger()

    def getRelationships(self):
        return PySet([self.REL_SUCCESS, self.REL_FAILURE, self.REL_UNMATCH])

    def validate(self, context):
        return None

    def getPropertyDescriptor(self, name):
        return None

    def getPropertyDescriptors(self):
        return None

    def validate(self, context):
        return None

    def onPropertyModified(self, descriptor, oldValue, newValue):
        pass

    def getIdentifier(self):
        return None

processor = PythonProcessor()

View solution in original post

2 REPLIES 2

avatar
Contributor

I found the solution. Looks like Set is not automatically translated to corresponding PySet class in Jython. Here's the solution in case someone else encounters the same:

from org.apache.nifi.processor import Processor, Relationship
from org.python.core import PySet


class PythonProcessor(Processor):
    def __init__(self):
        self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
        self.REL_FAILURE = Relationship.Builder().name("failure").description("FlowFiles that failed to be processed").build()
        self.REL_UNMATCH = Relationship.Builder().name("unmatch").description("FlowFiles that did not match rules").build()
        self.log = None

    def initialize(self, context):
        self.log = context.getLogger()

    def getRelationships(self):
        return PySet([self.REL_SUCCESS, self.REL_FAILURE, self.REL_UNMATCH])

    def validate(self, context):
        return None

    def getPropertyDescriptor(self, name):
        return None

    def getPropertyDescriptors(self):
        return None

    def validate(self, context):
        return None

    def onPropertyModified(self, descriptor, oldValue, newValue):
        pass

    def getIdentifier(self):
        return None

processor = PythonProcessor()

avatar
Master Guru

There are some examples of InvokedScriptedProcessor using Jython in the unit tests here. The examples use set() (not Set(), I don't know if there's a difference there between the return types or not)