Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi, CSV to Avro ValidateRecord - routing flowfile to failure because of double quote in field value

avatar
New Contributor

Hi Team, 

 

I am trying to validate CSV to Avro using ValidateRecord processor.  The Record Reader property of ValidateRecord processor is set to CSVReader controller service. Quote Character is set as double quotes (") for this CSVReader controller service.

 

When I try to validate flowfiles, few of the flowfiles are redirecting to failure relationship because of double quotes present in field value. 

 

Sample csv row from flowfile content :

 

"ICUA","01/22/2019","08:48:18",394846,"HAVE YOU REMOVED THE KEY?","YES---select "Accept Response" and continue with the remove","","","1"

 

I thought to use ReplaceText but this would tamper the actual value of the field. 

 

It would be really helpful if somebody could provide the approach to deal with this situation. 

 

Really appreciate your support. 

 

Thanks,

Vikram.

 

4 REPLIES 4

avatar

As you seem to understand, this is not a valid CSV file, hence custom parsing is required. I have copied the answer you gave here:

 

----

 

To implement search and replace missing double quote, I used ExecuteScript processor using Python such as,

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import re

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        with wrap(inputStream) as f:
            lines = f.readlines()
            outer_new_value_list = []
            for csv_row in lines:
                field_value_list = csv_row.split('|')
                inner_new_value_list = []
                for field in field_value_list:
                    if field.count('"') > 2:
                        replaced_field = re.sub(r'(?!^|.$)["^]', '""', field)
                        inner_new_value_list.append(replaced_field)

                    else:
                        inner_new_value_list.append(field)
                row = '|'.join([str(elem) for elem in inner_new_value_list])
                outer_new_value_list.append(row)
            with wrap(outputStream, 'w') as filehandle:
                filehandle.writelines("%s" % line for line in outer_new_value_list)
# end class
flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
# implicit return at the end

 

 

 


- Dennis Jaheruddin

If this answer helped, please mark it as 'solved' and/or if it is valuable for future readers please apply 'kudos'.

avatar
Super Guru

@vikram_shinde  can't you just use an escape character in the csv?

 

"ICUA","01/22/2019","08:48:18",394846,"HAVE YOU REMOVED THE KEY?","YES---select \"Accept Response\" and continue with the remove","","","1"

 If you can't change the source, a simple ReplaceText should resolve changing "quoted text" to \"quoted text\".   Ofcourse you would need to handle this downstream, but for example the csv reader if configured properly will ignore the escape characters when mapping that csv column to the schema.

 

 

If this answer helps with your issue, please mark it as Accepted Solution.

avatar
New Contributor

Thanks @DennisJaheruddi for updating question here. Really sorry that  I couldn't able to update this thread, was caught up in some other ingestion errors.

 

Thanks @stevenmatison for your input. Currently I am using ExecuteScript processor to handle double quote. You did mention that - "csv reader if configured properly will ignore the escape characters... " . Could you please provide idea/example about which CSVReader property we need to configure to ignore  escape characters ( I'm  not able to relate any CSVReader controller service property which would ignore escape characters)

 

avatar
Super Guru

@vikram_shinde the Escape Character is a setting in the CSVReader : CSVReader 1.9.0.3.4.1.1-4 my nifi version is: 1.9.

 

Escape Character is a basic of CSV, as it is required to ignore quotes inside of the "enclosed by quotes".  So if you feed the escaped string to the CSV reader it will output the correct values w/o the escape character.

 

Screen Shot 2019-12-27 at 9.33.58 AM.png