Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

KafkaPublisher.send() does not publishes messages to Kafka even after publisher.flush() and publisher.close()

Highlighted

KafkaPublisher.send() does not publishes messages to Kafka even after publisher.flush() and publisher.close()

Super Guru

Hi

I am writing the most simplest of publishers. The publisher is on my laptop and cluster is remote. I have done telnet and everything works. Except I don't see my messages being published. The program starts by calling "readFile()" from a test main class. I am copying my code below:

public class Publisher {

    private static final String FILE_NAME = "/tmp/device-data.txt" ;
    private static final String TOPIC = "device" ;
    private static final String BOOTSTRAP_SERVERS = "<my host and port>" ;
    private static final String PRODUCER_GROUP = "DeviceDataProducer" ;

    private KafkaProducer<Long, String> producer = null ;

    public Publisher (){

        producer = new KafkaProducer<Long, String>(initializeKafkaConnection());

    }

    public void readFile(){

        BufferedReader bufferedReader = null ;

        try{
            bufferedReader = new BufferedReader(new FileReader(FILE_NAME));

            bufferedReader.lines().forEach(i -> publishToKafka(i));
        }
        catch(IOException exception){
           exception.printStackTrace();
        }

        finally{

            if (bufferedReader != null){
                try{
                    bufferedReader.close();
                }
                catch(IOException exception){
                    System.out.println();
                    exception.printStackTrace();
                }

            }

            producer.flush();
            producer.close();
        }

    }

    private void publishToKafka(String newEvent) {

        long timestamp = System.currentTimeMillis();

        ProducerRecord<Long, String> record = null ;

        record = new ProducerRecord<>(TOPIC, timestamp, newEvent);

        producer.send(record);

        System.out.println(newEvent);

    }

    private Properties initializeKafkaConnection(){

        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_GROUP);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 0);
        //properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 0);

        return properties ;
    }
}
1 REPLY 1

Re: KafkaPublisher.send() does not publishes messages to Kafka even after publisher.flush() and publisher.close()

Rising Star

@mquershi please note that this API is asynchronous. Here's the method doc:

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)

Asynchronously send a record to a topic. Equivalent to send(record, null). See send(ProducerRecord, Callback) for details.

You should call .get() after send to make sure the event's actually sent out.

Don't have an account?
Coming from Hortonworks? Activate your account here