Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here. Want to know more about what has changed? Check out the Community News blog.

Kafka Producer Async Send with Callback - Error Handling

SOLVED Go to solution

Kafka Producer Async Send with Callback - Error Handling

Rising Star

Problem Statement: How do I get access to the Producer Record when I encounter an exception from my asynchronous send method returned within the Callback function used?

 

Other Information

I understand that the Callback can return a series of retriable and non-retriable exceptions.  I also get that the Callback is operating on another thread.  It is the notion that the Callback is on another thread that makes me wonder if I try to reference the ProducerRecord message variable, if I am guaranteed to get the same message that aligns to this exception, or if there is a chance that the main thread has continued on and the message is not another value by the time I attempt to reference it via the Callback(). 

 

I don't understand enough about Futures and Callbacks to be 100% confident and it's hard to validate when on my machine with multiple threads as well.

 

 

Example snippet of the method, stripped down significantly:

ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
	aProducer.send(message, new Callback() {
		public void onCompletion(RecordMetadata metadata, Exception exception) {
			if (exception != null) {
				// How do I find get the original message so that I can do something with it if needed?
				throw new KafkaException("Asynchronous send failure: ", exception);
			} else {
				//NoOp
			}
	}
}

 

 

Is it as simple as referencing the ProducerRecord message variable or do I have to approach it another way to get access to the message so that I can do something with it should I need to?

 

appreciated!

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Kafka Producer Async Send with Callback - Error Handling

Rising Star

I figured out a solution to my problem with the help of a friend and thought I'd share.  Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to. A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means it's advisable to avoid significant processing in the Callback(). 

 

The code snippet example follows:

 

ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
       aProducer.send(message, new MyCallback(message.key(),new String(message.value())));
	}
}


		
//Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much.
class MyCallback implements Callback {

	private final String messageKey;
	private final String messageValue;

	public MyCallback(String messageKey, String messageValue) {
		this.messageKey = messageKey;
		this.messageValue = messageValue;
	}

	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
             // This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later.
             [...] 
        }
3 REPLIES 3

Re: Kafka Producer Async Send with Callback - Error Handling

Rising Star
Anyone?

Re: Kafka Producer Async Send with Callback - Error Handling

Rising Star

I figured out a solution to my problem with the help of a friend and thought I'd share.  Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to. A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means it's advisable to avoid significant processing in the Callback(). 

 

The code snippet example follows:

 

ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
       aProducer.send(message, new MyCallback(message.key(),new String(message.value())));
	}
}


		
//Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much.
class MyCallback implements Callback {

	private final String messageKey;
	private final String messageValue;

	public MyCallback(String messageKey, String messageValue) {
		this.messageKey = messageKey;
		this.messageValue = messageValue;
	}

	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
             // This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later.
             [...] 
        }
Highlighted

Re: Kafka Producer Async Send with Callback - Error Handling

New Contributor

What I did is to use a Semaphore to wait for the Callback and then within the Callback itself I set the Exception to a local variable in the main thread and then it throws it to the client:

 

 

	@Override
	public int send_(String data) throws Exception
	{
		logger.trace("Entering KafkaMQDeviceService.send_() data:" + data);
		Semaphore semaphore = new Semaphore(0);
		producer.send(new ProducerRecord<String, String>(topic, data), new Callback()
		{
			@Override
			public void onCompletion(RecordMetadata meta, Exception e)
			{
				if ( null != e )
					setExceptionOnSend(e);
				semaphore.release();
			}
		});
		
		producer.flush();
		try
		{
			semaphore.acquire();
		} catch (InterruptedException e) {}
		
		if ( null != exceptionOnSend )
			throw exceptionOnSend;
		
		logger.trace("Leaving KafkaMQDeviceService.send_()");
		return StatusConstants.DEVICE_SEND_SUCCESSFUL;
	}
	

 

Hope it helps.