Created 12-19-2019 05:47 AM
I am trying to validate CSV to Avro using ValidateRecord processor. The Record Reader property of ValidateRecord processor is set to CSVReader controller service. Quote Character is set as double quotes (") for this CSVReader controller service.
When I try to validate flowfiles, few of the flowfiles are redirecting to failure relationship because of double quotes present in field value.
Sample csv row from flowfile content :
"ICUA","01/22/2019","08:48:18",394846,"HAVE YOU REMOVED THE KEY?","YES---select "Accept Response" and continue with the remove","","","1"
I thought to use ReplaceText but this would tamper the actual value of the field.
It would be really helpful if somebody could provide the approach to deal with this situation.
Really appreciate your support.
Created on 12-24-2019 03:51 AM - edited 12-24-2019 03:52 AM
As you seem to understand, this is not a valid CSV file, hence custom parsing is required. I have copied the answer you gave here:
To implement search and replace missing double quote, I used ExecuteScript processor using Python such as,
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback from org.apache.nifi.processors.script import ExecuteScript from org.python.core.util.FileUtil import wrap from io import StringIO import re # Define a subclass of StreamCallback for use in session.write() class PyStreamCallback(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): with wrap(inputStream) as f: lines = f.readlines() outer_new_value_list =  for csv_row in lines: field_value_list = csv_row.split('|') inner_new_value_list =  for field in field_value_list: if field.count('"') > 2: replaced_field = re.sub(r'(?!^|.$)["^]', '""', field) inner_new_value_list.append(replaced_field) else: inner_new_value_list.append(field) row = '|'.join([str(elem) for elem in inner_new_value_list]) outer_new_value_list.append(row) with wrap(outputStream, 'w') as filehandle: filehandle.writelines("%s" % line for line in outer_new_value_list) # end class flowFile = session.get() if (flowFile != None): flowFile = session.write(flowFile, PyStreamCallback()) session.transfer(flowFile, ExecuteScript.REL_SUCCESS) # implicit return at the end
Created on 12-24-2019 05:00 AM - edited 12-24-2019 05:02 AM
@vikram_shinde can't you just use an escape character in the csv?
"ICUA","01/22/2019","08:48:18",394846,"HAVE YOU REMOVED THE KEY?","YES---select \"Accept Response\" and continue with the remove","","","1"
If you can't change the source, a simple ReplaceText should resolve changing "quoted text" to \"quoted text\". Ofcourse you would need to handle this downstream, but for example the csv reader if configured properly will ignore the escape characters when mapping that csv column to the schema.
If this answer helps with your issue, please mark it as Accepted Solution.
Created 12-25-2019 09:32 PM
Thanks @DennisJaheruddi for updating question here. Really sorry that I couldn't able to update this thread, was caught up in some other ingestion errors.
Thanks @stevenmatison for your input. Currently I am using ExecuteScript processor to handle double quote. You did mention that - "csv reader if configured properly will ignore the escape characters... " . Could you please provide idea/example about which CSVReader property we need to configure to ignore escape characters ( I'm not able to relate any CSVReader controller service property which would ignore escape characters)
Created 12-27-2019 06:37 AM
@vikram_shinde the Escape Character is a setting in the CSVReader : CSVReader 22.214.171.124.4.1.1-4 my nifi version is: 1.9.
Escape Character is a basic of CSV, as it is required to ignore quotes inside of the "enclosed by quotes". So if you feed the escaped string to the CSV reader it will output the correct values w/o the escape character.