Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark performance

Highlighted

Spark performance

Hello,

I have a simple job, executing with spark-shell

spark-shell --master yarn-client --num-executors 4 --executor-memory 4G

val rdd = sc.textFile("some file")

rdd.count()

File size : 20G, already on HDFS 141 blocks of 128M

Cluster : 4 node of 7G

The count job took ~20min.

I feel that it is not a normal time. Should it be quicker ?

The number of partitions :

5186-spark1.png

It is distributed :

5187-node.png

and the time for each task :

5188-2016-06-23-11-51-42-217115232227-10101-connexion-b.png

The file is CSV file.

I don't understand why the job take that long ? Is that a expected or is there a issue somewhere ?

Regards

7 REPLIES 7
Highlighted

Re: Spark performance

Did you tried increasing number of executors like below? Does it make any difference?

--num-executors 8 --executor-memory 2G

Highlighted

Re: Spark performance

@Arthur GREVIN You can also set the parallelism at rdd itself.

val rdd = sc.textFile("some file",8) #here 8 is the parallelism

Re: Spark performance

@Jitendra Yadav

I increased the number of partition and it changed nothing. Each task take around 25 second

Changing the number of executors didn't change anything either

Highlighted

Re: Spark performance

what is spark.default.parallelism in the job run? did you observe any data skew in the job.. look at all the task input size to know data skew, from ui can also get any scheduler delay at task level. if everything looks normal try increasing --num-exceutors,executors memory and spark.default.parallelism.

Highlighted

Re: Spark performance

CSV is a text-based file format, and reading it can be slower than other optimized formats (ORC, Parquet). You could try using this csv library from databricks:

./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.0.3

You can use the csv library to load a CSV file like this:

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../Downloads/2008.csv")

Note: This would load data into DataFrame, not an RDD, but you can access the lower-level RDD API from there.

Highlighted

Re: Spark performance

Super Guru

I am assuming you have 141 partitions by default (your number of blocks). But you have only 4 or even 8 executors. See if you can increase this to 16 executors with 1G each. I would also try to use coalesce to reduce the number of partitions so it's not too high compared to executors. Also assign more cores using --executor-cores. I hate to give up but at the end, doing count on a 20G file with your hardware, might just take 20 minutes.

Highlighted

Re: Spark performance

This is going to be one of two issues: Disk I/O or Executors. With count() you will not be doing any swapping. Set the number of partitions executors to (cores * nodes) - 2. Let's assume 8 cores, then that's 30 for you or

val rdd = sc.textFile("some file",30)

That being said, I don't see how a shuffle is going to help a simple count. That is going to execute on each partition without shuffle and return to the driver. You can perform a test by changing count() to saveAsTextFile(), but I'm thinking you are bound by disk I/O. Are you in a cloud environment?

Try to do a reduceByKey() then count(). If your processing time is still about the same, then it further points to disk I/O.

Don't have an account?
Coming from Hortonworks? Activate your account here