Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

In nifi getkafka processor we can set concurrenttasks = 12 (which is number of partitions in kafka topic) does that mean in nifi we have a consumer group with 12 consumers in it?

avatar
Expert Contributor
  1. I think concurrenttasks is having 12 threads for this getkafka processor i.e. 12 consumers in one consumer group? Please correct me if iam wrong. So if i have 12 partitions in my kafka topic then i believe 12 consumers are consuming from 12 partitions?
  2. I have a 3 node nifi cluster and made concurrent tasks to 4 so that i can split the load between all the nifi nodes in the cluster. I think each node will be consuming from 4 paritions. What happens to the 4 partitions of that node if it dies or crashes? Normally there will be a rebalance and kafka will reassign partitions to consumer so that the 2 node will consume from 6 partitions each? Is this how it works in Nifi?
  3. What will happen to the messages or offsets that has already in the queue of a node that has died?
  4. Is there anyway to store offsets in nifi to achieve fault tolerance?
1 ACCEPTED SOLUTION

avatar
Master Guru

In general, concurrent tasks is the number of threads calling onTrigger for an instance of a processors. In a cluster, if you set concurrent tasks to 4, then it is 4 threads on each node of your cluster.

I am not as familiar with all the ins and outs of the kafka processors, but for GetKafka it does something like this:

int concurrentTaskToUse = context.getMaxConcurrentTasks();
final Map<String, Integer> topicCountMap = new HashMap<>(1);
topicCountMap.put(topic, concurrentTaskToUse); 

final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

The consumer is from the Kafka 0.8 client, so it is creating a message-stream for each concurrent task.

Then when the processor is triggered it takes one of those message-streams and consumes a message, and since multiple concurrent tasks are trigger the processor, it is consuming from each of those streams in parallel.

As far as rebalancing, I think the Kafka client does that transparently to NiFi, but I am not totally sure.

Messages that have already been pulled into a NiFi node will stay there until the node is back up and processing.

View solution in original post

1 REPLY 1

avatar
Master Guru

In general, concurrent tasks is the number of threads calling onTrigger for an instance of a processors. In a cluster, if you set concurrent tasks to 4, then it is 4 threads on each node of your cluster.

I am not as familiar with all the ins and outs of the kafka processors, but for GetKafka it does something like this:

int concurrentTaskToUse = context.getMaxConcurrentTasks();
final Map<String, Integer> topicCountMap = new HashMap<>(1);
topicCountMap.put(topic, concurrentTaskToUse); 

final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

The consumer is from the Kafka 0.8 client, so it is creating a message-stream for each concurrent task.

Then when the processor is triggered it takes one of those message-streams and consumes a message, and since multiple concurrent tasks are trigger the processor, it is consuming from each of those streams in parallel.

As far as rebalancing, I think the Kafka client does that transparently to NiFi, but I am not totally sure.

Messages that have already been pulled into a NiFi node will stay there until the node is back up and processing.