Member since
02-28-2024
2
Posts
2
Kudos Received
0
Solutions
02-29-2024
12:50 AM
1 Kudo
This is my producer code- import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class new_producer { public static void main(String[] args) { Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "localhost:9092"); prop.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.setProperty("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); KafkaProducer<String, Integer> producer = new KafkaProducer<String, Integer>(prop); ProducerRecord<String, Integer> record = new ProducerRecord<String, Integer>("OrderTopic", "tddgfhj laptops", 15); //ProducerRecord<String, Integer> recordtwo = new ProducerRecord<String, Integer>("OrderTopic", "Dell desktops", 15); try { RecordMetadata recordMetadata = producer.send(record).get();// synchronous call,waiting //RecordMetadata recordMetadatatwo = producer.send(recordtwo).get(); System.out.println(recordMetadata.partition()); System.out.println(recordMetadata.offset()); //System.out.println(recordMetadatatwo.partition()); //System.out.println(recordMetadatatwo.offset()); System.err.println("Message sent"); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } } This is my consumer code- import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.errors.WakeupException; public class new_consumer { public static void main(String[] args) { System.out.println("I am a Kafka Consumer"); String bootstrapServers = "localhost:9092"; String groupId = "salesOrder-Consumers1"; String topic = "OrderTopic"; // create consumer configs Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // create consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //System.out.println(consumer.position(tp)); // get a reference to the current thread final Thread mainThread = Thread.currentThread(); // adding the shutdown hook Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("Detected a shutdown, let's exit by calling consumer.wakeup()..."); consumer.wakeup(); // join the main thread to allow the execution of the code in the main thread try { mainThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }); try { // subscribe consumer to our topic(s) consumer.subscribe(Arrays.asList(topic)); DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd"); LocalDateTime now = LocalDateTime.now(); System.out.println(dtf.format(now)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { //FW.write(record.value()+(",")); System.out.println("Key: " + record.key() + ", Value: " + record.value()); System.out.println("Partition: " + record.partition() + ", Offset:" + record.offset()); } } } catch (WakeupException e) { System.out.println("Wake up exception!"); // we ignore this as this is an expected exception when closing a consumer } catch (Exception e) { System.out.println("Unexpected exception " + e.getMessage()); } finally { consumer.close(); // this will also commit the offsets if need be. System.out.println("The consumer is now gracefully closed."); } } } ....can you please help me out?
... View more
02-28-2024
09:23 PM
1 Kudo
Hello,I am a beginner starting to learn Kafka and Cloudera. I am running a simple java code for producer,and am using Cloudera SMM through Docker Desktop. On running my java code,I keep getting this error-"Disconnecting from node 1001 due to socket connection setup timeout". There are no issues in my code or pom. I am using Java Maven in VSCode. Please help me to fix this issue
... View more
Labels:
- Labels:
-
Apache Kafka