Support Questions

Find answers, ask questions, and share your expertise

Retain the previous NiFi Flow value in the current execution flow

avatar
Rising Star

We have a streaming requirement where the data is received continuously and we store them in HDFS by merging 10,000 records to avoid multiple IO operations.
We generate the batchID using the below calculation,
${now():format('yyyyMMdd')}${now():format("HH"):toNumber():multiply(60):plus(${now():format('mm'):toNumber()}):divide(15)}
So this batch will change every 15 minutes and in Nifi sometimes there are 4 flows or 5 flows based on the number of records received during 15-minute time intervals. For example, if we receive 50,000 records in 15 minutes then this flow will be executed 5 times but if there are 40,000 records in 15 minutes then the entire NiFi flow will be executed 4 times.
Once the batch execution is completed fully (after 4 flows or 5 flows as per the incoming data volume as explained in the above example) we want this batchID to be stored in the next table for further processing.
So, we are working on some logic/condition in NiFi to invoke the "RouteOnAttribute" to check the batchId of the current run and batchId of the previous run. If the current batchID is the same as the previous batchID then no action is taken but if both the values are different then the previous batchID needs to be passed to the next NiFi processor for further processing. So please help us here with how we can store the value of batchId in the previous run and use it in NiFi flow. I have tried using the UpdateAttribute processor with "Store state locally" but it is not helping here to store the batchId it always gives a blank. Please help here with the right approach to achieve this requirement.

2 ACCEPTED SOLUTIONS

avatar
Rising Star

Thanks, @SAMSAL for your reply!

Below is the POC created for implementing this requirement on my Windows laptop.

NagendraKumar_0-1724673757001.png

UpdateAttribute_previousBatchIDThe first Update attribute is used to record the batchID from the previous run

previousBatchID - ${getStateValue("batchID")}

NagendraKumar_1-1724674007181.png

 
UpdateAttribute_batchID - The second update attribute is used to generate the new batchID .
 
batchID - ${now():format('yyyyMMdd')}${now():format("HH"):toNumber():multiply(60):plus(${now():format('mm'):toNumber()}):divide(15)}
 
NagendraKumar_2-1724674119587.png
 
RouteOnAttribute - Compare the batchId with PreviousBatchID. 
 
NagendraKumar_3-1724674264224.png

 

UpdateAttribute_processedBatchID - The last update attribute after the RouteOnAttribute is used to get the previous BatchID
 
BatchID_Process - ${previousBatchID}
 
NagendraKumar_4-1724674631438.png 

 

As u can see below, the variables have entry strings in the last step for both the variables highlighted in yellow.
 
NagendraKumar_5-1724674851887.png

Please help to guide me with the right way to use the update attribute to achieve my requirement. Thanks!

View solution in original post

avatar
Super Guru

Hi @NagendraKumar ,

I think you misunderstood how the Stateful UpdateAtrribute works which is OK since a lot of people would think this way too if you have not used before which happened to me as well  :).

Basically when you make an UpdateAttribute Stateful by setting the Store State Property, as the value says it will "Store state locally" meaning you only can access the previous state of a given attribute within the processor itself. As I can see,  you are trying to store the Batch Id in the Second Update Attribute and then access it from the first UpdateAttribute and that is why you are getting empty string because it doesnt exist there.  You dont need two UpdateAttributes to manage that and one should do the job.

Lets assume we have the following flow which consist of the following processors:

1- GenerateFlowFile: This will simulate setting new BatchId attribute by adding dynamic property BatchID and set it to some value.

2- UpdateAttribute: This will be a stateful and it will have two attributes: One to get the last saved BatchId value and another to set the last saved batch ID to the Current.

3- RouteOnAttribute: This is basically where you compare previous to current and route accordingly.

Here is how the flow looks like

SAMSAL_0-1724714235887.png

Here is the config for each processor:

GenerateFlowFile:

SAMSAL_1-1724714261473.png

 

UpdateAttribute:

SAMSAL_2-1724714292912.png

RouteOnAttribute:

SAMSAL_3-1724714332147.png

 

Basically, if you run once for the first time you will get the flowfile routed to the unmatched relationship of the RouteOnAttribute (since no previous value was set), however if you run it again without changing anything the result will be routed to the Match relationship since the previous saved value will equal the new one. Change the value in the GenerateFlowFile and it will go to unmatched and so on.

I know you probably wondering how this works since in the UpdateAttribute Im referencing the LastSavedStateBatchID while at the same time its being set to the CurrentBatchID?! which comes first ? well the answer is simple: If you refer to the stateful documentation   where its talking about you will find the following line: " If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind" . Which means PreviousBatchID will be set to the lastSavedStateBatchID before resetting the later to the current . if that makes any sense 🙂

There is a cleaner way of doing it which help eliminate this confusion and the circular reference  by defining Rules under the Advanced feature which you can play with but I feel this way is much shorter.

Hope that helps, if it does please accept solution.

Thanks

 

View solution in original post

4 REPLIES 4

avatar
Super Guru

Hi,

I was going to recommend the Update Attribute since it has the ability to store state. Can you please share screenshots of how it was configured and where did it return blank?

avatar
Rising Star

Thanks, @SAMSAL for your reply!

Below is the POC created for implementing this requirement on my Windows laptop.

NagendraKumar_0-1724673757001.png

UpdateAttribute_previousBatchIDThe first Update attribute is used to record the batchID from the previous run

previousBatchID - ${getStateValue("batchID")}

NagendraKumar_1-1724674007181.png

 
UpdateAttribute_batchID - The second update attribute is used to generate the new batchID .
 
batchID - ${now():format('yyyyMMdd')}${now():format("HH"):toNumber():multiply(60):plus(${now():format('mm'):toNumber()}):divide(15)}
 
NagendraKumar_2-1724674119587.png
 
RouteOnAttribute - Compare the batchId with PreviousBatchID. 
 
NagendraKumar_3-1724674264224.png

 

UpdateAttribute_processedBatchID - The last update attribute after the RouteOnAttribute is used to get the previous BatchID
 
BatchID_Process - ${previousBatchID}
 
NagendraKumar_4-1724674631438.png 

 

As u can see below, the variables have entry strings in the last step for both the variables highlighted in yellow.
 
NagendraKumar_5-1724674851887.png

Please help to guide me with the right way to use the update attribute to achieve my requirement. Thanks!

avatar
Super Guru

Hi @NagendraKumar ,

I think you misunderstood how the Stateful UpdateAtrribute works which is OK since a lot of people would think this way too if you have not used before which happened to me as well  :).

Basically when you make an UpdateAttribute Stateful by setting the Store State Property, as the value says it will "Store state locally" meaning you only can access the previous state of a given attribute within the processor itself. As I can see,  you are trying to store the Batch Id in the Second Update Attribute and then access it from the first UpdateAttribute and that is why you are getting empty string because it doesnt exist there.  You dont need two UpdateAttributes to manage that and one should do the job.

Lets assume we have the following flow which consist of the following processors:

1- GenerateFlowFile: This will simulate setting new BatchId attribute by adding dynamic property BatchID and set it to some value.

2- UpdateAttribute: This will be a stateful and it will have two attributes: One to get the last saved BatchId value and another to set the last saved batch ID to the Current.

3- RouteOnAttribute: This is basically where you compare previous to current and route accordingly.

Here is how the flow looks like

SAMSAL_0-1724714235887.png

Here is the config for each processor:

GenerateFlowFile:

SAMSAL_1-1724714261473.png

 

UpdateAttribute:

SAMSAL_2-1724714292912.png

RouteOnAttribute:

SAMSAL_3-1724714332147.png

 

Basically, if you run once for the first time you will get the flowfile routed to the unmatched relationship of the RouteOnAttribute (since no previous value was set), however if you run it again without changing anything the result will be routed to the Match relationship since the previous saved value will equal the new one. Change the value in the GenerateFlowFile and it will go to unmatched and so on.

I know you probably wondering how this works since in the UpdateAttribute Im referencing the LastSavedStateBatchID while at the same time its being set to the CurrentBatchID?! which comes first ? well the answer is simple: If you refer to the stateful documentation   where its talking about you will find the following line: " If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind" . Which means PreviousBatchID will be set to the lastSavedStateBatchID before resetting the later to the current . if that makes any sense 🙂

There is a cleaner way of doing it which help eliminate this confusion and the circular reference  by defining Rules under the Advanced feature which you can play with but I feel this way is much shorter.

Hope that helps, if it does please accept solution.

Thanks

 

avatar
Rising Star

Thanks a lot, @SAMSAL  for your detailed explanations. Appreciate your expertise on the NiFi. I am a beginner to the NiFi world and not much documentation on this topic is available on the internet. Thanks once again for your support!