Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kafka Java Consumer Client returning empty records

avatar

I am using default Kafka service that comes with HDP 2.4 sandbox. Using Eclipse for the Java code to push/pull messages from the topic. I am able to put messages to the topic, however my consumer code is not returning any messages.

Here is the producer code:

package com.hcsc.kafka.test.hcsckafka; import java.util.*; import java.io.IOException; import java.io.InputStream;

import org.apache.kafka.clients.producer.*;

public class HcscKafkaProducer {

public static void main(String[] args) throws IOException { // TODO Auto-generated method stub KafkaProducer<String, String> producer; Properties props = new Properties(); props.put("bootstrap.servers", "sandbox.hortonworks.com:6667"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(props); ProducerRecord<String,String> record = new ProducerRecord<>("testrole2", "0914msg1"); try { producer.send(record).get(); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); System.exit(0); } }

}

Here is my consumer code:

package com.hcsc.kafka.test.hcsckafka;

import java.util.Arrays; import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;

public class HcscKafkaConsumer {

public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "sandbox.hortonworks.com:6667"); props.put("group.id", "0914consumer"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //props.put("security.protocol", "PLAINTEXTSASL"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.subscribe(Arrays.asList("testrole2")); while (true) { try { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("Topic: " + record.topic() + "Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId()); } } catch (Exception e) { e.printStackTrace();

} finally { kafkaConsumer.close(); //System.exit(0); } } }

}

1 ACCEPTED SOLUTION

avatar

I figured this out, Kafka Consumer API by default is reading only new messages from the topic, So I had to run my consumer java program first and the start my producer java program.

Thanks,

View solution in original post

4 REPLIES 4

avatar
Master Guru

do you have any logs?

both compiled successfully? you are running both?

You need to connect to Zookeeper from the client.

      val kafkaConf = Map(
        "metadata.broker.list" -> "sandbox.hortonworks.com:6667",
        "zookeeper.connect" -> "sandbox.hortonworks.com:2181")

Make sure you have the ports 2181, 6667, 9092

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_HDP_Reference_Guide/content/kafka-ports....

Were you able to use the built in scripts for console producer and consumer?

http://kafka.apache.org/07/quickstart.html

What JDK are you using? Where are you running? on Sandbox? Local Machine?

It's best to build both projects and run through from the command line.

Also make sure the Kafka service is running in the sandbox, it may note be.

When running from Java, use: -Djava.net.preferIPv4Stack=true

avatar

What logs should I look for on the sandbox server?

Both code compile successfully, and running both - i.e. producer runs publishes message and shutsdowns. same with the consumer code.

My understanding is for the new consumer API - I need to send the broker list only. I know I am able to connect to sandbox.hortonworks.com:6667 as the producer code is able to publish the message.

Yes - I am run the built in scripts for both producer and consumer. Infact the consumer script is able to read the messages published by the producer java client.

I am using JDK 1.8 - I am running on my local machine using Java Eclipse IDE connecting to the sandbox on my local machine.

avatar

I figured this out, Kafka Consumer API by default is reading only new messages from the topic, So I had to run my consumer java program first and the start my producer java program.

Thanks,

avatar
Super Guru

@srivatsan chakravarti

You can also read all the messages that are in the retention period for your topic. That way you don't have to run your producer while you test your consumer. You can consume as many times you want from what was produced and it is still retained, usually 7 days, by default. You would have to use low level SimpleConsumer API to implement Java code that will emulate what you can do from the CLI with:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning