Created 04-05-2018 06:03 AM
Hi, Is there a way to consume a kafka message along with its timestamp in Nifi using the Consumekafka processor?
e.g. we can consume the kafka message and see the time stamp by adding the property print.timestamp=true . The output looks something like this on the console
CreateTime:1522893745217 test_message
I can't seem to access this variable 'CreateTime'. I've tried using ${kafka.CreateTime} in the Updateattribute process but it doesnt work. Please let me know if there is way to do this as adding the custom timestamp (now()) is not an option in our case.
Created 04-05-2018 04:59 PM
This is not currently supported, but there is a JIRA for this issue:
https://issues.apache.org/jira/browse/NIFI-4487
Part of the issue is that this would only make sense if you are consuming 1 message per flow file, which generally is poor for performance. So what do you do when you consume 10k messages into a single flow file? For ConsumeKafkaRecord then the potentially the timestamp could be put into a field in each record, assuming the schema had a timestamp field, but for regular ConsumeKafka there would be no way to handle it.
Created 04-05-2018 04:59 PM
This is not currently supported, but there is a JIRA for this issue:
https://issues.apache.org/jira/browse/NIFI-4487
Part of the issue is that this would only make sense if you are consuming 1 message per flow file, which generally is poor for performance. So what do you do when you consume 10k messages into a single flow file? For ConsumeKafkaRecord then the potentially the timestamp could be put into a field in each record, assuming the schema had a timestamp field, but for regular ConsumeKafka there would be no way to handle it.