Support Questions
Find answers, ask questions, and share your expertise

Unable to send GenericRecord data from Kafka Producer in AVRO format

Unable to send GenericRecord data from Kafka Producer in AVRO format

Contributor

Using HDF 2.1.1 Confluent-oss-5.0.0-2.11
My Kafka Producer code is

public class AvroProducer {
 public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("ZOOKEEPER_HOST", "localhost");
        //props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081");
        String topic = "confluent-new";

        Schema.Parser parser = new Schema.Parser();
// I will get below schema string from SCHEMA REGISTRY
        Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"location\",\"type\":\"string\",\"default\":\"Noida\"}]}");

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
        GenericRecord record = new GenericData.Record(schema);
        record.put("uID", "06080000");
        record.put("userName", "User data10");
        record.put("company", "User data10");
        record.put("age", 12);
        record.put("location", "User data10");

        ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
        producer.send(recordData);

        System.out.println("Message Sent");
    }

I am able to see Message Sent on the console.

Kafka Consumer code is:

publicclassAvroConsumer{publicstaticvoid main(String[] args)throwsExecutionException,InterruptedException{Properties props =newProperties();
    props.put("bootstrap.servers","localhost:9092");
    props.put("ZOOKEEPER_HOST","localhost");
    props.put("acks","all");
    props.put("retries",0);
    props.put("group.id","consumer1");
    props.put("auto.offset.reset","latest");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("schema.registry.url","http://localhost:8081");String topic ="confluent-new";KafkaConsumer<String,GenericRecord> consumer =newKafkaConsumer<String,GenericRecord>(props);
    consumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,GenericRecord> recs = consumer.poll(10000);for(ConsumerRecord<String,GenericRecord> rec : recs){System.out.printf("{AvroUtilsConsumerUser}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());}}}

I am unable to see message(data) on the Kafka consumer end. Also I checked the offset count/status for confluent_new topic and its not updating. Seems like Producer code is having some problem. Any pointer would be helpful.

Meanwhile below Producer code is working and here POJO i.e. User is avro-tools generated POJO.

publicclassAvroProducer{publicstaticvoid main(String[] args)throwsExecutionException,InterruptedException{Properties props =newProperties();
        kafkaParams.put("auto.offset.reset","smallest");
        kafkaParams.put("ZOOKEEPER_HOST","bihdp01");*/
        props.put("bootstrap.servers","localhost:9092");
        props.put("ZOOKEEPER_HOST","localhost");
        props.put("acks","all");
        props.put("retries",0);
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url","http://localhost:8081");String topic ="confluent-new";Producer<String,User> producer =newKafkaProducer<String,User>(props);User user =newUser();
        user.setUID("0908");
        user.setUserName("User data10");
        user.setCompany("HCL");
        user.setAge(20);
        user.setLocation("Noida");ProducerRecord<String,User> record =newProducerRecord<String,User>(topic,(String) user.getUID(), user);
        producer.send(record).get();System.out.println("Sent");}

P.S. My requirement is to send the received JSON data from source KAFKA topic to destination KAFKA topic in AVRO format. First I am infering AVRO schema from received JSON data using AVRO4S and registering the schema to SCHEMA REGISTRY. Next is to pull data from received JSON and populate in GenericRecord instance and send this GenericRecord instance to Kafka topic using KafkaAvroSerializer. At consumer end I will use KafkaAvroDeserializer to deserialize the received AVRO data.