<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: How to add timestamp to message in kafka in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/How-to-add-timestamp-to-message-in-kafka/m-p/383111#M244825</link>
    <description>&lt;P&gt;There is no Kafka property that you can add to track the timestamp of the message. Each record in a topic has a creation timestamp property, this property can be populated by the producer (by default) or by the broker, depending on how the topic is configured (&lt;A href="https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html" target="_blank"&gt;https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html&lt;/A&gt;)&lt;/P&gt;&lt;P&gt;At the producer code level,&amp;nbsp; you can add headers to the record to store a timestamp, then compare this time at the consumer level, here are some Java code samples that you might use:&lt;/P&gt;&lt;P&gt;Producer code level:&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;// Create producer&lt;BR /&gt;&lt;/SPAN&gt;KafkaProducer&amp;lt;String&lt;SPAN&gt;, &lt;/SPAN&gt;String&amp;gt; producer = &lt;SPAN&gt;new &lt;/SPAN&gt;KafkaProducer&amp;lt;String&lt;SPAN&gt;, &lt;/SPAN&gt;String&amp;gt;(properties)&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;BR /&gt;//Save the current timstamp&lt;BR /&gt;&lt;SPAN&gt;long &lt;/SPAN&gt;time = System.&lt;SPAN&gt;currentTimeMillis&lt;/SPAN&gt;()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;//Create header list&lt;BR /&gt;&lt;/SPAN&gt;List&amp;lt;Header&amp;gt; headers = &lt;SPAN&gt;new &lt;/SPAN&gt;ArrayList&amp;lt;&amp;gt;()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;headers.add(&lt;SPAN&gt;new &lt;/SPAN&gt;RecordHeader(&lt;SPAN&gt;"producer_timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;Longs.&lt;SPAN&gt;toByteArray&lt;/SPAN&gt;(time)))&lt;SPAN&gt;;&lt;BR /&gt;//produce the record &lt;BR /&gt;&lt;/SPAN&gt;ProducerRecord&amp;lt;String&lt;SPAN&gt;,&lt;/SPAN&gt;String&amp;gt; producerRecord = &lt;SPAN&gt;new &lt;/SPAN&gt;ProducerRecord&amp;lt;&amp;gt;(topic&lt;SPAN&gt;,null, null,&lt;/SPAN&gt;&lt;SPAN&gt;"message key"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;"Message value"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;headers)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;Note that the 3rd argument to produce the Record is the timestamp, in this case, we are sending null to allow the Kafka producer API to add the timestamp in the record.&lt;/P&gt;&lt;P&gt;With this code, we captured one custom timestamp as a header, before sending the message and the producer timestamp when the record was produced&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;On the consumer side, we need to read both timestamps:&lt;/P&gt;&lt;PRE&gt;ConsumerRecords&amp;lt;String&lt;SPAN&gt;, &lt;/SPAN&gt;String&amp;gt; records = consumer.poll(Duration.&lt;SPAN&gt;ofMillis&lt;/SPAN&gt;(&lt;SPAN&gt;1000&lt;/SPAN&gt;))&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;for &lt;/SPAN&gt;(ConsumerRecord&amp;lt;String&lt;SPAN&gt;, &lt;/SPAN&gt;String&amp;gt; record : records){&lt;BR /&gt;    &lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Key: " &lt;/SPAN&gt;+ record.key() + &lt;SPAN&gt;", value: " &lt;/SPAN&gt;+ record.value())&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    long &lt;/SPAN&gt;producerTimestamp = &lt;SPAN&gt;0L&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;    Headers consumedHeaders = record.headers()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    for &lt;/SPAN&gt;(Header header : consumedHeaders) {&lt;BR /&gt;        &lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Header Key" &lt;/SPAN&gt;+ header.key())&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Header Value String" &lt;/SPAN&gt;+ &lt;SPAN&gt;new &lt;/SPAN&gt;String(header.value()))&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;        if &lt;/SPAN&gt;(header.key().equals(&lt;SPAN&gt;"producer_timestamp"&lt;/SPAN&gt;)) {&lt;BR /&gt;            &lt;SPAN&gt;for &lt;/SPAN&gt;(&lt;SPAN&gt;byte &lt;/SPAN&gt;b : header.value()) {&lt;BR /&gt;                &lt;SPAN&gt;// Shifting previous value 8 bits to right and&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;                // add it with next value&lt;BR /&gt;&lt;/SPAN&gt;                producerTimestamp = (producerTimestamp &amp;lt;&amp;lt; &lt;SPAN&gt;8&lt;/SPAN&gt;) + (b &amp;amp; &lt;SPAN&gt;255&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;            }&lt;BR /&gt;            &lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Record timestamp: " &lt;/SPAN&gt;+ record.timestamp() + &lt;SPAN&gt;", Producer timestamp: " &lt;/SPAN&gt;+ producerTimestamp + &lt;SPAN&gt;", Consumer timestamp: " &lt;/SPAN&gt;+ System.&lt;SPAN&gt;currentTimeMillis&lt;/SPAN&gt;())&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        }&lt;BR /&gt;    }&lt;/PRE&gt;&lt;P&gt;We need to transform the header from bytes[] to long and then print the content:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Record timestamp: " &lt;/SPAN&gt;+ record.timestamp() + &lt;SPAN&gt;", Producer timestamp: " &lt;/SPAN&gt;+ producerTimestamp + &lt;SPAN&gt;", Consumer timestamp: " &lt;/SPAN&gt;+ System.&lt;SPAN&gt;currentTimeMillis&lt;/SPAN&gt;())&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;We can see 3 different timestamps in the output, the record timestamp, which was added by the Kafka producer API, the producer timestamp, stored by the producer as a header before sending the message, and the consumer timestamp captured at the consumer side after reading the record.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Let us know if this example helps too answer your questions&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Sun, 04 Feb 2024 17:32:35 GMT</pubDate>
    <dc:creator>gtorres</dc:creator>
    <dc:date>2024-02-04T17:32:35Z</dc:date>
    <item>
      <title>How to add timestamp to message in kafka</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-add-timestamp-to-message-in-kafka/m-p/382836#M244713</link>
      <description>&lt;P&gt;Hi Team ,&lt;/P&gt;&lt;P&gt;We are trying to add timestamp to Kafka message to get more details of message travel time from producer to landing &amp;nbsp; . I wanted to check if we can add any broker properties&amp;nbsp; from Cloudera manager&amp;nbsp; .&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Regards&lt;/P&gt;&lt;P&gt;Bharad&lt;/P&gt;</description>
      <pubDate>Tue, 21 Apr 2026 06:43:28 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-add-timestamp-to-message-in-kafka/m-p/382836#M244713</guid>
      <dc:creator>bhara</dc:creator>
      <dc:date>2026-04-21T06:43:28Z</dc:date>
    </item>
    <item>
      <title>Re: How to add timestamp to message in kafka</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-add-timestamp-to-message-in-kafka/m-p/383111#M244825</link>
      <description>&lt;P&gt;There is no Kafka property that you can add to track the timestamp of the message. Each record in a topic has a creation timestamp property, this property can be populated by the producer (by default) or by the broker, depending on how the topic is configured (&lt;A href="https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html" target="_blank"&gt;https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html&lt;/A&gt;)&lt;/P&gt;&lt;P&gt;At the producer code level,&amp;nbsp; you can add headers to the record to store a timestamp, then compare this time at the consumer level, here are some Java code samples that you might use:&lt;/P&gt;&lt;P&gt;Producer code level:&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;// Create producer&lt;BR /&gt;&lt;/SPAN&gt;KafkaProducer&amp;lt;String&lt;SPAN&gt;, &lt;/SPAN&gt;String&amp;gt; producer = &lt;SPAN&gt;new &lt;/SPAN&gt;KafkaProducer&amp;lt;String&lt;SPAN&gt;, &lt;/SPAN&gt;String&amp;gt;(properties)&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;BR /&gt;//Save the current timstamp&lt;BR /&gt;&lt;SPAN&gt;long &lt;/SPAN&gt;time = System.&lt;SPAN&gt;currentTimeMillis&lt;/SPAN&gt;()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;//Create header list&lt;BR /&gt;&lt;/SPAN&gt;List&amp;lt;Header&amp;gt; headers = &lt;SPAN&gt;new &lt;/SPAN&gt;ArrayList&amp;lt;&amp;gt;()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;headers.add(&lt;SPAN&gt;new &lt;/SPAN&gt;RecordHeader(&lt;SPAN&gt;"producer_timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;Longs.&lt;SPAN&gt;toByteArray&lt;/SPAN&gt;(time)))&lt;SPAN&gt;;&lt;BR /&gt;//produce the record &lt;BR /&gt;&lt;/SPAN&gt;ProducerRecord&amp;lt;String&lt;SPAN&gt;,&lt;/SPAN&gt;String&amp;gt; producerRecord = &lt;SPAN&gt;new &lt;/SPAN&gt;ProducerRecord&amp;lt;&amp;gt;(topic&lt;SPAN&gt;,null, null,&lt;/SPAN&gt;&lt;SPAN&gt;"message key"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;"Message value"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;headers)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;Note that the 3rd argument to produce the Record is the timestamp, in this case, we are sending null to allow the Kafka producer API to add the timestamp in the record.&lt;/P&gt;&lt;P&gt;With this code, we captured one custom timestamp as a header, before sending the message and the producer timestamp when the record was produced&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;On the consumer side, we need to read both timestamps:&lt;/P&gt;&lt;PRE&gt;ConsumerRecords&amp;lt;String&lt;SPAN&gt;, &lt;/SPAN&gt;String&amp;gt; records = consumer.poll(Duration.&lt;SPAN&gt;ofMillis&lt;/SPAN&gt;(&lt;SPAN&gt;1000&lt;/SPAN&gt;))&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;for &lt;/SPAN&gt;(ConsumerRecord&amp;lt;String&lt;SPAN&gt;, &lt;/SPAN&gt;String&amp;gt; record : records){&lt;BR /&gt;    &lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Key: " &lt;/SPAN&gt;+ record.key() + &lt;SPAN&gt;", value: " &lt;/SPAN&gt;+ record.value())&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    long &lt;/SPAN&gt;producerTimestamp = &lt;SPAN&gt;0L&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;    Headers consumedHeaders = record.headers()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    for &lt;/SPAN&gt;(Header header : consumedHeaders) {&lt;BR /&gt;        &lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Header Key" &lt;/SPAN&gt;+ header.key())&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Header Value String" &lt;/SPAN&gt;+ &lt;SPAN&gt;new &lt;/SPAN&gt;String(header.value()))&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;        if &lt;/SPAN&gt;(header.key().equals(&lt;SPAN&gt;"producer_timestamp"&lt;/SPAN&gt;)) {&lt;BR /&gt;            &lt;SPAN&gt;for &lt;/SPAN&gt;(&lt;SPAN&gt;byte &lt;/SPAN&gt;b : header.value()) {&lt;BR /&gt;                &lt;SPAN&gt;// Shifting previous value 8 bits to right and&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;                // add it with next value&lt;BR /&gt;&lt;/SPAN&gt;                producerTimestamp = (producerTimestamp &amp;lt;&amp;lt; &lt;SPAN&gt;8&lt;/SPAN&gt;) + (b &amp;amp; &lt;SPAN&gt;255&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;            }&lt;BR /&gt;            &lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Record timestamp: " &lt;/SPAN&gt;+ record.timestamp() + &lt;SPAN&gt;", Producer timestamp: " &lt;/SPAN&gt;+ producerTimestamp + &lt;SPAN&gt;", Consumer timestamp: " &lt;/SPAN&gt;+ System.&lt;SPAN&gt;currentTimeMillis&lt;/SPAN&gt;())&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        }&lt;BR /&gt;    }&lt;/PRE&gt;&lt;P&gt;We need to transform the header from bytes[] to long and then print the content:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;log&lt;/SPAN&gt;.info(&lt;SPAN&gt;"Record timestamp: " &lt;/SPAN&gt;+ record.timestamp() + &lt;SPAN&gt;", Producer timestamp: " &lt;/SPAN&gt;+ producerTimestamp + &lt;SPAN&gt;", Consumer timestamp: " &lt;/SPAN&gt;+ System.&lt;SPAN&gt;currentTimeMillis&lt;/SPAN&gt;())&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;We can see 3 different timestamps in the output, the record timestamp, which was added by the Kafka producer API, the producer timestamp, stored by the producer as a header before sending the message, and the consumer timestamp captured at the consumer side after reading the record.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Let us know if this example helps too answer your questions&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Sun, 04 Feb 2024 17:32:35 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-add-timestamp-to-message-in-kafka/m-p/383111#M244825</guid>
      <dc:creator>gtorres</dc:creator>
      <dc:date>2024-02-04T17:32:35Z</dc:date>
    </item>
    <item>
      <title>Re: How to add timestamp to message in kafka</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-add-timestamp-to-message-in-kafka/m-p/383387#M244886</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/50045"&gt;@bhara&lt;/a&gt;&amp;nbsp;Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.&lt;/P&gt;</description>
      <pubDate>Fri, 09 Feb 2024 16:14:15 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-add-timestamp-to-message-in-kafka/m-p/383387#M244886</guid>
      <dc:creator>DianaTorres</dc:creator>
      <dc:date>2024-02-09T16:14:15Z</dc:date>
    </item>
  </channel>
</rss>

