Support Questions

Find answers, ask questions, and share your expertise

read CSV Record then update the values using nifi custom processor

avatar
Explorer
 

I have data that comes in records as CSV Format.

Sample Data :

 

a,b,c,d,e
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5

 

 

 

 

I want to read the record. then add 3-6 columns based on some conditions and process the data. then export the data in CSV format.

 

 

a,a1,b,b1,c,c1,d,d1,e,e2
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55

 

 

I can use processors but I need to build a new processor. I don't know how to read the records and process them. I am trying to write some code but I failed.

 

 

public class readcsv extends AbstractProcessor {

    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
            .name("Record Reader")
            .description("Specifies the Controller Service to use for reading incoming data")
            .identifiesControllerService(RecordReaderFactory.class)
            .required(true)
            .build();

    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
            .name("Record Writer")
            .description("Specifies the Controller Service to use for writing out the records")
            .identifiesControllerService(RecordSetWriterFactory.class)
            .required(true)
            .build();

    static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("SUCCESS")
            .build();
   

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(RECORD_READER);
        descriptors.add(RECORD_WRITER);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
    }



    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        final Map<String, String> originalAttributes = flowFile.getAttributes();
        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);


        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(InputStream in) throws IOException {
                try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, flowFile.getSize(), getLogger())) {

                    final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
                    final RecordSet recordSet = reader.createRecordSet();
                    final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);


                } catch (final SchemaNotFoundException | MalformedRecordException e) {
                    throw new ProcessException("Failed to parse incoming data", e);
                }
            }
        });


    }
}

 

1 REPLY 1

avatar

The first thing I would recommend, is attempting to avoid a custom processor.

 

If you need to add columns to a record, the UpdateRecord processor should enable this. (At first I expected the QueryRecord processor to do it as well, but am unable to find any references to this).

 

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache...

 

https://community.cloudera.com/t5/Support-Questions/Adding-columns-to-sql-in-nifi/td-p/224598 


- Dennis Jaheruddin

If this answer helped, please mark it as 'solved' and/or if it is valuable for future readers please apply 'kudos'.