<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: NiFi Publish Kafka hashes partition info automatically in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/NiFi-Publish-Kafka-hashes-partition-info-automatically/m-p/336973#M232448</link>
    <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/11191"&gt;@araujo&lt;/a&gt;,&lt;/P&gt;&lt;P&gt;Thank you for your answer.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN class="Y2IQFc"&gt;Our Kafka servers are always designed to consist of 100 partitions in my company and e&lt;/SPAN&gt;&lt;SPAN class="Y2IQFc"&gt;ach partition belongs to a data of an id. I explicitly generate the partition value attribute according to an id before going into Publish Kafka processor.&lt;/SPAN&gt;&amp;nbsp;So, there is no chance that some values could not be valid partitions.&lt;/P&gt;&lt;P&gt;Also, I dont know why but every flow file with same partition number does not result in same partition. This one confused me. Maybe, hash function returns a value more than 100 and that value may be processed randomly in the kafka servers since we have only 100 partitions.&lt;/P&gt;&lt;P&gt;As a last ditch solution and if it makes sense for the community, I may open a pr to add another config in publish kafka processor whether partition value should be hashed or not.&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Thu, 24 Feb 2022 07:59:23 GMT</pubDate>
    <dc:creator>Yemre</dc:creator>
    <dc:date>2022-02-24T07:59:23Z</dc:date>
    <item>
      <title>NiFi Publish Kafka hashes partition info automatically</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NiFi-Publish-Kafka-hashes-partition-info-automatically/m-p/336904#M232421</link>
      <description>&lt;P&gt;Hello everyone,&lt;/P&gt;&lt;P&gt;I want to use the Publish Kafka processors and I need to specify the partition for each flow file. But when I choose&amp;nbsp;&lt;SPAN&gt;&lt;STRONG&gt;Expression Language Partitioner&lt;/STRONG&gt; class and add my partition info on &lt;STRONG&gt;Partition&lt;/STRONG&gt; field, it does not publish my records to&amp;nbsp;appropriate partitions. I realize that, instead, it gets my partition string and hashes. Then it publishes the records to partition with hashed value. &lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I dont understand why it works like this. I want to choose a spesific partition for each flow file. How can I achieve this?&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Capture.PNG" style="width: 794px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/33665iB73F9EC599B6C15E/image-size/large?v=v2&amp;amp;px=999" role="button" title="Capture.PNG" alt="Capture.PNG" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Here is the code from nifi github repo. As we can see it hashes the partition value:&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;private Integer getPartition(final ProcessContext context, final FlowFile flowFile) {
    final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
    if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
    final String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
    final int hash = Objects.hashCode(partition);
    return hash;
    }

    return null;
}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;A href="https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java#L529" target="_self"&gt;https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java&lt;/A&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 23 Feb 2022 13:23:01 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NiFi-Publish-Kafka-hashes-partition-info-automatically/m-p/336904#M232421</guid>
      <dc:creator>Yemre</dc:creator>
      <dc:date>2022-02-23T13:23:01Z</dc:date>
    </item>
    <item>
      <title>Re: NiFi Publish Kafka hashes partition info automatically</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NiFi-Publish-Kafka-hashes-partition-info-automatically/m-p/336941#M232433</link>
      <description>&lt;P&gt;Hi,&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/93216"&gt;@Yemre&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This is the way the Kafka Partitioners typically work. In general it's a safer approach since every record will be guaranteed to be routed to a valid partition. If the processor was using the literal value of the expression as the partition number, it could happen that some values could not be valid partitions, if proper care is not taken.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I don't think there's currently a way to explicitly specify the exact partition in the PublishKafka processors. However, if you use the method you mentioned, you will guarantee that every message with the same result for the expression that you specified will always be sent to the same partition. Isn't this enough for your use case? If not, could you please explain what are you trying to achieve?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Cheers,&lt;/P&gt;&lt;P&gt;André&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 23 Feb 2022 23:12:11 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NiFi-Publish-Kafka-hashes-partition-info-automatically/m-p/336941#M232433</guid>
      <dc:creator>araujo</dc:creator>
      <dc:date>2022-02-23T23:12:11Z</dc:date>
    </item>
    <item>
      <title>Re: NiFi Publish Kafka hashes partition info automatically</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NiFi-Publish-Kafka-hashes-partition-info-automatically/m-p/336973#M232448</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/11191"&gt;@araujo&lt;/a&gt;,&lt;/P&gt;&lt;P&gt;Thank you for your answer.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN class="Y2IQFc"&gt;Our Kafka servers are always designed to consist of 100 partitions in my company and e&lt;/SPAN&gt;&lt;SPAN class="Y2IQFc"&gt;ach partition belongs to a data of an id. I explicitly generate the partition value attribute according to an id before going into Publish Kafka processor.&lt;/SPAN&gt;&amp;nbsp;So, there is no chance that some values could not be valid partitions.&lt;/P&gt;&lt;P&gt;Also, I dont know why but every flow file with same partition number does not result in same partition. This one confused me. Maybe, hash function returns a value more than 100 and that value may be processed randomly in the kafka servers since we have only 100 partitions.&lt;/P&gt;&lt;P&gt;As a last ditch solution and if it makes sense for the community, I may open a pr to add another config in publish kafka processor whether partition value should be hashed or not.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 24 Feb 2022 07:59:23 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NiFi-Publish-Kafka-hashes-partition-info-automatically/m-p/336973#M232448</guid>
      <dc:creator>Yemre</dc:creator>
      <dc:date>2022-02-24T07:59:23Z</dc:date>
    </item>
  </channel>
</rss>

