Created 08-26-2024 12:15 AM
We have a streaming requirement where the data is received continuously and we store them in HDFS by merging 10,000 records to avoid multiple IO operations.
We generate the batchID using the below calculation,
${now():format('yyyyMMdd')}${now():format("HH"):toNumber():multiply(60):plus(${now():format('mm'):toNumber()}):divide(15)}
So this batch will change every 15 minutes and in Nifi sometimes there are 4 flows or 5 flows based on the number of records received during 15-minute time intervals. For example, if we receive 50,000 records in 15 minutes then this flow will be executed 5 times but if there are 40,000 records in 15 minutes then the entire NiFi flow will be executed 4 times.
Once the batch execution is completed fully (after 4 flows or 5 flows as per the incoming data volume as explained in the above example) we want this batchID to be stored in the next table for further processing.
So, we are working on some logic/condition in NiFi to invoke the "RouteOnAttribute" to check the batchId of the current run and batchId of the previous run. If the current batchID is the same as the previous batchID then no action is taken but if both the values are different then the previous batchID needs to be passed to the next NiFi processor for further processing. So please help us here with how we can store the value of batchId in the previous run and use it in NiFi flow. I have tried using the UpdateAttribute processor with "Store state locally" but it is not helping here to store the batchId it always gives a blank. Please help here with the right approach to achieve this requirement.
Created 08-26-2024 05:24 AM
Thanks, @SAMSAL for your reply!
Below is the POC created for implementing this requirement on my Windows laptop.
UpdateAttribute_previousBatchID - The first Update attribute is used to record the batchID from the previous run
previousBatchID - ${getStateValue("batchID")}
Please help to guide me with the right way to use the update attribute to achieve my requirement. Thanks!
Created on 08-26-2024 04:39 PM - edited 08-26-2024 04:41 PM
Hi @NagendraKumar ,
I think you misunderstood how the Stateful UpdateAtrribute works which is OK since a lot of people would think this way too if you have not used before which happened to me as well :).
Basically when you make an UpdateAttribute Stateful by setting the Store State Property, as the value says it will "Store state locally" meaning you only can access the previous state of a given attribute within the processor itself. As I can see, you are trying to store the Batch Id in the Second Update Attribute and then access it from the first UpdateAttribute and that is why you are getting empty string because it doesnt exist there. You dont need two UpdateAttributes to manage that and one should do the job.
Lets assume we have the following flow which consist of the following processors:
1- GenerateFlowFile: This will simulate setting new BatchId attribute by adding dynamic property BatchID and set it to some value.
2- UpdateAttribute: This will be a stateful and it will have two attributes: One to get the last saved BatchId value and another to set the last saved batch ID to the Current.
3- RouteOnAttribute: This is basically where you compare previous to current and route accordingly.
Here is how the flow looks like
Here is the config for each processor:
GenerateFlowFile:
UpdateAttribute:
RouteOnAttribute:
Basically, if you run once for the first time you will get the flowfile routed to the unmatched relationship of the RouteOnAttribute (since no previous value was set), however if you run it again without changing anything the result will be routed to the Match relationship since the previous saved value will equal the new one. Change the value in the GenerateFlowFile and it will go to unmatched and so on.
I know you probably wondering how this works since in the UpdateAttribute Im referencing the LastSavedStateBatchID while at the same time its being set to the CurrentBatchID?! which comes first ? well the answer is simple: If you refer to the stateful documentation where its talking about you will find the following line: " If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind" . Which means PreviousBatchID will be set to the lastSavedStateBatchID before resetting the later to the current . if that makes any sense 🙂
There is a cleaner way of doing it which help eliminate this confusion and the circular reference by defining Rules under the Advanced feature which you can play with but I feel this way is much shorter.
Hope that helps, if it does please accept solution.
Thanks
Created 08-26-2024 01:35 AM
Hi,
I was going to recommend the Update Attribute since it has the ability to store state. Can you please share screenshots of how it was configured and where did it return blank?
Created 08-26-2024 05:24 AM
Thanks, @SAMSAL for your reply!
Below is the POC created for implementing this requirement on my Windows laptop.
UpdateAttribute_previousBatchID - The first Update attribute is used to record the batchID from the previous run
previousBatchID - ${getStateValue("batchID")}
Please help to guide me with the right way to use the update attribute to achieve my requirement. Thanks!
Created on 08-26-2024 04:39 PM - edited 08-26-2024 04:41 PM
Hi @NagendraKumar ,
I think you misunderstood how the Stateful UpdateAtrribute works which is OK since a lot of people would think this way too if you have not used before which happened to me as well :).
Basically when you make an UpdateAttribute Stateful by setting the Store State Property, as the value says it will "Store state locally" meaning you only can access the previous state of a given attribute within the processor itself. As I can see, you are trying to store the Batch Id in the Second Update Attribute and then access it from the first UpdateAttribute and that is why you are getting empty string because it doesnt exist there. You dont need two UpdateAttributes to manage that and one should do the job.
Lets assume we have the following flow which consist of the following processors:
1- GenerateFlowFile: This will simulate setting new BatchId attribute by adding dynamic property BatchID and set it to some value.
2- UpdateAttribute: This will be a stateful and it will have two attributes: One to get the last saved BatchId value and another to set the last saved batch ID to the Current.
3- RouteOnAttribute: This is basically where you compare previous to current and route accordingly.
Here is how the flow looks like
Here is the config for each processor:
GenerateFlowFile:
UpdateAttribute:
RouteOnAttribute:
Basically, if you run once for the first time you will get the flowfile routed to the unmatched relationship of the RouteOnAttribute (since no previous value was set), however if you run it again without changing anything the result will be routed to the Match relationship since the previous saved value will equal the new one. Change the value in the GenerateFlowFile and it will go to unmatched and so on.
I know you probably wondering how this works since in the UpdateAttribute Im referencing the LastSavedStateBatchID while at the same time its being set to the CurrentBatchID?! which comes first ? well the answer is simple: If you refer to the stateful documentation where its talking about you will find the following line: " If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind" . Which means PreviousBatchID will be set to the lastSavedStateBatchID before resetting the later to the current . if that makes any sense 🙂
There is a cleaner way of doing it which help eliminate this confusion and the circular reference by defining Rules under the Advanced feature which you can play with but I feel this way is much shorter.
Hope that helps, if it does please accept solution.
Thanks
Created 08-27-2024 09:30 AM
Thanks a lot, @SAMSAL for your detailed explanations. Appreciate your expertise on the NiFi. I am a beginner to the NiFi world and not much documentation on this topic is available on the internet. Thanks once again for your support!