Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Super Guru

Abstract

This article objective is to cover key design and deployment considerations for High Availability Apache Kafka service.

Introduction

Kafka’s through its distributed design allows a large number of permanent or ad-hoc consumers, being highly available and resilient to node failures, also supporting automatic recovery. These characteristics make Kafka an ideal fit for communication and integration between components of large scale data systems.

Kafka application resilience is not enough to have a true HA system. Consumers and producers or the network design need also to be HA. Zookeepers and brokers need to be able to communicate among themselves, producers and consumers need to be able to access Kafka API. A fast car can reach the maximum speed only on a good track. A swamp is a swamp for a fast or a slow car.

Kafka Design

Kafka utilizes Zookeeper for storing metadata information about the brokers, topics, and partitions. Writes to Zookeeper are only performed on changes to the membership of consumer groups or on changes to the Kafka cluster itself. This amount of traffic is minimal, and it does not justify the use of a dedicated Zookeeper ensemble for a single Kafka cluster. Many deployments will use a single Zookeeper ensemble for multiple Kafka clusters.

Prior to Apache Kafka 0.9.0.0, consumers, in addition to the brokers, utilized Zookeeper to directly store information about the composition of the consumer group, what topics it was consuming, and to periodically commit offsets for each partition being consumed (to enable failover between consumers in the group). With version 0.9.0.0, a new consumer interface was introduced which allows this to be managed directly with the Kafka brokers. However, there is a concern with consumers and Zookeeper under certain configurations. Consumers have a configurable choice to use either Zookeeper or Kafka brokers for committing offsets, and they can also configure the interval between commits. If the consumer uses Zookeeper for offsets, each consumer will perform a Zookeeper write at every interval for every partition it consumes. A reasonable interval for offset commits is 1 minute, as this is the period of time over which a consumer group will read duplicate messages in the case of a consumer failure. These commits can be a significant amount of Zookeeper traffic, especially in a cluster with many consumers, and will need to be taken into account. It may be necessary to use a longer commit interval if the Zookeeper ensemble is not able to handle the traffic. However, it is recommended that consumers using the latest Kafka libraries use Kafka brokers for committing offsets, removing the dependency on Zookeeper. Outside of using a single ensemble for multiple Kafka clusters, it is not recommended to share the ensemble with other applications, if it can be avoided.

Kafka is sensitive to Zookeeper latency and timeouts, and an interruption in communications with the ensemble will cause the brokers to behave unpredictably. This can easily cause multiple brokers to go offline at the same time, should they lose Zookeeper connections, which will result in offline partitions. It also puts stress on the cluster controller, which can show up as subtle errors long after the interruption has passed, such as when trying to perform a controlled shutdown of a broker. Other applications that can put stress on the Zookeeper ensemble, either through heavy usage or improper operations, should be segregated to their own ensemble.

Infrastructure and Network Design Challenges

Application is the last layer in top of other 6 layers of OSI stack, including Network, Data Link and Physical. A power source that is not redundant can take out the rack switch and none of the servers in the rack are accessible. There are at least two issues with that implementation, e.g. non-redundant power source for the switch, lack of redundancy for the actual rack switch. Add that there is a single communication path from consumers and producers and exactly that is down so your mission critical system does not deliver the service that is maybe your main stream of revenue. Add to that rack-awareness is not implemented and all topic partitions reside in the infrastructure hosted by that exact rack that failed due a bad switch or a bad power source.

Did it happen to you? What was the price that you afforded to pay when that bad day came?

Network Design and Deployment Considerations

Implementing a resilient Kafka cluster is similar with implementing a resilient HDFS cluster. Kafka or HDFS reliability is for data/app reliability, at most compensating for limited server failure, but not for network failure, especially in cases when infrastructure and network has many single points of failure coupled with a bad deployment of Kafka zookeepers, brokers or replicated topics.

Dual NIC, dedicated core switches and redundant, rack top switches, balancing replicas across racks are common for a good network design for HA. Kafka, like HDFS, supports rack awareness. A single point of failure should not impact more than one zookeeper or one broker, or one partition of a topic. Zookeepers and Brokers should have HA communication, if one path is down, another path is used. They should be distributed ideally in different racks.

Network redundancy needs to provide the alternate access path to brokers for producers and consumers, when failure arises. Also, as a good practice, brokers should be distributed across multiple racks.

Configure Network Correctly

Network configuration with Kafka is similar to other distributed systems, with a few caveats mentioned below. Infrastructure, whether on-premises or cloud, offers a variety of different IP and DNS options. Choose an option that keeps inter-broker network traffic on the private subnet and allows clients to connect to the brokers. Inter-broker and client communication use the same network interface and port.

When a broker is started, it registers its hostname with ZooKeeper. The producer since Kafka 0.8.1 and the consumer since 0.9.0 are configured with a bootstrapped (or “starting”) list of Kafka brokers. Prior versions were configured with ZooKeeper. In both cases, the client makes a request (either to a broker in the bootstrap list, or to ZooKeeper) to fetch all broker hostnames and begin interacting with the cluster.

Depending on how the operating system’s hostname and network are configured, brokers on server instances may register hostnames with ZooKeeper that aren’t reachable by clients. The purpose of advertised.listeners is to address exactly this problem; the configured protocol, hostname, and port in advertised.listeners is registered with ZooKeeper instead of the operating system’s hostname.

In a multi-datacenter architecture, careful consideration has to be made for MirrorMaker. Under the covers, MirrorMaker is simply a consumer and a producer joined together. If MirrorMaker is configured to consume from a static ID, the single broker tied to the static IP will be reachable, but the other brokers in the source cluster won’t be. MirrorMaker needs access to all brokers in the source and destination data center, which in most cases is best implemented with a VPN between data centers.

Client service discovery can be implemented in a number of different ways. One option is to use HAProxy on each client machine, proxying localhost requests to an available broker. Synapse works well for this. Another option is to use a Load Balancer appliance. In this configuration, ensure the Load Balancer is not public to the internet. Sessions and stickiness do not need to be configured because Kafka clients only make a request to the load balancer at startup. A health check can be a ping or a telnet.

Distribute Kafka brokers across multiple racks

Kafka was designed to run within a single data center. As such, we discourage distributing brokers in a single cluster across multiple data centers. However, we recommend “stretching” brokers in a single cluster across multiple racks. It could be in the same private network or over multiple private networks within the same data center. There are considerations that each enterprise is responsible for implementing resilient systems.

A multi-rack cluster offers stronger fault tolerance because a failed rack won’t cause Kafka downtime.

However, in this configuration, prior to Kafka 0.10, you must assign partition replicas manually to ensure that replicas for each partition are spread across availability zones. Replicas can be assigned manually either when a topic is created, or by using the kafka-reassign-partitions command line tool. Kafka 0.10 or later supports rack awareness, which makes spreading replicas across racks much easier to configure.

At the minimum, for the HA of your Kafka-based service:

  • Use dedicated “Top of Rack” (TOR) switches (can be shared with Hadoop cluster).
  • Use dedicated core switching blades or switches.
  • If deployed to a physical environment, then make certain to place the cluster on in a VLAN.
  • For the switch communicating between the racks you will want to establish HA connections.
  • Implement Kafka rack-awareness configuration. It does not apply retroactively to the existent zookeepers, brokers or topics.
  • Test periodically your infrastructure for resiliency and impact on Kafka service availability, e.g. disconnect one rack switch impacting a single broker for example. If a correct design of the network and topic replication was implemented, producers and consumers should be able to work as usual, the only acceptable impact should be due to reduced capacity producers may accumulate a lag in processing or consumers may have some performance impact, however, everything else should be 100% functional.
  • Implement continuous monitoring of producers and consumers to detect failure events. Alerts and possibly automated corrective actions.

See below a simplified example of a logical architecture for HA.

69489-kafka-cluster-ha.png

The above diagram shows an implementation for HA for an enterprise that uses 5 racks, 5 zookeepers distributed across multiple racks, multiple brokers distributed across those 5 racks, replicated topics, HA producers and consumers. This is similar with HDFS HA network design.

Distribute ZooKeeper nodes across multiple racks

ZooKeeper should be distributed across multiple racks as well, to increase fault tolerance. To tolerate a rack failure, ZooKeeper must be running in at least three different racks. Obviously, there is also the private network concept of failure. An enterprise can have zookeepers separated in different networks. That comes at the price of latency. It is a conscious decision between performance and reliability. Separating in different racks is a good compromise. In a configuration where three ZooKeepers are running in two racks, if the rack with two ZooKeepers fails, ZooKeeper will not have quorum and will not be available.

Monitor broker performance and terminate poorly performing brokers

Kafka broker performance can decrease unexpectedly over time for unknown reasons. It is a good practice to terminate and replace a broker if, for example, the 99 percentile of produce/fetch request latency is higher than is tolerable for your application.

Datacenter Layout

For development systems, the physical location of the Kafka brokers within a datacenter is not as much of a concern, as there is not as severe an impact if the cluster is partially or completely unavailable for short periods of time.

When serving production traffic, however, downtime means dollars lost, whether through loss of services to users or loss of telemetry on what the users are doing. This is when it becomes critical to configure replication within the Kafka cluster, which is also when it is important to consider the physical location of brokers in their racks in the datacenter.

If a deployment model across multiple racks and rack-awareness configuration was not implement prior to deploying Kafka, expensive maintenance to move servers around may be needed. The Kafka broker has no rack-awareness when assigning new partitions to brokers so everything to the date when the rack-awareness was implemented could be brittle to failure. This means that it cannot take into account that two brokers may be located in the same physical rack, or in the same network, therefore can easily assign all replicas for a partition to brokers that share the same power and network connections in the same rack. Should that rack have a failure, these partitions would be offline and inaccessible to clients. In addition, it can result in additional lost data on recovery due to an unclean leader election.

The best practice is to have each Kafka broker in a cluster installed in a different rack, or at the very least not share single points of failure for infrastructure services such as power and network. This typically means at least deploying the servers that will run brokers with dual power connections (to two different circuits) and dual network switches (with a bonded interface on the servers themselves to failover seamlessly). Even with dual connections, there is a benefit to having brokers in completely separate racks.

From time to time, it may be necessary to perform physical maintenance on a rack or cabinet that requires it to be offline (such as moving servers around, or rewiring power connections).

Other Good Practices

I am sure I missed other good practices, so here are a few more:

  • Produce to a replicated topic.
  • Consume from a replicated topic (consumers must be in the same consumer group).
  • Each partition gets assigned a consumer; need to have more partitions than consumers.
  • Resilient producers – spill data to local disk until Kafka is available.

Other Useful References

Mirroring data between clusters: http://kafka.apache.org/documentation.html#basic_ops_mirror_maker

Data centers: http://kafka.apache.org/documentation.html#datacenters

Thanks

I'd like to thank all Apache Kafka contributors and the community that drives the innovation. I mean to thank everyone that contributes to improve documentation and publications on how to design and deploy Kafka as a Service in HA mode.

15,477 Views
Comments

A very informative post, Thanks for sharing! As per my observation, Kafka is more Network intensive application and with that being said I have question on Active-Active network bond configuration with Kafka. Is this something recommended and what are the considerations if i decide to go for it. Thanks again!