Created 07-26-2016 08:58 AM
I have 8 node amazon cluster and I am trying to optimize my spark job but unable to bring down program execution below 15 minutes.
I have tried executing my spark job with different memory parameters but it not accept and always execute with 16 executors even when i supply 21 or 33.
Please help me what are the possible reasons as below is my command..
nohup hadoop jar /var/lib/aws/emr/myjar.jar spark-submit --deploy-mode cluster --num-executors 17 --executor-cores 5 --driver-cores 2 --driver-memory 4g --class class_name s3:validator.jar -e runtime -v true -t true -r true &
Observation: When i pass 3 executes it default take 4 and execution is longer but other parameters have no effect.
Created 07-26-2016 11:25 AM
I think applying different memory parameter sizes are the best we can do with respect to file size to optimize spark performance except if we have already tuned underlining program.
As i don’t know the operation my team is performing in program but i have suggested need to verify below :-
We can set parallelism at rdd like below:-
Val rdd =sc.textFile(“somefile”,8)
Second major factor on performance is because of security like wire encryption having 2x overhead and data encryption(Ranger KMS) could cause 15 to 20% overhead.
Note: Kerberos have no impact.
Another parameter that need look is what is the default queue for your spark-submit job, if this is going to default queue and then override using below to more specialized queue with below parameter
--queue <if you have queue's setup>
Please let me know if we check anything else to gain performance....
Created 07-26-2016 11:25 AM
I think applying different memory parameter sizes are the best we can do with respect to file size to optimize spark performance except if we have already tuned underlining program.
As i don’t know the operation my team is performing in program but i have suggested need to verify below :-
We can set parallelism at rdd like below:-
Val rdd =sc.textFile(“somefile”,8)
Second major factor on performance is because of security like wire encryption having 2x overhead and data encryption(Ranger KMS) could cause 15 to 20% overhead.
Note: Kerberos have no impact.
Another parameter that need look is what is the default queue for your spark-submit job, if this is going to default queue and then override using below to more specialized queue with below parameter
--queue <if you have queue's setup>
Please let me know if we check anything else to gain performance....
Created 07-26-2016 02:36 PM
Just received feedback from developers that using above approach there are able to utilize 61 virtual cores out of 64.
But performance is still the bottleneck means file still taking same time. Anybody have idea whats wrong going on?
Created 07-26-2016 06:49 PM
Thanks @Mukesh Kumar for adding your answer. However, I would like to add a note on the general aspects that can be looked up on for improving the performance.
As you rightly said, Parallelism is one important aspect.
We can adjust parallelism in code by increasing or decreasing the partitions -
Other aspects that you can consider are - Data Locality
This refer to how near the data and code are.
There could be situations where there are no CPU cycles to start a task on local – spark can decide to
The wait time for CPU can be configured setting spark.locality.wait* properties. Based on the application, we can decide if waiting for more time saves us time compared to the data being shuffled across over the network.
Data Serialization
There are situations where framework may require to ship the data over the network or persist it. In such scenarios, the objects are serialized. Java Serialization is used by default. However, serialization frameworks like kryo has shown better results than the default serialization. The serialization can be set using spark.serializer
Memory Management
Another important aspect that you already stepped up on is Memory Management. If the application would require not to Persist/Cache the data, you could try reducing the Storage Memory fraction and increasing the execution memory. This is with the fact that the Execution Memory can evict the storage memory up to the configured threshold for Storage and the reverse is not true. We can adjust these memory value changing spark.memory.fraction and spark.memory.storageFraction
Also on a side note, all Java GC tuning methods could be applied to Spark Applications as well. We can collect GC statistics using java options -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
Also note that serializing helps reduce the GC overhead of fighting large number of smaller objects.
Finally, we could most importantly look at our code, some points to consider could be -
Created 07-27-2016 10:18 AM
Thanks @Arun A K, i'll verify suggestions on my test case let you know progress if get.
Created 07-27-2016 10:39 AM
Hi @Arun A K, it has been observed that most of the time is consumed when we write data for downstream in Cassandra as single node is serving to Cassandra cluster. Now we are planning to increase create multiple Cassandra nodes inside the Hadoop cluster for fast writing. I'll keep you update on progress.
Created 07-27-2016 01:17 PM
Thanks for the update @Mukesh Kumar. Is it worth doing a 1-1 write or do you want to explore the BulkLoad option in Cassandra?
Created 07-26-2016 07:07 PM
Just wanted to know if you have dynamic execution enabled and if so, what are the values for initial and max executors. Also could I ask, how many node, core per node and RAM per node.