- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Read Kafka timestamp as an attribute in Nifi
- Labels:
-
Apache Kafka
-
Apache NiFi
Created ‎04-05-2018 06:03 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi, Is there a way to consume a kafka message along with its timestamp in Nifi using the Consumekafka processor?
e.g. we can consume the kafka message and see the time stamp by adding the property print.timestamp=true . The output looks something like this on the console
CreateTime:1522893745217 test_message
I can't seem to access this variable 'CreateTime'. I've tried using ${kafka.CreateTime} in the Updateattribute process but it doesnt work. Please let me know if there is way to do this as adding the custom timestamp (now()) is not an option in our case.
Created ‎04-05-2018 04:59 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
This is not currently supported, but there is a JIRA for this issue:
https://issues.apache.org/jira/browse/NIFI-4487
Part of the issue is that this would only make sense if you are consuming 1 message per flow file, which generally is poor for performance. So what do you do when you consume 10k messages into a single flow file? For ConsumeKafkaRecord then the potentially the timestamp could be put into a field in each record, assuming the schema had a timestamp field, but for regular ConsumeKafka there would be no way to handle it.
Created ‎04-05-2018 04:59 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
This is not currently supported, but there is a JIRA for this issue:
https://issues.apache.org/jira/browse/NIFI-4487
Part of the issue is that this would only make sense if you are consuming 1 message per flow file, which generally is poor for performance. So what do you do when you consume 10k messages into a single flow file? For ConsumeKafkaRecord then the potentially the timestamp could be put into a field in each record, assuming the schema had a timestamp field, but for regular ConsumeKafka there would be no way to handle it.
