- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Kafka Key not populated (PublishKafkaRecord)
- Labels:
-
Apache Kafka
-
Apache NiFi
Created on 02-19-2022 12:05 PM - edited 02-19-2022 12:06 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am using PublishKafkaRecord to produce Kafka messages from the incoming Json data. The json data has the right field to be used as the key to the Kafka message but the kafka key is not populated. Here's what I did (using Nifi 1.12.1).
1. `GenerateFlowFile` produces a record with the future kafka key (i.e. 'metadata.aggregateId' field)
2. Used `UpdateAttribute` processor to add an attribute called 'kafka.key' with 'metadata.aggregateId' as its value
3. `PublishKafkaRecord` has its `Message Key Field` as ${kafka.key}
4. Produced Kafka message key is null
5. `LogAttribute` shows all necessary hooks are there
And, here's the application log:
2022-02-19 17:57:40,401 INFO [Timer-Driven Process Thread-10] o.a.n.processors.standard.LogAttribute LogAttribute[id=e9b431f7-6784-1a90-8110-78386da99314] logging for flow file StandardFlowFileRecord[uuid=f495c59f-fdc6-4762-932e-f478a182039d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1645226154214-9, container=default, section=9], offset=36854, length=783],offset=0,name=f495c59f-fdc6-4762-932e-f478a182039d,size=783]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Sat Feb 19 17:57:40 UTC 2022'
Key: 'lineageStartDate'
Value: 'Sat Feb 19 17:57:40 UTC 2022'
Key: 'fileSize'
Value: '783'
FlowFile Attribute Map Content
Key: 'filename'
Value: 'f495c59f-fdc6-4762-932e-f478a182039d'
Key: 'kafka.key'
Value: 'metadata.aggregateId'
Key: 'msg.count'
Value: '1'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'f495c59f-fdc6-4762-932e-f478a182039d'
--------------------------------------------------
{
"metadata": {
"aggregateId": "ABC123",
"aggregateEventOrder": 2,
"aggregateRepositoryId": "mdm@ABC",
"aggregateType": "foo.bar.v1.BazType",
"appId": "mdm",
"businessKey": "ABC123",
"correlationId": "aaaaaaa-403b-4856-a4d1-f07ff84064a6",
"eventId": "bbbbbbb-c061-492e-a66a-9e63e2906e39",
"eventType": "VIS",
"eventMessageVersion": "1.0",
"eventRecordedTime": 1519168559357,
"qualifiedEventName": "foo.bar.v1.BazTypeChangedEvent",
"userLoginId": 0,
"userFirstName": "John",
"userLastName": "Doe",
"userLoginName": "app-abcd",
"workBranch": "ABC",
"changeType": "CREATE",
"uploadNumber": "U38"
},
"data": {
"_documentation": "simple demo data",
"code": "ABC123",
"description": "N/A"
}
}
What am I missing here? @sdairs 's explanation was helpful in stackoverflow but did not work for me.
Thanks!
Created 02-20-2022 01:16 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
It seems that the "Message Key Field" property doesn't work with a nested structure. This property must refer to a top level field of the record that contains the key value.
In the example below I used UpdateRecord to create a top level field called "id" and update it with the value of "metadata.aggregateId". Them, on the PublishKafkaRecord I was able to use the "id" field as the Kafka message key.
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.