Created on 02-08-2022 10:51 PM - edited on 02-10-2022 05:56 PM by subratadas
The page Connecting Kafka clients to Data Hub provisioned clusters in the Cloudera documentation explains how to configure the Kafka Console Consumer to connect to a Kafka cluster in DataHub.
In this article, I will expand that explanation for a generic Java client and will show how to configure the client to use Schema Registry for the storage and retrieval of schemas.
A full version of the code in this article can be found in GitHub.
The initialization of the KafkaConsumer or KafkaProducer client is done simply by passing a Properties object to its constructor. The Properties object can be either initialized programmatically or loaded from a properties file.
Example:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1.example.com:9093");
... initialize other properties ...
KafkaConsumer<String, Transaction> consumer = new KafkaConsumer<>(props);
The initialization of a KafkaProducer is identical to the above.
This consumer reads a key of type String and a value of type Transaction from each Kafka message. Transaction is an Avro object, which can be transparently serialized and deserialized with the help of the KafkaAvroSerializer and KafkaAvroDeserializer, respectively, as we will see later in this article.
The properties listed in this section are common to both the KafkaConsumer and KafkaProducer.
The properties below specify the address of the cluster and the security settings used for authentication.
bootstrap.servers=<kafka-broker-fqdn>:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=<TRUSTSTORE_PATH>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<USERNAME>" password="<PASSWORD>";
In order to connect to Schema Registry, you must also set the properties below:
schema.registry.url=<SCHEMA_REGISTRY_ENDPOINT>
schema.registry.auth.username=<USERNAME>
schema.registry.auth.password=<PASSWORD>
Cloudera provides Avro-specific serializer and deserializer classes for use with Kafka clients. This makes it really easy to write Avro objects to Kafka and read them later, since the serialization and deserialization are handled transparently.
These classes are Schema Registry-aware. When configured with the Schema Registry service details, the serializer will ensure that the schema used for serializing the messages that are sent to Kafka is a registry in Schema Registry. It will also add the schema's identifier from Schema Registry to the message payload, so that the consumer will know what's the correct schema version to fetch.
When the consumer reads those messages, the deserializer will fetch that schema using the identifier from the message and will use the schema to correctly deserialize the payload.
You must set the value.serializer and value.deserializer properties of your clients to use these classes to be able to handle the Avro messages correctly using Schema Registry, as shown as follows:
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer
specific.avro.reader=<true OR false>
Kafka consumers and producers accept many other properties, as listed in the Apache Kafka documentation. You should complete the configuration of your consumer and producer clients by setting the necessary/appropriate properties for them.
The KafkaConsumer, for example, requires at least one more property to be set:
group.id=<my-consumer-group-name>
Once the clients have been configured as per above, producing data to a topic is as simple as instantiating an Avro object and using the producer to send it to Kafka:
Transaction transaction = new Transaction();
// set transaction properties here if required
ProducerRecord<String, Transaction> record = new ProducerRecord<>("my-topic", transaction);
producer.send(record, new ProducerCallback());
Consuming messages from Kafka is as simple as:
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, Transaction> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Transaction> record : records) {
Transaction transaction = record.value();
System.out.println(transaction);
}
}
Some applications may need to connect directly to Schema Registry to interact with it. For example, to retrieve or store a schema.
A Schema Registry API client is also provided by Cloudera for these cases. For example, to retrieve the latest version of a schema you can use the following:
Map<String, Object> config = new HashMap<>();
config.put("schema.registry.url", "<SCHEMA_REGISTRY_ENDPOINT>");
config.put("schema.registry.auth.username", "<USERNAME>");
config.put("schema.registry.auth.password", "<PASSWORD>");
String topicName = "my-topic";
SchemaRegistryClient client = new SchemaRegistryClient(config);
try {
SchemaVersionInfo latest = client.getLatestSchemaVersionInfo(topicName);
System.out.println(latest.getSchemaText());
} catch (SchemaNotFoundException e) {
LOG.info("Schema [{}] not found", topicName);
throw e;
}