Support Questions

Find answers, ask questions, and share your expertise

Spark performance parameter num-executors has no effect performance impact...

avatar

I have 8 node amazon cluster and I am trying to optimize my spark job but unable to bring down program execution below 15 minutes.

I have tried executing my spark job with different memory parameters but it not accept and always execute with 16 executors even when i supply 21 or 33.

Please help me what are the possible reasons as below is my command..

nohup hadoop jar /var/lib/aws/emr/myjar.jar spark-submit --deploy-mode cluster --num-executors 17 --executor-cores 5 --driver-cores 2 --driver-memory 4g --class class_name s3:validator.jar -e runtime -v true -t true -r true &

Observation: When i pass 3 executes it default take 4 and execution is longer but other parameters have no effect.

1 ACCEPTED SOLUTION

avatar

I think applying different memory parameter sizes are the best we can do with respect to file size to optimize spark performance except if we have already tuned underlining program.

As i don’t know the operation my team is performing in program but i have suggested need to verify below :-

We can set parallelism at rdd like below:-

Val rdd =sc.textFile(“somefile”,8)

Second major factor on performance is because of security like wire encryption having 2x overhead and data encryption(Ranger KMS) could cause 15 to 20% overhead.

Note: Kerberos have no impact.

Another parameter that need look is what is the default queue for your spark-submit job, if this is going to default queue and then override using below to more specialized queue with below parameter

--queue <if you have queue's setup>

Please let me know if we check anything else to gain performance....

View solution in original post

7 REPLIES 7

avatar

I think applying different memory parameter sizes are the best we can do with respect to file size to optimize spark performance except if we have already tuned underlining program.

As i don’t know the operation my team is performing in program but i have suggested need to verify below :-

We can set parallelism at rdd like below:-

Val rdd =sc.textFile(“somefile”,8)

Second major factor on performance is because of security like wire encryption having 2x overhead and data encryption(Ranger KMS) could cause 15 to 20% overhead.

Note: Kerberos have no impact.

Another parameter that need look is what is the default queue for your spark-submit job, if this is going to default queue and then override using below to more specialized queue with below parameter

--queue <if you have queue's setup>

Please let me know if we check anything else to gain performance....

avatar

Just received feedback from developers that using above approach there are able to utilize 61 virtual cores out of 64.

But performance is still the bottleneck means file still taking same time. Anybody have idea whats wrong going on?

avatar
Super Collaborator

Thanks @Mukesh Kumar for adding your answer. However, I would like to add a note on the general aspects that can be looked up on for improving the performance.

As you rightly said, Parallelism is one important aspect.

We can adjust parallelism in code by increasing or decreasing the partitions -

  • case where you have lot free cores to run your application and the number of partitions are comparatively less, you could increase the number of partitions and
  • case where you derive an RDD from an existing RDD and you get a very small fraction of the parent RDD, you may not get any benefit in the child rdd having the same number of partitions as its parent, you can try reducing the number of partitions

Other aspects that you can consider are - Data Locality

This refer to how near the data and code are.

There could be situations where there are no CPU cycles to start a task on local – spark can decide to

  • WAIT - data movement not required
  • Move over to a free CPU and start the task there – Data need to be moved

The wait time for CPU can be configured setting spark.locality.wait* properties. Based on the application, we can decide if waiting for more time saves us time compared to the data being shuffled across over the network.

Data Serialization

There are situations where framework may require to ship the data over the network or persist it. In such scenarios, the objects are serialized. Java Serialization is used by default. However, serialization frameworks like kryo has shown better results than the default serialization. The serialization can be set using spark.serializer

Memory Management

Another important aspect that you already stepped up on is Memory Management. If the application would require not to Persist/Cache the data, you could try reducing the Storage Memory fraction and increasing the execution memory. This is with the fact that the Execution Memory can evict the storage memory up to the configured threshold for Storage and the reverse is not true. We can adjust these memory value changing spark.memory.fraction and spark.memory.storageFraction

Also on a side note, all Java GC tuning methods could be applied to Spark Applications as well. We can collect GC statistics using java options -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

Also note that serializing helps reduce the GC overhead of fighting large number of smaller objects.

Finally, we could most importantly look at our code, some points to consider could be -

  • Carry forward only data that is worth ie. we could consider filtering, schema validation on structured datasets etc upfront before propogating them to the downstream logics/aggregations
  • Know the data size - think before calling collect, use sample/subset for debug/testing
  • Target the low hanging fruits - consider reduceByKey over groupByKey, this helps avoid lot data shuffle over the network
  • Consider using broadcast variables for caching large read only variables

avatar

Thanks @Arun A K, i'll verify suggestions on my test case let you know progress if get.

avatar

Hi @Arun A K, it has been observed that most of the time is consumed when we write data for downstream in Cassandra as single node is serving to Cassandra cluster. Now we are planning to increase create multiple Cassandra nodes inside the Hadoop cluster for fast writing. I'll keep you update on progress.

avatar
Super Collaborator

Thanks for the update @Mukesh Kumar. Is it worth doing a 1-1 write or do you want to explore the BulkLoad option in Cassandra?

avatar
Super Collaborator

Just wanted to know if you have dynamic execution enabled and if so, what are the values for initial and max executors. Also could I ask, how many node, core per node and RAM per node.