Created 12-13-2023 04:47 AM
Hello ,
I'm using NIFI to replicate 250 million messages between Kafka topics.
The problem is that NIFI replicates messages in a non-sequential order, resulting in the destination topic storing messages differently than the source topic.
for example
source topic - partition 0
offset:5 key:a value:v1
offset:6 key:a value:v2
offset:7 key:a value:v3
destination topic - partition 0
offset:5 key:a value:v2
offset:6 key:a value:v1
offset:7 key:a value:v3
The topics are configured with a cleanup policy: compact.
I'm using ConsumeKafka and PublishKafka processors to replicate topics.
Created 01-31-2024 02:21 AM
Upon examination, implementing the queue as a First In First Out prioritizer and configuring the load balancing strategy to Partition by attribute with the kafka.partition attribute has proven effective in maintaining the order.
Created 12-13-2023 05:46 AM
@edim2525 I have never had a concern to replicate kafka with nifi keeping exact order. Perhaps others can respond with more details in a nifi context.
That said, my customers are using Streams Replication Manager (SRM) for heavy duty kafka replications.
https://docs.cloudera.com/cdf-datahub/7.2.16/howto-streams-replication-manager.html
https://docs.cloudera.com/runtime/7.2.16/srm-overview/topics/srm-replication-overview.html
There are several different form factors of SRM across private and public cloud but the doc links above should give you a good idea of what it is.
Created 12-14-2023 12:12 AM
Thank you for your insights @steven-matison ,I will check this out.
Created 12-13-2023 07:46 AM
@edim2525
Apache NiFi is designed to move data fast and efficiently. Each processor consumes FlowFiles from an incoming connection with no guaranteed priority/order, but depending on the processors used and concurrency setting, that does not guarantee order of FlowFiles execution. Nor can order be guaranteed when FlowFiles are spread across nodes in a multi-node NiFi cluster.
Now you did not mention if you are a single standalone instance of NiFi or a NiFi cluster. You also do not mention if you have other NiFi component processors between your consumeKafka and PublishKafka processors.
The ConsumeKafka processor consumes messages from a Topic and produces one FlowFile for each message. For most efficient processing the ConsumeKafka processor would typically be have its concurrent tasks configured to equal or be less than number of partitions on the topic (never more as that causes constant rebalance). This creates a consumer group allowing each concurrent thread execution to pull data from specific partitions assigned to the consumer group.
As an example:
If you had a 3 partition topic and a 3 node NiFi cluster, you would configure your ConsumeKafka with 1 concurrent task (this is a total of 3 concurrent tasks across yoru 3 NiFi nodes). Each NiFi node would would be assigned a different topic partition to consume from.
Now each node in a NiFi cluster executes its own copy of the flow with no knowledge of or regards to what FlowFiles are be processed on other nodes. This is why guaranteed order in a NiFi cluster is very difficult. There are options to run some dataflows on Primary node only keeping that dataflow running on only one node. But even then you can have the primary node election change and NiFi has no way to force or guarantee a specific nodes is assigned or retains the primary node role.
Somethings you may try that might work for you...
Suggestion 1:
1. Standalone NiFi instance (removes issues that a cluster can introduce when order is important)
2. When using ConsumeKafka, try using EnforceOrder processor after ConsumeKafka configured to order FlowFiles by the offset. Then make sure that all connections downstream of EnforceOrder processor to PublishKafka use "FirstInFirstOutPrioritizer" (by default no prioritizer is set on a connection).
3. Make sure all processors in this dataflow are configured with only 1 concurrent task to avoid any concurrency that could lead to some FlowFile being processed faster then other impacting queued order in downstream connections.
Suggestion 2:
1. Standalone NiFi instance for same reason as above.
2. Use ConsumeKafkaRecord instead of ConsumeKafka. This produce fewer FlowFiles with each FlowFile containing multiple messages.
3. Use PublishKafkaRecord instead of PublishKafka
4. Make sure that all connections downstream of PublsihKafkaRecord processor to PublishKafkaRecord processor use "FirstInFirstOutPrioritizer" (by default no prioritizer is set on a connection).
5. still use one concurrent task on all processors to avoid concurrent record production.
--- This option leads to fewer FlowFile (meaning less overhead and lower chance of messages becoming out of order) and likely faster throughput.
Suggestion 3:
If your dataflow consists of nothing more than consume and publish and you aer doing nothing with the data between these processor, as @steven-matison mentioned, NiFi may not be the best choice here. NiFi is designed around faster and efficient movement of data using multi-node clusters and concurrent thread execution. Additionally NiFi needs to consume all the messages in to FlowFiles (writing content to content_repository and metatdata to the flowfile_repository) and records data provenance which adds to overhead. So a purpose built tool for a simple replication task may be better for this use case.
If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 12-14-2023 12:53 AM
Thank you for your comprehensive response.
We are using NIFI cluster topology and only two processors ConsumeKafka and PublishKafka.
Our topics are configured with 36 partitions ,we have 12 nodes in our NIFI cluster, so we configured the concurrent tasks to 3 .
As mentioned, our topics are defined with cleanup police : compact, which requires a Kakfa key for each record. Can we use ConsumeKafkaRecord and PublishKafkaRecord when a Kafka key is involved?
Created 01-31-2024 02:21 AM
Upon examination, implementing the queue as a First In First Out prioritizer and configuring the load balancing strategy to Partition by attribute with the kafka.partition attribute has proven effective in maintaining the order.