Support Questions
Find answers, ask questions, and share your expertise

Get total count of number of events per minute from consumer kafka

New Contributor

Hi, I am new to NIFI, I wanted to Get total count of number of events per minute from consumer kafka.
After searching on internet, I get 'UpdateCounter' , but information was very less. Can someone suggest me how to add counter . I added ${filename} in UpdateCounter>properities>counter name, It is giving individual file name with value 1 in hamburger menu>counter.
Or is there any other processor that can give the event count from the consumer kafka.
Thank you in advance.

1 ACCEPTED SOLUTION

Master Guru

Hello @aie 

Awesome to hear you're getting involved with NiFi.

 

NiFi has both ConsumeKafka and ConsumeKafkaRecord based processors.  These Kafka processor components in NiFi utilize Kafka client code and with Kafka the client version matters.  So make sure you are using the component version that matches the version of yoru Kafka server.

NiFi produces FlowFiles that move from component to component in your dataflow(s).  A FlowFile consists of two parts, FlowFile Attributes/metatdata and FlowFile Content (actual physical content of ingested/created data).  The FlowFile Attributes/Metadata lives with NiFi's JVM heap memory space and is persisted to the FlowFile repository while content resides in the content repository and only exists in heap memory space if a specific processor should need for that.  I bring this up because more FlowFiles = higher heap usage and processing resources related to the creating and management of those FlowFiles.  

The many NIFi "record" based processors (like ConsumeKafkaRecord) are much more efficient to use.  The consumeKafkaRecord processor while ingest many Kafka messages to a single FlowFile.  Down side here is this makes calculating things like messages per minute much harder to do.

 

Now if you are just looking to count the total number of consumed messages, you can do this.  I'll use the ConsumeKafkaRecord_2_0 processor as an example here.  Most processor will create or modify FlowFile attributes and you can look at the embedded usage documentation for each component to see what those attributes are.  All created attributes on a FlowFile can be read by other processor using NiFi Expression Language.  So If we look at the Attributes created on the FlowFiles created by the ConsumeKafkaRecord processor we will find a couple named "record.count" and "kafka.topic" (make note that all attribute names are case sensitive).  The "record.count" attribute will tell you the number of messages in this one FlowFile.

We can now use this "record.count" attribute can now be used to increment a counter using the UpdateCounter processor 
component.  We use the "kafka.topic" attribute as the counter name so that one counter ic created for each unique topic you may consume from.  We use the "record.count" attribute so we can increment that counter based on number of messages in each FlowFile.  Configuration would look like this:

MattWho_0-1622230460687.png

You can observe the counters and reset them via the counters UI found under the NiFi global menu in upper right corner of the UI.

MattWho_1-1622230556990.png


Also keep the following in mind for good performance.  NiFi's when it comes to the consumeKafka processors is a Kafka Consumer Group.  The processor itself lets you set a unique "Group ID" for this processors Kafka consumer group. When you add any processor component to the NiFi UI it starts with the default "1" concurrent task.  For Optimal Kafka performance you want your Kafka Consumer Group to have as many consumers as there are partitions in the target topic.  Also keep in mind that if you are running a NiFi cluster (recommended), each processor executes on each node as part of same consumer group.  So 3 node cluster, will mean your ConsumeKafkaRecord with 1 concurrent task has three consumers in that group.  So you would ideally for best performance want to have a multiple of 3 as the number of partitions on the topic (3, 6, 9, 12, 15, etc...). Let say you have a 3 node cluster and your topic has 15 partitions, then your consumeKafkaRecord should be set to 5 concurrent tasks (5 x 3 nodes = 15 consumers).  Avoid having more consumers than partitions or you will constantly have rebalance going on.  So even if you had 16 partitions on your topic you would still set only 5 concurrent tasks. One of your 15 available consumers would simply be assigned to receive data from two topics instead of just one.

If you found this information was helpful with yoru query, please take a moment to login and click "Accept" on this solution.

https://nifi.apache.org/docs.html

Thank you and happy NiFi'ing....
Matt

View solution in original post

2 REPLIES 2

Master Guru

Hello @aie 

Awesome to hear you're getting involved with NiFi.

 

NiFi has both ConsumeKafka and ConsumeKafkaRecord based processors.  These Kafka processor components in NiFi utilize Kafka client code and with Kafka the client version matters.  So make sure you are using the component version that matches the version of yoru Kafka server.

NiFi produces FlowFiles that move from component to component in your dataflow(s).  A FlowFile consists of two parts, FlowFile Attributes/metatdata and FlowFile Content (actual physical content of ingested/created data).  The FlowFile Attributes/Metadata lives with NiFi's JVM heap memory space and is persisted to the FlowFile repository while content resides in the content repository and only exists in heap memory space if a specific processor should need for that.  I bring this up because more FlowFiles = higher heap usage and processing resources related to the creating and management of those FlowFiles.  

The many NIFi "record" based processors (like ConsumeKafkaRecord) are much more efficient to use.  The consumeKafkaRecord processor while ingest many Kafka messages to a single FlowFile.  Down side here is this makes calculating things like messages per minute much harder to do.

 

Now if you are just looking to count the total number of consumed messages, you can do this.  I'll use the ConsumeKafkaRecord_2_0 processor as an example here.  Most processor will create or modify FlowFile attributes and you can look at the embedded usage documentation for each component to see what those attributes are.  All created attributes on a FlowFile can be read by other processor using NiFi Expression Language.  So If we look at the Attributes created on the FlowFiles created by the ConsumeKafkaRecord processor we will find a couple named "record.count" and "kafka.topic" (make note that all attribute names are case sensitive).  The "record.count" attribute will tell you the number of messages in this one FlowFile.

We can now use this "record.count" attribute can now be used to increment a counter using the UpdateCounter processor 
component.  We use the "kafka.topic" attribute as the counter name so that one counter ic created for each unique topic you may consume from.  We use the "record.count" attribute so we can increment that counter based on number of messages in each FlowFile.  Configuration would look like this:

MattWho_0-1622230460687.png

You can observe the counters and reset them via the counters UI found under the NiFi global menu in upper right corner of the UI.

MattWho_1-1622230556990.png


Also keep the following in mind for good performance.  NiFi's when it comes to the consumeKafka processors is a Kafka Consumer Group.  The processor itself lets you set a unique "Group ID" for this processors Kafka consumer group. When you add any processor component to the NiFi UI it starts with the default "1" concurrent task.  For Optimal Kafka performance you want your Kafka Consumer Group to have as many consumers as there are partitions in the target topic.  Also keep in mind that if you are running a NiFi cluster (recommended), each processor executes on each node as part of same consumer group.  So 3 node cluster, will mean your ConsumeKafkaRecord with 1 concurrent task has three consumers in that group.  So you would ideally for best performance want to have a multiple of 3 as the number of partitions on the topic (3, 6, 9, 12, 15, etc...). Let say you have a 3 node cluster and your topic has 15 partitions, then your consumeKafkaRecord should be set to 5 concurrent tasks (5 x 3 nodes = 15 consumers).  Avoid having more consumers than partitions or you will constantly have rebalance going on.  So even if you had 16 partitions on your topic you would still set only 5 concurrent tasks. One of your 15 available consumers would simply be assigned to receive data from two topics instead of just one.

If you found this information was helpful with yoru query, please take a moment to login and click "Accept" on this solution.

https://nifi.apache.org/docs.html

Thank you and happy NiFi'ing....
Matt

Community Manager

@aie  Has your issue been resolved? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. 

 

Screen Shot 2019-08-06 at 1.54.47 PM.png


Cy Jervis, Manager, Community Program
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
; ;