Created on 05-03-2023 10:43 AM - edited 05-03-2023 01:08 PM
Hi all,
We are trying to configure a flow in Nifi where failed sql queries are saved after some retries to local file for further analysis.
I already see the following links related to error handling in nifi
https://youtu.be/nrxv65J4Ctg by Steven Koon
https://insights.1904labs.com/blog/2020-11-12-creating-an-error-retry-framework-in-nifi-part-2
But those articles do not refer to PutSql processor, I'm not sure if the case apply.
The failed query in our scenario using PutSQL processor is related to wrong registry information on origin database, giving error inserting a text value on a type bigint column on destination sql server database.
This is an example sql query used for inserts on destination database:
INSERT INTO destination (id, name, description, group_id) VALUES (100, 'Bob', 'Example query', '2151')
We have a wrong registry on origin database in where the group_id field have value 'null' getting the following exception on nifi logs:
org.apache.nifi.processor.exception.ProcessException: Failed to process StandardFlowFileRecord[uuid=8fb0f61a-dda2-4101-befe-24ca18ced5c0,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1683132909199-67, container=default, section=67], offset=36504, length=104],offset=0,name=56638e31-2c44-4506-bac7-a38a2e612301,size=104] due to java.sql.SQLDataException: The value of the sql.args.4.value is 'null', which cannot be converted into the necessary data type
at org.apache.nifi.processor.util.pattern.ExceptionHandler.lambda$createOnGroupError$2(ExceptionHandler.java:226)
at org.apache.nifi.processor.util.pattern.ExceptionHandler.lambda$createOnError$1(ExceptionHandler.java:179)
As the error shows the reason is that the destination group_id field is of type bigint. We need to manage cases like that (registries with wrong data or format where the query fails).
I configured a PutFile processor to save the queries that failed in PutSQL processor after some retries, this is the content in the local file created by PutFile processor after running the flow.
-----------------------------------------------------------------------------------------------------------
INSERT INTO topology_node_destination (id, natural_key, description, infotecnica_id) VALUES (?, ?, ?, ?)
-----------------------------------------------------------------------------------------------------------
This is the part of our flow concerning
My questions are the following:
Thank you for reading.
Created 05-04-2023 12:33 AM
@danielhg1285,
While the solution provided by @SAMSAL seems to be better for you and more production ready, you could also try the below things. This might work if you are using a stable statement all the time and if are not restricted to see the exact INSERT Statement but rather see the values trying to be inserted.
- Shortly after RetryFlowFile, you can add an AttributesToJSON processor and manually define all the columns which you want to insert in the Attributes List Property. Make sure that you use the attribute name from your FlowFile (sql.args.N.value) in your correct order and you set Destination = flowfile-content. In this way, you will generate a JSON File with all the columns and all the values which you have tried to insert but failed.
- After AttributesToJSON, you can keep your PutFile to save your file locally on your machine, hence opening it whenever and wherever you want 🙂
PS: This is maybe not the best solution, due to the following reasons, but it will get you started on your track:
- You will need to know how many columns you have to insert and each time a new column will be added you will have to modify your AttributesToJSON processor.
- You will not get the exact SQL INSERT/UPDATE Statement, but a JSON File containing the column-value pair, which can easily be analyzed by anybody.
Created 05-03-2023 10:46 AM
@danielhg1285 Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our NiFi experts @cotopaul @SAMSAL @MattWho who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,Created 05-03-2023 06:52 PM
Hi,
It seems like you are generating your SQL from JSONToSQL kind of processor and then using PUT SQL to execute the generated SQL statement from earlier processor , is this correct? If that is the case I dont think there is an easy way to capture the actual values in the SQL statement as they are expected to be part of the generated sql flow file attribute in the format of "sql.args.N.value" based on the PUTSQL documentation. The only suggestion I have to overcome such thing is to write your custom code inside ExecuteScript processpr after the "retries-exceeded" relationship to replace the place holders (?,?,?..) in the flowfile content with sql.args.N.value attribute where the N = place holder Index + 1, so you have to write some logic to extract the place holder , save into variable , split the variable using (,), loop through the array of "?", construct new variable with sql.args.[i+1]ivalue , when the loop finish replace the place holder string with the new value string , then store new result into new flowfile content and send to success. For more info on writing custom script using ExecuteScript :
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018
If anyone has a better idea please feel free to provide your input.
If that helps please accept solution.
Thanks
Created 05-04-2023 12:33 AM
@danielhg1285,
While the solution provided by @SAMSAL seems to be better for you and more production ready, you could also try the below things. This might work if you are using a stable statement all the time and if are not restricted to see the exact INSERT Statement but rather see the values trying to be inserted.
- Shortly after RetryFlowFile, you can add an AttributesToJSON processor and manually define all the columns which you want to insert in the Attributes List Property. Make sure that you use the attribute name from your FlowFile (sql.args.N.value) in your correct order and you set Destination = flowfile-content. In this way, you will generate a JSON File with all the columns and all the values which you have tried to insert but failed.
- After AttributesToJSON, you can keep your PutFile to save your file locally on your machine, hence opening it whenever and wherever you want 🙂
PS: This is maybe not the best solution, due to the following reasons, but it will get you started on your track:
- You will need to know how many columns you have to insert and each time a new column will be added you will have to modify your AttributesToJSON processor.
- You will not get the exact SQL INSERT/UPDATE Statement, but a JSON File containing the column-value pair, which can easily be analyzed by anybody.