Member since
09-14-2016
5
Posts
4
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
8123 | 09-15-2016 06:58 PM |
09-15-2016
06:58 PM
2 Kudos
I figured this out, Kafka Consumer API by default is reading only new messages from the topic, So I had to run my consumer java program first and the start my producer java program. Thanks,
... View more
09-15-2016
01:45 PM
What logs should I look for on the sandbox server? Both code compile successfully, and running both - i.e. producer runs publishes message and shutsdowns. same with the consumer code. My understanding is for the new consumer API - I need to send the broker list only. I know I am able to connect to sandbox.hortonworks.com:6667 as the producer code is able to publish the message. Yes - I am run the built in scripts for both producer and consumer. Infact the consumer script is able to read the messages published by the producer java client. I am using JDK 1.8 - I am running on my local machine using Java Eclipse IDE connecting to the sandbox on my local machine.
... View more
09-14-2016
10:08 PM
2 Kudos
I am using default Kafka service that comes with HDP 2.4 sandbox. Using Eclipse for the Java code to push/pull messages from the topic. I am able to put messages to the topic, however my consumer code is not returning any messages. Here is the producer code: package com.hcsc.kafka.test.hcsckafka;
import java.util.*;
import java.io.IOException;
import java.io.InputStream;
import org.apache.kafka.clients.producer.*;
public class HcscKafkaProducer { public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
KafkaProducer<String, String> producer;
Properties props = new Properties();
props.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(props);
ProducerRecord<String,String> record = new ProducerRecord<>("testrole2", "0914msg1");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
finally {
producer.close();
System.exit(0);
}
} } Here is my consumer code: package com.hcsc.kafka.test.hcsckafka; import java.util.Arrays;
import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; public class HcscKafkaConsumer { public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
props.put("group.id", "0914consumer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//props.put("security.protocol", "PLAINTEXTSASL");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("testrole2"));
while (true) {
try {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Topic: " + record.topic() + "Partition: " + record.partition() + " Offset: " + record.offset()
+ " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId());
}
} catch (Exception e) {
e.printStackTrace(); } finally {
kafkaConsumer.close();
//System.exit(0);
}
}
} }
... View more
Labels:
- Labels:
-
Apache Kafka