Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Kafka new producer not behaving consistently

Solved Go to solution
Highlighted

Kafka new producer not behaving consistently

Hi

I am trying to use new kafka Producer (org.apache.kafka.client.producer.KafkaProducer) on kafka 0.9.0.2.4 on HDP 2.4.0.0-169 sandbox to create a very basic kafka producer/consumer scenario where I am just sending some data on my kafka topic with 1 partition and 1 replication and read it from the kafka-console-consumer. However when I am trying to get data in consumer, I am seeing some data getting lost. Its random, not happening every time and the data lost % is also random. I have done the same thing with the old api (kafka.javapi.producer.Prouder) which works fine on the same sandbox every time. I have tested the same behavior on docker and its giving me similar results. Sometimes the consumer is getting all the records and sometimes it doesn't. Its not consistent. I am not sure if anyone else faced the similar situation. It would be nice to know if I am missing something.

PS - I have tried using callback also, its the same behavior. The consumer I am running is "/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_input --from-beginning"

Here is my code which I am testing -

import java.util.Properties 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} 

object AsyncKafkaProducerWithoutCallBack { 
def main(args: Array[String]) = { 
val properties = new Properties() 
properties.put("bootstrap.servers", "sandbox:6667") 
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
properties.put("acks", "all") 
// how many times to retry when produce request fails? 
properties.put("retries", "3") 
properties.put("linger.ms", "5") 

val producer = new KafkaProducer[String, String](properties) 
val startTime = System.currentTimeMillis() 
for (i <- 1 to 2000) { 
	val kafkaRecord = new ProducerRecord[String, String]("test_input", s"$i") 
	val start = System.currentTimeMillis() 
	println(producer.send(kafkaRecord)) 
	println(s"Time taken for - $i -- " + (System.currentTimeMillis() - start) + "ms") 
} 
println("Total Time taken -- " + (System.currentTimeMillis() - startTime)) 
} 
}

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Kafka new producer not behaving consistently

Solved the mystery. The issue was the "producer.close()" was missing. Main was exiting before kafka finished writing. Strange that this is not required in older api. Anyways for referenece from javadocs -

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them* to the cluster. Failure to close the producer after use will leak these resources.

View solution in original post

5 REPLIES 5
Highlighted

Re: Kafka new producer not behaving consistently

can you try adding a callback to producer to check whether all calls are succeeding or not

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
   producer.send(myRecord,
                new Callback() {
                     public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null)
                             e.printStackTrace();
                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
                     }
                });
Highlighted

Re: Kafka new producer not behaving consistently

Thanks Ankit for the reply. But I have mentioned that in my original post that the behavior is same using callback a well. Also this is random. If I run my code like 10 times in a row, I see this happening once or twice randomly using new Producer API. The same thing if I do with the old API, it runs fine. Not sure what is the issue, I am not finding anything weird in the logs as well.

Highlighted

Re: Kafka new producer not behaving consistently

Oh, you are saying that no exception is seen in the call back as well. so what about sending the calls to kafka immediately (purpose of below property to avoid buffer and see them at consumer immediately)

batch.size=0 
linger.ms=0
block.on.buffer.full=true// just for precaution
Highlighted

Re: Kafka new producer not behaving consistently

Hi Ankit

I tried the settings you gave, not working. The only way I can get consistency is when I am using old API or making my calls synchronous i.e. producer.send(record).get() Not sure what is happening why it works for one and doesn't work for others. I am using kafka 0.9.0.1 . Hope that should be stable.

Highlighted

Re: Kafka new producer not behaving consistently

Solved the mystery. The issue was the "producer.close()" was missing. Main was exiting before kafka finished writing. Strange that this is not required in older api. Anyways for referenece from javadocs -

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them* to the cluster. Failure to close the producer after use will leak these resources.

View solution in original post

Don't have an account?
Coming from Hortonworks? Activate your account here