Support Questions
Find answers, ask questions, and share your expertise

KafkaPublisher.send() does not publishes messages to Kafka even after publisher.flush() and publisher.close()

KafkaPublisher.send() does not publishes messages to Kafka even after publisher.flush() and publisher.close()

Super Guru

Hi

I am writing the most simplest of publishers. The publisher is on my laptop and cluster is remote. I have done telnet and everything works. Except I don't see my messages being published. The program starts by calling "readFile()" from a test main class. I am copying my code below:

public class Publisher {

    private static final String FILE_NAME = "/tmp/device-data.txt" ;
    private static final String TOPIC = "device" ;
    private static final String BOOTSTRAP_SERVERS = "<my host and port>" ;
    private static final String PRODUCER_GROUP = "DeviceDataProducer" ;

    private KafkaProducer<Long, String> producer = null ;

    public Publisher (){

        producer = new KafkaProducer<Long, String>(initializeKafkaConnection());

    }

    public void readFile(){

        BufferedReader bufferedReader = null ;

        try{
            bufferedReader = new BufferedReader(new FileReader(FILE_NAME));

            bufferedReader.lines().forEach(i -> publishToKafka(i));
        }
        catch(IOException exception){
           exception.printStackTrace();
        }

        finally{

            if (bufferedReader != null){
                try{
                    bufferedReader.close();
                }
                catch(IOException exception){
                    System.out.println();
                    exception.printStackTrace();
                }

            }

            producer.flush();
            producer.close();
        }

    }

    private void publishToKafka(String newEvent) {

        long timestamp = System.currentTimeMillis();

        ProducerRecord<Long, String> record = null ;

        record = new ProducerRecord<>(TOPIC, timestamp, newEvent);

        producer.send(record);

        System.out.println(newEvent);

    }

    private Properties initializeKafkaConnection(){

        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_GROUP);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 0);
        //properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 0);

        return properties ;
    }
}
1 REPLY 1
Highlighted

Re: KafkaPublisher.send() does not publishes messages to Kafka even after publisher.flush() and publisher.close()

Rising Star

@mquershi please note that this API is asynchronous. Here's the method doc:

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)

Asynchronously send a record to a topic. Equivalent to send(record, null). See send(ProducerRecord, Callback) for details.

You should call .get() after send to make sure the event's actually sent out.