Support Questions

Find answers, ask questions, and share your expertise

How to add timestamp to message in kafka

avatar
Expert Contributor

Hi Team ,

We are trying to add timestamp to Kafka message to get more details of message travel time from producer to landing   . I wanted to check if we can add any broker properties  from Cloudera manager  .

 

Regards

Bharad

2 REPLIES 2

avatar
Rising Star

There is no Kafka property that you can add to track the timestamp of the message. Each record in a topic has a creation timestamp property, this property can be populated by the producer (by default) or by the broker, depending on how the topic is configured (https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html)

At the producer code level,  you can add headers to the record to store a timestamp, then compare this time at the consumer level, here are some Java code samples that you might use:

Producer code level:

// Create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//Save the current timstamp
long time = System.currentTimeMillis();
//Create header list
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("producer_timestamp", Longs.toByteArray(time)));
//produce the record
ProducerRecord<String,String> producerRecord = new ProducerRecord<>(topic,null, null,"message key","Message value", headers);

Note that the 3rd argument to produce the Record is the timestamp, in this case, we are sending null to allow the Kafka producer API to add the timestamp in the record.

With this code, we captured one custom timestamp as a header, before sending the message and the producer timestamp when the record was produced 

 

On the consumer side, we need to read both timestamps:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records){
log.info("Key: " + record.key() + ", value: " + record.value());
long producerTimestamp = 0L;
Headers consumedHeaders = record.headers();
for (Header header : consumedHeaders) {
log.info("Header Key" + header.key());
log.info("Header Value String" + new String(header.value()));
if (header.key().equals("producer_timestamp")) {
for (byte b : header.value()) {
// Shifting previous value 8 bits to right and
// add it with next value
producerTimestamp = (producerTimestamp << 8) + (b & 255);
}
log.info("Record timestamp: " + record.timestamp() + ", Producer timestamp: " + producerTimestamp + ", Consumer timestamp: " + System.currentTimeMillis());
}
}

We need to transform the header from bytes[] to long and then print the content:

 

log.info("Record timestamp: " + record.timestamp() + ", Producer timestamp: " + producerTimestamp + ", Consumer timestamp: " + System.currentTimeMillis());

We can see 3 different timestamps in the output, the record timestamp, which was added by the Kafka producer API, the producer timestamp, stored by the producer as a header before sending the message, and the consumer timestamp captured at the consumer side after reading the record.

Let us know if this example helps too answer your questions

avatar
Community Manager

@bhara Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: