- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on ‎08-04-2016 11:03 AM - edited ‎08-17-2019 10:55 AM
This article is applicable for anyone deploying a Kafka cluster for production use. It points out certain high level points that you should be thinking about before deploying your cluster.
It is divided into 3 parts: Important configurations for the Producer, Broker, Consumer and tips for Performance tuning.
Important Client / Producer Configurations:
The most important producer configurations:
- compression
The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values arenone, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
Name:
compression.type
Default:
None
- sync vs async production
Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request.
Name:
producer.type
Default:
sync
- batch size (for async producers)
A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.
Name:
batch.size
Default:
200
- maximum message size
This is largest message size Kafka will allow to be appended to this topic. Note that if you increase this size you must also increase your consumer's fetch size so they can fetch messages this large.
Account for this when doing disk sizing. Average Message size+Retention Period * Replication factor
Name:
max.message.bytes
Default: 1,000,000
- acks
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:
acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and theretries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.
acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
Name:
acks
Default:
1
To see full list of producer configs:
http://kafka.apache.org/documentation.html#producerconfigs
Brokers:
The important configurations are the following:
- compression.type
Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4'). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.
Name:
compression.type
Default:
producer
- num.replica.fetchers
Number of fetcher threads used to replicate messages from a source broker. Increasing this value can increase the degree of I/O parallelism in the follower broker.
Name:
num.replica.fetchers
Default:
1
- offsets.topic.replication.factor
The replication factor for the offsets topic (set higher to ensure availability). To ensure that the effective replication factor of the offsets topic is the configured value, the number of alive brokers has to be at least the replication factor at the time of the first request for the offsets topic. If not, either the offsets topic creation will fail or it will get a replication factor of min(alive brokers, configured replication factor).
Name:
offsets.topic.replication.factor
Default:
3
Number of partitions for a topic
- Number of topics and partitions impact how much can be stored in page cache
- Topic/Partition is unit of parallelism in Kafka
- Partitions in Kafka drives the parallelism of consumers
- Throughput requirements drive number of number of partitions on a topic.
- Formula:
- P= Throughput from producer to a single partition is
- C= Throughput from a single partition to a consumer
- T=Target throughput–Required # of partitions= Max (T/P, T/C)
CONSUMERS:
max.partition.fetch.bytes
#partitions * max.partition.fetch.bytes
. This size must be at least as large as the message.max.bytes
or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.
message.max.bytes
or else it is possible for the producer to send messages larger than the consumer can fetch.
Performance Tuning
General
num.io.threads
should be greater than the number of disks dedicated for Kafka. I strongly recommend to start with same number of disks first.num.network.threads
adjust based on the number of producers + number of consumers + replication factor- Message size will affects network bandwidth. For higher performance kafka cluster, use 10GbEcards
Kernel tuning
- While most Kafka deployments contain a lot of RAM, you can get comparable performance with spinning disks depending on the type of IOPS you need.
- Give huge page cache & avoid disks if you are going to be doing random IO operations.
- Kafka broker will always write to page cache (OS) first
- Messages are flushed by based on several configurations
- Flush message to disk controlled by
log.flush.interval.message
Defaults to Long.MAX_VALUE which is very big - The number of messages written to a log partition before we force an fsync on the log
Name:
log.flush.interval.message
- The per-topic override for log.flush.interval.messages
Name:
log.flush.interval.ms.per.topic
- OS will flush (pdflush) regardless of kafka params
-
Default flush configurations will cover most use cases as the durability is provided by replication
Disk throughput & durability
- If you enable auto commit
enable.auto.commit
, add longer intervals forauto.commit.interval.ms
(default is 500 ms) - Since Kafka has replication the redundancy provided by RAID can also be provided at theapplication
- Isolate disk and only use to store the application data
- Using multi disk with multi-directory often resulted in better performance than using RAID
- RAID can potentially do a better load balancing of the data at the low-level. But the major downside of RAID is usually a big performance hit for write throughput and reduces the available disk space. Another potential benefit of RAID is the ability to tolerate disk failures. Rebuilding the RAID array is so I/O intensive that it effectively disables the server,so this does not provide much real availability improvement.
Java/JVM tuning
- Minimize GC pauses by using Oracle JDK it uses new G1 garbage-first collector
- Kafka Heap Size
- From HCC Article, by default kafka-broker jvm is set to 1Gb this can be increased using Ambari kafka-env template. When you are sending large messages JVM garbage collection can be an issue. Try to keep the Kafka Heap size below 4GB.
- Example: In kafka-env.sh add following settings.
- export KAFKA_HEAP_OPTS="-Xmx16g -Xms16g"
- export KAFKA_JVM_PERFORMANCE_OPTS="-XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
Kafka Deployment Example
- 8 coredual socket (16CPU)
- 64-128 Gig ofRam
- 8-16GB Kafka JVM
- Rest for OS + Page Cache
- 8x2TB disk
- EXT4
- XFS may handle locking during fsync better. EXT4 mostly used in field
- SAS or SSD preferred
- Based on Retention period
- Account for Broker and # of partitions it will handle
- JBOD – Assuming you will use replication
- Optional
- RAID10
- The primary downside of RAIDis that it is usually a big performance hit for write throughput and reduces the available disk space.
- 10GigE Bonded NICs for extreme performance
LinkedIn Published Benchmarks
SOURCE: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap...
- Single producer thread, no replication
- 821,557 records/sec(78.3 MB/sec)
- Single producer thread, 3x asynchronous replication
- 786,980 records/sec(75.1 MB/sec)
Created on ‎02-27-2017 11:54 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
some config you must explain explicitly,such as unclean.leader.election.enable,min.insync.replicas,num.replica.fetchers
Created on ‎04-16-2018 05:23 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Nice Article.
I am trying to access full list of producer configurations but not able to access it, please update the article :
Created on ‎10-02-2018 01:14 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
In the Kafka heap size section, it is mentioned as 4 GB is the ideal one, but in the example, it is given as 16 GB, please see below:
Try to keep the Kafka Heap size below 4GB.
- export KAFKA_HEAP_OPTS="-Xmx16g -Xms16g"
Created on ‎05-01-2019 08:25 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Hi Vedant!
You state:
num.io.threads should be greater than the number of disks dedicated for Kafka. I strongly recommend to start with same number of disks first.
Is num.io.threads to be calculated as the number of disks per node allocated to Kafka or the total number of disk for Kafka for the entire cluster?
I'm guessing disks per node dedicated for Kafka, but I wanted to confirm.
Thanks,
Jeff G.