<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Kafka Producer Async Send with Callback - Error Handling in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-Async-Send-with-Callback-Error-Handling/m-p/53228#M58127</link>
    <description>Anyone?</description>
    <pubDate>Wed, 05 Apr 2017 16:11:20 GMT</pubDate>
    <dc:creator>tseader</dc:creator>
    <dc:date>2017-04-05T16:11:20Z</dc:date>
    <item>
      <title>Kafka Producer Async Send with Callback - Error Handling</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-Async-Send-with-Callback-Error-Handling/m-p/52676#M58126</link>
      <description>&lt;P&gt;&lt;STRONG&gt;&lt;U&gt;Problem Statement:&lt;/U&gt;&lt;/STRONG&gt;&amp;nbsp;How do&amp;nbsp;I get access to the Producer Record when I encounter an exception from my asynchronous send method returned&amp;nbsp;within the Callback function used?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;U&gt;&lt;STRONG&gt;Other Information&lt;/STRONG&gt;&lt;/U&gt;&lt;/P&gt;&lt;P&gt;I understand that the Callback can return&amp;nbsp;a series of retriable and non-retriable exceptions.&amp;nbsp; I also get that the Callback is operating on another thread.&amp;nbsp; It is the notion that the Callback is on another thread that makes me wonder if I try to reference the ProducerRecord &lt;EM&gt;message&lt;/EM&gt; variable, if I am guaranteed to get the same message that aligns to &lt;EM&gt;this &lt;/EM&gt;exception, or if there is a chance that the main thread has continued on and the &lt;EM&gt;message&lt;/EM&gt; is not another value by the time I attempt to reference it via the Callback().&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I don't understand enough about Futures and Callbacks to be 100% confident and it's hard to validate when on my machine with multiple threads as well.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;U&gt;&lt;STRONG&gt;Example snippet of the method, stripped down significantly:&lt;/STRONG&gt;&lt;/U&gt;&lt;/P&gt;&lt;PRE&gt;ProducerRecord&amp;lt;String, byte[]&amp;gt; message = null;
protected KafkaProducer&amp;lt;String, byte[]&amp;gt; aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord&amp;lt;String, byte[]&amp;gt;(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
	aProducer.send(message, new Callback() {
		public void onCompletion(RecordMetadata metadata, Exception exception) {
			if (exception != null) {
				// How do I find get the original message so that I can do something with it if needed?
				throw new KafkaException("Asynchronous send failure: ", exception);
			} else {
				//NoOp
			}
	}
}&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Is it as simple as referencing the ProducerRecord &lt;EM&gt;message&lt;/EM&gt; variable or do I have to approach it another way to get access to the &lt;EM&gt;message&lt;/EM&gt; so that I can do something with it should I need to?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;appreciated!&lt;/P&gt;</description>
      <pubDate>Fri, 16 Sep 2022 11:20:30 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-Async-Send-with-Callback-Error-Handling/m-p/52676#M58126</guid>
      <dc:creator>tseader</dc:creator>
      <dc:date>2022-09-16T11:20:30Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka Producer Async Send with Callback - Error Handling</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-Async-Send-with-Callback-Error-Handling/m-p/53228#M58127</link>
      <description>Anyone?</description>
      <pubDate>Wed, 05 Apr 2017 16:11:20 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-Async-Send-with-Callback-Error-Handling/m-p/53228#M58127</guid>
      <dc:creator>tseader</dc:creator>
      <dc:date>2017-04-05T16:11:20Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka Producer Async Send with Callback - Error Handling</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-Async-Send-with-Callback-Error-Handling/m-p/53245#M58128</link>
      <description>&lt;P&gt;I figured out a solution to my problem with the help of a friend&amp;nbsp;and thought I'd share.&amp;nbsp; Pretty much what I did was implement Callback() with an overloaded constructor that accepts the my message key and value so that I can choose to do something with it should I need to.&amp;nbsp;A note for anyone reading this though is that the API it states that "this callback will generally execute in the background I/O thread so it should be fast", which means&amp;nbsp;it's advisable to avoid significant processing in the Callback().&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;The code snippet example follows:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;ProducerRecord&amp;lt;String, byte[]&amp;gt; message = null;
protected KafkaProducer&amp;lt;String, byte[]&amp;gt; aProducer = null;
aProducer = createKafkaProducer();

[...]

message = new ProducerRecord&amp;lt;String, byte[]&amp;gt;(producerConfig.getKafkaTopic(), byteString);
send();

[...]

public void send() {
       aProducer.send(message, new MyCallback(message.key(),new String(message.value())));
	}
}


		
//Implemented Callback but I overloaded the constructor to pass in the message so that, if needed, I can do something with the message itself on error. Since according to the API, "This callback will generally execute in the background I/O thread so it should be fast.", I should avoid expensive actions in this callback, so I would have to spin of another executor if I want to do much.
class MyCallback implements Callback {

	private final String messageKey;
	private final String messageValue;

	public MyCallback(String messageKey, String messageValue) {
		this.messageKey = messageKey;
		this.messageValue = messageValue;
	}

	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
             // This is where I will handle exceptions and I'll have the message should I need to write it out somewhere to deal with later.
             [...] 
        }&lt;/PRE&gt;</description>
      <pubDate>Thu, 06 Apr 2017 00:51:39 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-Async-Send-with-Callback-Error-Handling/m-p/53245#M58128</guid>
      <dc:creator>tseader</dc:creator>
      <dc:date>2017-04-06T00:51:39Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka Producer Async Send with Callback - Error Handling</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-Async-Send-with-Callback-Error-Handling/m-p/78190#M58129</link>
      <description>&lt;P&gt;What I did is to use a Semaphore to wait for the Callback&amp;nbsp;and then within the Callback itself I set the Exception to a local variable in the main thread and then it&amp;nbsp;throws it to the client:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;	@Override
	public int send_(String data) throws Exception
	{
		logger.trace("Entering KafkaMQDeviceService.send_() data:" + data);
		Semaphore semaphore = new Semaphore(0);
		producer.send(new ProducerRecord&amp;lt;String, String&amp;gt;(topic, data), new Callback()
		{
			@Override
			public void onCompletion(RecordMetadata meta, Exception e)
			{
				if ( null != e )
					setExceptionOnSend(e);
				semaphore.release();
			}
		});
		
		producer.flush();
		try
		{
			semaphore.acquire();
		} catch (InterruptedException e) {}
		
		if ( null != exceptionOnSend )
			throw exceptionOnSend;
		
		logger.trace("Leaving KafkaMQDeviceService.send_()");
		return StatusConstants.DEVICE_SEND_SUCCESSFUL;
	}
	&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Hope it helps.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 07 Aug 2018 16:20:02 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Producer-Async-Send-with-Callback-Error-Handling/m-p/78190#M58129</guid>
      <dc:creator>laurakia</dc:creator>
      <dc:date>2018-08-07T16:20:02Z</dc:date>
    </item>
  </channel>
</rss>

