I have a simple job, executing with spark-shell
spark-shell --master yarn-client --num-executors 4 --executor-memory 4G
val rdd = sc.textFile("some file") rdd.count()
File size : 20G, already on HDFS 141 blocks of 128M
Cluster : 4 node of 7G
The count job took ~20min.
I feel that it is not a normal time. Should it be quicker ?
The number of partitions :
It is distributed :
and the time for each task :
The file is CSV file.
I don't understand why the job take that long ? Is that a expected or is there a issue somewhere ?
what is spark.default.parallelism in the job run? did you observe any data skew in the job.. look at all the task input size to know data skew, from ui can also get any scheduler delay at task level. if everything looks normal try increasing --num-exceutors,executors memory and spark.default.parallelism.
CSV is a text-based file format, and reading it can be slower than other optimized formats (ORC, Parquet). You could try using this csv library from databricks:
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.0.3
You can use the csv library to load a CSV file like this:
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../Downloads/2008.csv")
Note: This would load data into DataFrame, not an RDD, but you can access the lower-level RDD API from there.
I am assuming you have 141 partitions by default (your number of blocks). But you have only 4 or even 8 executors. See if you can increase this to 16 executors with 1G each. I would also try to use coalesce to reduce the number of partitions so it's not too high compared to executors. Also assign more cores using --executor-cores. I hate to give up but at the end, doing count on a 20G file with your hardware, might just take 20 minutes.
This is going to be one of two issues: Disk I/O or Executors. With count() you will not be doing any swapping. Set the number of partitions executors to (cores * nodes) - 2. Let's assume 8 cores, then that's 30 for you or
val rdd = sc.textFile("some file",30)
That being said, I don't see how a shuffle is going to help a simple count. That is going to execute on each partition without shuffle and return to the driver. You can perform a test by changing count() to saveAsTextFile(), but I'm thinking you are bound by disk I/O. Are you in a cloud environment?
Try to do a reduceByKey() then count(). If your processing time is still about the same, then it further points to disk I/O.