Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

read CSV Record then update the values using nifi custom processor

avatar
Explorer
 

I have data that comes in records as CSV Format.

Sample Data :

 

a,b,c,d,e
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5

 

 

 

 

I want to read the record. then add 3-6 columns based on some conditions and process the data. then export the data in CSV format.

 

 

a,a1,b,b1,c,c1,d,d1,e,e2
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55

 

 

I can use processors but I need to build a new processor. I don't know how to read the records and process them. I am trying to write some code but I failed.

 

 

public class readcsv extends AbstractProcessor {

    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
            .name("Record Reader")
            .description("Specifies the Controller Service to use for reading incoming data")
            .identifiesControllerService(RecordReaderFactory.class)
            .required(true)
            .build();

    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
            .name("Record Writer")
            .description("Specifies the Controller Service to use for writing out the records")
            .identifiesControllerService(RecordSetWriterFactory.class)
            .required(true)
            .build();

    static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("SUCCESS")
            .build();
   

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(RECORD_READER);
        descriptors.add(RECORD_WRITER);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
    }



    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        final Map<String, String> originalAttributes = flowFile.getAttributes();
        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);


        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(InputStream in) throws IOException {
                try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, flowFile.getSize(), getLogger())) {

                    final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
                    final RecordSet recordSet = reader.createRecordSet();
                    final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);


                } catch (final SchemaNotFoundException | MalformedRecordException e) {
                    throw new ProcessException("Failed to parse incoming data", e);
                }
            }
        });


    }
}

 

1 REPLY 1

avatar

The first thing I would recommend, is attempting to avoid a custom processor.

 

If you need to add columns to a record, the UpdateRecord processor should enable this. (At first I expected the QueryRecord processor to do it as well, but am unable to find any references to this).

 

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache...

 

https://community.cloudera.com/t5/Support-Questions/Adding-columns-to-sql-in-nifi/td-p/224598 


- Dennis Jaheruddin

If this answer helped, please mark it as 'solved' and/or if it is valuable for future readers please apply 'kudos'.