When converting CSV to AVRO I would like to output all the rejections to a file (let's say error.csv).
A rejection is usually caused by a wrong data type - e.g. when a "string" value appears in a "long" field.
I am trying to do it using incompatible output, however instead of saving the rows that failed to convert (2 in the example below), it saves the whole CSV file. Is it possible to filter out somehow only these records that failed to convert? (Does NiFi add some markers to these records etc?)
Hello @Michal R
I haven't tried it myself, but by referring ConvertCSVToAvro code, it seems that:
ConvertCSVToAvro transfers a clone of the incoming CSV file to 'incompatible' relationship, which contains whole CSV records, and an additional 'error' attribute. You can refer to the 'error' attribute to diagnose type of problems, and see example error record for problems. But it doesn't filter records.
Hope this helps,
Thanks Koji for your answer. That might be a good hint - the thing is I cannot find any good example which shows how to filter data based on a such additional attribute. I even looked at the source code and it adds sth like "errors" in case of incompatibles, but I am still looking for a transformation that will do the filtering I need. Any further hints will be appreciated.
@Matt Burgess let's assume that myfile.csv has 10 rows, 8 converted successfully and went to "success" leg, now - how I can filter out the "incompatibles" leg to store in errors.csv only those 2 records that failed to convert? RouteOnAttribute works on full files, which means that "error" is an attribute of a file not a single record.
@kkawamura but does the "incompatible" leg of the flow somehow mark single records with something like "error" attribute that is available after splitting the file into rows? I was trying to use RouteText, but it seems that "error" attribute applies only to a flow file, not the records separately.
Hi @Michal R
I tested ConvertCSVToAvro processor myself.
If I pass a CSV file containing 3 lines, and the 2nd line is incompatible, e.g. doesn't have required number of columns, then two Avro records are routed to 'success', and the original flow file which contains 3 CSV lines is routed to 'incompatible' and it has 'errors' attribute as:
Field v3: cannot make "string" value: 'null': Field v3 type:STRING pos:2 not set and has no default value
It's hard to extract the 2nd line from this information.
So, if you really need to extract failed records and do something, you need to split it in advance, like below. In this case, the 2nd line was routed to 'failure' relationship instead of 'incompatible' and it has the same 'error' attribute. This behavior was different than my expectation, I expected have it routed to 'incompatible'. But I don't have strong opinion whether we need to fix this or not..
So, overall, my recommendation is to split lines, then pass each line to ConvertCSVToAvro, then route both 'incompatible' and 'failure' relationship to the recovery route for now. However, this make resulting Avro file, that is routed to 'success' less efficient in performance point of view.
I think a better way we can improve is, adding new property to ConvertCSVToAvro something like 'Incompatible records' defaults to 'Summarize in error attribute' (existing behavior), and let user to select 'Copy to flow file content'. How do you think?
@Michal R. There are two different parts to CSVtoAvro converter. First is Type validation and next is conversion. The error that you have mentioned comes under Type Validation. ConvertCSVToAvro transfers a clone of the incoming CSV file to 'incompatible' relationship, which contains whole CSV record.
You can have your custom code here to append the error message at the end of each incompatible record (This is possible since they have failed type validation and are hence not converted to AVRO). Hope this helps.
@Balakrishnan Ramasamy thanks for the answer, but how I am supposed to know which record is incompatible? comparing the input file with the avro output file to look for these records does not seem to be effective way of doing that...
The Type validation is a record level operation. It's not necessary you use the same incompatible relationship. You can create a new relationship say invalid and you can plug in a logic to send in only those records which failed. And that only transfers the flow file to invalid when the error count is greater than 0.