<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: In nifi getkafka processor we can set concurrenttasks = 12 (which is number of partitions in kafka topic) does that mean in nifi we have a consumer group with 12 consumers in it? in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/In-nifi-getkafka-processor-we-can-set-concurrenttasks-12/m-p/104017#M66914</link>
    <description>&lt;P&gt;In general, concurrent tasks is the number of threads calling onTrigger for an instance of a processors. In a cluster, if you set concurrent tasks to 4, then it is 4 threads on each node of your cluster.&lt;/P&gt;&lt;P&gt;I am not as familiar with all the ins and outs of the kafka processors, but for GetKafka it does something like this:&lt;/P&gt;&lt;PRE&gt;int concurrentTaskToUse = context.getMaxConcurrentTasks();
final Map&amp;lt;String, Integer&amp;gt; topicCountMap = new HashMap&amp;lt;&amp;gt;(1);
topicCountMap.put(topic, concurrentTaskToUse); 

final Map&amp;lt;String, List&amp;lt;KafkaStream&amp;lt;byte[], byte[]&amp;gt;&amp;gt;&amp;gt; consumerMap = consumer.createMessageStreams(topicCountMap);&lt;/PRE&gt;&lt;P&gt;The consumer is from the Kafka 0.8 client, so it is creating a message-stream for each concurrent task.&lt;/P&gt;&lt;P&gt;Then when the processor is triggered it takes one of those message-streams and consumes a message, and since multiple concurrent tasks are trigger the processor, it is consuming from each of those streams in parallel.&lt;/P&gt;&lt;P&gt;As far as rebalancing, I think the Kafka client does that transparently to NiFi, but I am not totally sure.&lt;/P&gt;&lt;P&gt;Messages that have already been pulled into a NiFi node will stay there until the node is back up and processing. &lt;/P&gt;</description>
    <pubDate>Tue, 16 Aug 2016 20:59:04 GMT</pubDate>
    <dc:creator>bbende</dc:creator>
    <dc:date>2016-08-16T20:59:04Z</dc:date>
    <item>
      <title>In nifi getkafka processor we can set concurrenttasks = 12 (which is number of partitions in kafka topic) does that mean in nifi we have a consumer group with 12 consumers in it?</title>
      <link>https://community.cloudera.com/t5/Support-Questions/In-nifi-getkafka-processor-we-can-set-concurrenttasks-12/m-p/104016#M66913</link>
      <description>&lt;OL&gt;&lt;LI&gt;I think concurrenttasks is having 12 threads for this getkafka processor i.e. 12 consumers in one consumer group? Please correct me if iam wrong. So if i have 12 partitions in my kafka topic then i believe 12 consumers are consuming from 12 partitions?&lt;/LI&gt;&lt;LI&gt;I have a 3 node nifi cluster and made concurrent tasks to 4 so that i can split the load between all the nifi nodes in the cluster.  I think each node will be consuming from 4 paritions. What happens to the 4 partitions of that node if it dies or crashes? Normally there will be a rebalance and kafka will reassign partitions to consumer so that the 2 node will consume from 6 partitions each? Is this how it works in Nifi?&lt;/LI&gt;&lt;LI&gt;What will happen to the messages or offsets that has already in the queue of a node that has died?&lt;/LI&gt;&lt;LI&gt;Is there anyway to store offsets in nifi to achieve fault tolerance? &lt;/LI&gt;&lt;/OL&gt;</description>
      <pubDate>Tue, 16 Aug 2016 17:13:49 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/In-nifi-getkafka-processor-we-can-set-concurrenttasks-12/m-p/104016#M66913</guid>
      <dc:creator>bigspark</dc:creator>
      <dc:date>2016-08-16T17:13:49Z</dc:date>
    </item>
    <item>
      <title>Re: In nifi getkafka processor we can set concurrenttasks = 12 (which is number of partitions in kafka topic) does that mean in nifi we have a consumer group with 12 consumers in it?</title>
      <link>https://community.cloudera.com/t5/Support-Questions/In-nifi-getkafka-processor-we-can-set-concurrenttasks-12/m-p/104017#M66914</link>
      <description>&lt;P&gt;In general, concurrent tasks is the number of threads calling onTrigger for an instance of a processors. In a cluster, if you set concurrent tasks to 4, then it is 4 threads on each node of your cluster.&lt;/P&gt;&lt;P&gt;I am not as familiar with all the ins and outs of the kafka processors, but for GetKafka it does something like this:&lt;/P&gt;&lt;PRE&gt;int concurrentTaskToUse = context.getMaxConcurrentTasks();
final Map&amp;lt;String, Integer&amp;gt; topicCountMap = new HashMap&amp;lt;&amp;gt;(1);
topicCountMap.put(topic, concurrentTaskToUse); 

final Map&amp;lt;String, List&amp;lt;KafkaStream&amp;lt;byte[], byte[]&amp;gt;&amp;gt;&amp;gt; consumerMap = consumer.createMessageStreams(topicCountMap);&lt;/PRE&gt;&lt;P&gt;The consumer is from the Kafka 0.8 client, so it is creating a message-stream for each concurrent task.&lt;/P&gt;&lt;P&gt;Then when the processor is triggered it takes one of those message-streams and consumes a message, and since multiple concurrent tasks are trigger the processor, it is consuming from each of those streams in parallel.&lt;/P&gt;&lt;P&gt;As far as rebalancing, I think the Kafka client does that transparently to NiFi, but I am not totally sure.&lt;/P&gt;&lt;P&gt;Messages that have already been pulled into a NiFi node will stay there until the node is back up and processing. &lt;/P&gt;</description>
      <pubDate>Tue, 16 Aug 2016 20:59:04 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/In-nifi-getkafka-processor-we-can-set-concurrenttasks-12/m-p/104017#M66914</guid>
      <dc:creator>bbende</dc:creator>
      <dc:date>2016-08-16T20:59:04Z</dc:date>
    </item>
  </channel>
</rss>

