Member since
07-27-2023
55
Posts
19
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1386 | 09-14-2023 04:41 AM |
03-03-2024
09:39 PM
NiFi docs mention this: While the contents and attributes of a FlowFile can change, the FlowFile object is immutable. How is flowfile immutable if its contents and attributes can change?
... View more
Labels:
- Labels:
-
Apache NiFi
01-28-2024
11:36 PM
I have set following: nifi.flowfile.repository.implementation: org.apache.nifi.controller.repository.VolatileFlowFileRepository nifi.provenance.repository.implementation: org.apache.nifi.provenance.VolatileProvenanceRepository With this, I expect disk io operations to go down(checked with nmon). But its just opposite and disk writes have gone up multiple times. Why so?
... View more
Labels:
- Labels:
-
Apache NiFi
01-11-2024
11:32 PM
Is this the correct way of exposing a nifi property as env variable(in start.sh): prop_replace 'nifi.web.http.port' "${NIFI_WEB_HTTP_PORT:-8080}" I expect it to expose nifi.web.http.port as NIFI_WEB_HTTP_PORT, and also setting it to default value of 8080.
... View more
Labels:
- Labels:
-
Apache NiFi
10-12-2023
04:38 AM
Here is my unit test code: runner.setProperty(....); RecordSchema inputRecordSchema = ....create schema based on RecordField final Object[] recordArray = recordArray(inputRecordSchema); // create records(instances of MapRecord) parser.addRecord((Object) recordArray); .... Object[] recordArray(RecordSchema schema){ Record record = new MapRecord(schema, new HashMap<String, Object> { { put(...) ... ... } } ..... } In main code, I iterate records from reader: while((record = reader.nextRecord()) != null) {
//process record
} But I see that record, though MapRecord instance, is empty. Seems I am missing some step in unit test. How to sort it out?
... View more
Labels:
- Labels:
-
Apache NiFi
10-08-2023
10:59 PM
Its in Java. It basically needs to split a record based on some input fields, and then also filter records.
... View more
10-08-2023
10:57 PM
I need some inputs on relationship definitions in a custom processor code: 1. Do I need to define all three(success, failure, original) relationships in processor class? 2. Do I need to handle all three relationships(using session.transfer(....))? 3. Why do I need to transfer original flowfile as it has been processed?
... View more
Labels:
- Labels:
-
Apache NiFi
10-07-2023
04:01 AM
I am trying to implement for my custom processor: @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException Basically I do: FlowFile original = session.get(); ... ... try (final InputStream in = session.read(original); final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { Record inputRecord = null; output = session.create(original); while((inputRecord = reader.nextRecord()) != null) { final WriteResult writeResult; ... ... In unit test, I have this arrangement: List<RecordField> fields = new ArrayList<>(); RecordField idField = new RecordField("id", RecordFieldType.STRING.getDataType()); fields.add(idField); .... more fields RecordSchema inputRecordSchema = new SimpleRecordSchema(fields); RecordField inputRecord = new RecordField("inputRecord", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(inputRecordSchema))); parser.addSchemaField(inputRecord); final Record record1 = new MapRecord(inputRecordSchema, new HashMap<String, Object>(){....}); final Record record2 = new MapRecord(inputRecordSchema, new HashMap<String, Object>() {....}); final Object[] recordArray = new Object[] { record1, record2 }; parser.addRecord((Object) recordArray); Though I explicitly set inputRecord, I expect that reader.nextRecord() in main code would fetch me inner records one by one. So first I would get record1, then record2. But instead I get parent record(by name inputRecord). As a result, I need to write explicit code to parse the record, something like: Arrays.asList(inputRecord.getValues()).get(0).... I want to get rid of this. Any input in improving this would help.
... View more
Labels:
- Labels:
-
Apache NiFi
10-06-2023
06:49 AM
My attempt so far: List<RecordField> fields = new ArrayList<>(); RecordField field1= new RecordField("field1", RecordFieldType.STRING.getDataType()); fields.add(field1); .... .... SimpleRecordSchema schema = new SimpleRecordSchema(fields); final Record mapRecord = new MapRecord(schema, new HashMap<>()); final List<FieldValue> selectedFields = new ArrayList<>(); //TODO how to create corresponding FieldValue instances from raw values for (final FieldValue selectedField : selectedFields) { mapRecord.setValue(selectedField.getField(), selectedField.getValue()); }
... View more
10-06-2023
05:47 AM
So my requirement is that I have to take some selected fields from input record, do some computations with them, create a new Record with new fields which are populated with these computed values, and write this new record into output flow file. I have to do it all in a custom processor I am writing.
... View more
10-06-2023
04:42 AM
I have a usecase which involves creating a new Record(actually several records) from a record in input flowfile, and writing it in output flowfile. PS: I would use avro reader to read input and avro writer to write into output. What is idiomatic way of doing it in nifi?
... View more
Labels:
- Labels:
-
Apache NiFi