Member since
08-07-2018
1
Post
0
Kudos Received
0
Solutions
08-07-2018
09:20 AM
What I did is to use a Semaphore to wait for the Callback and then within the Callback itself I set the Exception to a local variable in the main thread and then it throws it to the client: @Override
public int send_(String data) throws Exception
{
logger.trace("Entering KafkaMQDeviceService.send_() data:" + data);
Semaphore semaphore = new Semaphore(0);
producer.send(new ProducerRecord<String, String>(topic, data), new Callback()
{
@Override
public void onCompletion(RecordMetadata meta, Exception e)
{
if ( null != e )
setExceptionOnSend(e);
semaphore.release();
}
});
producer.flush();
try
{
semaphore.acquire();
} catch (InterruptedException e) {}
if ( null != exceptionOnSend )
throw exceptionOnSend;
logger.trace("Leaving KafkaMQDeviceService.send_()");
return StatusConstants.DEVICE_SEND_SUCCESSFUL;
}
Hope it helps.
... View more