I have data that comes in records as CSV Format.
Sample Data :
a,b,c,d,e
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5
I want to read the record. then add 3-6 columns based on some conditions and process the data. then export the data in CSV format.
a,a1,b,b1,c,c1,d,d1,e,e2
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55
I can use processors but I need to build a new processor. I don't know how to read the records and process them. I am trying to write some code but I failed.
public class readcsv extends AbstractProcessor {
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("SUCCESS")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final Map<String, String> originalAttributes = flowFile.getAttributes();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, flowFile.getSize(), getLogger())) {
final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Failed to parse incoming data", e);
}
}
});
}
}