- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Ensuring of order of flow files in Nifi
- Labels:
-
Apache NiFi
Created 04-05-2021 03:48 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi,
We need help with respect to order of records in Nifi.
We have couple of custom processors which work in batch.
There is processor which reads the data from source (Processor A) & another processor which writes the data to SFTP (processor B).
Processor A outputs the data in form of multiple flow files (100 records from source are written in one flow file). Assume if there are 600 records in source, we get 6 FFs out of processor A.
Process A & Processor B are directly connected to each other & entire execution happens in Single node only.
However, we are finding that one some occasions the Process B is processing flow files out of order. For example, third flow gets processed first & then the second flow file.
Understood the flow fill prioritizer needs to be chosen explicitly in the connection between Process A & Process B. However, confused up on FirstInFirstOut prioritizer & OlderFlowFilePrioritizer.
Kindly provide pointers to guarantee the order of flow files
Thanks in advance.
Created 04-12-2021 06:35 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@ram_g
With all 100 FlowFiles committed to the success relationship of your custom processor at the same time, how do we want NiFi to determine their priority order?
If you can out put some attributes on each FlowFile that your custom processor is creating, those attribute values could be used set processing order downstream.
Hope this helps,
Matt
Created 04-05-2021 05:48 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @ram_g
The oldest flow file processor determines the oldest file using the amount of time the flow file is in the flow. That makes processor B take any file in random if more than one flow file comes out of processor A.
Can you please elaborate the issue you are facing while you go with FirstInFirstOut prioritizer?
Also please see if you can limit the queue's threshold to 1 so there is always one flow file in the queue eliminating the confusion caused by prioritizer.
Created 04-05-2021 06:42 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks.
We haven't explicitly setup an prioritizer so far the connection. Now observing it goes out of order at random times. So looking for best prioritizer which will help us to maintain the order, confused between the two FIFO & oldestFlowFile.
Created 04-05-2021 09:10 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @ram_g
Can you please let me know how much time it takes processor A to create single flow file of 100 records?
Created 04-06-2021 11:50 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi ,
This typically varies & we can seen it can be around 100ms(best case) - 2 seconds(worst case) range. Avg is around 200-300ms
Created 04-08-2021 08:35 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@ram_g @Magudeswaran
Guaranteeing order in NiFi can be challenging.
As far as the prioritizers on the connection go:
FirstInFirstOutPrioritizer: Given two FlowFiles, the one that reached the connection first will be processed first. This looks at timestamp recorded for FlowFile when the FlowFile entered this connection.
- In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles. Typically with such processors all output FlowFiles are committed to the downstream connection at the same time which makes using this prioritizer a challenge if that is the case. But generally processors that produce multiple FlowFiles from a single FlowFile also set FlowFile attributes that identify the fragments. Take a look at the attributes written by the SplitRecord processor as an example.
- OldestFlowFileFirstPrioritizer: Given two FlowFiles, the one that is oldest in the dataflow will be processed first. 'This is the default scheme that is used if no prioritizers are selected'. This looks at the FlowFile creation timestamp.
- In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles. Are all output FlowFiles created as new?
Now you may want to look at the following prioritizer:
- PriorityAttributePrioritizer: Given two FlowFiles, an attribute called “priority” will be extracted. The one that has the lowest priority value will be processed first.
Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set.
If only one has that attribute it will go first.
Values for the "priority" attribute can be alphanumeric, where "a" will come before "z" and "1" before "9"
If "priority" attribute cannot be parsed as a long, unicode string ordering will be used. For example: "99" and "100" will be ordered so the flowfile with "99" comes first, but "A-99" and "A-100" will sort so the flowfile with "A-100" comes first.
Assuming your custom processor writes some unique attribute(s) to the FlowFiles it outputs, you may be able to use those attributes to enforce ordering downstream via above prioritizer.
*** Also keep in mind that NiFi connection are "soft" limits. If your ere to set backpressure object threshold on connection outbound from your custom processor to 1 and on execution of your processor it produced 6 FlowFiles, they would all get committed to that connection. Only then does backpressure kick in and prevent your custom processor from being scheduled again until queue drops to below the backpressure threshold again. This is a good way of making sure only one "batch" of FlowFiles lands in the downstream connection at a time, but will not help enforce the order of the FlowFiles in that batch.
Hope this helps,
Matt
Created on 04-09-2021 01:59 AM - edited 04-09-2021 02:00 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles. Are all output FlowFiles created as new?
Created 04-12-2021 06:35 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@ram_g
With all 100 FlowFiles committed to the success relationship of your custom processor at the same time, how do we want NiFi to determine their priority order?
If you can out put some attributes on each FlowFile that your custom processor is creating, those attribute values could be used set processing order downstream.
Hope this helps,
Matt
Created on 04-13-2021 12:47 AM - edited 04-13-2021 12:47 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks. We have decided to introduce Priority attribute & use the corresponding prioritizer for the connection.
Created 04-12-2021 04:34 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
