Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi Publish Kafka hashes partition info automatically

avatar
Explorer

Hello everyone,

I want to use the Publish Kafka processors and I need to specify the partition for each flow file. But when I choose Expression Language Partitioner class and add my partition info on Partition field, it does not publish my records to appropriate partitions. I realize that, instead, it gets my partition string and hashes. Then it publishes the records to partition with hashed value.

I dont understand why it works like this. I want to choose a spesific partition for each flow file. How can I achieve this?

Capture.PNG

 

Here is the code from nifi github repo. As we can see it hashes the partition value:

 

 

 

private Integer getPartition(final ProcessContext context, final FlowFile flowFile) {
    final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
    if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
    final String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
    final int hash = Objects.hashCode(partition);
    return hash;
    }

    return null;
}

 

 

https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processor... 

 

2 REPLIES 2

avatar
Super Guru

Hi, @Yemre ,

 

This is the way the Kafka Partitioners typically work. In general it's a safer approach since every record will be guaranteed to be routed to a valid partition. If the processor was using the literal value of the expression as the partition number, it could happen that some values could not be valid partitions, if proper care is not taken.

 

I don't think there's currently a way to explicitly specify the exact partition in the PublishKafka processors. However, if you use the method you mentioned, you will guarantee that every message with the same result for the expression that you specified will always be sent to the same partition. Isn't this enough for your use case? If not, could you please explain what are you trying to achieve?

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Explorer

Hi @araujo,

Thank you for your answer. 

Our Kafka servers are always designed to consist of 100 partitions in my company and each partition belongs to a data of an id. I explicitly generate the partition value attribute according to an id before going into Publish Kafka processor. So, there is no chance that some values could not be valid partitions.

Also, I dont know why but every flow file with same partition number does not result in same partition. This one confused me. Maybe, hash function returns a value more than 100 and that value may be processed randomly in the kafka servers since we have only 100 partitions.

As a last ditch solution and if it makes sense for the community, I may open a pr to add another config in publish kafka processor whether partition value should be hashed or not.