Support Questions

Find answers, ask questions, and share your expertise

Kafka new producer not behaving consistently

avatar
Contributor

Hi

I am trying to use new kafka Producer (org.apache.kafka.client.producer.KafkaProducer) on kafka 0.9.0.2.4 on HDP 2.4.0.0-169 sandbox to create a very basic kafka producer/consumer scenario where I am just sending some data on my kafka topic with 1 partition and 1 replication and read it from the kafka-console-consumer. However when I am trying to get data in consumer, I am seeing some data getting lost. Its random, not happening every time and the data lost % is also random. I have done the same thing with the old api (kafka.javapi.producer.Prouder) which works fine on the same sandbox every time. I have tested the same behavior on docker and its giving me similar results. Sometimes the consumer is getting all the records and sometimes it doesn't. Its not consistent. I am not sure if anyone else faced the similar situation. It would be nice to know if I am missing something.

PS - I have tried using callback also, its the same behavior. The consumer I am running is "/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_input --from-beginning"

Here is my code which I am testing -

import java.util.Properties 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} 

object AsyncKafkaProducerWithoutCallBack { 
def main(args: Array[String]) = { 
val properties = new Properties() 
properties.put("bootstrap.servers", "sandbox:6667") 
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
properties.put("acks", "all") 
// how many times to retry when produce request fails? 
properties.put("retries", "3") 
properties.put("linger.ms", "5") 

val producer = new KafkaProducer[String, String](properties) 
val startTime = System.currentTimeMillis() 
for (i <- 1 to 2000) { 
	val kafkaRecord = new ProducerRecord[String, String]("test_input", s"$i") 
	val start = System.currentTimeMillis() 
	println(producer.send(kafkaRecord)) 
	println(s"Time taken for - $i -- " + (System.currentTimeMillis() - start) + "ms") 
} 
println("Total Time taken -- " + (System.currentTimeMillis() - startTime)) 
} 
}

1 ACCEPTED SOLUTION

avatar
Contributor

Solved the mystery. The issue was the "producer.close()" was missing. Main was exiting before kafka finished writing. Strange that this is not required in older api. Anyways for referenece from javadocs -

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them* to the cluster. Failure to close the producer after use will leak these resources.

View solution in original post

5 REPLIES 5

avatar

can you try adding a callback to producer to check whether all calls are succeeding or not

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
   producer.send(myRecord,
                new Callback() {
                     public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null)
                             e.printStackTrace();
                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
                     }
                });

avatar
Contributor

Thanks Ankit for the reply. But I have mentioned that in my original post that the behavior is same using callback a well. Also this is random. If I run my code like 10 times in a row, I see this happening once or twice randomly using new Producer API. The same thing if I do with the old API, it runs fine. Not sure what is the issue, I am not finding anything weird in the logs as well.

avatar

Oh, you are saying that no exception is seen in the call back as well. so what about sending the calls to kafka immediately (purpose of below property to avoid buffer and see them at consumer immediately)

batch.size=0 
linger.ms=0
block.on.buffer.full=true// just for precaution

avatar
Contributor

Hi Ankit

I tried the settings you gave, not working. The only way I can get consistency is when I am using old API or making my calls synchronous i.e. producer.send(record).get() Not sure what is happening why it works for one and doesn't work for others. I am using kafka 0.9.0.1 . Hope that should be stable.

avatar
Contributor

Solved the mystery. The issue was the "producer.close()" was missing. Main was exiting before kafka finished writing. Strange that this is not required in older api. Anyways for referenece from javadocs -

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them* to the cluster. Failure to close the producer after use will leak these resources.