<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Disconnecting from node due to socket connection timeout in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384226#M245278</link>
    <description>&lt;P&gt;This is my producer code-&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.util.Properties;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.producer.KafkaProducer;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.producer.ProducerRecord;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.producer.RecordMetadata;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt; new_producer {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;static&lt;/SPAN&gt; &lt;SPAN&gt;void&lt;/SPAN&gt;&lt;SPAN&gt; main(&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;[] args) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;Properties&lt;/SPAN&gt;&lt;SPAN&gt; prop = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt;&lt;SPAN&gt; Properties();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"bootstrap.servers"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"localhost:9092"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"key.serializer"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"org.apache.kafka.common.serialization.StringSerializer"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"value.serializer"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"org.apache.kafka.common.serialization.IntegerSerializer"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;KafkaProducer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; producer = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt; &lt;SPAN&gt;KafkaProducer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;(prop);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;ProducerRecord&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; record = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt; &lt;SPAN&gt;ProducerRecord&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"OrderTopic"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"tddgfhj laptops"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;15&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//ProducerRecord&amp;lt;String, Integer&amp;gt; recordtwo = new ProducerRecord&amp;lt;String, Integer&amp;gt;("OrderTopic", "Dell desktops", 15);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;RecordMetadata&lt;/SPAN&gt;&lt;SPAN&gt; recordMetadata = producer.send(record).get();&lt;/SPAN&gt;&lt;SPAN&gt;// synchronous call,waiting&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//RecordMetadata recordMetadatatwo = producer.send(recordtwo).get();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(recordMetadata.partition());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(recordMetadata.offset());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//System.out.println(recordMetadatatwo.partition());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//System.out.println(recordMetadatatwo.offset());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.err.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Message sent"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;Exception&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e.printStackTrace();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;finally&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; producer.close();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;SPAN&gt;} This is my consumer code-&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.time.Duration;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.time.LocalDateTime;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.time.format.DateTimeFormatter;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.util.Arrays;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.util.Properties;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.ConsumerRecords;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.ConsumerConfig;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.ConsumerRecord;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.KafkaConsumer;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.common.serialization.StringDeserializer;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.common.errors.WakeupException;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt; new_consumer {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;static&lt;/SPAN&gt; &lt;SPAN&gt;void&lt;/SPAN&gt;&lt;SPAN&gt; main(&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;[] args)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"I am a Kafka Consumer"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt; bootstrapServers = &lt;/SPAN&gt;&lt;SPAN&gt;"localhost:9092"&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt; groupId = &lt;/SPAN&gt;&lt;SPAN&gt;"salesOrder-Consumers1"&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt; topic = &lt;/SPAN&gt;&lt;SPAN&gt;"OrderTopic"&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// create consumer configs&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;Properties&lt;/SPAN&gt;&lt;SPAN&gt; properties = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt;&lt;SPAN&gt; Properties();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.&lt;/SPAN&gt;&lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt;.getName());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.&lt;/SPAN&gt;&lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt;.getName());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &lt;/SPAN&gt;&lt;SPAN&gt;"earliest"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// create consumer&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;KafkaConsumer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; consumer = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt; &lt;SPAN&gt;KafkaConsumer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&amp;gt;(properties);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//System.out.println(consumer.position(tp));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// get a reference to the current thread&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;final&lt;/SPAN&gt; &lt;SPAN&gt;Thread&lt;/SPAN&gt;&lt;SPAN&gt; mainThread = Thread.currentThread();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// adding the shutdown hook&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; Runtime.getRuntime().addShutdownHook(&lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt;&lt;SPAN&gt; Thread() {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;void&lt;/SPAN&gt;&lt;SPAN&gt; run() {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Detected a shutdown, let's exit by calling consumer.wakeup()..."&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.wakeup();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// join the main thread to allow the execution of the code in the main thread&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; mainThread.join();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;InterruptedException&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e.printStackTrace();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; });&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// subscribe consumer to our topic(s)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.subscribe(Arrays.asList(topic));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;DateTimeFormatter&lt;/SPAN&gt;&lt;SPAN&gt; dtf = DateTimeFormatter.ofPattern(&lt;/SPAN&gt;&lt;SPAN&gt;"yyyy/MM/dd"&lt;/SPAN&gt;&lt;SPAN&gt;); &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;LocalDateTime&lt;/SPAN&gt;&lt;SPAN&gt; now = LocalDateTime.now(); &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(dtf.format(now));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;while&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;ConsumerRecords&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; records =&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.poll(Duration.ofMillis(&lt;/SPAN&gt;&lt;SPAN&gt;100&lt;/SPAN&gt;&lt;SPAN&gt;));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;for&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;ConsumerRecord&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; record &lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;SPAN&gt; records) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//FW.write(record.value()+(","));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Key: "&lt;/SPAN&gt;&lt;SPAN&gt; + record.key() + &lt;/SPAN&gt;&lt;SPAN&gt;", Value: "&lt;/SPAN&gt;&lt;SPAN&gt; + record.value());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Partition: "&lt;/SPAN&gt;&lt;SPAN&gt; + record.partition() + &lt;/SPAN&gt;&lt;SPAN&gt;", Offset:"&lt;/SPAN&gt;&lt;SPAN&gt; + record.offset());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;WakeupException&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Wake up exception!"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// we ignore this as this is an expected exception when closing a consumer&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;Exception&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Unexpected exception "&lt;/SPAN&gt;&lt;SPAN&gt; + e.getMessage());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;finally&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.close(); &lt;/SPAN&gt;&lt;SPAN&gt;// this will also commit the offsets if need be.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"The consumer is now gracefully closed."&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;} ....can you please help me out?&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Thu, 29 Feb 2024 08:50:55 GMT</pubDate>
    <dc:creator>Andy1989</dc:creator>
    <dc:date>2024-02-29T08:50:55Z</dc:date>
    <item>
      <title>Kafka  broker disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384217#M245274</link>
      <description>&lt;P&gt;Hello,I am a beginner starting to learn Kafka and Cloudera. I am running a simple java code for producer,and am using Cloudera SMM through Docker Desktop. On running my java code,I keep getting this error-"Disconnecting from node 1001 due to socket connection setup timeout". There are no issues in my code or pom. I am using Java Maven in VSCode. Please help me to fix this issue&lt;/P&gt;</description>
      <pubDate>Mon, 23 Mar 2026 12:55:21 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384217#M245274</guid>
      <dc:creator>Andy1989</dc:creator>
      <dc:date>2026-03-23T12:55:21Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384221#M245276</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/109411"&gt;@Andy1989&lt;/a&gt;&amp;nbsp;Welcome to our community! To help you get the best possible answer, I have tagged in our Kafka experts&amp;nbsp; &lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/15085"&gt;@Yuexin Zhang&lt;/a&gt;&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/83812"&gt;@ywu&lt;/a&gt;&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/81193"&gt;@Babasaheb&lt;/a&gt;&amp;nbsp;@ who may be able to assist you further&lt;BR /&gt;&lt;BR /&gt;Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.&lt;/P&gt;</description>
      <pubDate>Thu, 29 Feb 2024 07:14:08 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384221#M245276</guid>
      <dc:creator>VidyaSargur</dc:creator>
      <dc:date>2024-02-29T07:14:08Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384223#M245277</link>
      <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/109411"&gt;@Andy1989&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;"socket connection setup timeout" sounds like some network issue on client side.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;May I know you specify Kafka broker address and port in your code?&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;Is the address solvable from client side and is the port number of Kafka broker correct?&lt;/P&gt;</description>
      <pubDate>Thu, 29 Feb 2024 07:24:48 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384223#M245277</guid>
      <dc:creator>ywu</dc:creator>
      <dc:date>2024-02-29T07:24:48Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384226#M245278</link>
      <description>&lt;P&gt;This is my producer code-&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.util.Properties;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.producer.KafkaProducer;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.producer.ProducerRecord;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.producer.RecordMetadata;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt; new_producer {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;static&lt;/SPAN&gt; &lt;SPAN&gt;void&lt;/SPAN&gt;&lt;SPAN&gt; main(&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;[] args) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;Properties&lt;/SPAN&gt;&lt;SPAN&gt; prop = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt;&lt;SPAN&gt; Properties();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"bootstrap.servers"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"localhost:9092"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"key.serializer"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"org.apache.kafka.common.serialization.StringSerializer"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"value.serializer"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"org.apache.kafka.common.serialization.IntegerSerializer"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;KafkaProducer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; producer = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt; &lt;SPAN&gt;KafkaProducer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;(prop);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;ProducerRecord&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; record = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt; &lt;SPAN&gt;ProducerRecord&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"OrderTopic"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"tddgfhj laptops"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;15&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//ProducerRecord&amp;lt;String, Integer&amp;gt; recordtwo = new ProducerRecord&amp;lt;String, Integer&amp;gt;("OrderTopic", "Dell desktops", 15);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;RecordMetadata&lt;/SPAN&gt;&lt;SPAN&gt; recordMetadata = producer.send(record).get();&lt;/SPAN&gt;&lt;SPAN&gt;// synchronous call,waiting&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//RecordMetadata recordMetadatatwo = producer.send(recordtwo).get();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(recordMetadata.partition());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(recordMetadata.offset());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//System.out.println(recordMetadatatwo.partition());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//System.out.println(recordMetadatatwo.offset());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.err.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Message sent"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;Exception&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e.printStackTrace();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;finally&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; producer.close();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;SPAN&gt;} This is my consumer code-&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.time.Duration;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.time.LocalDateTime;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.time.format.DateTimeFormatter;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.util.Arrays;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.util.Properties;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.ConsumerRecords;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.ConsumerConfig;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.ConsumerRecord;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.KafkaConsumer;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.common.serialization.StringDeserializer;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.common.errors.WakeupException;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt; new_consumer {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;static&lt;/SPAN&gt; &lt;SPAN&gt;void&lt;/SPAN&gt;&lt;SPAN&gt; main(&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;[] args)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"I am a Kafka Consumer"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt; bootstrapServers = &lt;/SPAN&gt;&lt;SPAN&gt;"localhost:9092"&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt; groupId = &lt;/SPAN&gt;&lt;SPAN&gt;"salesOrder-Consumers1"&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt; topic = &lt;/SPAN&gt;&lt;SPAN&gt;"OrderTopic"&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// create consumer configs&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;Properties&lt;/SPAN&gt;&lt;SPAN&gt; properties = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt;&lt;SPAN&gt; Properties();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.&lt;/SPAN&gt;&lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt;.getName());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.&lt;/SPAN&gt;&lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt;.getName());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &lt;/SPAN&gt;&lt;SPAN&gt;"earliest"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// create consumer&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;KafkaConsumer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; consumer = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt; &lt;SPAN&gt;KafkaConsumer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&amp;gt;(properties);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//System.out.println(consumer.position(tp));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// get a reference to the current thread&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;final&lt;/SPAN&gt; &lt;SPAN&gt;Thread&lt;/SPAN&gt;&lt;SPAN&gt; mainThread = Thread.currentThread();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// adding the shutdown hook&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; Runtime.getRuntime().addShutdownHook(&lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt;&lt;SPAN&gt; Thread() {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;void&lt;/SPAN&gt;&lt;SPAN&gt; run() {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Detected a shutdown, let's exit by calling consumer.wakeup()..."&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.wakeup();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// join the main thread to allow the execution of the code in the main thread&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; mainThread.join();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;InterruptedException&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e.printStackTrace();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; });&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// subscribe consumer to our topic(s)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.subscribe(Arrays.asList(topic));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;DateTimeFormatter&lt;/SPAN&gt;&lt;SPAN&gt; dtf = DateTimeFormatter.ofPattern(&lt;/SPAN&gt;&lt;SPAN&gt;"yyyy/MM/dd"&lt;/SPAN&gt;&lt;SPAN&gt;); &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;LocalDateTime&lt;/SPAN&gt;&lt;SPAN&gt; now = LocalDateTime.now(); &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(dtf.format(now));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;while&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;ConsumerRecords&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; records =&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.poll(Duration.ofMillis(&lt;/SPAN&gt;&lt;SPAN&gt;100&lt;/SPAN&gt;&lt;SPAN&gt;));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;for&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;ConsumerRecord&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; record &lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;SPAN&gt; records) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//FW.write(record.value()+(","));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Key: "&lt;/SPAN&gt;&lt;SPAN&gt; + record.key() + &lt;/SPAN&gt;&lt;SPAN&gt;", Value: "&lt;/SPAN&gt;&lt;SPAN&gt; + record.value());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Partition: "&lt;/SPAN&gt;&lt;SPAN&gt; + record.partition() + &lt;/SPAN&gt;&lt;SPAN&gt;", Offset:"&lt;/SPAN&gt;&lt;SPAN&gt; + record.offset());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;WakeupException&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Wake up exception!"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// we ignore this as this is an expected exception when closing a consumer&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;Exception&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Unexpected exception "&lt;/SPAN&gt;&lt;SPAN&gt; + e.getMessage());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;finally&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.close(); &lt;/SPAN&gt;&lt;SPAN&gt;// this will also commit the offsets if need be.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"The consumer is now gracefully closed."&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;} ....can you please help me out?&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Thu, 29 Feb 2024 08:50:55 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384226#M245278</guid>
      <dc:creator>Andy1989</dc:creator>
      <dc:date>2024-02-29T08:50:55Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384409#M245363</link>
      <description>&lt;P&gt;The Kafka broker address is specified as "localhost". Is the broker running on the same host as the producer？I guess not.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"bootstrap.servers"&lt;/SPAN&gt;&lt;SPAN&gt;,&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;"localhost:9092"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Please use IP address or hostname of the host where your Kafka broker is running and try again.&lt;/DIV&gt;</description>
      <pubDate>Fri, 01 Mar 2024 05:08:20 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384409#M245363</guid>
      <dc:creator>ywu</dc:creator>
      <dc:date>2024-03-01T05:08:20Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384886#M245540</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/109411"&gt;@Andy1989&lt;/a&gt;,&amp;nbsp;Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 12 Mar 2024 09:43:05 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384886#M245540</guid>
      <dc:creator>VidyaSargur</dc:creator>
      <dc:date>2024-03-12T09:43:05Z</dc:date>
    </item>
  </channel>
</rss>

