Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark (java) - too many open files

avatar
New Contributor

We are trying to run a batch job in spark2 which takes a huge list as input and iterates on the list to perform the processing. The program executes fine for around 8000 records of the list and then breaks giving exception (given below). Input has a list of 3 lakh records. Tried setting "spark.shuffle.consolidateFiles" to "true" in SparkConf. But that didn't work.

Please help me fix the issue.

Exception:

WARN Lost task 0.0 in stage 421079.0 (TID 996338, acusnldlenhww4.cloudapp.net, executor 1): java.io.FileNotFoundException: /data/1/hadoop/yarn/local/usercache/A2159537-MSP01/appcache/application_1497532405817_0072/blockmgr-73dc563c-8ea5-4f2d-adfe-6c60cf3e3968/0d/shuffle_145960_0_0.index.cfb6d5ea-8c7b-41a1-acc3-2c840e7f8998 (Too many open files) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.<init>(FileOutputStream.java:213) at java.io.FileOutputStream.<init>(FileOutputStream.java:162) at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) (org.apache.spark.scheduler.TaskSetManager)

1 REPLY 1

avatar

The best way is definitely just to increase the ulimit if possible, this is sort of an assumption we make in Spark that clusters will be able to move it around.

You might be able to hack around this by decreasing the number of reducers [or cores used by each node] but this could have some performance implications for your job.

In general if a node in your cluster has C assigned cores and you run a job with X reducers then Spark will open C*X files in parallel and start writing. Shuffle consolidation will help decrease the total number of files created but the number of file handles open at any time doesn't change so it won't help the ulimit problem.