Member since
‎07-21-2016
10
Posts
2
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
24380 | ‎07-25-2016 10:23 AM |
‎07-31-2016
07:13 PM
1 Kudo
I want to scale my Spark-Streaming-Application manually by adding new executors to the SparkContext. The problem is, that the number of executors which are really used is limited by the number of partitions I set for the kafka topic. Example: I create a topic with 5 partitions and replication-factor 3. Then im Application can only use 5 executors. If I allocate more executors, they only idle for this batch. On the next batch again only 5 executors are randomly picked from the pool. Is it possible to scale a spark-streaming application independent of the number of partitions set in the kafka topic? This is my application. The map function is the core of my application and it need some milliseconds computation time for each event. final JavaPairInputDStream<String, String> directPairStream = KafkaUtils.createDirectStream(ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
Sets.newHashSet(TOPIC));
final JavaDStream<String> directStream =
directPairStream.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(final Tuple2<String, String> v1) throws Exception {
return v1._2();
}
});
final JavaDStream<TaxiCsvBean> map = directStream.map(...);
map.foreachRDD(new VoidFunction<JavaRDD<TaxiCsvBean>>() {
@Override
public void call(final JavaRDD<TaxiCsvBean> rdd) throws Exception {
rdd.collect();
}
}); I use HDP 2.4 to setup my 3 node cluster with:
Zookeeper: 3.4 HDFS, YARN, MapReduce2: 2.7 spark: 1.6 Kafka 0.9 Each node has 4 cores an 8 GB RAM. Some Properties: spark.dynamicAllocation.enabled=false Thank you for helping me 🙂
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
‎07-28-2016
05:15 PM
Oh no.... I did't saw this before. Is this only a HDP 2.4 problem? On CDH-Doc it seems to be possible.
... View more
‎07-28-2016
04:10 PM
Each node contribute 4GB RAM and 4 cores to yarn. I submit the application with: --driver-memory 640mb
--executor-memory 640mb
With the overhead each container use 1 GB RAM. Altogether 12 containers are possible. yarn.scheduler.minimum-allocation-mb=512mb
yarn.nodemanager.resource.memory-mb=4096mb
mapreduce.map.memory.mb=1.5GB
The following properties are not defined: mapreduce.map.java.opts.max.heap
mapreduce.map.cpu.vcores
... View more
‎07-28-2016
03:39 PM
I want to setup a spark streaming-cluster with dynamic allocation. I tested the dynamic allocation on it by submitting the SparkPi application and the dynamic allocation works fine. Then I tried my own application: It get his input from an other server and I used the socketTextStream to receiving the input data. The application is very simple: final JavaReceiverInputDStream<String> stream =
ssc.socketTextStream(host, port, StorageLevels.MEMORY_AND_DISK_SER);
final JavaDStream<MyObject> mapStream = stream.map(...);
mapStream.foreachRDD(new VoidFunction<JavaRDD<MyObject>>() {
@Override
public void call(final JavaRDD<MyObject> stringJavaRDD) throws Exception {
stringJavaRDD.collect();
}
});
The map function is the core of my application and it need some milliseconds computation time for each event. When I increase the number of events, the cluster allocate new containers and when slowing down the events the number of containers decrease, but the new allocated containers are never used. I checked the number of computed tasks per executor, they are always 0. Because the containers are never used, spark deallocate them after executorIdleTimeout and immediately allocate a new one because the workload is very high. Only the first 2 containers from the application start do the jobs. I though, maybe it could help if I use Kafka for receiving the events distributed to all containers. (I don't know if it is the right way to solve my problem.) With Kafka I got an other problem: Spark didn't allocate more container as I have partitions set for my topic. I use HDP 2.4 to setup my 3 node cluster with:
Zookeeper: 3.4 HDFS, YARN, MapReduce2: 2.7 spark: 1.6 Kafka 0.9 Each node has 4 cores an 8 GB RAM. Can you tell me, which option is the right for solving my problem an how can I fix it? Thank you so much for helping me 🙂
... View more
Labels:
‎07-25-2016
10:23 AM
I think it was my problem, sorry. I only checked the configs from the spark-thrift-sparkconf. After adding the config to spark-default it works. Thank you for your help.
... View more
‎07-21-2016
07:34 AM
With an input value of 100000 the SparkPi needed about 3 minutes. I checked the number of cores often, but nothing changed. I never used the Spark thriftserver before. Can you tell me how to a demo job?
... View more
‎07-21-2016
05:09 AM
Using the first command: only 3 cores are used The second command: 8 cores are used
... View more
‎07-21-2016
04:35 AM
1 Kudo
I want to use the dynamic-allocation feature from spark for my submitted applications, but the applications do not scale. My cluster consists of 3 nodes and each has: 4 cores 8GB RAM spark: 1.6 YARN + MapReduce2: 2.7 I use hdp 2.4 and set up all needed dynamic-allocation properties as follows (they were preconfigured in hdp but I verify them with the docs😞 spark.dynamicAllocation.enabled=true spark.dynamicAllocation.initialExecutors=5 spark.dynamicAllocation.maxExecutors=10 spark.dynamicAllocation.minExecutors=1 spark.shuffle.service.enabled=true yarn.nodemanager.aux-services=mapreduce_shuffle,spark_shuffle yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService I use the yarn ResourceManager-UI to lookup the used cores. When I submit the following PI-example only 3 container with each 1 core are used. There where enough resources to allocate, but no more resources were used. ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
lib/spark-examples*.jar 10000
When I test to submit the PI-example with a defined number of executors, it a lot more resources could be allocated. In this example it static allocate 8 container with each 1 core. (1 driver + 7 executor) ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--num-executors 7 \
--driver-memory 512m \
--executor-memory 512m \
lib/spark-examples*.jar 100000 What did I wrong, that spark not automatically allocate the maximum of available resources? Thank you for you help 🙂
... View more
Labels:
- Labels:
-
Apache Spark