Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kafka Key not populated (PublishKafkaRecord)

avatar
New Contributor

I am using PublishKafkaRecord to produce Kafka messages from the incoming Json data.   The json data has the right field to be used as the key to the Kafka message but the kafka key is not populated.   Here's what I did (using Nifi 1.12.1).


1. `GenerateFlowFile` produces a record with the future kafka key (i.e. 'metadata.aggregateId' field)
2. Used `UpdateAttribute` processor to add an attribute called 'kafka.key' with 'metadata.aggregateId' as its value
3. `PublishKafkaRecord` has its `Message Key Field` as ${kafka.key}
4. Produced Kafka message key is null
5. `LogAttribute` shows all necessary hooks are there

2022-02-19_10-12-44.png

 

And, here's the application log:
 

2022-02-19 17:57:40,401 INFO [Timer-Driven Process Thread-10] o.a.n.processors.standard.LogAttribute LogAttribute[id=e9b431f7-6784-1a90-8110-78386da99314] logging for flow file StandardFlowFileRecord[uuid=f495c59f-fdc6-4762-932e-f478a182039d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1645226154214-9, container=default, section=9], offset=36854, length=783],offset=0,name=f495c59f-fdc6-4762-932e-f478a182039d,size=783]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
        Value: 'Sat Feb 19 17:57:40 UTC 2022'
Key: 'lineageStartDate'
        Value: 'Sat Feb 19 17:57:40 UTC 2022'
Key: 'fileSize'
        Value: '783'
FlowFile Attribute Map Content
Key: 'filename'
        Value: 'f495c59f-fdc6-4762-932e-f478a182039d'
Key: 'kafka.key'
        Value: 'metadata.aggregateId'
Key: 'msg.count'
        Value: '1'
Key: 'path'
        Value: './'
Key: 'uuid'
        Value: 'f495c59f-fdc6-4762-932e-f478a182039d'
--------------------------------------------------
{
  "metadata": {
    "aggregateId": "ABC123",
    "aggregateEventOrder": 2,
    "aggregateRepositoryId": "mdm@ABC",
    "aggregateType": "foo.bar.v1.BazType",
    "appId": "mdm",
    "businessKey": "ABC123",
    "correlationId": "aaaaaaa-403b-4856-a4d1-f07ff84064a6",
    "eventId": "bbbbbbb-c061-492e-a66a-9e63e2906e39",
    "eventType": "VIS",
    "eventMessageVersion": "1.0",
    "eventRecordedTime": 1519168559357,
    "qualifiedEventName": "foo.bar.v1.BazTypeChangedEvent",
    "userLoginId": 0,
    "userFirstName": "John",
    "userLastName": "Doe",
    "userLoginName": "app-abcd",
    "workBranch": "ABC",
    "changeType": "CREATE",
    "uploadNumber": "U38"
  },
  "data": {
    "_documentation": "simple demo data",
    "code": "ABC123",
    "description": "N/A"
  }
}

 
What am I missing here?  @sdairs 's explanation was helpful in stackoverflow but did not work for me.

Thanks!


1 REPLY 1

avatar
Super Guru

@CloudNaive ,

 

It seems that the "Message Key Field" property doesn't work with a nested structure. This property must refer to a top level field of the record that contains the key value.

In the example below I used UpdateRecord to create a top level field called "id" and update it with the value of "metadata.aggregateId". Them, on the PublishKafkaRecord I was able to use the "id" field as the Kafka message key.

araujo_0-1645348571104.png

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.