@Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
Basically I do:
FlowFile original = session.get(); ... ... try (final InputStream in = session.read(original); final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
Record inputRecord = null; output = session.create(original); while((inputRecord = reader.nextRecord()) != null) { final WriteResult writeResult; ... ...
In unit test, I have this arrangement:
List<RecordField> fields = new ArrayList<>(); RecordField idField = new RecordField("id", RecordFieldType.STRING.getDataType()); fields.add(idField); .... more fields RecordSchema inputRecordSchema = new SimpleRecordSchema(fields); RecordField inputRecord = new RecordField("inputRecord", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(inputRecordSchema))); parser.addSchemaField(inputRecord); final Record record1 = new MapRecord(inputRecordSchema, new HashMap<String, Object>(){....}); final Record record2 = new MapRecord(inputRecordSchema, new HashMap<String, Object>() {....}); final Object[] recordArray = new Object[] { record1, record2 }; parser.addRecord((Object) recordArray);
Though I explicitly set inputRecord, I expect that reader.nextRecord() in main code would fetch me inner records one by one. So first I would get record1, then record2.
But instead I get parent record(by name inputRecord). As a result, I need to write explicit code to parse the record, something like:
Is this custom processor something you're writing in Java and will be a NAR or Groovy? Ultimately, what is it this custom processor is trying to achieve?