<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: How does PublishKafkaRecord_0_10 use filename as Message key? in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226792#M63680</link>
    <description>&lt;P&gt;What you described is exactly what I need.&lt;/P&gt;&lt;P&gt;PublishKafka processor doesn't support Schema Registry. That's why I don't use it. Otherwise, it looks good to me.&lt;/P&gt;&lt;P&gt;Thanks.&lt;/P&gt;</description>
    <pubDate>Wed, 28 Jun 2017 01:49:49 GMT</pubDate>
    <dc:creator>alvinuw</dc:creator>
    <dc:date>2017-06-28T01:49:49Z</dc:date>
    <item>
      <title>How does PublishKafkaRecord_0_10 use filename as Message key?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226788#M63676</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;Let's say I have a csv file called User.csv:&lt;/P&gt;&lt;P&gt;id,firstName,lastName,email,gender &lt;/P&gt;&lt;P&gt;1,alvin,jin,aj@gmail.com,m &lt;/P&gt;&lt;P&gt;
2,bob,trump,bt@gmail.com,m&lt;/P&gt;&lt;P&gt;3,alice,trump,at@gmail.com,f&lt;/P&gt;&lt;P&gt;I'd like to use each row as a message value, and the file name as message key. All messages from one file will use the same key, which is "User.csv" in above example.&lt;/P&gt;&lt;P&gt;However, it seems the "Message Key Field" property in PublishKafkaRecord  can only be specified from one of the fields in the record.&lt;/P&gt;&lt;P&gt;Is there anyway to use one of the flowfile attributes as message key, e.g. filename?&lt;/P&gt;&lt;P&gt;Besides, I found PublishKafka processor's "Message Key" property can accept flowfile attributes, but it doesn't have Schema registry service.
I was wondering why PublishKafkaRecord_0_10 is designed in this way?&lt;/P&gt;&lt;P&gt;Thanks.&lt;/P&gt;</description>
      <pubDate>Tue, 27 Jun 2017 01:30:44 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226788#M63676</guid>
      <dc:creator>alvinuw</dc:creator>
      <dc:date>2017-06-27T01:30:44Z</dc:date>
    </item>
    <item>
      <title>Re: How does PublishKafkaRecord_0_10 use filename as Message key?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226789#M63677</link>
      <description>&lt;P&gt;I think what we need is a way to control the partitioning independently of the message key...&lt;/P&gt;&lt;P&gt;The message key is used on the broker side during compaction/age-off, and the latest record with a given message key will be retained. This would mean that all the lines of your CSV would be treated as if they were different versions of the same message, and at some point all of the records could be age-off except the latest one. &lt;/P&gt;&lt;P&gt;I think what you would really want is to use the "id" field from your CSV as the message key, but then indicate to NiFi that all of these messages from this flow file should be sent to the same partition, which unfortunately doesn't currently exist. I created this JIRA to add that option: &lt;/P&gt;&lt;P&gt;&lt;A href="https://issues.apache.org/jira/browse/NIFI-4133" target="_blank"&gt;https://issues.apache.org/jira/browse/NIFI-4133&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 28 Jun 2017 00:06:56 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226789#M63677</guid>
      <dc:creator>bbende</dc:creator>
      <dc:date>2017-06-28T00:06:56Z</dc:date>
    </item>
    <item>
      <title>Re: How does PublishKafkaRecord_0_10 use filename as Message key?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226790#M63678</link>
      <description>&lt;P&gt;Hi &lt;A rel="user" href="https://community.cloudera.com/users/363/bbende.html" nodeid="363"&gt;@Bryan Bende&lt;/A&gt;,&lt;/P&gt;&lt;P&gt;Yes, you are right. I agree that in most cases, the message key should be chosen from the record columns.&lt;/P&gt;&lt;P&gt;In my case, I want all messages(rows) from the same sources(could be file, sftp, etc) go to the same partition. I won't enable the compaction log in the Kafka broker. &lt;/P&gt;&lt;P&gt;I am a little bit confused about the term "message key field" used in the NiFi Kafka processor.&lt;/P&gt;&lt;P&gt;Is it the same as "record key" concept in Kafka used for partition?&lt;/P&gt;&lt;P&gt;Thank you for creating this ticket &lt;A href="https://issues.apache.org/jira/browse/NIFI-4133"&gt;NIFI-4133&lt;/A&gt; &lt;/P&gt;</description>
      <pubDate>Wed, 28 Jun 2017 00:43:19 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226790#M63678</guid>
      <dc:creator>alvinuw</dc:creator>
      <dc:date>2017-06-28T00:43:19Z</dc:date>
    </item>
    <item>
      <title>Re: How does PublishKafkaRecord_0_10 use filename as Message key?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226791#M63679</link>
      <description>&lt;P&gt;Yes the message key is the same thing as record key.&lt;/P&gt;&lt;P&gt;Ok so for your use case, you need more than just the same partition for all messages from a flow file. You want the same partition for all flow files from a given source. In that case, we should have a property in the processor for the partition that supports expression language, so then you could do something like...&lt;/P&gt;&lt;P&gt;GetFile (source1) -&amp;gt; UpdateAttribute (set kafka.partition = 1) -&amp;gt; PublishKafka (partition = ${kafka.partition})&lt;/P&gt;&lt;P&gt;GetFile (source2) -&amp;gt; UpdateAttribute (set kafka.partition = 2) -&amp;gt; PublishKafka (partition = ${kafka.partition})&lt;/P&gt;&lt;P&gt;I'll clarify this on the JIRA.&lt;/P&gt;&lt;P&gt;In the meantime you probably have a better chance of using PublishKafka_0_10 (the non-record version)... If you strip off the header before reaching this processor, then set the Message Demarcator for PublishKafka_0_10 to be a new-line, and set the key to ${filename}, you should get what you are looking for.&lt;/P&gt;</description>
      <pubDate>Wed, 28 Jun 2017 01:27:05 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226791#M63679</guid>
      <dc:creator>bbende</dc:creator>
      <dc:date>2017-06-28T01:27:05Z</dc:date>
    </item>
    <item>
      <title>Re: How does PublishKafkaRecord_0_10 use filename as Message key?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226792#M63680</link>
      <description>&lt;P&gt;What you described is exactly what I need.&lt;/P&gt;&lt;P&gt;PublishKafka processor doesn't support Schema Registry. That's why I don't use it. Otherwise, it looks good to me.&lt;/P&gt;&lt;P&gt;Thanks.&lt;/P&gt;</description>
      <pubDate>Wed, 28 Jun 2017 01:49:49 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-does-PublishKafkaRecord-0-10-use-filename-as-Message-key/m-p/226792#M63680</guid>
      <dc:creator>alvinuw</dc:creator>
      <dc:date>2017-06-28T01:49:49Z</dc:date>
    </item>
  </channel>
</rss>

