Created on 10-07-2023 04:01 AM - edited 10-07-2023 04:03 AM
I am trying to implement for my custom processor:
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
Basically I do:
FlowFile original = session.get();
...
...
try (final InputStream in = session.read(original);
final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
Record inputRecord = null;
output = session.create(original);
while((inputRecord = reader.nextRecord()) != null) {
final WriteResult writeResult;
...
...
In unit test, I have this arrangement:
List<RecordField> fields = new ArrayList<>();
RecordField idField = new RecordField("id", RecordFieldType.STRING.getDataType());
fields.add(idField);
.... more fields
RecordSchema inputRecordSchema = new SimpleRecordSchema(fields);
RecordField inputRecord = new RecordField("inputRecord",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(inputRecordSchema)));
parser.addSchemaField(inputRecord);
final Record record1 = new MapRecord(inputRecordSchema, new HashMap<String, Object>(){....});
final Record record2 = new MapRecord(inputRecordSchema, new HashMap<String, Object>() {....});
final Object[] recordArray = new Object[] { record1, record2 };
parser.addRecord((Object) recordArray);
Though I explicitly set inputRecord, I expect that reader.nextRecord() in main code would fetch me inner records one by one. So first I would get record1, then record2.
But instead I get parent record(by name inputRecord). As a result, I need to write explicit code to parse the record, something like:
Arrays.asList(inputRecord.getValues()).get(0)....
I want to get rid of this.
Any input in improving this would help.
Created 10-08-2023 10:09 AM
Is this custom processor something you're writing in Java and will be a NAR or Groovy? Ultimately, what is it this custom processor is trying to achieve?
Created 10-08-2023 10:59 PM
Its in Java. It basically needs to split a record based on some input fields, and then also filter records.