Created 04-05-2021 03:48 AM
Hi,
We need help with respect to order of records in Nifi.
We have couple of custom processors which work in batch.
There is processor which reads the data from source (Processor A) & another processor which writes the data to SFTP (processor B).
Processor A outputs the data in form of multiple flow files (100 records from source are written in one flow file). Assume if there are 600 records in source, we get 6 FFs out of processor A.
Process A & Processor B are directly connected to each other & entire execution happens in Single node only.
However, we are finding that one some occasions the Process B is processing flow files out of order. For example, third flow gets processed first & then the second flow file.
Understood the flow fill prioritizer needs to be chosen explicitly in the connection between Process A & Process B. However, confused up on FirstInFirstOut prioritizer & OlderFlowFilePrioritizer.
Kindly provide pointers to guarantee the order of flow files
Thanks in advance.
Created 04-12-2021 06:35 AM
@ram_g
With all 100 FlowFiles committed to the success relationship of your custom processor at the same time, how do we want NiFi to determine their priority order?
If you can out put some attributes on each FlowFile that your custom processor is creating, those attribute values could be used set processing order downstream.
Hope this helps,
Matt
Created 04-05-2021 05:48 AM
Hi @ram_g
The oldest flow file processor determines the oldest file using the amount of time the flow file is in the flow. That makes processor B take any file in random if more than one flow file comes out of processor A.
Can you please elaborate the issue you are facing while you go with FirstInFirstOut prioritizer?
Also please see if you can limit the queue's threshold to 1 so there is always one flow file in the queue eliminating the confusion caused by prioritizer.
Created 04-05-2021 06:42 AM
Thanks.
We haven't explicitly setup an prioritizer so far the connection. Now observing it goes out of order at random times. So looking for best prioritizer which will help us to maintain the order, confused between the two FIFO & oldestFlowFile.
Created 04-05-2021 09:10 PM
Hi @ram_g
Can you please let me know how much time it takes processor A to create single flow file of 100 records?
Created 04-06-2021 11:50 PM
Hi ,
This typically varies & we can seen it can be around 100ms(best case) - 2 seconds(worst case) range. Avg is around 200-300ms
Created 04-08-2021 08:35 AM
@ram_g @Magudeswaran
Guaranteeing order in NiFi can be challenging.
As far as the prioritizers on the connection go:
FirstInFirstOutPrioritizer: Given two FlowFiles, the one that reached the connection first will be processed first. This looks at timestamp recorded for FlowFile when the FlowFile entered this connection.
Now you may want to look at the following prioritizer:
Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set.
If only one has that attribute it will go first.
Values for the "priority" attribute can be alphanumeric, where "a" will come before "z" and "1" before "9"
If "priority" attribute cannot be parsed as a long, unicode string ordering will be used. For example: "99" and "100" will be ordered so the flowfile with "99" comes first, but "A-99" and "A-100" will sort so the flowfile with "A-100" comes first.
Assuming your custom processor writes some unique attribute(s) to the FlowFiles it outputs, you may be able to use those attributes to enforce ordering downstream via above prioritizer.
*** Also keep in mind that NiFi connection are "soft" limits. If your ere to set backpressure object threshold on connection outbound from your custom processor to 1 and on execution of your processor it produced 6 FlowFiles, they would all get committed to that connection. Only then does backpressure kick in and prevent your custom processor from being scheduled again until queue drops to below the backpressure threshold again. This is a good way of making sure only one "batch" of FlowFiles lands in the downstream connection at a time, but will not help enforce the order of the FlowFiles in that batch.
Hope this helps,
Matt
Created on 04-09-2021 01:59 AM - edited 04-09-2021 02:00 AM
In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles. Are all output FlowFiles created as new?
Yes. we are creating new flow files for input flow file. Processor A get triggered with single flow file which comes to it scheduled intervals. After getting this request, the processor reads the data from source like DB & produces output flow files for each 100 records from source which sent to processor B for writing in SFTP. In Processor B we are observing FFs going out of order.
Created 04-12-2021 06:35 AM
@ram_g
With all 100 FlowFiles committed to the success relationship of your custom processor at the same time, how do we want NiFi to determine their priority order?
If you can out put some attributes on each FlowFile that your custom processor is creating, those attribute values could be used set processing order downstream.
Hope this helps,
Matt
Created on 04-13-2021 12:47 AM - edited 04-13-2021 12:47 AM
Thanks. We have decided to introduce Priority attribute & use the corresponding prioritizer for the connection.
Created 04-12-2021 04:34 PM