Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

how to generate sequence number in nifi cluster mode?

avatar
Rising Star

Hi All,

@mattclarke,@mattburgess,@markpayne

I want to generate sequence number in my nifi cluster (3 nodes), I was using update attribute processor with store state locally option , but this is not serving my purpose as each node is generating its own value incrementally and this is creating duplicate values while loading data to target table.I would be grateful if i can get alternate solution to achieve this batchid generation in cluster mode.

Thanks in advance!!

1 ACCEPTED SOLUTION

avatar

If you need atomic sequencing and still want to use a paralell system you are gonna have to push that sequencing off onto a system capable of atomic sequencing.


probably the easiest way is to write a stored procedure with a transaction in an rdbms. and do an executesql in your flow.


Don't use a cache as MC says caches are not designed for transactional atomic stuff. Only use cache for actually caching stuff (you can get the value but its expensive and cheaper just to store for a bit in the cache.

View solution in original post

7 REPLIES 7

avatar
Master Guru

@sri chaturvedi

Instead of using UpdateAttribute processor's state use DistributedMapCache and you can fetch the stored value across the cluster.

Use PutDistributedMapCache processor to store the value that got assigned recently then use FetchDistributedMapCache processor to Fetch the store value then apply your logic(increment..etc) to assign new value then overwrite the already stored value in DistributedMapCache using PutDistributedMapCache processor.

Use this and this links as references for configuring Distributedmapcache processors/controllers.

avatar
Rising Star

@shu,@Mattclarke,@markpayne How do i generate the sequence number to be used as a stored value as you suggested.As per my knowledge there is only one processor in nifi to generate sequence number and that is update attribute which in cluster mode will again produce different values across all nodes.

avatar
Super Mentor

@Shu @sri chaturvedi

The issue with this distributed cache solution is timing. You have a bit of a race condition to consider here. Each of the nodes in a NiFi cluster runs their own copy of the flow.xml.gz and process their own set of FlowFiles. While the Distributed cache allows you to have all your NiFi nodes reading and writing to the same Distributed Map Cache server, they are doing it at their own pace. This means that multiple nodes may end up pulling the same cache value incrementing it and writing it back to the cache server so you do not end up with a true count. You may also have a condition where node 1 fetches value X from cache server and for some reason the flow on that node 1 is delayed getting to putDistributeCache to to update value. Mean time some other node has already fetched and updated the cache multiple times. Now node 1 puts and overwrites newer value with an older count.
-

Dealing with such a "race" condition is going to be difficult here because of how NiFi clustering works.

avatar
Super Mentor

@sri chaturvedi

-

The question here what is the use case for needing to have a sequence number across entire cluster. Why not generate a sequential number per node to keep track of per node batches. Maybe use the NiFi hostname in the sequence number identifier?

-

Maybe you can share some more details on the full use case for these sequence numbers. Why you are generating and what they are being used for.
-

If you use the distributedMapCache, you could keep three different sequential number cached values (each node has its own sequence number stored in a cache entry by hostname.
-
You could then build a flow that fetches all three value add adds them together for you on an hourly/daily/weekly schedule?

-

Thank you,

Matt

avatar
Rising Star

Thanks Matt for your view on this,the ask is to generate a batchid which should be a sequence number, so whenever querydb processor fetches records from source db (sqlserver) a batchid should be added to the flowfile so that all records have a same batchid when loaded to target table,this will help in auditing of records.but here in cluster mode it seems difficult to achieve this using updateattribute processor.i liked your idea of appending node hostname with the sequence but if i could generate atomic values across all nodes it would be much better.

avatar

If you need atomic sequencing and still want to use a paralell system you are gonna have to push that sequencing off onto a system capable of atomic sequencing.


probably the easiest way is to write a stored procedure with a transaction in an rdbms. and do an executesql in your flow.


Don't use a cache as MC says caches are not designed for transactional atomic stuff. Only use cache for actually caching stuff (you can get the value but its expensive and cheaper just to store for a bit in the cache.

avatar
Rising Star

Thanks David, Idea looks good ,I will try this.