Created on 03-26-2017 09:12 PM - edited 09-16-2022 04:20 AM
Problem Statement: How do I get access to the Producer Record when I encounter an exception from my asynchronous send method returned within the Callback function used?
Other Information
I understand that the Callback can return a series of retriable and non-retriable exceptions. I also get that the Callback is operating on another thread. It is the notion that the Callback is on another thread that makes me wonder if I try to reference the ProducerRecord message variable, if I am guaranteed to get the same message that aligns to this exception, or if there is a chance that the main thread has continued on and the message is not another value by the time I attempt to reference it via the Callback().
I don't understand enough about Futures and Callbacks to be 100% confident and it's hard to validate when on my machine with multiple threads as well.
Example snippet of the method, stripped down significantly:
ProducerRecord<String, byte[]> message = null; protected KafkaProducer<String, byte[]> aProducer = null; aProducer = createKafkaProducer(); [...] message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString); send(); [...] public void send() { aProducer.send(message, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { // How do I find get the original message so that I can do something with it if needed? throw new KafkaException("Asynchronous send failure: ", exception); } else { //NoOp } } }
Is it as simple as referencing the ProducerRecord message variable or do I have to approach it another way to get access to the message so that I can do something with it should I need to?
appreciated!
Created 04-05-2017 05:51 PM
I figured out a solution to my problem with the help of a friend and thought I'd share. Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to. A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means it's advisable to avoid significant processing in the Callback().
The code snippet example follows:
ProducerRecord<String, byte[]> message = null; protected KafkaProducer<String, byte[]> aProducer = null; aProducer = createKafkaProducer(); [...] message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString); send(); [...] public void send() { aProducer.send(message, new MyCallback(message.key(),new String(message.value()))); } } //Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much. class MyCallback implements Callback { private final String messageKey; private final String messageValue; public MyCallback(String messageKey, String messageValue) { this.messageKey = messageKey; this.messageValue = messageValue; } @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later. [...] }
Created 04-05-2017 09:11 AM
Created 04-05-2017 05:51 PM
I figured out a solution to my problem with the help of a friend and thought I'd share. Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to. A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means it's advisable to avoid significant processing in the Callback().
The code snippet example follows:
ProducerRecord<String, byte[]> message = null; protected KafkaProducer<String, byte[]> aProducer = null; aProducer = createKafkaProducer(); [...] message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString); send(); [...] public void send() { aProducer.send(message, new MyCallback(message.key(),new String(message.value()))); } } //Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much. class MyCallback implements Callback { private final String messageKey; private final String messageValue; public MyCallback(String messageKey, String messageValue) { this.messageKey = messageKey; this.messageValue = messageValue; } @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later. [...] }
Created 08-07-2018 09:20 AM
What I did is to use a Semaphore to wait for the Callback and then within the Callback itself I set the Exception to a local variable in the main thread and then it throws it to the client:
@Override public int send_(String data) throws Exception { logger.trace("Entering KafkaMQDeviceService.send_() data:" + data); Semaphore semaphore = new Semaphore(0); producer.send(new ProducerRecord<String, String>(topic, data), new Callback() { @Override public void onCompletion(RecordMetadata meta, Exception e) { if ( null != e ) setExceptionOnSend(e); semaphore.release(); } }); producer.flush(); try { semaphore.acquire(); } catch (InterruptedException e) {} if ( null != exceptionOnSend ) throw exceptionOnSend; logger.trace("Leaving KafkaMQDeviceService.send_()"); return StatusConstants.DEVICE_SEND_SUCCESSFUL; }
Hope it helps.