Member since
02-19-2022
1
Post
0
Kudos Received
0
Solutions
02-19-2022
12:05 PM
I am using PublishKafkaRecord to produce Kafka messages from the incoming Json data. The json data has the right field to be used as the key to the Kafka message but the kafka key is not populated. Here's what I did (using Nifi 1.12.1). 1. `GenerateFlowFile` produces a record with the future kafka key (i.e. 'metadata.aggregateId' field) 2. Used `UpdateAttribute` processor to add an attribute called 'kafka.key' with 'metadata.aggregateId' as its value 3. `PublishKafkaRecord` has its `Message Key Field` as ${kafka.key} 4. Produced Kafka message key is null 5. `LogAttribute` shows all necessary hooks are there And, here's the application log: 2022-02-19 17:57:40,401 INFO [Timer-Driven Process Thread-10] o.a.n.processors.standard.LogAttribute LogAttribute[id=e9b431f7-6784-1a90-8110-78386da99314] logging for flow file StandardFlowFileRecord[uuid=f495c59f-fdc6-4762-932e-f478a182039d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1645226154214-9, container=default, section=9], offset=36854, length=783],offset=0,name=f495c59f-fdc6-4762-932e-f478a182039d,size=783]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Sat Feb 19 17:57:40 UTC 2022'
Key: 'lineageStartDate'
Value: 'Sat Feb 19 17:57:40 UTC 2022'
Key: 'fileSize'
Value: '783'
FlowFile Attribute Map Content
Key: 'filename'
Value: 'f495c59f-fdc6-4762-932e-f478a182039d'
Key: 'kafka.key'
Value: 'metadata.aggregateId'
Key: 'msg.count'
Value: '1'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'f495c59f-fdc6-4762-932e-f478a182039d'
--------------------------------------------------
{
"metadata": {
"aggregateId": "ABC123",
"aggregateEventOrder": 2,
"aggregateRepositoryId": "mdm@ABC",
"aggregateType": "foo.bar.v1.BazType",
"appId": "mdm",
"businessKey": "ABC123",
"correlationId": "aaaaaaa-403b-4856-a4d1-f07ff84064a6",
"eventId": "bbbbbbb-c061-492e-a66a-9e63e2906e39",
"eventType": "VIS",
"eventMessageVersion": "1.0",
"eventRecordedTime": 1519168559357,
"qualifiedEventName": "foo.bar.v1.BazTypeChangedEvent",
"userLoginId": 0,
"userFirstName": "John",
"userLastName": "Doe",
"userLoginName": "app-abcd",
"workBranch": "ABC",
"changeType": "CREATE",
"uploadNumber": "U38"
},
"data": {
"_documentation": "simple demo data",
"code": "ABC123",
"description": "N/A"
}
} What am I missing here? @sdairs 's explanation was helpful in stackoverflow but did not work for me. Thanks!
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache NiFi