Support Questions

Find answers, ask questions, and share your expertise

Consume all records from kafka using ConsumeKafkaRecord_2_6 in one shot

avatar
Contributor

Hi , 

My requirement is to consume all records from kafka topic (e.g. 1 lakh) in one go by using ConsumeKafkaRecord processor which is scheduled to run once 4 hours as i need to merge all data and create a parquet file and store in HDFS path using putHDFS processor .

if i consume all incoming data and then use a merge content processor and run it once in 4 hours then issue comes is all data stays in nifi queued for 4 hours taking memory , so for avoiding that i wanted to use ConsumeKafkaRecord with scheduler (4 hrs) but its not able to consume all record .

How to solve this issue ?

1 ACCEPTED SOLUTION

avatar
Master Mentor

@Rohit1997jio 

https://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html

Your quartz cron "0-30 */6 * * * ?" translates to:

Execute every second from 0 - 30 seconds 6 minutes after every minute of every hour ...
I think your issue is using */6 because you are saying 6 minutes after every minute which is effectively the same thing as having just * in the minutes field.  If you change this to 0/6, the processor would get scheduled 0/6/12/18/24/30/36/42/48/54 every hour.  If you want it to start at 6 minutes you would use 6/6 which would schedule processor at 6/12/18/24/30/36/42/48/54 every hour (you would however have a 12 minute gap between end of each hour and 6 minutes of next hour with this config).

Also keep in mind that Scheduling does not necessarily mean execution at same time.  NiFi has a Max timer driven thread pool from which threads are given out to scheduled processors. With very large flows or processors with long running threads, scheduled processor may need to wait for a thread to become available to actually execute.  

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

 

View solution in original post

3 REPLIES 3

avatar
Master Mentor

@Rohit1997jio 

The content of a NiFi FlowFile does not live in NiFi heap memory space.  Only the FlowFile Metadata/Attributes are held in NiFi heap memory.  Even then there are thresholds per connection in which swap files would be created to reduce that heap usage.

Some Processors may need to load content into heap memory when they execute against a FlowFile(s).

Before making recommendations on your ConsumeKafkaRecord processor configuration, more information about your NiFi and Kafka topic are needed.

  1. Are you running a multi-node NiFi cluster or a Single instance of NiFi?  If a cluster, how many nodes make up yoru NiFi cluster?
  2. How many partitions are setup on the target kafka topic?

Kafka partitions are assigned by Kafka to different consumers in consumer group.  So lets say you have 10 partitions on your kafka topic, 1 NiFi instance, and  a consumeKafka configured with 1 concurrent task.  all 10 of these partitions would be assigned to that one consumer.  When the ConsumeKafkaRecord executes, it will consume from one of those partitions, next execution from the next partition, and so on.    This is likely why you are not seeing all the kafka messages consumed when you schedule the processor to execute only once every 4 hours.   Even if you were to set concurrent tasks to 10 on the ConsumeKafkaRecord processor, the scheduler is only going to allow one execution every 4 hours.  So in this case you would be best suited to set 10 concurrent tasks and adjust your Quartz Cron scheduler so it schedules every second for 10 seconds every 4 hours.

Also keep in mind the "Max Poll Records" setting as in controls max records(messages) to add to single record FlowFile created during each execution.   If you have a lot of records you may consider increasing how many times it get scheduled every 4 hours to maybe 30 seconds to make sure you get all messages form every partition. 

Now assuming you have a multi-node NiFi cluster with 5 nodes for example,  your consumeKafkaRecord processor is configured with a group.id, and 10 partitions.  You would set concurrent tasks to 2 (2 consumers X 5 nodes = 10 consumers in the consumer group).  Kafka will assign one partition to each of these 10 consumers in the consumer group.

Hope this helps you configure your ConsumeKafkaRecord processor so you can be successful with your requirement.


Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Contributor

Hi Matt , 

Quartz Cron scheduler method worked , but one issue is coming related to time at which i want data to get consumed , i used 0-30 */6 * * * ? cron query , as per this i was expecting data to get consumed after 6 minutes or around 6 minutes . Data published was 50K , but all data got consumed before 3 minutes around 2 mins 50 sec . How to make this time configure accurately  . Please help on this

 

avatar
Master Mentor

@Rohit1997jio 

https://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html

Your quartz cron "0-30 */6 * * * ?" translates to:

Execute every second from 0 - 30 seconds 6 minutes after every minute of every hour ...
I think your issue is using */6 because you are saying 6 minutes after every minute which is effectively the same thing as having just * in the minutes field.  If you change this to 0/6, the processor would get scheduled 0/6/12/18/24/30/36/42/48/54 every hour.  If you want it to start at 6 minutes you would use 6/6 which would schedule processor at 6/12/18/24/30/36/42/48/54 every hour (you would however have a 12 minute gap between end of each hour and 6 minutes of next hour with this config).

Also keep in mind that Scheduling does not necessarily mean execution at same time.  NiFi has a Max timer driven thread pool from which threads are given out to scheduled processors. With very large flows or processors with long running threads, scheduled processor may need to wait for a thread to become available to actually execute.  

Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt