Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

InvokeScriptedProcessor in Python

avatar
New Member

I'm trying to port this InvokeScriptedProcessor - Hello World! into Python. The Groovy example uses Generics in some of the methods like

Set<Relationship> getRelationships() {
	return [REL_SUCCESS] as Set
}

@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }

List<PropertyDescriptor> getPropertyDescriptors() { return null }

How can I port this to Python?

I tried to return Java's HashSet like

def getRelationships(self):
    set = HashSet()
    set.add(self.REL_SUCCESS)
    return set

and then I am getting 2016-12-30 12:04:10,829 ERROR [NiFi Web Server-139] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=4eb16182-0159-1000-dc3a-531dc23b13e6] Unable to get relationships from scripted Processor: java.lang.reflect.UndeclaredThrowableException

I also tried Python's Set like

def getRelationships(self):
    return Set([self.REL_SUCCESS,])

In this case I received 2016-12-30 11:04:50,588 ERROR [Timer-Driven Process Thread-8] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=4eb16182-0159-1000-dc3a-531dc23b13e6] Unable to get relationships from scripted Processor: java.lang.ClassCastException: org.python.core.PyObjectDerived cannot be cast to java.util.Set

Here's a complete code so far:

import java.util
from org.apache.nifi.processor import Processor, Relationship
from sets import Set


class PythonProcessor(Processor):
    def __init__(self):
        self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
        self.log = None

    def initialize(self, context):
        self.log = context.getLogger()

    def getRelationships(self):
        return Set([self.REL_SUCCESS])

    def validate(self, context):
        return None

    def getPropertyDescriptor(self, name):
        return None

    def getPropertyDescriptors(self):
        return None

    def validate(self, context):
        return None

    def onPropertyModified(self, descriptor, oldValue, newValue):
        pass

    def getIdentifier(self):
        return None

processor = PythonProcessor()

I will appreciate any help

1 ACCEPTED SOLUTION

avatar
New Member

I found the solution. Looks like Set is not automatically translated to corresponding PySet class in Jython. Here's the solution in case someone else encounters the same:

from org.apache.nifi.processor import Processor, Relationship
from org.python.core import PySet


class PythonProcessor(Processor):
    def __init__(self):
        self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
        self.REL_FAILURE = Relationship.Builder().name("failure").description("FlowFiles that failed to be processed").build()
        self.REL_UNMATCH = Relationship.Builder().name("unmatch").description("FlowFiles that did not match rules").build()
        self.log = None

    def initialize(self, context):
        self.log = context.getLogger()

    def getRelationships(self):
        return PySet([self.REL_SUCCESS, self.REL_FAILURE, self.REL_UNMATCH])

    def validate(self, context):
        return None

    def getPropertyDescriptor(self, name):
        return None

    def getPropertyDescriptors(self):
        return None

    def validate(self, context):
        return None

    def onPropertyModified(self, descriptor, oldValue, newValue):
        pass

    def getIdentifier(self):
        return None

processor = PythonProcessor()

View solution in original post

2 REPLIES 2

avatar
New Member

I found the solution. Looks like Set is not automatically translated to corresponding PySet class in Jython. Here's the solution in case someone else encounters the same:

from org.apache.nifi.processor import Processor, Relationship
from org.python.core import PySet


class PythonProcessor(Processor):
    def __init__(self):
        self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
        self.REL_FAILURE = Relationship.Builder().name("failure").description("FlowFiles that failed to be processed").build()
        self.REL_UNMATCH = Relationship.Builder().name("unmatch").description("FlowFiles that did not match rules").build()
        self.log = None

    def initialize(self, context):
        self.log = context.getLogger()

    def getRelationships(self):
        return PySet([self.REL_SUCCESS, self.REL_FAILURE, self.REL_UNMATCH])

    def validate(self, context):
        return None

    def getPropertyDescriptor(self, name):
        return None

    def getPropertyDescriptors(self):
        return None

    def validate(self, context):
        return None

    def onPropertyModified(self, descriptor, oldValue, newValue):
        pass

    def getIdentifier(self):
        return None

processor = PythonProcessor()

avatar
Master Guru

There are some examples of InvokedScriptedProcessor using Jython in the unit tests here. The examples use set() (not Set(), I don't know if there's a difference there between the return types or not)