Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to scale Spark-Streaming with kafka?

avatar
Contributor

I want to scale my Spark-Streaming-Application manually by adding new executors to the SparkContext. The problem is, that the number of executors which are really used is limited by the number of partitions I set for the kafka topic.

Example: I create a topic with 5 partitions and replication-factor 3. Then im Application can only use 5 executors. If I allocate more executors, they only idle for this batch. On the next batch again only 5 executors are randomly picked from the pool.

Is it possible to scale a spark-streaming application independent of the number of partitions set in the kafka topic?

This is my application. The map function is the core of my application and it need some milliseconds computation time for each event.

final JavaPairInputDStream<String, String> directPairStream = KafkaUtils.createDirectStream(ssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        Sets.newHashSet(TOPIC));

final JavaDStream<String> directStream = 
	directPairStream.map(new Function<Tuple2<String, String>, String>() {
		@Override
    		public String call(final Tuple2<String, String> v1) throws Exception {
        		return v1._2();
    		}
});

final JavaDStream<TaxiCsvBean> map = directStream.map(...);
map.foreachRDD(new VoidFunction<JavaRDD<TaxiCsvBean>>() {
	@Override
    	public void call(final JavaRDD<TaxiCsvBean> rdd) throws Exception {
        	rdd.collect();
    	}
});

I use HDP 2.4 to setup my 3 node cluster with:

  • Zookeeper: 3.4
  • HDFS, YARN, MapReduce2: 2.7
  • spark: 1.6
  • Kafka 0.9

Each node has 4 cores an 8 GB RAM.

Some Properties:

spark.dynamicAllocation.enabled=false

Thank you for helping me 🙂

1 ACCEPTED SOLUTION

avatar
Super Collaborator

When creating a Kafka receiver, its one receiver per topic partition. You can definitely repartition the data after receiving it from Kafka. This should distribute the data to all of your workers as opposed to only having 5 executors do the work.

View solution in original post

2 REPLIES 2

avatar
Super Collaborator

When creating a Kafka receiver, its one receiver per topic partition. You can definitely repartition the data after receiving it from Kafka. This should distribute the data to all of your workers as opposed to only having 5 executors do the work.

avatar
Master Guru

Just want to support that answer. a repartition is not bad in any case since I have seen some interesting characteristics with kafka producers. If you use round robin for them they send data to a random partition and switch them every 10 min or so. So it is possible that a single Partition in Kafka will randomly get ALL the data and blow your spark application up ( Flume kafka connector was my example ).

A repartition after the KafkaStream fixed that. You can parametrize this based on the number of executors etc.