I have data that comes in records as CSV Format.
Sample Data :
Created 01-31-2021 09:27 AM
I have data that comes in records as CSV Format.
Sample Data :
a,b,c,d,e
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5
I want to read the record. then add 3-6 columns based on some conditions and process the data. then export the data in CSV format.
a,a1,b,b1,c,c1,d,d1,e,e2
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55
I can use processors but I need to build a new processor. I don't know how to read the records and process them. I am trying to write some code but I failed.
public class readcsv extends AbstractProcessor {
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("SUCCESS")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final Map<String, String> originalAttributes = flowFile.getAttributes();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, flowFile.getSize(), getLogger())) {
final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Failed to parse incoming data", e);
}
}
});
}
}
Created 02-01-2021 06:08 AM
The first thing I would recommend, is attempting to avoid a custom processor.
If you need to add columns to a record, the UpdateRecord processor should enable this. (At first I expected the QueryRecord processor to do it as well, but am unable to find any references to this).
https://community.cloudera.com/t5/Support-Questions/Adding-columns-to-sql-in-nifi/td-p/224598