Created 07-11-2016 10:16 AM
Hi
I am trying to use new kafka Producer (org.apache.kafka.client.producer.KafkaProducer) on kafka 0.9.0.2.4 on HDP 2.4.0.0-169 sandbox to create a very basic kafka producer/consumer scenario where I am just sending some data on my kafka topic with 1 partition and 1 replication and read it from the kafka-console-consumer. However when I am trying to get data in consumer, I am seeing some data getting lost. Its random, not happening every time and the data lost % is also random. I have done the same thing with the old api (kafka.javapi.producer.Prouder) which works fine on the same sandbox every time. I have tested the same behavior on docker and its giving me similar results. Sometimes the consumer is getting all the records and sometimes it doesn't. Its not consistent. I am not sure if anyone else faced the similar situation. It would be nice to know if I am missing something.
PS - I have tried using callback also, its the same behavior. The consumer I am running is "/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_input --from-beginning"
Here is my code which I am testing -
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} object AsyncKafkaProducerWithoutCallBack { def main(args: Array[String]) = { val properties = new Properties() properties.put("bootstrap.servers", "sandbox:6667") properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties.put("acks", "all") // how many times to retry when produce request fails? properties.put("retries", "3") properties.put("linger.ms", "5") val producer = new KafkaProducer[String, String](properties) val startTime = System.currentTimeMillis() for (i <- 1 to 2000) { val kafkaRecord = new ProducerRecord[String, String]("test_input", s"$i") val start = System.currentTimeMillis() println(producer.send(kafkaRecord)) println(s"Time taken for - $i -- " + (System.currentTimeMillis() - start) + "ms") } println("Total Time taken -- " + (System.currentTimeMillis() - startTime)) } }
Created 07-18-2016 03:47 PM
Solved the mystery. The issue was the "producer.close()" was missing. Main was exiting before kafka finished writing. Strange that this is not required in older api. Anyways for referenece from javadocs -
The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them* to the cluster. Failure to close the producer after use will leak these resources.
Created 07-14-2016 11:37 AM
can you try adding a callback to producer to check whether all calls are succeeding or not
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
Created 07-14-2016 12:19 PM
Thanks Ankit for the reply. But I have mentioned that in my original post that the behavior is same using callback a well. Also this is random. If I run my code like 10 times in a row, I see this happening once or twice randomly using new Producer API. The same thing if I do with the old API, it runs fine. Not sure what is the issue, I am not finding anything weird in the logs as well.
Created 07-15-2016 01:37 PM
Oh, you are saying that no exception is seen in the call back as well. so what about sending the calls to kafka immediately (purpose of below property to avoid buffer and see them at consumer immediately)
batch.size=0 linger.ms=0 block.on.buffer.full=true// just for precaution
Created 07-18-2016 12:45 PM
Hi Ankit
I tried the settings you gave, not working. The only way I can get consistency is when I am using old API or making my calls synchronous i.e. producer.send(record).get() Not sure what is happening why it works for one and doesn't work for others. I am using kafka 0.9.0.1 . Hope that should be stable.
Created 07-18-2016 03:47 PM
Solved the mystery. The issue was the "producer.close()" was missing. Main was exiting before kafka finished writing. Strange that this is not required in older api. Anyways for referenece from javadocs -
The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them* to the cluster. Failure to close the producer after use will leak these resources.