Support Questions

Find answers, ask questions, and share your expertise

Correct implementation of onTrigger to get actual records

avatar
Expert Contributor

I am trying to implement for my custom processor:

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException

Basically I do:

FlowFile original = session.get();
...
...
try
(final InputStream in = session.read(original);
final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {

Record inputRecord = null;
output = session.create(original);
while((inputRecord = reader.nextRecord()) != null) {
final WriteResult writeResult;
...
...

In unit test, I have this arrangement:

List<RecordField> fields = new ArrayList<>();
RecordField idField = new RecordField("id", RecordFieldType.STRING.getDataType());
fields.add(idField);
.... more fields
RecordSchema inputRecordSchema = new SimpleRecordSchema(fields);
RecordField inputRecord = new RecordField("inputRecord",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(inputRecordSchema)));
parser.addSchemaField(inputRecord);
final Record record1 = new MapRecord(inputRecordSchema, new HashMap<String, Object>(){....});
final Record record2 = new MapRecord(inputRecordSchema, new HashMap<String, Object>() {....});
final Object[] recordArray = new Object[] { record1, record2 };
parser.addRecord((Object) recordArray);

Though I explicitly set inputRecord, I expect that reader.nextRecord() in main code would fetch me inner records one by one. So first I would get record1, then record2.

But instead I get parent record(by name inputRecord). As a result, I need to write explicit code to parse the record, something like:

Arrays.asList(inputRecord.getValues()).get(0)....

I want to get rid of this.

Any input in improving this would help.

2 REPLIES 2

avatar
Super Collaborator

Is this custom processor something you're writing in Java and will be a NAR or Groovy? Ultimately, what is it this custom processor is trying to achieve?

avatar
Expert Contributor

Its in Java. It basically needs to split a record based on some input fields, and then also filter records.