Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Ensuring of order of flow files in Nifi

avatar
Explorer

Hi,

We need help with respect to order of records in Nifi.

 

We have couple of custom processors which work in batch.

There is processor which reads the data from source (Processor A) & another processor which writes the data to SFTP (processor B).

Processor A outputs the data in form of multiple flow files (100 records from source are written in one flow file). Assume if there are 600 records in source, we get 6 FFs out of processor A.

Process A & Processor B are directly connected to each other & entire execution happens in Single node only. 

However, we are finding that one some occasions the Process B is processing flow files out of order. For example, third flow gets processed first & then the second flow file.

 

Understood the flow fill prioritizer needs to be chosen explicitly in the connection between Process A & Process B. However, confused up on FirstInFirstOut prioritizer & OlderFlowFilePrioritizer.

 

Kindly provide pointers to guarantee the order of flow files

 

Thanks in advance.

 

1 ACCEPTED SOLUTION

avatar
Master Mentor

@ram_g 

With all 100 FlowFiles committed to the success relationship of your custom processor at the same time, how do we want NiFi to determine their priority order?
If you can out put some attributes on each FlowFile that your custom processor is creating, those attribute values could be used set processing order downstream.

Hope this helps,

Matt

View solution in original post

9 REPLIES 9

avatar
Contributor

Hi @ram_g 

 

The oldest flow file processor determines the oldest file using the amount of time the flow file is in the flow. That makes processor B take any file in random if more than one flow file comes out of processor A.

 

Can you please elaborate the issue you are facing while you go with FirstInFirstOut prioritizer?

 

Also please see if you can limit the queue's threshold to 1 so there is always one flow file in the queue eliminating the confusion caused by prioritizer. 

avatar
Explorer

Thanks. 

We haven't explicitly setup an prioritizer so far the connection. Now observing it goes out of order at random times. So looking for best prioritizer which will help us to maintain the order, confused between the two FIFO & oldestFlowFile.

avatar
Contributor

Hi @ram_g 

 

Can you please let me know how much time it takes processor A to create single flow file of 100 records?

avatar
Explorer

Hi ,

This typically varies & we can seen it can be around 100ms(best case) - 2 seconds(worst case) range. Avg is around 200-300ms

avatar
Master Mentor

@ram_g @Magudeswaran 

Guaranteeing order in NiFi can be challenging.
As far as the prioritizers on the connection go:

  • FirstInFirstOutPrioritizer: Given two FlowFiles, the one that reached the connection first will be processed first.  This looks at timestamp recorded for FlowFile when the FlowFile entered this connection. 

    • In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles.  Typically with such processors all output FlowFiles are committed to the downstream connection at the same time which makes using this prioritizer a challenge if that is the case.  But generally processors that produce multiple FlowFiles from a single FlowFile also set FlowFile attributes that identify the fragments.  Take a look at the attributes written by the SplitRecord processor as an example.  

     

  • OldestFlowFileFirstPrioritizer: Given two FlowFiles, the one that is oldest in the dataflow will be processed first. 'This is the default scheme that is used if no prioritizers are selected'.  This looks at the FlowFile creation timestamp. 
    • In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles.  Are all output FlowFiles created as new?

Now you may want to look at the following prioritizer:

  • PriorityAttributePrioritizer: Given two FlowFiles, an attribute called “priority” will be extracted. The one that has the lowest priority value will be processed first.
    • Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set.

    • If only one has that attribute it will go first.

    • Values for the "priority" attribute can be alphanumeric, where "a" will come before "z" and "1" before "9"

    • If "priority" attribute cannot be parsed as a long, unicode string ordering will be used. For example: "99" and "100" will be ordered so the flowfile with "99" comes first, but "A-99" and "A-100" will sort so the flowfile with "A-100" comes first.

Assuming your custom processor writes some unique attribute(s) to the FlowFiles it outputs, you may be able to use those attributes to enforce ordering downstream via above prioritizer.

*** Also keep in mind that NiFi connection are "soft" limits. If your ere to set backpressure object threshold on connection outbound from your custom processor to 1 and on execution of your processor it produced 6 FlowFiles, they would all get committed to that connection.  Only then does backpressure kick in and prevent your custom processor from being scheduled again until queue drops to below the backpressure threshold again.  This is a good way of making sure only one "batch" of FlowFiles lands in the downstream connection at a time, but will not help enforce the order of the FlowFiles in that batch.

Hope this helps,

Matt

 

avatar
Explorer
  • In your case, you have a custom processor that takes in 1 FlowFile and may output 1 or more FlowFiles.  Are all output FlowFiles created as new?

     

    Yes. we are creating new flow files for input flow file. Processor A get triggered with single flow file which comes to it scheduled intervals. After getting this request, the processor reads the data from source like DB & produces output flow files for each 100 records from source which sent to processor B for writing in SFTP. In Processor B we are observing FFs going out of order.

avatar
Master Mentor

@ram_g 

With all 100 FlowFiles committed to the success relationship of your custom processor at the same time, how do we want NiFi to determine their priority order?
If you can out put some attributes on each FlowFile that your custom processor is creating, those attribute values could be used set processing order downstream.

Hope this helps,

Matt

avatar
Explorer

Thanks. We have decided to introduce Priority attribute & use the corresponding prioritizer for the connection.

avatar
Master Guru