Support Questions

Find answers, ask questions, and share your expertise

KafkaPublisher.send() does not publishes messages to Kafka even after publisher.flush() and publisher.close()

Super Guru

Hi

I am writing the most simplest of publishers. The publisher is on my laptop and cluster is remote. I have done telnet and everything works. Except I don't see my messages being published. The program starts by calling "readFile()" from a test main class. I am copying my code below:

public class Publisher {

    private static final String FILE_NAME = "/tmp/device-data.txt" ;
    private static final String TOPIC = "device" ;
    private static final String BOOTSTRAP_SERVERS = "<my host and port>" ;
    private static final String PRODUCER_GROUP = "DeviceDataProducer" ;

    private KafkaProducer<Long, String> producer = null ;

    public Publisher (){

        producer = new KafkaProducer<Long, String>(initializeKafkaConnection());

    }

    public void readFile(){

        BufferedReader bufferedReader = null ;

        try{
            bufferedReader = new BufferedReader(new FileReader(FILE_NAME));

            bufferedReader.lines().forEach(i -> publishToKafka(i));
        }
        catch(IOException exception){
           exception.printStackTrace();
        }

        finally{

            if (bufferedReader != null){
                try{
                    bufferedReader.close();
                }
                catch(IOException exception){
                    System.out.println();
                    exception.printStackTrace();
                }

            }

            producer.flush();
            producer.close();
        }

    }

    private void publishToKafka(String newEvent) {

        long timestamp = System.currentTimeMillis();

        ProducerRecord<Long, String> record = null ;

        record = new ProducerRecord<>(TOPIC, timestamp, newEvent);

        producer.send(record);

        System.out.println(newEvent);

    }

    private Properties initializeKafkaConnection(){

        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_GROUP);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 0);
        //properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 0);

        return properties ;
    }
}
1 REPLY 1

Rising Star

@mquershi please note that this API is asynchronous. Here's the method doc:

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)

Asynchronously send a record to a topic. Equivalent to send(record, null). See send(ProducerRecord, Callback) for details.

You should call .get() after send to make sure the event's actually sent out.