Community Articles

Find and share helpful community-sourced technical articles.
avatar
Super Guru

The page Connecting Kafka clients to Data Hub provisioned clusters in the Cloudera documentation explains how to configure the Kafka Console Consumer to connect to a Kafka cluster in DataHub.

In this article, I will expand that explanation for a generic Java client and will show how to configure the client to use Schema Registry for the storage and retrieval of schemas.

 

A full version of the code in this article can be found in GitHub.

Client initialization

The initialization of the KafkaConsumer or KafkaProducer client is done simply by passing a Properties object to its constructor. The Properties object can be either initialized programmatically or loaded from a properties file.

Example:

Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1.example.com:9093");
... initialize other properties ...
KafkaConsumer<String, Transaction> consumer = new KafkaConsumer<>(props);

The initialization of a KafkaProducer is identical to the above.

This consumer reads a key of type String and a value of type Transaction from each Kafka message. Transaction is an Avro object, which can be transparently serialized and deserialized with the help of the KafkaAvroSerializer and KafkaAvroDeserializer, respectively, as we will see later in this article.

Common properties

The properties listed in this section are common to both the KafkaConsumer and KafkaProducer.

The properties below specify the address of the cluster and the security settings used for authentication.

bootstrap.servers=<kafka-broker-fqdn>:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=<TRUSTSTORE_PATH>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<USERNAME>" password="<PASSWORD>";
  • The bootstrap.servers property can take a list of broker addresses, separated by comma. The broker address should always use the broker's fully qualified domain name (FQDN) and have a port number.
  • If you are running this client in one node of your CDP Public Cloud cluster, the truststore path can be set to /var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks. If you are running the client outside of the cluster, copy the truststore from one of the cluster hosts to the machine where the client will run and specify its location in the configuration.
  • The username specified in the sasl.jaas.config property must be a CDP Public Cloud user and its password is the Workload Password that the user can set in the CDP Console.

In order to connect to Schema Registry, you must also set the properties below:

schema.registry.url=<SCHEMA_REGISTRY_ENDPOINT>
schema.registry.auth.username=<USERNAME>
schema.registry.auth.password=<PASSWORD>
  • If schema.registry.url is not set, the client will not try to connect to Schema Registry.
  • If this property is set, the username and password ones must also be configured.
  • The Schema Registry Endpoint can be found in the CDP Console, under the Endpoints tab of your DataHub Kafka cluster:araujo_0-1644384432489.png

Kafka-Avro Serializer and Deserializer

Cloudera provides Avro-specific serializer and deserializer classes for use with Kafka clients. This makes it really easy to write Avro objects to Kafka and read them later, since the serialization and deserialization are handled transparently.

 

These classes are Schema Registry-aware. When configured with the Schema Registry service details, the serializer will ensure that the schema used for serializing the messages that are sent to Kafka is a registry in Schema Registry. It will also add the schema's identifier from Schema Registry to the message payload, so that the consumer will know what's the correct schema version to fetch.

 

When the consumer reads those messages, the deserializer will fetch that schema using the identifier from the message and will use the schema to correctly deserialize the payload.

You must set the value.serializer and value.deserializer properties of your clients to use these classes to be able to handle the Avro messages correctly using Schema Registry, as shown as follows:

Producer properties:

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer

Consumer properties:

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer
specific.avro.reader=<true OR false>
  • The specific.avro.reader property is optional (defaults to false). It specifies whether the deserialized object should be an Avro GenericRecord (false) or an Avro specific object (true).

Other properties

Kafka consumers and producers accept many other properties, as listed in the Apache Kafka documentation. You should complete the configuration of your consumer and producer clients by setting the necessary/appropriate properties for them.

 

The KafkaConsumer, for example, requires at least one more property to be set:

group.id=<my-consumer-group-name>

Producing Avro messages to Kafka

Once the clients have been configured as per above, producing data to a topic is as simple as instantiating an Avro object and using the producer to send it to Kafka:

Transaction transaction = new Transaction();
// set transaction properties here if required
ProducerRecord<String, Transaction> record = new ProducerRecord<>("my-topic", transaction);
producer.send(record, new ProducerCallback());

Consuming Avro messages from Kafka

Consuming messages from Kafka is as simple as:

consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, Transaction> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Transaction> record : records) {
Transaction transaction = record.value();
System.out.println(transaction);
}
}

Interacting with Schema Registry explicitly

Some applications may need to connect directly to Schema Registry to interact with it. For example, to retrieve or store a schema.

 

A Schema Registry API client is also provided by Cloudera for these cases. For example, to retrieve the latest version of a schema you can use the following:

Map<String, Object> config = new HashMap<>();
config.put("schema.registry.url", "<SCHEMA_REGISTRY_ENDPOINT>");
config.put("schema.registry.auth.username", "<USERNAME>");
config.put("schema.registry.auth.password", "<PASSWORD>");

String topicName = "my-topic";
SchemaRegistryClient client = new SchemaRegistryClient(config);
try {
SchemaVersionInfo latest = client.getLatestSchemaVersionInfo(topicName);
System.out.println(latest.getSchemaText());
} catch (SchemaNotFoundException e) {
LOG.info("Schema [{}] not found", topicName);
throw e;
}
3,722 Views