Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2115 | 12-20-2024 05:49 AM | |
| 2413 | 12-19-2024 08:33 PM | |
| 2161 | 12-19-2024 06:48 AM | |
| 1445 | 12-17-2024 12:56 PM | |
| 2062 | 12-16-2024 04:38 AM |
03-02-2024
06:18 AM
@saquibsk, It seems you are using Literal Value in the Replacement Value Strategy of the second UpdateRecord. I think it needs to be "Record Path Value" since you are referencing another field from the content data itself. If that helps please accept solution. Thanks
... View more
03-01-2024
05:35 AM
1 Kudo
Hi @piyush7829 , Glad it worked and you were able to do the post. If I understood your question correctly then you need to pass the series_id to the post:form:series_id multipart form property dynamically, right? If that is the case then its easy since the InvokeHttp multipart dynamic properties allows for expression language you can reference the series_id attribute you have extracted using the EvaluateJsonPath and assign it as follows: post:form:series_id: ${series_id} Every time the invokeHttp is triggered it would evaluate the series_id expression from the incoming flowfile and assign its value before making the api invokation. Hope that helps.
... View more
03-01-2024
05:27 AM
2 Kudos
@VidyaSargur& @pvillard , Thank you both for reading my long post and reply with feedback. I think I will try and do both. If I can provide some input to make the Guide more helpful then I would be happy to do so. I will also try and come up with couple of shorts article regarding python extension based on what I have gone through and what I have learned so far. Thanks
... View more
02-29-2024
07:02 PM
2 Kudos
hi @piyush7829 , Can you provide more info on what the issue is? Are you getting error messages in the failure , retry relationships? Are you not getting any response at all? Are you sure that your api takes the data as request body and not as post form data key value as the processor description indicates : Also I dont see that you are setting the Request Content-Type property to: multipart/form-data Here are some posts that might help get it working: https://community.cloudera.com/t5/Support-Questions/How-to-send-an-http-POST-multipart-form-data-request-with-a/m-p/240035 https://palindromicity.blogspot.com/2020/04/sending-multipart-form-data-with.html https://stackoverflow.com/questions/51929208/how-to-send-multipart-form-data-via-invokehttp-nifi If you find this is helpful please accept solution. Thanks
... View more
02-29-2024
04:47 PM
3 Kudos
Greetings, So I have been experimenting with Python extension (2.0.0 -M1) for sometime now and I have learned quite a lot since I posted the post above, and I think now I know how to resolve those issues which I will share with the community, but before doing so, I just wanted to kindly ask - if I may - the Nifi Leadership and the committers out there to do better job when documenting the different features and capabilities of Nifi specially when its newly released feature. They have done great job on some of the documentation like in the General Nifi User Guide , System Admin. user Guide, How to start with Nifi...etc. but when it comes to others like the "NiFi Python Developer’s Guide" (https://nifi.apache.org/documentation/nifi-2.0.0-M1/html/python-developer-guide.htm ) I find it extremely lacking and any one who is new to this and new to the Python to Java communication, would struggle quite a bit and spend a lot of time to figure out how things can be implemented based on what the guide is highlighting. I'm hoping this can reach someone who can address this concern @cjervis , @VidyaSargur , @pvillard. Anyway, enough talking about boring documentations and lets get to code: First, if you are planning to use Python Extension I recommend you get familiarized with the Java\Python communication specially the py4j library. it seems everything written in python is being passed\translated to Java which makes sense because Nifi is a java based application. The Python extension guide referenced above talks briefly about it but you can find more info here: https://www.py4j.org/ Second, as I mentioned in the above post that both issues seem to be related to the same cause since both issues produce the same error even though they are addressing totally different development problem. To summarize the issue, is when you are dealing with complex types like Python Collections as in the first case or external Classes like nifi Relationship: nifiapi.relationship Those complex types will get passed to the jvm through py4j , and if they dont adhere to the Java Gateway interface you will end up with this confusing error: AttributeError: '...' object has no attribute '_get_object_id' If you get familiar with how py4j works, you understand that in order to have the right class types everything has to go through\from java_gateway(). Either you convert your complex types to java compatible ones or you instantiate them through the gateway. In the first case with python dict type, I managed to resolve it using the java_gateway MapConverter() but it felt like Im doing some kind of hack copying some code from the python framework folder from the "controller.py" file in order to have access to the java_gateway from my python extension processor which looked like this: java_port = int(os.getenv('JAVA_PORT'))
auth_token = os.getenv('AUTH_TOKEN')
gateway = JavaGateway(
callback_server_parameters=CallbackServerParameters(port=0, auth_token=auth_token),
gateway_parameters=GatewayParameters(port=java_port, read_timeout=None, enable_memory_management=True, auth_token=auth_token),
auto_convert=True)
.... However the second case I got stuck because I did not know how to convert or pass the nifiapi Relationship object. Well, after given up for days\weeks where there was no help even from the community, my stubbornness sometimes pays off 🙂 . It turns out buried somewhere in the same "Controller.py" the following line of code: # Initialize the JvmHolder class with the gateway jvm.
# This must be done before executing the module to ensure that the nifiapi module
# is able to access the JvmHolder.jvm variable. This enables the nifiapi.properties.StandardValidators, etc. to be used
# However, we have to delay the import until this point, rather than adding it to the top of the ExtensionManager class
# because we need to ensure that we've fetched the appropriate dependencies for the pyenv environment for the extension point.
from nifiapi.__jvm__ import JvmHolder
JvmHolder.jvm = gateway.jvm
JvmHolder.gateway = gateway Vowalla! I raise my hat to whoever wrote the comments above the code and I wish if they had access to slap those comments in the user guide as it would have saved me a lot of time. It seems the nifiapi has static class called JvmHolder that has access to the java_gateway(). I know it feels silly but Im going to say: it felt like I found a buried treasure! So that would have simplified the code above in two ways: 1- I could have used the gateway from the JvmHolder and do the conversion using MapConverter as follows: from nifiapi.__jvm__ import JvmHolder
input_dict = {'name':record['name']}
mc_run_map_dict = MapConverter().convert(input_dict,JvmHolder.gateway._gateway_client) 2- Or even better , It seems when you have access to the java_gateway , you can instantiate your java compatible python dict using gateway.jvm.java.util.HashMap(). Thanks to : https://www.py4j.org/advanced_topics.html#map from nifiapi.__jvm__ import JvmHolder
map = JvmHolder.gateway.jvm.java.util.HashMap()
map.put("name",record['name'])
return RecordTransformResult(schema=None,record=record,relationship="success",partition=map) It seems we are making progress without having to do more treasure hunting, but what about the custom Relationship? The python developer guide promised us that we can define our own relationship by implementing the getRelationships(self) : https://nifi.apache.org/documentation/nifi-2.0.0-M1/html/python-developer-guide.html#relationships and simply by returning a list of "nifiapi.relationship.Relationship objects" it should magically works - or at least thats what I thought - but it did not! So I had to go on another treasure hunting and what do you know, buried in the "PythonProcessorAdapter.py" I found the following code: self.relationships = gateway.jvm.java.util.HashSet()
success = gateway.jvm.org.apache.nifi.processor.Relationship.Builder() \
.name("success") \
.description("All FlowFiles will go to this relationship") \
.build()
self.relationships.add(success) Vowalla again! That solved my second issue and I was able to define and return my own custom relationship to which I shouted "freeeeeeeeeeeeedoooooooooooooooooooooooooom". I hope that helps someone out there and save them the headache, frustration and whatever hair left on their head. If you know of any other buried treasures, please share and don't be stingy. By the way if you love treasure hunting, I still have unsolved issue here that I would love to find an answer for: https://community.cloudera.com/t5/Support-Questions/python-extension-generate-multiple-flowfiles-from-bytes/m-p/383095 I hope the community accepts that Im accepting my own solution. Sorry if I kept it long or got quite emotional 😉 its just been quite the struggle. Thank you all for reading and keep learning. S
... View more
02-28-2024
11:10 AM
1 Kudo
Hi, It depends on how you are setting the UpdateRecord processor. If you can provide screenshot it will help us figure out the issue. It seems you are trying to overwrite the field SPRINT_LIST and its complaining that you cannot convert Array to String because its already an array in the first record. May I suggest a different approach because the UpdateRecord you have to worry about setting the Record Writer\Reader and also you have to define the avro schema accordingly if you are planning to add new fields. Instead you can just use the Json Jolt Transformation processor with the following spec: [
{
"operation": "shift",
"spec": {
"*": {
"SPRINT_LIST": {
"0": {
"#true": "[&3].is_sprint_array"
},
"@": "[&2].SPRINT_LIST"
},
"*": "[&1].&"
}
}
},
{
"operation": "default",
"spec": {
"*": {
"is_sprint_array": "false"
}
}
}
] No need to set record reader\writer as long you know the input is always its valid json and has same schema. This will give you the following output: [ {
"INTEGRATION_ID" : "1",
"ISSUE_STATUS_CAT" : "Done",
"SPRINT_LIST" : [ "24-w1-2", "24-w1-3", "24-w1-4" ],
"is_sprint_array" : "true"
}, {
"INTEGRATION_ID" : "2",
"ISSUE_STATUS_CAT" : "Done",
"SPRINT_LIST" : "24-w1-2",
"is_sprint_array" : "false"
} ] If that helps please accept solution. Thanks
... View more
02-28-2024
10:35 AM
Hi @iriszhuhao , I think you are trying to address to issues in one processor and I dont think the PutDatabaseRecord processor can do that: 1- Insert\update the records in Table1 2- Add changes log to Table2 Lets address Problem 1 first with Table1, I think the PutDatabaseRecord can handle multiple records in flowfile if you have a valid json containing all records as an array , in this case you can try setting up the Data Record Path property and depending on what version you are using , you can set also the "Statement Type Record Path" to determine what type of sql statement for each record (UPDATE, INSERT....etc.). This means you have to know before hand what statements applies to what record and that information is set alongside each record as part of the whole json flowfile. If you dont have the statement type attach to each record or you dont have the records as json array , i.e. each json record is separated with new line then you have to split those records using SplitText processor and then do a lookup to find if the record exist or not in the database to decide the proper sql statement. For the lookup you can use something LookupREcord processor with SimpleDatabaseLookupService. Regarding Problem 2 for adding change log to table2 , you have few options: 1- After you execute PutDatabaseRecord with the SQL statement provided as an attribute when you did the LookRecord to see if the record exists (update) or not (insert) then you can use the same attribute to write the proper SQL script ( or create sql stored proc) to add the change log and then use that SQL in the PutSQL processor in the SQL Statement property. In this case you have to extract record fields into attribute as well to provide to the SQL statement with the field_id, old_value & new_value so that it can check what value has changed or not, or pass the whole json record to sql and do the parsing there instead. This can be cumbersome if you have too many fields. 2- Instead of handling this in Nifi , you can define Update\Insert Trigger on Table 1 , then define the logic on the trigger to add proper log information depending on the old vs new values that get captured with the trigger. Again this might get cumbersome if you have too many fields to check and it might impact performance when updating data in table 1 but its cleaner than the first option. 3- Using CDC (Change Data Capture) , I know this comes out of the box with SQL server but I think for mysql you need to use a third party tool in order to set it up. I cant give you much details on how to because I dont know myself but you can research it. The idea here is to let CDE capture what has changed\inserted\deleted and then you can read that log info and store it in whatever format\table you desire. Here you dont have to check what field has changed because the CDC will do that for you. Hope that helps. If it does, please let me know if you have any questions otherwise please accept the solution. Thanks
... View more
02-27-2024
05:15 AM
2 Kudos
Hi @cotopaul , Thanks for your help. Actually you are right. Im not sure what I was thinking when I posted this or under what circumstances I was getting this error but I retried what I posted and it work so I apologize if I wasted your and the community time. However, what if I want to store the schema in the ValidateRecord processor itself as I would think that would be the proper way to use. Notice how in that processor there is no such field for setting the Date\Time format , and if I provide my schema to it by setting (Use Schema Text Property) I dont get an error but my flowfile goes to the "invalid" relationship complaining that the dob filed doesnt have the valid type! In this case I did not use any schema def in the record reader\write but I kept the data format set to MM/dd/yyyy. Is there a way to fix this or am I using the processor the wrong way? Thanks again for your help.
... View more
02-23-2024
07:08 AM
Hi Everyone, I'm trying to use ValidateRecord processor where I have a json input with date field as follows: {
"name": "sam",
"age": "20",
"dob": "01/01/2004",
"id": "001"
} In the validaterecord Record Writer Im using JsonRecordSetWriter with the following configuration: Where I'm using the following avro schema in the Schema Text property: {
"name": "person_rec",
"type": "record",
"namespace": "nifi.com",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "dob",
"type": "int",
"logicalType": "date"
},
{
"name": "id",
"type": "string"
}
]
} Also I'm setting the Date Format to : MM/dd/yyyy I though based on the description that when I set the Date Format it will be able to parse the date when reading\writing date fields. I thought by date field it means using logicalType "date" which means the type must be "int" according to avro schema. My expectation is that my validate record will be able to validate against date field as string with the specified format and write it as such however I keep getting error: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema
- Caused by: java.lang.NumberFormatException: For input string: "01/01/2004" Not sure if I miss understanding or missing something here but how this is suppose to work if its expecting string? How the specified Date Format knows what the date field if its not defined as such in the avro schema? Can someone help please ? Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
02-22-2024
06:18 AM
If you have your nifi instance secure with truststore & keystore , you can use the same for creating SSL Context Service using StandardSSLContextService or StandardRestrictedSSLContextService . Either services you need to populate the truststore & keystore information that you used to secure nifi, but you have to pass SSL information to the code in your executescript processor. There is a way to access controller services from ExecuteScript, refere to part3 of : https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018 But then again why not use the out of the box InvokeHttp processor , and pass the created service to the SSL Context Service property.
... View more