Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (1)
Guru

This article is applicable for anyone deploying a Kafka cluster for production use. It points out certain high level points that you should be thinking about before deploying your cluster.

It is divided into 3 parts: Important configurations for the Producer, Broker, Consumer and tips for Performance tuning.

Important Client / Producer Configurations:

The most important producer configurations:

  • compression

The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values arenone, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).

Name: compression.type Default: None

  • sync vs async production

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request.

Name: producer.type Default: sync

  • batch size (for async producers)

A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.

Name: batch.size Default: 200

  • maximum message size

This is largest message size Kafka will allow to be appended to this topic. Note that if you increase this size you must also increase your consumer's fetch size so they can fetch messages this large.

Account for this when doing disk sizing. Average Message size+Retention Period * Replication factor

Name: max.message.bytes Default: 1,000,000

  • acks

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:

acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and theretries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.

acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

Name: acks Default: 1

To see full list of producer configs:

http://kafka.apache.org/documentation.html#producerconfigs

6383-screen-shot-2016-08-04-at-104106-am.png

Brokers:

The important configurations are the following:

  • compression.type

Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4'). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.

Name: compression.type Default: producer

  • num.replica.fetchers

Number of fetcher threads used to replicate messages from a source broker. Increasing this value can increase the degree of I/O parallelism in the follower broker.

Name: num.replica.fetchers Default: 1

  • offsets.topic.replication.factor

The replication factor for the offsets topic (set higher to ensure availability). To ensure that the effective replication factor of the offsets topic is the configured value, the number of alive brokers has to be at least the replication factor at the time of the first request for the offsets topic. If not, either the offsets topic creation will fail or it will get a replication factor of min(alive brokers, configured replication factor).

Name: offsets.topic.replication.factor Default: 3

6384-screen-shot-2016-08-04-at-110656-am.png

Credits: http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-...

Number of partitions for a topic

CONSUMERS:

    max.partition.fetch.bytes

  • The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be #partitions * max.partition.fetch.bytes. This size must be at least as large as the message.max.bytes or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.
  • fetch.message.max.bytes
  • The number of bytes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as message.max.bytes or else it is possible for the producer to send messages larger than the consumer can fetch.

    6370-screen-shot-2016-08-04-at-93747-am.png

    Performance Tuning

    General

    • num.io.threads should be greater than the number of disks dedicated for Kafka. I strongly recommend to start with same number of disks first.
    • num.network.threads adjust based on the number of producers + number of consumers + replication factor
    • Message size will affects network bandwidth. For higher performance kafka cluster, use 10GbEcards

    Kernel tuning

    • While most Kafka deployments contain a lot of RAM, you can get comparable performance with spinning disks depending on the type of IOPS you need.
    • Give huge page cache & avoid disks if you are going to be doing random IO operations.
    • Kafka broker will always write to page cache (OS) first
      • Messages are flushed by based on several configurations
      • Flush message to disk controlled by log.flush.interval.message
        Defaults to Long.MAX_VALUE which is very big
      • The number of messages written to a log partition before we force an fsync on the log
        Name: log.flush.interval.message
      • The per-topic override for log.flush.interval.messages
        Name: log.flush.interval.ms.per.topic
      • OS will flush (pdflush) regardless of kafka params
    • Default flush configurations will cover most use cases as the durability is provided by replication

    6406-jacobs3.jpg

    Source: http://queue.acm.org/detail.cfm?id=1563874

    Disk throughput & durability

    • If you enable auto commit enable.auto.commit, add longer intervals for auto.commit.interval.ms (default is 500 ms)
    • Since Kafka has replication the redundancy provided by RAID can also be provided at theapplication
    • Isolate disk and only use to store the application data
    • Using multi disk with multi-directory often resulted in better performance than using RAID
    • RAID can potentially do a better load balancing of the data at the low-level. But the major downside of RAID is usually a big performance hit for write throughput and reduces the available disk space. Another potential benefit of RAID is the ability to tolerate disk failures. Rebuilding the RAID array is so I/O intensive that it effectively disables the server,so this does not provide much real availability improvement.

    Java/JVM tuning

    • Minimize GC pauses by using Oracle JDK it uses new G1 garbage-first collector
    • Kafka Heap Size
      • From HCC Article, by default kafka-broker jvm is set to 1Gb this can be increased using Ambari kafka-env template. When you are sending large messages JVM garbage collection can be an issue. Try to keep the Kafka Heap size below 4GB.
      • Example: In kafka-env.sh add following settings.
      • export KAFKA_HEAP_OPTS="-Xmx16g -Xms16g"
      • export KAFKA_JVM_PERFORMANCE_OPTS="-XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

    Kafka Deployment Example

    • 8 coredual socket (16CPU)
    • 64-128 Gig ofRam
      • 8-16GB Kafka JVM
      • Rest for OS + Page Cache
    • 8x2TB disk
      • EXT4
      • XFS may handle locking during fsync better. EXT4 mostly used in field
    • SAS or SSD preferred
      • Based on Retention period
      • Account for Broker and # of partitions it will handle
      • JBOD – Assuming you will use replication
    • Optional
      • RAID10
      • The primary downside of RAIDis that it is usually a big performance hit for write throughput and reduces the available disk space.
    • 10GigE Bonded NICs for extreme performance

    LinkedIn Published Benchmarks

    SOURCE: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap...
    • Single producer thread, no replication
      • 821,557 records/sec(78.3 MB/sec)
    • Single producer thread, 3x asynchronous replication
      • 786,980 records/sec(75.1 MB/sec)

    screen-shot-2016-08-03-at-105906-pm.pngscreen-shot-2016-08-03-at-93056-pm.png
    48,008 Views
    Comments
    Not applicable

    some config you must explain explicitly,such as unclean.leader.election.enable,min.insync.replicas,num.replica.fetchers

    Nice Article.

    I am trying to access full list of producer configurations but not able to access it, please update the article :

    http://kafka.apache.org/documentation.html#producerconfigs

    Not applicable

    In the Kafka heap size section, it is mentioned as 4 GB is the ideal one, but in the example, it is given as 16 GB, please see below:

    Try to keep the Kafka Heap size below 4GB.

  • Example: In kafka-env.sh add following settings.
    • export KAFKA_HEAP_OPTS="-Xmx16g -Xms16g"
  • Could you please confirm if ideal heap size for Kafka is 4 GB or 16 GB?
    Cloudera Employee

    Hi Vedant!


    You state:

    num.io.threads should be greater than the number of disks dedicated for Kafka. I strongly recommend to start with same number of disks first.


    Is num.io.threads to be calculated as the number of disks per node allocated to Kafka or the total number of disk for Kafka for the entire cluster?


    I'm guessing disks per node dedicated for Kafka, but I wanted to confirm.


    Thanks,


    Jeff G.

    Don't have an account?
    Coming from Hortonworks? Activate your account here
    Version history
    Revision #:
    2 of 2
    Last update:
    ‎08-17-2019 10:55 AM
    Updated by:
     
    Contributors
    Top Kudoed Authors