Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Disconnecting from node due to socket connection timeout

avatar
New Contributor

Hello,I am a beginner starting to learn Kafka and Cloudera. I am running a simple java code for producer,and am using Cloudera SMM through Docker Desktop. On running my java code,I keep getting this error-"Disconnecting from node 1001 due to socket connection setup timeout". There are no issues in my code or pom. I am using Java Maven in VSCode. Please help me to fix this issue

1 ACCEPTED SOLUTION

avatar
Expert Contributor

The Kafka broker address is specified as "localhost". Is the broker running on the same host as the producer?I guess not. 

 

        prop.setProperty("bootstrap.servers""localhost:9092");
 
Please use IP address or hostname of the host where your Kafka broker is running and try again.

View solution in original post

5 REPLIES 5

avatar
Community Manager

@Andy1989 Welcome to our community! To help you get the best possible answer, I have tagged in our Kafka experts  @Yuexin Zhang @ywu @Babasaheb @ who may be able to assist you further

Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Expert Contributor

Hello @Andy1989 ,

"socket connection setup timeout" sounds like some network issue on client side. 

May I know you specify Kafka broker address and port in your code? 

Is the address solvable from client side and is the port number of Kafka broker correct?

avatar
New Contributor

This is my producer code-

import java.util.Properties;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class new_producer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "localhost:9092");
        prop.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.setProperty("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

        KafkaProducer<String, Integer> producer = new KafkaProducer<String, Integer>(prop);
        ProducerRecord<String, Integer> record = new ProducerRecord<String, Integer>("OrderTopic", "tddgfhj laptops", 15);
        //ProducerRecord<String, Integer> recordtwo = new ProducerRecord<String, Integer>("OrderTopic", "Dell desktops", 15);

        try {
            RecordMetadata recordMetadata = producer.send(record).get();// synchronous call,waiting
            //RecordMetadata recordMetadatatwo = producer.send(recordtwo).get();
            System.out.println(recordMetadata.partition());
            System.out.println(recordMetadata.offset());
            //System.out.println(recordMetadatatwo.partition());
            //System.out.println(recordMetadatatwo.offset());

            System.err.println("Message sent");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
} This is my consumer code- 

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.errors.WakeupException;
public class new_consumer {
   public static void main(String[] args)
    {
        System.out.println("I am a Kafka Consumer");
        String bootstrapServers = "localhost:9092";
        String groupId = "salesOrder-Consumers1";
        String topic = "OrderTopic";
 
        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
       
       
 
        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //System.out.println(consumer.position(tp));
        // get a reference to the current thread
        final Thread mainThread = Thread.currentThread();
 
        // adding the shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                System.out.println("Detected a shutdown, let's exit by calling consumer.wakeup()...");
                consumer.wakeup();
 
                // join the main thread to allow the execution of the code in the main thread
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
       
        try {
 
            // subscribe consumer to our topic(s)
            consumer.subscribe(Arrays.asList(topic));
            DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd");  
            LocalDateTime now = LocalDateTime.now();  
            System.out.println(dtf.format(now));
           
            while (true) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(100));
 
                for (ConsumerRecord<String, String> record : records) {
                    //FW.write(record.value()+(","));
                    System.out.println("Key: " + record.key() + ", Value: " + record.value());
                    System.out.println("Partition: " + record.partition() + ", Offset:" + record.offset());
                }
            }
        }
       
        catch (WakeupException e) {
            System.out.println("Wake up exception!");
            // we ignore this as this is an expected exception when closing a consumer
        } catch (Exception e) {
            System.out.println("Unexpected exception " + e.getMessage());
        }
        finally {
            consumer.close(); // this will also commit the offsets if need be.
            System.out.println("The consumer is now gracefully closed.");
        }
    }
} ....can you please help me out?

avatar
Expert Contributor

The Kafka broker address is specified as "localhost". Is the broker running on the same host as the producer?I guess not. 

 

        prop.setProperty("bootstrap.servers""localhost:9092");
 
Please use IP address or hostname of the host where your Kafka broker is running and try again.

avatar
Community Manager

@Andy1989, Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future. 



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: