Created on 03-26-2017 09:12 PM - edited 09-16-2022 04:20 AM
Problem Statement: How do I get access to the Producer Record when I encounter an exception from my asynchronous send method returned within the Callback function used?
Other Information
I understand that the Callback can return a series of retriable and non-retriable exceptions. I also get that the Callback is operating on another thread. It is the notion that the Callback is on another thread that makes me wonder if I try to reference the ProducerRecord message variable, if I am guaranteed to get the same message that aligns to this exception, or if there is a chance that the main thread has continued on and the message is not another value by the time I attempt to reference it via the Callback().
I don't understand enough about Futures and Callbacks to be 100% confident and it's hard to validate when on my machine with multiple threads as well.
Example snippet of the method, stripped down significantly:
ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();
[...]
message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();
[...]
public void send() {
aProducer.send(message, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// How do I find get the original message so that I can do something with it if needed?
throw new KafkaException("Asynchronous send failure: ", exception);
} else {
//NoOp
}
}
}
Is it as simple as referencing the ProducerRecord message variable or do I have to approach it another way to get access to the message so that I can do something with it should I need to?
appreciated!
Created 04-05-2017 05:51 PM
I figured out a solution to my problem with the help of a friend and thought I'd share. Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to. A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means it's advisable to avoid significant processing in the Callback().
The code snippet example follows:
ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();
[...]
message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();
[...]
public void send() {
aProducer.send(message, new MyCallback(message.key(),new String(message.value())));
}
}
//Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much.
class MyCallback implements Callback {
private final String messageKey;
private final String messageValue;
public MyCallback(String messageKey, String messageValue) {
this.messageKey = messageKey;
this.messageValue = messageValue;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later.
[...]
}
Created 04-05-2017 09:11 AM
Created 04-05-2017 05:51 PM
I figured out a solution to my problem with the help of a friend and thought I'd share. Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to. A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means it's advisable to avoid significant processing in the Callback().
The code snippet example follows:
ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();
[...]
message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();
[...]
public void send() {
aProducer.send(message, new MyCallback(message.key(),new String(message.value())));
}
}
//Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much.
class MyCallback implements Callback {
private final String messageKey;
private final String messageValue;
public MyCallback(String messageKey, String messageValue) {
this.messageKey = messageKey;
this.messageValue = messageValue;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later.
[...]
}
Created 08-07-2018 09:20 AM
What I did is to use a Semaphore to wait for the Callback and then within the Callback itself I set the Exception to a local variable in the main thread and then it throws it to the client:
@Override
public int send_(String data) throws Exception
{
logger.trace("Entering KafkaMQDeviceService.send_() data:" + data);
Semaphore semaphore = new Semaphore(0);
producer.send(new ProducerRecord<String, String>(topic, data), new Callback()
{
@Override
public void onCompletion(RecordMetadata meta, Exception e)
{
if ( null != e )
setExceptionOnSend(e);
semaphore.release();
}
});
producer.flush();
try
{
semaphore.acquire();
} catch (InterruptedException e) {}
if ( null != exceptionOnSend )
throw exceptionOnSend;
logger.trace("Leaving KafkaMQDeviceService.send_()");
return StatusConstants.DEVICE_SEND_SUCCESSFUL;
}
Hope it helps.