Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Any tips on how to optimize Kafka broker performance?

avatar
Contributor

I was wondering if you can share some of your experience with the following server.properties parameters:

  1. message.max.bytes
    num.network.threads
    num.io.threads
    background.threads
    queued.max.requests
    socket.send.buffer.bytes
    socket.receive.buffer.bytes
    socket.request.max.bytes
    num.partitions
1 ACCEPTED SOLUTION

avatar
Super Guru

@Boris Demerov

Usually, you don't need to modify these settings, however, if you want to extract every last bit of performance from your machines, then changing some of them can help. You may have to tweak some of the values, but these worked 80% of the cases for me:

  • message.max.bytes=1000000
  • num.network.threads=3
  • num.io.threads=8
  • background.threads=10
  • queued.max.requests=500
  • socket.send.buffer.bytes=102400
  • socket.receive.buffer.bytes=102400
  • socket.request.max.bytes=104857600
  • num.partitions=1

Quick explanations of the numbers:

  • message.max.bytes: This sets the maximum size of the message that the server can receive. This should be set to prevent any producer from inadvertently sending extra large messages and swamping the consumers. The default size is 1000000.
  • num.network.threads: This sets the number of threads running to handle the network's request. If you are going to have too many requests coming in, then you need to change this value. Else, you are good to go. Its default value is 3.
  • num.io.threads: This sets the number of threads spawned for IO operations. This is should be set to the number of disks present at the least. Its default value is 8.
  • background.threads: This sets the number of threads that will be running and doing various background jobs. These include deleting old log files. Its default value is 10 and you might not need to change it.
  • queued.max.requests: This sets the queue size that holds the pending messages while others are being processed by the IO threads. If the queue is full, the network threads will stop accepting any more messages. If you have erratic loads in your application, you need to set queued.max.requests to a value at which it will not throttle.
  • socket.send.buffer.bytes: This sets the SO_SNDBUFF buffer size, which is used for socket connections.
  • socket.receive.buffer.bytes: This sets the SO_RCVBUFF buffer size, which is used for socket connections.
  • socket.request.max.bytes: This sets the maximum size of the request that the server can receive. This should be smaller than the Java heap size you have set.
  • num.partitions: This sets the number of default partitions of a topic you create without explicitly giving any partition size.

Number of partitions may have to be higher than 1 for reliability, but for performance (even not realistic :)), 1 is better.

These are no silver bullet :), however, you could test these changes with a test topic and 1,000/10,000/100,000 messages per second to see the difference between default values and adjusted values. Vary some of them to see the difference.

You may need to configure your Java installation for maximum performance. This includes the settings for heap, socket size, and so on.

***

Hope it helps. Pls vote/accept best answer

View solution in original post

4 REPLIES 4

avatar
Super Guru

@Boris Demerov

Usually, you don't need to modify these settings, however, if you want to extract every last bit of performance from your machines, then changing some of them can help. You may have to tweak some of the values, but these worked 80% of the cases for me:

  • message.max.bytes=1000000
  • num.network.threads=3
  • num.io.threads=8
  • background.threads=10
  • queued.max.requests=500
  • socket.send.buffer.bytes=102400
  • socket.receive.buffer.bytes=102400
  • socket.request.max.bytes=104857600
  • num.partitions=1

Quick explanations of the numbers:

  • message.max.bytes: This sets the maximum size of the message that the server can receive. This should be set to prevent any producer from inadvertently sending extra large messages and swamping the consumers. The default size is 1000000.
  • num.network.threads: This sets the number of threads running to handle the network's request. If you are going to have too many requests coming in, then you need to change this value. Else, you are good to go. Its default value is 3.
  • num.io.threads: This sets the number of threads spawned for IO operations. This is should be set to the number of disks present at the least. Its default value is 8.
  • background.threads: This sets the number of threads that will be running and doing various background jobs. These include deleting old log files. Its default value is 10 and you might not need to change it.
  • queued.max.requests: This sets the queue size that holds the pending messages while others are being processed by the IO threads. If the queue is full, the network threads will stop accepting any more messages. If you have erratic loads in your application, you need to set queued.max.requests to a value at which it will not throttle.
  • socket.send.buffer.bytes: This sets the SO_SNDBUFF buffer size, which is used for socket connections.
  • socket.receive.buffer.bytes: This sets the SO_RCVBUFF buffer size, which is used for socket connections.
  • socket.request.max.bytes: This sets the maximum size of the request that the server can receive. This should be smaller than the Java heap size you have set.
  • num.partitions: This sets the number of default partitions of a topic you create without explicitly giving any partition size.

Number of partitions may have to be higher than 1 for reliability, but for performance (even not realistic :)), 1 is better.

These are no silver bullet :), however, you could test these changes with a test topic and 1,000/10,000/100,000 messages per second to see the difference between default values and adjusted values. Vary some of them to see the difference.

You may need to configure your Java installation for maximum performance. This includes the settings for heap, socket size, and so on.

***

Hope it helps. Pls vote/accept best answer

avatar
Super Guru

@Boris Demerov

Yes. Reference provided by @Randy Gelhausen is awesome, one of the best articles in HCC. It covers Kafka tuning practices beyond the scope of your question, but it is a must read article.

I'd like to point out a few good rules of thumb related to your question, which Wes in his article covered too (I extracted and commented a few):

Set num.io.threads to at least no. of disks you are going to use by default its 8. It be can higher than the number of disks.

A common broker server has 8 disks. That is my current experience, however, this number can be increased.

Set num.network.threads higher based on number of concurrent producers, consumers, and replication factor.

The default value of 3 has been set based on field experience, however, you can take an iterative approach and test different values until you find what is optimal for your case.

Ideally you want to assign the default number of partitions (num.partitions) to at least n-1 servers. This can break up the write workload and it allows for greater parallelism on the consumer side. Remember that Kafka does total ordering within a partition, not over multiple partitions, so make sure you partition intelligently on the producer side to parcel up units of work that might span multiple messages/events.

Consumers benefit from this approach, on producers – careful design is recommended. You need to balance the benefits between producer and consumers based on your business needs.

Kafka is designed for small messages. I recommend you to avoid using kafka for larger messages. If that’s not avoidable there are several ways to go about sending larger messages like 1MB. Use compression if the original message is json, xml or text using compression is the best option to reduce the size. Large messages will affect your performance and throughput. Check your topic partitions and replica.fetch.size to make sure it doesn’t go over your physical ram. Another approach is to break the message into smaller chunks and use the same message key to send it same partition. This way you are sending small messages and these can be re-assembled at the consumer side.

This complicates your Producer and Consumer code in case of very large messages. Design carefully how Producers and Consumers deal with large size messages. There are many ways to implement compression or chunking, as well decompression and assembly. Choose after testing your approach. For example, a high compression ratio is most of the time an advantage but it comes with a price paid for compression/decompression time. It is sometimes more efficient to have a lesser compression as long as you can reduce the size of the message under 1MB, but faster compression/decompression. It all comes-down to your SLAs whether they are ms or seconds.

I am sure I did not cover the domain completely, but hopefully this helps.

avatar

Hi @Boris Demerov, please also see @Wes Floyd's Storm & Kafka guide here. There's some overlap between it and @Constantin Stanca's recommendations, but you may find it useful anyway.

avatar
Contributor

@Randy Gelhausen, thank you for the link. I added that to my favorites!:)

@Constantin Stanca

Thank you so much for your updated response. It provided valuable reasoning and advice and helped me to read easier Wes' article.