Reply
Contributor
Posts: 48
Registered: ‎08-05-2015
Accepted Solution

Kafka Producer Async Send with Callback - Error Handling

Problem Statement: How do I get access to the Producer Record when I encounter an exception from my asynchronous send method returned within the Callback function used?

 

Other Information

I understand that the Callback can return a series of retriable and non-retriable exceptions.  I also get that the Callback is operating on another thread.  It is the notion that the Callback is on another thread that makes me wonder if I try to reference the ProducerRecord message variable, if I am guaranteed to get the same message that aligns to this exception, or if there is a chance that the main thread has continued on and the message is not another value by the time I attempt to reference it via the Callback(). 

 

I don't understand enough about Futures and Callbacks to be 100% confident and it's hard to validate when on my machine with multiple threads as well.

 

 

Example snippet of the method, stripped down significantly:

ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
	aProducer.send(message, new Callback() {
		public void onCompletion(RecordMetadata metadata, Exception exception) {
			if (exception != null) {
				// How do I find get the original message so that I can do something with it if needed?
				throw new KafkaException("Asynchronous send failure: ", exception);
			} else {
				//NoOp
			}
	}
}

 

 

Is it as simple as referencing the ProducerRecord message variable or do I have to approach it another way to get access to the message so that I can do something with it should I need to?

 

appreciated!

Contributor
Posts: 48
Registered: ‎08-05-2015

Re: Kafka Producer Async Send with Callback - Error Handling

Anyone?
Highlighted
Contributor
Posts: 48
Registered: ‎08-05-2015

Re: Kafka Producer Async Send with Callback - Error Handling

I figured out a solution to my problem with the help of a friend and thought I'd share.  Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to. A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means it's advisable to avoid significant processing in the Callback(). 

 

The code snippet example follows:

 

ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
       aProducer.send(message, new MyCallback(message.key(),new String(message.value())));
	}
}


		
//Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much.
class MyCallback implements Callback {

	private final String messageKey;
	private final String messageValue;

	public MyCallback(String messageKey, String messageValue) {
		this.messageKey = messageKey;
		this.messageValue = messageValue;
	}

	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
             // This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later.
             [...] 
        }
Announcements