<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Kafka  broker disconnecting from node due to socket connection timeout in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384217#M245274</link>
    <description>&lt;P&gt;Hello,I am a beginner starting to learn Kafka and Cloudera. I am running a simple java code for producer,and am using Cloudera SMM through Docker Desktop. On running my java code,I keep getting this error-"Disconnecting from node 1001 due to socket connection setup timeout". There are no issues in my code or pom. I am using Java Maven in VSCode. Please help me to fix this issue&lt;/P&gt;</description>
    <pubDate>Mon, 23 Mar 2026 12:55:21 GMT</pubDate>
    <dc:creator>Andy1989</dc:creator>
    <dc:date>2026-03-23T12:55:21Z</dc:date>
    <item>
      <title>Kafka  broker disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384217#M245274</link>
      <description>&lt;P&gt;Hello,I am a beginner starting to learn Kafka and Cloudera. I am running a simple java code for producer,and am using Cloudera SMM through Docker Desktop. On running my java code,I keep getting this error-"Disconnecting from node 1001 due to socket connection setup timeout". There are no issues in my code or pom. I am using Java Maven in VSCode. Please help me to fix this issue&lt;/P&gt;</description>
      <pubDate>Mon, 23 Mar 2026 12:55:21 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384217#M245274</guid>
      <dc:creator>Andy1989</dc:creator>
      <dc:date>2026-03-23T12:55:21Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384221#M245276</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/109411"&gt;@Andy1989&lt;/a&gt;&amp;nbsp;Welcome to our community! To help you get the best possible answer, I have tagged in our Kafka experts&amp;nbsp; &lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/15085"&gt;@Yuexin Zhang&lt;/a&gt;&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/83812"&gt;@ywu&lt;/a&gt;&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/81193"&gt;@Babasaheb&lt;/a&gt;&amp;nbsp;@ who may be able to assist you further&lt;BR /&gt;&lt;BR /&gt;Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.&lt;/P&gt;</description>
      <pubDate>Thu, 29 Feb 2024 07:14:08 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384221#M245276</guid>
      <dc:creator>VidyaSargur</dc:creator>
      <dc:date>2024-02-29T07:14:08Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384223#M245277</link>
      <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/109411"&gt;@Andy1989&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;"socket connection setup timeout" sounds like some network issue on client side.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;May I know you specify Kafka broker address and port in your code?&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;Is the address solvable from client side and is the port number of Kafka broker correct?&lt;/P&gt;</description>
      <pubDate>Thu, 29 Feb 2024 07:24:48 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384223#M245277</guid>
      <dc:creator>ywu</dc:creator>
      <dc:date>2024-02-29T07:24:48Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384226#M245278</link>
      <description>&lt;P&gt;This is my producer code-&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.util.Properties;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.producer.KafkaProducer;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.producer.ProducerRecord;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.producer.RecordMetadata;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt; new_producer {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;static&lt;/SPAN&gt; &lt;SPAN&gt;void&lt;/SPAN&gt;&lt;SPAN&gt; main(&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;[] args) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;Properties&lt;/SPAN&gt;&lt;SPAN&gt; prop = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt;&lt;SPAN&gt; Properties();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"bootstrap.servers"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"localhost:9092"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"key.serializer"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"org.apache.kafka.common.serialization.StringSerializer"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"value.serializer"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"org.apache.kafka.common.serialization.IntegerSerializer"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;KafkaProducer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; producer = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt; &lt;SPAN&gt;KafkaProducer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;(prop);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;ProducerRecord&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; record = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt; &lt;SPAN&gt;ProducerRecord&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;Integer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"OrderTopic"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"tddgfhj laptops"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;15&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//ProducerRecord&amp;lt;String, Integer&amp;gt; recordtwo = new ProducerRecord&amp;lt;String, Integer&amp;gt;("OrderTopic", "Dell desktops", 15);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;RecordMetadata&lt;/SPAN&gt;&lt;SPAN&gt; recordMetadata = producer.send(record).get();&lt;/SPAN&gt;&lt;SPAN&gt;// synchronous call,waiting&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//RecordMetadata recordMetadatatwo = producer.send(recordtwo).get();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(recordMetadata.partition());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(recordMetadata.offset());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//System.out.println(recordMetadatatwo.partition());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//System.out.println(recordMetadatatwo.offset());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.err.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Message sent"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;Exception&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e.printStackTrace();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;finally&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; producer.close();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;SPAN&gt;} This is my consumer code-&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.time.Duration;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.time.LocalDateTime;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.time.format.DateTimeFormatter;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.util.Arrays;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; java.util.Properties;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.ConsumerRecords;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.ConsumerConfig;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.ConsumerRecord;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.clients.consumer.KafkaConsumer;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.common.serialization.StringDeserializer;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; org.apache.kafka.common.errors.WakeupException;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt; new_consumer {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;static&lt;/SPAN&gt; &lt;SPAN&gt;void&lt;/SPAN&gt;&lt;SPAN&gt; main(&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;[] args)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"I am a Kafka Consumer"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt; bootstrapServers = &lt;/SPAN&gt;&lt;SPAN&gt;"localhost:9092"&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt; groupId = &lt;/SPAN&gt;&lt;SPAN&gt;"salesOrder-Consumers1"&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt; topic = &lt;/SPAN&gt;&lt;SPAN&gt;"OrderTopic"&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// create consumer configs&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;Properties&lt;/SPAN&gt;&lt;SPAN&gt; properties = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt;&lt;SPAN&gt; Properties();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.&lt;/SPAN&gt;&lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt;.getName());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.&lt;/SPAN&gt;&lt;SPAN&gt;class&lt;/SPAN&gt;&lt;SPAN&gt;.getName());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &lt;/SPAN&gt;&lt;SPAN&gt;"earliest"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// create consumer&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;KafkaConsumer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; consumer = &lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt; &lt;SPAN&gt;KafkaConsumer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&amp;gt;(properties);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//System.out.println(consumer.position(tp));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// get a reference to the current thread&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;final&lt;/SPAN&gt; &lt;SPAN&gt;Thread&lt;/SPAN&gt;&lt;SPAN&gt; mainThread = Thread.currentThread();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// adding the shutdown hook&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; Runtime.getRuntime().addShutdownHook(&lt;/SPAN&gt;&lt;SPAN&gt;new&lt;/SPAN&gt;&lt;SPAN&gt; Thread() {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;public&lt;/SPAN&gt; &lt;SPAN&gt;void&lt;/SPAN&gt;&lt;SPAN&gt; run() {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Detected a shutdown, let's exit by calling consumer.wakeup()..."&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.wakeup();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// join the main thread to allow the execution of the code in the main thread&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; mainThread.join();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;InterruptedException&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e.printStackTrace();&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; });&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;try&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// subscribe consumer to our topic(s)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.subscribe(Arrays.asList(topic));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;DateTimeFormatter&lt;/SPAN&gt;&lt;SPAN&gt; dtf = DateTimeFormatter.ofPattern(&lt;/SPAN&gt;&lt;SPAN&gt;"yyyy/MM/dd"&lt;/SPAN&gt;&lt;SPAN&gt;); &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;LocalDateTime&lt;/SPAN&gt;&lt;SPAN&gt; now = LocalDateTime.now(); &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(dtf.format(now));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;while&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;ConsumerRecords&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; records =&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.poll(Duration.ofMillis(&lt;/SPAN&gt;&lt;SPAN&gt;100&lt;/SPAN&gt;&lt;SPAN&gt;));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;for&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;ConsumerRecord&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;&lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;String&lt;/SPAN&gt;&lt;SPAN&gt;&amp;gt; record &lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;SPAN&gt; records) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;//FW.write(record.value()+(","));&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Key: "&lt;/SPAN&gt;&lt;SPAN&gt; + record.key() + &lt;/SPAN&gt;&lt;SPAN&gt;", Value: "&lt;/SPAN&gt;&lt;SPAN&gt; + record.value());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Partition: "&lt;/SPAN&gt;&lt;SPAN&gt; + record.partition() + &lt;/SPAN&gt;&lt;SPAN&gt;", Offset:"&lt;/SPAN&gt;&lt;SPAN&gt; + record.offset());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;WakeupException&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Wake up exception!"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;// we ignore this as this is an expected exception when closing a consumer&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; } &lt;/SPAN&gt;&lt;SPAN&gt;catch&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;SPAN&gt;Exception&lt;/SPAN&gt;&lt;SPAN&gt; e) {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"Unexpected exception "&lt;/SPAN&gt;&lt;SPAN&gt; + e.getMessage());&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;finally&lt;/SPAN&gt;&lt;SPAN&gt; {&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer.close(); &lt;/SPAN&gt;&lt;SPAN&gt;// this will also commit the offsets if need be.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; System.out.println(&lt;/SPAN&gt;&lt;SPAN&gt;"The consumer is now gracefully closed."&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;} ....can you please help me out?&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Thu, 29 Feb 2024 08:50:55 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384226#M245278</guid>
      <dc:creator>Andy1989</dc:creator>
      <dc:date>2024-02-29T08:50:55Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384409#M245363</link>
      <description>&lt;P&gt;The Kafka broker address is specified as "localhost". Is the broker running on the same host as the producer？I guess not.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; prop.setProperty(&lt;/SPAN&gt;&lt;SPAN&gt;"bootstrap.servers"&lt;/SPAN&gt;&lt;SPAN&gt;,&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;"localhost:9092"&lt;/SPAN&gt;&lt;SPAN&gt;);&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Please use IP address or hostname of the host where your Kafka broker is running and try again.&lt;/DIV&gt;</description>
      <pubDate>Fri, 01 Mar 2024 05:08:20 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384409#M245363</guid>
      <dc:creator>ywu</dc:creator>
      <dc:date>2024-03-01T05:08:20Z</dc:date>
    </item>
    <item>
      <title>Re: Disconnecting from node due to socket connection timeout</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384886#M245540</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/109411"&gt;@Andy1989&lt;/a&gt;,&amp;nbsp;Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 12 Mar 2024 09:43:05 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-broker-disconnecting-from-node-due-to-socket/m-p/384886#M245540</guid>
      <dc:creator>VidyaSargur</dc:creator>
      <dc:date>2024-03-12T09:43:05Z</dc:date>
    </item>
  </channel>
</rss>

