Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How does PublishKafkaRecord_0_10 use filename as Message key?

avatar
Expert Contributor

Hello,

Let's say I have a csv file called User.csv:

id,firstName,lastName,email,gender

1,alvin,jin,aj@gmail.com,m

2,bob,trump,bt@gmail.com,m

3,alice,trump,at@gmail.com,f

I'd like to use each row as a message value, and the file name as message key. All messages from one file will use the same key, which is "User.csv" in above example.

However, it seems the "Message Key Field" property in PublishKafkaRecord can only be specified from one of the fields in the record.

Is there anyway to use one of the flowfile attributes as message key, e.g. filename?

Besides, I found PublishKafka processor's "Message Key" property can accept flowfile attributes, but it doesn't have Schema registry service. I was wondering why PublishKafkaRecord_0_10 is designed in this way?

Thanks.

1 ACCEPTED SOLUTION

avatar
Master Guru

I think what we need is a way to control the partitioning independently of the message key...

The message key is used on the broker side during compaction/age-off, and the latest record with a given message key will be retained. This would mean that all the lines of your CSV would be treated as if they were different versions of the same message, and at some point all of the records could be age-off except the latest one.

I think what you would really want is to use the "id" field from your CSV as the message key, but then indicate to NiFi that all of these messages from this flow file should be sent to the same partition, which unfortunately doesn't currently exist. I created this JIRA to add that option:

https://issues.apache.org/jira/browse/NIFI-4133

View solution in original post

4 REPLIES 4

avatar
Master Guru

I think what we need is a way to control the partitioning independently of the message key...

The message key is used on the broker side during compaction/age-off, and the latest record with a given message key will be retained. This would mean that all the lines of your CSV would be treated as if they were different versions of the same message, and at some point all of the records could be age-off except the latest one.

I think what you would really want is to use the "id" field from your CSV as the message key, but then indicate to NiFi that all of these messages from this flow file should be sent to the same partition, which unfortunately doesn't currently exist. I created this JIRA to add that option:

https://issues.apache.org/jira/browse/NIFI-4133

avatar
Expert Contributor

Hi @Bryan Bende,

Yes, you are right. I agree that in most cases, the message key should be chosen from the record columns.

In my case, I want all messages(rows) from the same sources(could be file, sftp, etc) go to the same partition. I won't enable the compaction log in the Kafka broker.

I am a little bit confused about the term "message key field" used in the NiFi Kafka processor.

Is it the same as "record key" concept in Kafka used for partition?

Thank you for creating this ticket NIFI-4133

avatar
Master Guru

Yes the message key is the same thing as record key.

Ok so for your use case, you need more than just the same partition for all messages from a flow file. You want the same partition for all flow files from a given source. In that case, we should have a property in the processor for the partition that supports expression language, so then you could do something like...

GetFile (source1) -> UpdateAttribute (set kafka.partition = 1) -> PublishKafka (partition = ${kafka.partition})

GetFile (source2) -> UpdateAttribute (set kafka.partition = 2) -> PublishKafka (partition = ${kafka.partition})

I'll clarify this on the JIRA.

In the meantime you probably have a better chance of using PublishKafka_0_10 (the non-record version)... If you strip off the header before reaching this processor, then set the Message Demarcator for PublishKafka_0_10 to be a new-line, and set the key to ${filename}, you should get what you are looking for.

avatar
Expert Contributor

What you described is exactly what I need.

PublishKafka processor doesn't support Schema Registry. That's why I don't use it. Otherwise, it looks good to me.

Thanks.