Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

2.0.0-M1 Python extension issues with custom processor and custom relatioships

avatar

Hi ,

I'm having a hard time writing my custom processor using python extension. it seems this feature is not mature as I thought or maybe I'm miss understanding something. To summarize I have encountered two issues but they are both related to the same error, as follows:

1- I was experimenting with developing a custom processor using the RecordTransform class to partition a json array, my  processor is simple, it takes json array of records where each record is a json object that has a "name" and "age" fields. The processor then try to return RecordTransformResult from the transform method as follows:

 

def transform(self, context, record, schema, attributemap):
   return  RecordTransformResult(schema=None,record=record,relationship="success",partition={'name':record['name']})

 

I'm trying to use partition to split the record and passing the partition key just as described in the developer guide:

https://nifi.apache.org/documentation/nifi-2.0.0-M1/html/python-developer-guide.html#record-transfor...

When I place the processor in the extensions folder and run nifi, Im getting the following error in the nifi-python.log:

 

AttributeError: 'dict' object has no attribute '_get_object_id'

 

After a lot time researching the issue I found that the py4j expects a java type dictionary and a conversion should happen  based on the post:

https://stackoverflow.com/questions/57502975/py4j-serialization-attributeerror-dict-object-has-no-at...

I was not sure how to access the java gateway object from the processor but after looking at the code from the framework folder I found in the controller.py how to instantiate it so I copied the same code and my transform method became like this:

 

   def transform(self, context, record, schema, attributemap):
      
      java_port = int(os.getenv('JAVA_PORT'))
       auth_token = os.getenv('AUTH_TOKEN')

       gateway = JavaGateway(
       callback_server_parameters=CallbackServerParameters(port=0, auth_token=auth_token),
       gateway_parameters=GatewayParameters(port=java_port, read_timeout=None, enable_memory_management=True, auth_token=auth_token),
       auto_convert=True)
       input_dict = {'name':record['name']}        
       mc_run_map_dict = MapConverter().convert(input_dict,gateway._gateway_client)
       return  RecordTransformResult(schema=None,record=record,relationship="success",partition=mc_run_map_dict)
    

 

The code above worked and the record was split accordingly. My question is this the correct way or am I missing something because I thought I followed the steps in the developer guide?

2- The other scenario is when I tried to override the getRelationships to define my custom ones again as described in the developer guide, as follows :

 

 def getRelationships(self):
     
     failedrel = Relationship(name="MyFailedRel",description="custom failed rel")
     successrel =  Relationship(name="success",description="custom success rel") 
     
     
     return [failedrel,successrel]

 

First I got the error that List doesnt have the attribute "_get_object_id" , so I tried to covert the list using the method above but now I'm getting the following error:

 

AttributeError: 'Relationship' object has no attribute '_get_object_id'

 

Im not sure what to do about this.  It could be related to the  first scenario.

Any suggestion would be really appreciated as I have been struggling with this for couple of days.

@MattWho  , @cotopaul , @steven-matison , @joseomjr 

Thanks

 

1 ACCEPTED SOLUTION

avatar
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
8 REPLIES 8

avatar
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar

Hi @SAMSAL - thanks for describing your experience with the new python stuff. We're more than happy to receive pull requests to improve the documentation around this. It'll be my pleasure to review and merge any submission you'd like to make. Here is the file in Github where the documentation is for the python stuff:

https://github.com/apache/nifi/blob/main/nifi-docs/src/main/asciidoc/python-developer-guide.adoc

avatar
Community Manager

@SAMSAL, we can even create a new Community Article for this, if you prefer.

 



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar

@VidyaSargur& @pvillard ,

Thank you both for reading my long post and reply with feedback. I think I will  try and do both. If I can provide some input to make the Guide more helpful then I would be happy to do so. I will also try and come up with couple of shorts article regarding python extension based on what I have gone through and what I have learned so far.

Thanks

 

 

 

avatar
Super Collaborator

Building on @SAMSAL 's discovery

I found you have access to the JVM upon your init

    def __init__(self, jvm, **kwargs):
        super().__init__()
        self.jvm = jvm

Which then lets you access the jvm gateway and Java data classes like this:

        jvm_gateway = self.jvm.gateway
        # Create a Java Map/Dict
        map = self.jvm.java.util.HashMap()
        map.put("name", record['name'])
        # Or convert the Python dict to a Java Map
        data = {"name": record["name"]}
        data = MapConverter().convert(data, jvm_gateway._gateway_client)

avatar

Where did you find this code , or did you add to the __init __ of the processor itself:

 def __init__(self, jvm, **kwargs):
        super().__init__()
        self.jvm = jvm

I could not get it to compile , it kept saying super has no attribute jvm 😞

avatar
Super Collaborator

@SAMSAL are you still on M1 or M2...I'm on M1 and took what you shared and just made those minor tweaks. I'll test it out more tomorrow...maybe I'll restart NiFi just to clear memory and any funny stuff that might there. 

avatar

Apparently in the github repo, there's a folder for processor example of base level, though it might not cover everything yet, might be a good place to solve basic issues.

I had issues building a custom relationship as well, then I saw this example, which helped.

Official Python Processor Examples