Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Read Kafka timestamp as an attribute in Nifi

avatar
Rising Star

Hi, Is there a way to consume a kafka message along with its timestamp in Nifi using the Consumekafka processor?

e.g. we can consume the kafka message and see the time stamp by adding the property print.timestamp=true . The output looks something like this on the console

CreateTime:1522893745217 test_message

I can't seem to access this variable 'CreateTime'. I've tried using ${kafka.CreateTime} in the Updateattribute process but it doesnt work. Please let me know if there is way to do this as adding the custom timestamp (now()) is not an option in our case.

1 ACCEPTED SOLUTION

avatar
Master Guru

This is not currently supported, but there is a JIRA for this issue:

https://issues.apache.org/jira/browse/NIFI-4487

Part of the issue is that this would only make sense if you are consuming 1 message per flow file, which generally is poor for performance. So what do you do when you consume 10k messages into a single flow file? For ConsumeKafkaRecord then the potentially the timestamp could be put into a field in each record, assuming the schema had a timestamp field, but for regular ConsumeKafka there would be no way to handle it.

View solution in original post

1 REPLY 1

avatar
Master Guru

This is not currently supported, but there is a JIRA for this issue:

https://issues.apache.org/jira/browse/NIFI-4487

Part of the issue is that this would only make sense if you are consuming 1 message per flow file, which generally is poor for performance. So what do you do when you consume 10k messages into a single flow file? For ConsumeKafkaRecord then the potentially the timestamp could be put into a field in each record, assuming the schema had a timestamp field, but for regular ConsumeKafka there would be no way to handle it.