Support Questions

Find answers, ask questions, and share your expertise

Kafka Producer Async Send with Callback - Error Handling

avatar
Expert Contributor

Problem Statement: How do I get access to the Producer Record when I encounter an exception from my asynchronous send method returned within the Callback function used?

 

Other Information

I understand that the Callback can return a series of retriable and non-retriable exceptions.  I also get that the Callback is operating on another thread.  It is the notion that the Callback is on another thread that makes me wonder if I try to reference the ProducerRecord message variable, if I am guaranteed to get the same message that aligns to this exception, or if there is a chance that the main thread has continued on and the message is not another value by the time I attempt to reference it via the Callback(). 

 

I don't understand enough about Futures and Callbacks to be 100% confident and it's hard to validate when on my machine with multiple threads as well.

 

 

Example snippet of the method, stripped down significantly:

ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
	aProducer.send(message, new Callback() {
		public void onCompletion(RecordMetadata metadata, Exception exception) {
			if (exception != null) {
				// How do I find get the original message so that I can do something with it if needed?
				throw new KafkaException("Asynchronous send failure: ", exception);
			} else {
				//NoOp
			}
	}
}

 

 

Is it as simple as referencing the ProducerRecord message variable or do I have to approach it another way to get access to the message so that I can do something with it should I need to?

 

appreciated!

1 ACCEPTED SOLUTION

avatar
Expert Contributor

I figured out a solution to my problem with the help of a friend and thought I'd share.  Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to. A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means it's advisable to avoid significant processing in the Callback(). 

 

The code snippet example follows:

 

ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
       aProducer.send(message, new MyCallback(message.key(),new String(message.value())));
	}
}


		
//Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much.
class MyCallback implements Callback {

	private final String messageKey;
	private final String messageValue;

	public MyCallback(String messageKey, String messageValue) {
		this.messageKey = messageKey;
		this.messageValue = messageValue;
	}

	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
             // This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later.
             [...] 
        }

View solution in original post

3 REPLIES 3

avatar
Expert Contributor
Anyone?

avatar
Expert Contributor

I figured out a solution to my problem with the help of a friend and thought I'd share.  Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to. A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means it's advisable to avoid significant processing in the Callback(). 

 

The code snippet example follows:

 

ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
       aProducer.send(message, new MyCallback(message.key(),new String(message.value())));
	}
}


		
//Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much.
class MyCallback implements Callback {

	private final String messageKey;
	private final String messageValue;

	public MyCallback(String messageKey, String messageValue) {
		this.messageKey = messageKey;
		this.messageValue = messageValue;
	}

	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
             // This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later.
             [...] 
        }

avatar
New Contributor

What I did is to use a Semaphore to wait for the Callback and then within the Callback itself I set the Exception to a local variable in the main thread and then it throws it to the client:

 

 

	@Override
	public int send_(String data) throws Exception
	{
		logger.trace("Entering KafkaMQDeviceService.send_() data:" + data);
		Semaphore semaphore = new Semaphore(0);
		producer.send(new ProducerRecord<String, String>(topic, data), new Callback()
		{
			@Override
			public void onCompletion(RecordMetadata meta, Exception e)
			{
				if ( null != e )
					setExceptionOnSend(e);
				semaphore.release();
			}
		});
		
		producer.flush();
		try
		{
			semaphore.acquire();
		} catch (InterruptedException e) {}
		
		if ( null != exceptionOnSend )
			throw exceptionOnSend;
		
		logger.trace("Leaving KafkaMQDeviceService.send_()");
		return StatusConstants.DEVICE_SEND_SUCCESSFUL;
	}
	

 

Hope it helps.