Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi How to save failed sql queries with wrong data on local files after some retries

avatar
New Contributor

Hi all,
We are trying to configure a flow in Nifi where failed sql queries are saved after some retries to local file for further analysis.

I already see the following links related to error handling in nifi
https://youtu.be/nrxv65J4Ctg by Steven Koon
https://insights.1904labs.com/blog/2020-11-12-creating-an-error-retry-framework-in-nifi-part-2

But those articles do not refer to PutSql processor, I'm not sure if the case apply.

 

The failed query in our scenario using PutSQL processor is related to wrong registry information on origin database, giving error inserting a text value on a type bigint column on destination sql server database.

This is an example sql query used for inserts on destination database:

INSERT INTO destination (id, name, description, group_id) VALUES (100, 'Bob', 'Example query', '2151')

We have a wrong registry on origin database in where the group_id field have value 'null' getting the following exception on nifi logs:


org.apache.nifi.processor.exception.ProcessException: Failed to process StandardFlowFileRecord[uuid=8fb0f61a-dda2-4101-befe-24ca18ced5c0,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1683132909199-67, container=default, section=67], offset=36504, length=104],offset=0,name=56638e31-2c44-4506-bac7-a38a2e612301,size=104] due to java.sql.SQLDataException: The value of the sql.args.4.value is 'null', which cannot be converted into the necessary data type
at org.apache.nifi.processor.util.pattern.ExceptionHandler.lambda$createOnGroupError$2(ExceptionHandler.java:226)
at org.apache.nifi.processor.util.pattern.ExceptionHandler.lambda$createOnError$1(ExceptionHandler.java:179)

 

As the error shows the reason is that the destination group_id field is of type bigint. We need to manage cases like that (registries with wrong data or format where the query fails).

 

I configured a PutFile processor to save the queries that failed in PutSQL processor after some retries, this is the content in the local file created by PutFile processor after running the flow.

-----------------------------------------------------------------------------------------------------------
INSERT INTO topology_node_destination (id, natural_key, description, infotecnica_id) VALUES (?, ?, ?, ?)

-----------------------------------------------------------------------------------------------------------

This is the part of our flow concerning

danielhg1285_0-1683134892863.png

 

My questions are the following:

  • Is there a way to save the query with the PutFile or another processor with the real values used by PutSQL processor, in a way to know the exact query that is causing the error ?
  • If that can not be achieved, what is the recommended practice ?


Thank you for reading.

1 ACCEPTED SOLUTION

avatar

@danielhg1285,

While the solution provided by @SAMSAL seems to be better for you and more production ready, you could also try the below things. This might work if you are using a stable statement all the time and if are not restricted to see the exact INSERT Statement but rather see the values trying to be inserted.


- Shortly after RetryFlowFile, you can add an AttributesToJSON processor and manually define all the columns which you want to insert in the Attributes List Property. Make sure that you use the attribute name from your FlowFile (sql.args.N.value) in your correct order and you set Destination = flowfile-content. In this way, you will generate a JSON File with all the columns and all the values which you have tried to insert but failed.
- After AttributesToJSON, you can keep your PutFile to save your file locally on your machine, hence opening it whenever and wherever you want 🙂

PS: This is maybe not the best solution, due to the following reasons, but it will get you started on your track:
- You will need to know how many columns you have to insert and each time a new column will be added you will have to modify your AttributesToJSON processor.
- You will not get the exact SQL INSERT/UPDATE Statement, but a JSON File containing the column-value pair, which can easily be analyzed by anybody.

View solution in original post

3 REPLIES 3

avatar
Community Manager

@danielhg1285 Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @cotopaul @SAMSAL @MattWho  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar

Hi,

It seems like you are generating your SQL from JSONToSQL kind of processor and then using PUT SQL to execute the generated SQL statement from earlier processor , is this correct? If that is the case I dont think there is an easy way to capture the actual values in the SQL statement as they are expected to be part of the generated sql flow file attribute in the format of "sql.args.N.value" based on the PUTSQL documentation. The only suggestion I have to overcome such thing is to write your custom code inside ExecuteScript processpr after the "retries-exceeded"  relationship to replace the place holders (?,?,?..) in the flowfile content with sql.args.N.value attribute where the N = place holder Index + 1, so you have to write some logic to extract the place holder , save into variable , split the variable using (,), loop through the array of "?", construct new variable with sql.args.[i+1]ivalue , when the loop finish replace the place holder string with the new value string , then store new result into new flowfile content and send to success. For more info on writing custom script using ExecuteScript :

https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922

https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018

If anyone has a better idea please feel free to provide your input.

If that helps please accept solution.

Thanks

 

 

 

avatar

@danielhg1285,

While the solution provided by @SAMSAL seems to be better for you and more production ready, you could also try the below things. This might work if you are using a stable statement all the time and if are not restricted to see the exact INSERT Statement but rather see the values trying to be inserted.


- Shortly after RetryFlowFile, you can add an AttributesToJSON processor and manually define all the columns which you want to insert in the Attributes List Property. Make sure that you use the attribute name from your FlowFile (sql.args.N.value) in your correct order and you set Destination = flowfile-content. In this way, you will generate a JSON File with all the columns and all the values which you have tried to insert but failed.
- After AttributesToJSON, you can keep your PutFile to save your file locally on your machine, hence opening it whenever and wherever you want 🙂

PS: This is maybe not the best solution, due to the following reasons, but it will get you started on your track:
- You will need to know how many columns you have to insert and each time a new column will be added you will have to modify your AttributesToJSON processor.
- You will not get the exact SQL INSERT/UPDATE Statement, but a JSON File containing the column-value pair, which can easily be analyzed by anybody.