Created 02-05-2024 01:18 PM
Hi ,
I'm having a hard time writing my custom processor using python extension. it seems this feature is not mature as I thought or maybe I'm miss understanding something. To summarize I have encountered two issues but they are both related to the same error, as follows:
1- I was experimenting with developing a custom processor using the RecordTransform class to partition a json array, my processor is simple, it takes json array of records where each record is a json object that has a "name" and "age" fields. The processor then try to return RecordTransformResult from the transform method as follows:
def transform(self, context, record, schema, attributemap):
return RecordTransformResult(schema=None,record=record,relationship="success",partition={'name':record['name']})
I'm trying to use partition to split the record and passing the partition key just as described in the developer guide:
When I place the processor in the extensions folder and run nifi, Im getting the following error in the nifi-python.log:
AttributeError: 'dict' object has no attribute '_get_object_id'
After a lot time researching the issue I found that the py4j expects a java type dictionary and a conversion should happen based on the post:
I was not sure how to access the java gateway object from the processor but after looking at the code from the framework folder I found in the controller.py how to instantiate it so I copied the same code and my transform method became like this:
def transform(self, context, record, schema, attributemap):
java_port = int(os.getenv('JAVA_PORT'))
auth_token = os.getenv('AUTH_TOKEN')
gateway = JavaGateway(
callback_server_parameters=CallbackServerParameters(port=0, auth_token=auth_token),
gateway_parameters=GatewayParameters(port=java_port, read_timeout=None, enable_memory_management=True, auth_token=auth_token),
auto_convert=True)
input_dict = {'name':record['name']}
mc_run_map_dict = MapConverter().convert(input_dict,gateway._gateway_client)
return RecordTransformResult(schema=None,record=record,relationship="success",partition=mc_run_map_dict)
The code above worked and the record was split accordingly. My question is this the correct way or am I missing something because I thought I followed the steps in the developer guide?
2- The other scenario is when I tried to override the getRelationships to define my custom ones again as described in the developer guide, as follows :
def getRelationships(self):
failedrel = Relationship(name="MyFailedRel",description="custom failed rel")
successrel = Relationship(name="success",description="custom success rel")
return [failedrel,successrel]
First I got the error that List doesnt have the attribute "_get_object_id" , so I tried to covert the list using the method above but now I'm getting the following error:
AttributeError: 'Relationship' object has no attribute '_get_object_id'
Im not sure what to do about this. It could be related to the first scenario.
Any suggestion would be really appreciated as I have been struggling with this for couple of days.
@MattWho , @cotopaul , @steven-matison , @joseomjr
Thanks
Created 02-29-2024 04:47 PM
Greetings,
So I have been experimenting with Python extension (2.0.0 -M1) for sometime now and I have learned quite a lot since I posted the post above, and I think now I know how to resolve those issues which I will share with the community, but before doing so, I just wanted to kindly ask - if I may - the Nifi Leadership and the committers out there to do better job when documenting the different features and capabilities of Nifi specially when its newly released feature. They have done great job on some of the documentation like in the General Nifi User Guide , System Admin. user Guide, How to start with Nifi...etc. but when it comes to others like the "NiFi Python Developer’s Guide" (https://nifi.apache.org/documentation/nifi-2.0.0-M1/html/python-developer-guide.htm ) I find it extremely lacking and any one who is new to this and new to the Python to Java communication, would struggle quite a bit and spend a lot of time to figure out how things can be implemented based on what the guide is highlighting. I'm hoping this can reach someone who can address this concern @cjervis , @VidyaSargur , @pvillard.
Anyway, enough talking about boring documentations and lets get to code:
First, if you are planning to use Python Extension I recommend you get familiarized with the Java\Python communication specially the py4j library. it seems everything written in python is being passed\translated to Java which makes sense because Nifi is a java based application. The Python extension guide referenced above talks briefly about it but you can find more info here: https://www.py4j.org/
Second, as I mentioned in the above post that both issues seem to be related to the same cause since both issues produce the same error even though they are addressing totally different development problem. To summarize the issue, is when you are dealing with complex types like Python Collections as in the first case or external Classes like nifi Relationship:
Those complex types will get passed to the jvm through py4j , and if they dont adhere to the Java Gateway interface you will end up with this confusing error:
AttributeError: '...' object has no attribute '_get_object_id'
If you get familiar with how py4j works, you understand that in order to have the right class types everything has to go through\from java_gateway(). Either you convert your complex types to java compatible ones or you instantiate them through the gateway. In the first case with python dict type, I managed to resolve it using the java_gateway MapConverter() but it felt like Im doing some kind of hack copying some code from the python framework folder from the "controller.py" file in order to have access to the java_gateway from my python extension processor which looked like this:
java_port = int(os.getenv('JAVA_PORT'))
auth_token = os.getenv('AUTH_TOKEN')
gateway = JavaGateway(
callback_server_parameters=CallbackServerParameters(port=0, auth_token=auth_token),
gateway_parameters=GatewayParameters(port=java_port, read_timeout=None, enable_memory_management=True, auth_token=auth_token),
auto_convert=True)
....
However the second case I got stuck because I did not know how to convert or pass the nifiapi Relationship object. Well, after given up for days\weeks where there was no help even from the community, my stubbornness sometimes pays off 🙂 . It turns out buried somewhere in the same "Controller.py" the following line of code:
# Initialize the JvmHolder class with the gateway jvm.
# This must be done before executing the module to ensure that the nifiapi module
# is able to access the JvmHolder.jvm variable. This enables the nifiapi.properties.StandardValidators, etc. to be used
# However, we have to delay the import until this point, rather than adding it to the top of the ExtensionManager class
# because we need to ensure that we've fetched the appropriate dependencies for the pyenv environment for the extension point.
from nifiapi.__jvm__ import JvmHolder
JvmHolder.jvm = gateway.jvm
JvmHolder.gateway = gateway
Vowalla! I raise my hat to whoever wrote the comments above the code and I wish if they had access to slap those comments in the user guide as it would have saved me a lot of time.
It seems the nifiapi has static class called JvmHolder that has access to the java_gateway(). I know it feels silly but Im going to say: it felt like I found a buried treasure!
So that would have simplified the code above in two ways:
1- I could have used the gateway from the JvmHolder and do the conversion using MapConverter as follows:
from nifiapi.__jvm__ import JvmHolder
input_dict = {'name':record['name']}
mc_run_map_dict = MapConverter().convert(input_dict,JvmHolder.gateway._gateway_client)
2- Or even better , It seems when you have access to the java_gateway , you can instantiate your java compatible python dict using gateway.jvm.java.util.HashMap(). Thanks to : https://www.py4j.org/advanced_topics.html#map
from nifiapi.__jvm__ import JvmHolder
map = JvmHolder.gateway.jvm.java.util.HashMap()
map.put("name",record['name'])
return RecordTransformResult(schema=None,record=record,relationship="success",partition=map)
It seems we are making progress without having to do more treasure hunting, but what about the custom Relationship? The python developer guide promised us that we can define our own relationship by implementing the getRelationships(self) :
https://nifi.apache.org/documentation/nifi-2.0.0-M1/html/python-developer-guide.html#relationships
and simply by returning a list of "nifiapi.relationship.Relationship objects" it should magically works - or at least thats what I thought - but it did not! So I had to go on another treasure hunting and what do you know, buried in the "PythonProcessorAdapter.py" I found the following code:
self.relationships = gateway.jvm.java.util.HashSet()
success = gateway.jvm.org.apache.nifi.processor.Relationship.Builder() \
.name("success") \
.description("All FlowFiles will go to this relationship") \
.build()
self.relationships.add(success)
Vowalla again! That solved my second issue and I was able to define and return my own custom relationship to which I shouted "freeeeeeeeeeeeedoooooooooooooooooooooooooom".
I hope that helps someone out there and save them the headache, frustration and whatever hair left on their head.
If you know of any other buried treasures, please share and don't be stingy. By the way if you love treasure hunting, I still have unsolved issue here that I would love to find an answer for: https://community.cloudera.com/t5/Support-Questions/python-extension-generate-multiple-flowfiles-fro...
I hope the community accepts that Im accepting my own solution.
Sorry if I kept it long or got quite emotional 😉 its just been quite the struggle.
Thank you all for reading and keep learning.
S
Created 02-29-2024 04:47 PM
Greetings,
So I have been experimenting with Python extension (2.0.0 -M1) for sometime now and I have learned quite a lot since I posted the post above, and I think now I know how to resolve those issues which I will share with the community, but before doing so, I just wanted to kindly ask - if I may - the Nifi Leadership and the committers out there to do better job when documenting the different features and capabilities of Nifi specially when its newly released feature. They have done great job on some of the documentation like in the General Nifi User Guide , System Admin. user Guide, How to start with Nifi...etc. but when it comes to others like the "NiFi Python Developer’s Guide" (https://nifi.apache.org/documentation/nifi-2.0.0-M1/html/python-developer-guide.htm ) I find it extremely lacking and any one who is new to this and new to the Python to Java communication, would struggle quite a bit and spend a lot of time to figure out how things can be implemented based on what the guide is highlighting. I'm hoping this can reach someone who can address this concern @cjervis , @VidyaSargur , @pvillard.
Anyway, enough talking about boring documentations and lets get to code:
First, if you are planning to use Python Extension I recommend you get familiarized with the Java\Python communication specially the py4j library. it seems everything written in python is being passed\translated to Java which makes sense because Nifi is a java based application. The Python extension guide referenced above talks briefly about it but you can find more info here: https://www.py4j.org/
Second, as I mentioned in the above post that both issues seem to be related to the same cause since both issues produce the same error even though they are addressing totally different development problem. To summarize the issue, is when you are dealing with complex types like Python Collections as in the first case or external Classes like nifi Relationship:
Those complex types will get passed to the jvm through py4j , and if they dont adhere to the Java Gateway interface you will end up with this confusing error:
AttributeError: '...' object has no attribute '_get_object_id'
If you get familiar with how py4j works, you understand that in order to have the right class types everything has to go through\from java_gateway(). Either you convert your complex types to java compatible ones or you instantiate them through the gateway. In the first case with python dict type, I managed to resolve it using the java_gateway MapConverter() but it felt like Im doing some kind of hack copying some code from the python framework folder from the "controller.py" file in order to have access to the java_gateway from my python extension processor which looked like this:
java_port = int(os.getenv('JAVA_PORT'))
auth_token = os.getenv('AUTH_TOKEN')
gateway = JavaGateway(
callback_server_parameters=CallbackServerParameters(port=0, auth_token=auth_token),
gateway_parameters=GatewayParameters(port=java_port, read_timeout=None, enable_memory_management=True, auth_token=auth_token),
auto_convert=True)
....
However the second case I got stuck because I did not know how to convert or pass the nifiapi Relationship object. Well, after given up for days\weeks where there was no help even from the community, my stubbornness sometimes pays off 🙂 . It turns out buried somewhere in the same "Controller.py" the following line of code:
# Initialize the JvmHolder class with the gateway jvm.
# This must be done before executing the module to ensure that the nifiapi module
# is able to access the JvmHolder.jvm variable. This enables the nifiapi.properties.StandardValidators, etc. to be used
# However, we have to delay the import until this point, rather than adding it to the top of the ExtensionManager class
# because we need to ensure that we've fetched the appropriate dependencies for the pyenv environment for the extension point.
from nifiapi.__jvm__ import JvmHolder
JvmHolder.jvm = gateway.jvm
JvmHolder.gateway = gateway
Vowalla! I raise my hat to whoever wrote the comments above the code and I wish if they had access to slap those comments in the user guide as it would have saved me a lot of time.
It seems the nifiapi has static class called JvmHolder that has access to the java_gateway(). I know it feels silly but Im going to say: it felt like I found a buried treasure!
So that would have simplified the code above in two ways:
1- I could have used the gateway from the JvmHolder and do the conversion using MapConverter as follows:
from nifiapi.__jvm__ import JvmHolder
input_dict = {'name':record['name']}
mc_run_map_dict = MapConverter().convert(input_dict,JvmHolder.gateway._gateway_client)
2- Or even better , It seems when you have access to the java_gateway , you can instantiate your java compatible python dict using gateway.jvm.java.util.HashMap(). Thanks to : https://www.py4j.org/advanced_topics.html#map
from nifiapi.__jvm__ import JvmHolder
map = JvmHolder.gateway.jvm.java.util.HashMap()
map.put("name",record['name'])
return RecordTransformResult(schema=None,record=record,relationship="success",partition=map)
It seems we are making progress without having to do more treasure hunting, but what about the custom Relationship? The python developer guide promised us that we can define our own relationship by implementing the getRelationships(self) :
https://nifi.apache.org/documentation/nifi-2.0.0-M1/html/python-developer-guide.html#relationships
and simply by returning a list of "nifiapi.relationship.Relationship objects" it should magically works - or at least thats what I thought - but it did not! So I had to go on another treasure hunting and what do you know, buried in the "PythonProcessorAdapter.py" I found the following code:
self.relationships = gateway.jvm.java.util.HashSet()
success = gateway.jvm.org.apache.nifi.processor.Relationship.Builder() \
.name("success") \
.description("All FlowFiles will go to this relationship") \
.build()
self.relationships.add(success)
Vowalla again! That solved my second issue and I was able to define and return my own custom relationship to which I shouted "freeeeeeeeeeeeedoooooooooooooooooooooooooom".
I hope that helps someone out there and save them the headache, frustration and whatever hair left on their head.
If you know of any other buried treasures, please share and don't be stingy. By the way if you love treasure hunting, I still have unsolved issue here that I would love to find an answer for: https://community.cloudera.com/t5/Support-Questions/python-extension-generate-multiple-flowfiles-fro...
I hope the community accepts that Im accepting my own solution.
Sorry if I kept it long or got quite emotional 😉 its just been quite the struggle.
Thank you all for reading and keep learning.
S
Created on 03-01-2024 02:30 AM - edited 03-01-2024 02:30 AM
Hi @SAMSAL - thanks for describing your experience with the new python stuff. We're more than happy to receive pull requests to improve the documentation around this. It'll be my pleasure to review and merge any submission you'd like to make. Here is the file in Github where the documentation is for the python stuff:
https://github.com/apache/nifi/blob/main/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
Created 03-01-2024 04:03 AM
@SAMSAL, we can even create a new Community Article for this, if you prefer.
Regards,
Vidya Sargur,Created 03-01-2024 05:27 AM
@VidyaSargur& @pvillard ,
Thank you both for reading my long post and reply with feedback. I think I will try and do both. If I can provide some input to make the Guide more helpful then I would be happy to do so. I will also try and come up with couple of shorts article regarding python extension based on what I have gone through and what I have learned so far.
Thanks
Created 03-05-2024 03:29 PM
Building on @SAMSAL 's discovery
I found you have access to the JVM upon your init
def __init__(self, jvm, **kwargs):
super().__init__()
self.jvm = jvm
Which then lets you access the jvm gateway and Java data classes like this:
jvm_gateway = self.jvm.gateway
# Create a Java Map/Dict
map = self.jvm.java.util.HashMap()
map.put("name", record['name'])
# Or convert the Python dict to a Java Map
data = {"name": record["name"]}
data = MapConverter().convert(data, jvm_gateway._gateway_client)
Created 03-05-2024 07:09 PM
Where did you find this code , or did you add to the __init __ of the processor itself:
def __init__(self, jvm, **kwargs):
super().__init__()
self.jvm = jvm
I could not get it to compile , it kept saying super has no attribute jvm 😞
Created on 03-05-2024 07:13 PM - edited 03-05-2024 07:14 PM
@SAMSAL are you still on M1 or M2...I'm on M1 and took what you shared and just made those minor tweaks. I'll test it out more tomorrow...maybe I'll restart NiFi just to clear memory and any funny stuff that might there.
Created 05-28-2024 05:51 AM
Apparently in the github repo, there's a folder for processor example of base level, though it might not cover everything yet, might be a good place to solve basic issues.
I had issues building a custom relationship as well, then I saw this example, which helped.