Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

In nifi getkafka processor we can set concurrenttasks = 12 (which is number of partitions in kafka topic) does that mean in nifi we have a consumer group with 12 consumers in it?

avatar
Expert Contributor
  1. I think concurrenttasks is having 12 threads for this getkafka processor i.e. 12 consumers in one consumer group? Please correct me if iam wrong. So if i have 12 partitions in my kafka topic then i believe 12 consumers are consuming from 12 partitions?
  2. I have a 3 node nifi cluster and made concurrent tasks to 4 so that i can split the load between all the nifi nodes in the cluster. I think each node will be consuming from 4 paritions. What happens to the 4 partitions of that node if it dies or crashes? Normally there will be a rebalance and kafka will reassign partitions to consumer so that the 2 node will consume from 6 partitions each? Is this how it works in Nifi?
  3. What will happen to the messages or offsets that has already in the queue of a node that has died?
  4. Is there anyway to store offsets in nifi to achieve fault tolerance?
1 ACCEPTED SOLUTION

avatar
Master Guru

In general, concurrent tasks is the number of threads calling onTrigger for an instance of a processors. In a cluster, if you set concurrent tasks to 4, then it is 4 threads on each node of your cluster.

I am not as familiar with all the ins and outs of the kafka processors, but for GetKafka it does something like this:

int concurrentTaskToUse = context.getMaxConcurrentTasks();
final Map<String, Integer> topicCountMap = new HashMap<>(1);
topicCountMap.put(topic, concurrentTaskToUse); 

final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

The consumer is from the Kafka 0.8 client, so it is creating a message-stream for each concurrent task.

Then when the processor is triggered it takes one of those message-streams and consumes a message, and since multiple concurrent tasks are trigger the processor, it is consuming from each of those streams in parallel.

As far as rebalancing, I think the Kafka client does that transparently to NiFi, but I am not totally sure.

Messages that have already been pulled into a NiFi node will stay there until the node is back up and processing.

View solution in original post

1 REPLY 1

avatar
Master Guru

In general, concurrent tasks is the number of threads calling onTrigger for an instance of a processors. In a cluster, if you set concurrent tasks to 4, then it is 4 threads on each node of your cluster.

I am not as familiar with all the ins and outs of the kafka processors, but for GetKafka it does something like this:

int concurrentTaskToUse = context.getMaxConcurrentTasks();
final Map<String, Integer> topicCountMap = new HashMap<>(1);
topicCountMap.put(topic, concurrentTaskToUse); 

final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

The consumer is from the Kafka 0.8 client, so it is creating a message-stream for each concurrent task.

Then when the processor is triggered it takes one of those message-streams and consumes a message, and since multiple concurrent tasks are trigger the processor, it is consuming from each of those streams in parallel.

As far as rebalancing, I think the Kafka client does that transparently to NiFi, but I am not totally sure.

Messages that have already been pulled into a NiFi node will stay there until the node is back up and processing.