Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Expert Contributor

Sometimes when we create our clusters, we use a small amount of Kafka brokers. After some time, there may be a requirement for adding more brokers, usually because of load, performance or high availability. In this article, we provide the considerations and steps to keep in mind after adding new brokers to HDP/HDF clusters.


If there is one Kafka broker in a cluster and plans are to add more brokers for high availability, it's important to mention that Kafka has an internal topic called __consumer_offsets. This topic is created by Kafka internally to store the consumer's committed offsets. At this point, if there is only one broker, Kafka will, by default, create this topic using one single replica and 50 partitions. As a result, it's highly recommended to change the number of replicas for this topic when adding more brokers to the cluster. These numbers are handled by the following properties:

 

  • offsets.topic.num.partitions: The number of partitions for the offset commit topic (should not change after deployment).
  • offsets.topic.replication.factor: The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

One of the considerations after adding new Kafka brokers to a cluster, is that Kafka doesn't have a way to reassign the current existing topics to the new brokers automatically. This has to be done manually using a script that comes with Kafka installation called Kafka reassign partition tool, bin/kafka-reassign-partitions.sh. In other words, if  N topics are already created and assigned to a broker and a replica, after adding more brokers, use the reassign partition tool to increase the number of replicas. The following is a Kafka command-line example to add more partitions:

 

Topic using 1 partition and 1 replica:

 

 

 

 

[kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181 
Topic:testTopic2 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001

 

 

 

 

After adding 2 new brokers, the topic "testTopic2" will remain exactly the same. To add more replicas and partitions, the following steps need to be performed:

  1. Increase the number of partitions:
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3
    Example:
    [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partition
    The following should be the output:
    [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
    Topic:testTopic2 PartitionCount:3 ReplicationFactor:1 Configs:
    Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
    Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002 Isr: 1002
    Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003 Isr: 1003
  2. Increase the number of replicas:
    1. First create a JSON file with the topic that has to be modified:
      {
      "version":1,
      "partitions":[
      {"topic":"testTopic2","partition":0,"replicas":[1001,1002,1003]},
      {"topic":"testTopic2","partition":1,"replicas":[1002,1003,1001]},
      {"topic":"testTopic2","partition":2,"replicas":[1003,1001,1002]}
      ]
      }
      From the above output, specify different orders in the replicas. This is because the first broker id in the list of replicas will be the partition leader. This helps to distribute the partitions among brokers.
    2. Run the following command to apply the changes:
      [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper c489-node2:2181 --reassignment-json-file /tmp/topic-replication.json --execute
      Current partition replica assignment
      
      {"version":1,"partitions":[{"topic":"testTopic2","partition":0,"replicas":[1001],"log_dirs":["any"]},{"topic":"testTopic2","partition":2,"replicas":[1003],"log_dirs":["any"]},{"topic":"testTopic2","partition":1,"replicas":[1002],"log_dirs":["any"]}]}
      
      Save this to use as the --reassignment-json-file option during rollback
      Successfully started reassignment of partitions.
    3. The following will be the output:
      [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
      Topic:testTopic2 PartitionCount:3 ReplicationFactor:3 Configs:
      Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1001,1002,1003
      Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1001,1003
      Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003,1001,1002 Isr: 1003,1001,1002


To modify multiple topics, use the following JSON template:

 

 

 

 

{
"version":1,
"partitions":[
{"topic":"testTopic3","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic3","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic3","partition":2,"replicas":[1003,1001,1002]},
{"topic":"testTopic4","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic4","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic4","partition":2,"replicas":[1003,1001,1002]}
]
}

 

 

 

 

In the above JSON template, testTopic3 and testTopic4 are modified. To add more, the important thing to note is that the latest "topic" line must not have a comma.


In summary:

  • Add the new brokers to HDP/HDF using ambari UI.
  • Kafka doesn't reassign the topics that are created automatically after adding new brokers. Follow the steps previously provided to reassign the already created topics. For new topics, use --replication-factor and --partitions properties.
3,853 Views