<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question How to scale Spark-Streaming with kafka? in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-scale-Spark-Streaming-with-kafka/m-p/158383#M36502</link>
    <description>&lt;P&gt;I want to scale my Spark-Streaming-Application manually by adding new executors to the SparkContext. The problem is, that the number of executors which are really used is limited by the number of partitions I set for the kafka topic.&lt;/P&gt;&lt;P&gt;Example: I create a topic with 5 partitions and replication-factor 3. Then im Application can only use 5 executors. If I allocate more executors, they only idle for this batch. On the next batch again only 5 executors are randomly picked from the pool.&lt;/P&gt;&lt;P&gt;Is it possible to scale a spark-streaming application independent of the number of partitions set in the kafka topic?  &lt;/P&gt;&lt;P&gt;This is my application. The map function is the core of my application and it need some milliseconds computation time for each event.&lt;/P&gt;&lt;PRE&gt;final JavaPairInputDStream&amp;lt;String, String&amp;gt; directPairStream = KafkaUtils.createDirectStream(ssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        Sets.newHashSet(TOPIC));

final JavaDStream&amp;lt;String&amp;gt; directStream = 
	directPairStream.map(new Function&amp;lt;Tuple2&amp;lt;String, String&amp;gt;, String&amp;gt;() {
		@Override
    		public String call(final Tuple2&amp;lt;String, String&amp;gt; v1) throws Exception {
        		return v1._2();
    		}
});

final JavaDStream&amp;lt;TaxiCsvBean&amp;gt; map = directStream.map(...);
map.foreachRDD(new VoidFunction&amp;lt;JavaRDD&amp;lt;TaxiCsvBean&amp;gt;&amp;gt;() {
	@Override
    	public void call(final JavaRDD&amp;lt;TaxiCsvBean&amp;gt; rdd) throws Exception {
        	rdd.collect();
    	}
});&lt;/PRE&gt;&lt;P&gt;I use &lt;STRONG&gt;HDP 2.4&lt;/STRONG&gt; to setup my &lt;STRONG&gt;3 node&lt;/STRONG&gt; cluster with:&lt;/P&gt;&lt;UL&gt;
&lt;LI&gt;Zookeeper: 3.4&lt;/LI&gt;&lt;LI&gt;HDFS, YARN, MapReduce2: 2.7&lt;/LI&gt;&lt;LI&gt;spark: 1.6&lt;/LI&gt;&lt;LI&gt;Kafka 0.9&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Each node has 4 cores an 8 GB RAM.&lt;/P&gt;&lt;P&gt;Some Properties:&lt;/P&gt;&lt;PRE&gt;spark.dynamicAllocation.enabled=false&lt;/PRE&gt;&lt;P&gt;Thank you for helping me &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 01 Aug 2016 02:13:02 GMT</pubDate>
    <dc:creator>retricia1</dc:creator>
    <dc:date>2016-08-01T02:13:02Z</dc:date>
    <item>
      <title>How to scale Spark-Streaming with kafka?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-scale-Spark-Streaming-with-kafka/m-p/158383#M36502</link>
      <description>&lt;P&gt;I want to scale my Spark-Streaming-Application manually by adding new executors to the SparkContext. The problem is, that the number of executors which are really used is limited by the number of partitions I set for the kafka topic.&lt;/P&gt;&lt;P&gt;Example: I create a topic with 5 partitions and replication-factor 3. Then im Application can only use 5 executors. If I allocate more executors, they only idle for this batch. On the next batch again only 5 executors are randomly picked from the pool.&lt;/P&gt;&lt;P&gt;Is it possible to scale a spark-streaming application independent of the number of partitions set in the kafka topic?  &lt;/P&gt;&lt;P&gt;This is my application. The map function is the core of my application and it need some milliseconds computation time for each event.&lt;/P&gt;&lt;PRE&gt;final JavaPairInputDStream&amp;lt;String, String&amp;gt; directPairStream = KafkaUtils.createDirectStream(ssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        Sets.newHashSet(TOPIC));

final JavaDStream&amp;lt;String&amp;gt; directStream = 
	directPairStream.map(new Function&amp;lt;Tuple2&amp;lt;String, String&amp;gt;, String&amp;gt;() {
		@Override
    		public String call(final Tuple2&amp;lt;String, String&amp;gt; v1) throws Exception {
        		return v1._2();
    		}
});

final JavaDStream&amp;lt;TaxiCsvBean&amp;gt; map = directStream.map(...);
map.foreachRDD(new VoidFunction&amp;lt;JavaRDD&amp;lt;TaxiCsvBean&amp;gt;&amp;gt;() {
	@Override
    	public void call(final JavaRDD&amp;lt;TaxiCsvBean&amp;gt; rdd) throws Exception {
        	rdd.collect();
    	}
});&lt;/PRE&gt;&lt;P&gt;I use &lt;STRONG&gt;HDP 2.4&lt;/STRONG&gt; to setup my &lt;STRONG&gt;3 node&lt;/STRONG&gt; cluster with:&lt;/P&gt;&lt;UL&gt;
&lt;LI&gt;Zookeeper: 3.4&lt;/LI&gt;&lt;LI&gt;HDFS, YARN, MapReduce2: 2.7&lt;/LI&gt;&lt;LI&gt;spark: 1.6&lt;/LI&gt;&lt;LI&gt;Kafka 0.9&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Each node has 4 cores an 8 GB RAM.&lt;/P&gt;&lt;P&gt;Some Properties:&lt;/P&gt;&lt;PRE&gt;spark.dynamicAllocation.enabled=false&lt;/PRE&gt;&lt;P&gt;Thank you for helping me &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 01 Aug 2016 02:13:02 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-scale-Spark-Streaming-with-kafka/m-p/158383#M36502</guid>
      <dc:creator>retricia1</dc:creator>
      <dc:date>2016-08-01T02:13:02Z</dc:date>
    </item>
    <item>
      <title>Re: How to scale Spark-Streaming with kafka?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-scale-Spark-Streaming-with-kafka/m-p/158384#M36503</link>
      <description>&lt;P&gt;When creating a Kafka receiver, its one receiver per topic partition.  You can definitely repartition the data after receiving it from Kafka.  This should distribute the data to all of your workers as opposed to only having 5 executors do the work.&lt;/P&gt;</description>
      <pubDate>Mon, 01 Aug 2016 04:19:29 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-scale-Spark-Streaming-with-kafka/m-p/158384#M36503</guid>
      <dc:creator>jwiden</dc:creator>
      <dc:date>2016-08-01T04:19:29Z</dc:date>
    </item>
    <item>
      <title>Re: How to scale Spark-Streaming with kafka?</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-scale-Spark-Streaming-with-kafka/m-p/158385#M36504</link>
      <description>&lt;P&gt;Just want to support that answer. a repartition is not bad in any case since I have seen some interesting characteristics with kafka producers. If you use round robin for them they send data to a random partition and switch them every 10 min or so. So it is possible that a single Partition in Kafka will randomly get ALL the data and blow your spark application up ( Flume kafka connector was my example ). &lt;/P&gt;&lt;P&gt;A repartition after the KafkaStream fixed that. You can parametrize this based on the number of executors etc.&lt;/P&gt;</description>
      <pubDate>Mon, 01 Aug 2016 17:44:22 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-scale-Spark-Streaming-with-kafka/m-p/158385#M36504</guid>
      <dc:creator>bleonhardi</dc:creator>
      <dc:date>2016-08-01T17:44:22Z</dc:date>
    </item>
  </channel>
</rss>

