Member since
07-27-2023
32
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
358 | 09-14-2023 04:41 AM |
10-12-2023
04:38 AM
Here is my unit test code: runner.setProperty(....); RecordSchema inputRecordSchema = ....create schema based on RecordField final Object[] recordArray = recordArray(inputRecordSchema); // create records(instances of MapRecord) parser.addRecord((Object) recordArray); .... Object[] recordArray(RecordSchema schema){ Record record = new MapRecord(schema, new HashMap<String, Object> { { put(...) ... ... } } ..... } In main code, I iterate records from reader: while((record = reader.nextRecord()) != null) {
//process record
} But I see that record, though MapRecord instance, is empty. Seems I am missing some step in unit test. How to sort it out?
... View more
Labels:
- Labels:
-
Apache NiFi
10-08-2023
10:59 PM
Its in Java. It basically needs to split a record based on some input fields, and then also filter records.
... View more
10-08-2023
10:57 PM
I need some inputs on relationship definitions in a custom processor code: 1. Do I need to define all three(success, failure, original) relationships in processor class? 2. Do I need to handle all three relationships(using session.transfer(....))? 3. Why do I need to transfer original flowfile as it has been processed?
... View more
Labels:
- Labels:
-
Apache NiFi
10-07-2023
04:01 AM
I am trying to implement for my custom processor: @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException Basically I do: FlowFile original = session.get(); ... ... try (final InputStream in = session.read(original); final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { Record inputRecord = null; output = session.create(original); while((inputRecord = reader.nextRecord()) != null) { final WriteResult writeResult; ... ... In unit test, I have this arrangement: List<RecordField> fields = new ArrayList<>(); RecordField idField = new RecordField("id", RecordFieldType.STRING.getDataType()); fields.add(idField); .... more fields RecordSchema inputRecordSchema = new SimpleRecordSchema(fields); RecordField inputRecord = new RecordField("inputRecord", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(inputRecordSchema))); parser.addSchemaField(inputRecord); final Record record1 = new MapRecord(inputRecordSchema, new HashMap<String, Object>(){....}); final Record record2 = new MapRecord(inputRecordSchema, new HashMap<String, Object>() {....}); final Object[] recordArray = new Object[] { record1, record2 }; parser.addRecord((Object) recordArray); Though I explicitly set inputRecord, I expect that reader.nextRecord() in main code would fetch me inner records one by one. So first I would get record1, then record2. But instead I get parent record(by name inputRecord). As a result, I need to write explicit code to parse the record, something like: Arrays.asList(inputRecord.getValues()).get(0).... I want to get rid of this. Any input in improving this would help.
... View more
Labels:
- Labels:
-
Apache NiFi
10-06-2023
06:49 AM
My attempt so far: List<RecordField> fields = new ArrayList<>(); RecordField field1= new RecordField("field1", RecordFieldType.STRING.getDataType()); fields.add(field1); .... .... SimpleRecordSchema schema = new SimpleRecordSchema(fields); final Record mapRecord = new MapRecord(schema, new HashMap<>()); final List<FieldValue> selectedFields = new ArrayList<>(); //TODO how to create corresponding FieldValue instances from raw values for (final FieldValue selectedField : selectedFields) { mapRecord.setValue(selectedField.getField(), selectedField.getValue()); }
... View more
10-06-2023
05:47 AM
So my requirement is that I have to take some selected fields from input record, do some computations with them, create a new Record with new fields which are populated with these computed values, and write this new record into output flow file. I have to do it all in a custom processor I am writing.
... View more
10-06-2023
04:42 AM
I have a usecase which involves creating a new Record(actually several records) from a record in input flowfile, and writing it in output flowfile. PS: I would use avro reader to read input and avro writer to write into output. What is idiomatic way of doing it in nifi?
... View more
Labels:
- Labels:
-
Apache NiFi
09-27-2023
07:57 PM
Got it. Its on Data Provenance dialog box.
... View more
09-27-2023
07:52 PM
Where is this graph available on ui? And does it get updated after every run?
... View more
09-27-2023
02:36 AM
I have a nifi flow. I observe that if input file is split into smaller files and fed into the flow one by one, then overall time taken(sum of time taken for individual files) is considerably low compared to when I feed single big file. What can be a possible cause for this performance difference? Note: Flow has many processors that use avro readers/writers. I calculate time using following in a LogMessage processor: ${now():toNumber():minus(${lineageStartDate}):format("HH:mm:ss", "GMT")}
... View more
Labels:
- Labels:
-
Apache NiFi