Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Tips to increase performance in flafka

Tips to increase performance in flafka


Hello, I have flafka runing in my cluster, but the performance is too slow (like 1500 logs/s) and I want to reach like 10,000 logs/s. I have 1 agent, 1 source, 1 sink and 1 broker. The flume-ng configuration is:


# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources  = kafka-source-1
flume1.channels = hdfs-channel-1  
flume1.sinks    = hdfs-sink-1 
# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect =
flume1.sources.kafka-source-1.topic = kafkatopic2
flume1.sources.kafka-source-1.batchSize = 2000
flume1.sources.kafka-source-1.channels = hdfs-channel-1 = hdfs-channel-1 = logChannel
flume1.channels.hdfs-channel-1.type   = memory
#flume1.channels.logChannel.type   = memory

flume1.channels.hdfs-channel-1.capacity = 100000
flume1.channels.hdfs-channel-1.transactionCapacity = 20000

#flume1.channels.logChannel.capacity = 1000000
#flume1.channels.logChannel.transactionCapacity = 100000

#Interceptors setup
flume1.sources.kafka-source-1.interceptors = i1
flume1.sources.kafka-source-1.interceptors.i1.type =$Builder

# sinks configuration
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = %{product}
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = /user/root/logs/%{product}/%{client}/
flume1.sinks.hdfs-sink-1.hdfs.callTimeout = 1500000

#flume1.sinks.logSink.type = logger

Kafka configuration is:

number of partitions: 1
replication factor: 1
number of replicas in IRS: 1
number of partitions: 50
topic replication factor: 1



Re: Tips to increase performance in flafka

Cloudera Employee

Hi JoaquinS,


I have a few questions for you


1) How are you measuring the number events per second?  

2) What does the interceptor do in your configuration?

3) How many partitions for kafkatopic2 ? 

4) Any reason you are only running 1 broker?





Re: Tips to increase performance in flafka


Hello Jeff,


Answering your questios:


1) With RabbitMQ, the queue system, I'm consuming like 1.5k/s top and it prodce like 5k/s top, so I have a huge queue. Also Cloudera Manager has a graphs that shows the flume and kafka events recived.


2) The interceptor add 2 fields into headers (client, product), also modify the body adding empty field to the missing ones (to structure the logs), because I want to convert them to Hive tables.


3) 51.


4) Because its the simplest configuration, and I read it in the Learning Apache Kafka book. Increasing the number of brokers improve the performance?.


Re: Tips to increase performance in flafka

Cloudera Employee

Thanks for those, it is helpful. 


I'll go in reverse order. 


4) - It is a standard practice to use multiple brokers with Kafka. I typically recommend at least 3 to get started. It is also typical to recommend replication. Kafka's unit of defense against failure is replication. Running without means that you are at the very best susceptible to extended downtimes and at worst data loss. Additionally, you will be ultimately limited by the capacity of a single machine/NIC/{set of disks} etc. 


3) Ok


2) What file format are you using to persist the events into HDFS?  (Avro? Text?)


1)Ok, so mostly the CM graphs.


Currently you have 1 Kafka Consumer (the source) reading from 51 partitions and sending events to the File Channel 2k at a time. Your inteceptor might be responsible for some delay, ( I havent seen your code) but note that each event will be processed sequentially per batch. This may or may not be an issue. If you're comfortable writing an interceptor in the first place and aren't going to be dropping events, I might suggest that you you ditch the Source / File Channel combinatation altogether and just use the Kafka Channel -> HDFS Sink.


In order to modify the events like you mentioned, you can do this in a custom Serializer. Especially if you are just adding a field, it is pretty straightforward to do. 



There's an example here: Serializer Example (This was created for something unrelated), you can just ignore lines 67 & 68.  


If you decide the Serializer option isn't for you, you can still use Kafka Source -> Kafka Channel -> HDFS Sink, though given your single broker config this might not be ideal. 


There will ultimately be a max# of events that you can process in a single thread. Kafka's answer for this is partitions...the idea is that you have multiple readers each reading 1:N partitions. For Flume, the threading model is a bit clunkier, you will likely have to create multiple agents (note you can do this in one config file, but I usually do multiple agents / services in CM). You could test out your current setup of kafka only by running the producer / consumer performance benchmarks. You can then measure how much overhead the Flume piece is adding for you. 


Additionally, you are using ZK for offset storage. I haven't asked about your ZK setup but it might be worth seeing if ZK is limiting you ( you can check the CM graphs). Most places I've seen don't have ZK configured properly for this type of high throughput scenario.


 Given that you are trying to process 10k m/s that means that if everything was going well, you'd be doing 5 offset commits to ZK per second (2k batches). This might be the bottleck. You could try enabling auto-commit for a test to see if this gives you measurable performance benefit and if so, perhaps switch the to kafka in the Flume Source properties ( = kafka) 



Hope this helps.







Re: Tips to increase performance in flafka


Hi Jeff, 


I didn't saw annything that limitates the max event quantity. And also I connected the source directly to the sink, without the interceptor stage, added 2 procesors to the flume node and set auto.commit.enable = true and it didn't changed the performance. Now I'm going to try the kafka serializer, put I have a question, the serializer can do the same thing that the interceptor? that is add 2 headers and modify the body.


Also I have other problem, I have  a 6 node cluster and in one node I have zookeeper, flume and kafka. When flume writes the data it shards betwin the nodes, but the flume node it seems that keeps a copy of the data locally because the disk is increasing the same amount as the input, and the disk is getting full. So what do I do to don't keep that local copy and what is the cause.


I watched the CM and i didn't find anything wrong, here are the charts:


If you can take a look of them I will be very gratful.



flume agent process.png

flume channel.png

flume sink.png

flume source.png

-Kafka topic

kafka topic.png


zookeeper server 1.png

zookeeper server 2.png


Thanks a lot

Re: Tips to increase performance in flafka




Today I found that when I stop flume, and wait a while , and then start flume, it writes at 4k m/s and the channel gets full. So it is not a problem with flume performance. 


An other thing, is that I found that kafka writes the data in a local folder, and then flume read it and writes it in HDFS. It is posible to delete the data of kafka as flume wirites it in HDFS so the disk don't get full with reduntant data?.



Re: Tips to increase performance in flafka

Cloudera Employee

Great news that you are seeing better performance.  I still didn't get what your final file format is. I guess I'm trying to understand the importance of the headers. I can assume if you care about the headers that you are storing the events as Flume AvroEvents on HDFS?


Anyway, you can configure kafka to delete messages more aggressively in CM by changing the log.retention.hours.


In Kafka terms, the "log" is the actual data.





Re: Tips to increase performance in flafka


Hi Jeff,


The file format is JSON, and in a future I want to convert it into parquet .

The improve is because I was testing if flume is the limitation. I stoped flume for a whie, so kafka can store data without flume, and when start it flume can read the missing data. When I stared flume again the performance improved until flume finished writing the pending files that kafka generated when flume was off. Now I have to find who is the responsable of the low performance, zookeeper or kafka. 


And thanks for the help with log.retencion.hours it was what I needed. 

Don't have an account?
Coming from Hortonworks? Activate your account here