Support Questions

Find answers, ask questions, and share your expertise

Spark executor blocks on last task

avatar

Hello and good morning,

we have a problem with the submit of Spark Jobs.

The last two tasks are not processed and the system is blocked. It only helps to quit the application.

Executor ID Address Status RDD Blocks Storage Memory Disk Used Cores Active Tasks Failed Tasks Complete Tasks Total Tasks Task Time (GC Time) Input Shuffle Read Shuffle Write Logs Thread Dump
driver 192.168.1.134:37341 Active 2 48.2 KB / 4.4 GB 0.0 B 0 0 0 0 0 0 ms (0 ms) 0.0 B 0.0 B 0.0 B stdout Thread Dump
stderr
1 worker_12:43376 Active 2 48.2 KB / 9 GB 0.0 B 2 2 0 20 22 3 s (58 ms) 920 KB 0.0 B 152 B stdout Thread Dump
stderr

In the thread dump we have found the following

In the thread dump I could find the following inconsistency. It seems that the thread with the ID 63 is waiting for the one with the ID 71. Can you see why the thread can't finish its work?

Thread ID Thread Name Thread State Thread Locks
71 Executor task launch worker for task 502 RUNNABLE Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1010216063}), Monitor(java.lang.UNIXProcess$ProcessPipeInputStream@752994465}), Monitor(org.apache.spark.api.python.PythonWorkerFactory@143199750}), Monitor(org.apache.spark.SparkEnv@687340918})
java.io.FileInputStream.readBytes(Native Method)
java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265) => holding Monitor(java.lang.UNIXProcess$ProcessPipeInputStream@752994465})
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:181) => holding Monitor(org.apache.spark.api.python.PythonWorkerFactory@143199750})
org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:103) => holding Monitor(org.apache.spark.api.python.PythonWorkerFactory@143199750})
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:79)
org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:118) => holding Monitor(org.apache.spark.SparkEnv@687340918})
org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:108)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
63 Executor task launch worker for task 524 BLOCKED Blocked by Thread Some(71) Lock(org.apache.spark.SparkEnv@687340918})
Lock(java.util.concurrent.ThreadPoolExecutor$Worker@2128788466})
org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116)
org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:108)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

The spark-003.txt contains the last ~200 lines of the job log.

If any further log / dump etc. needed I will try to provide and post it.

Very thanks ahead for the support.

Best regards René


spark-001.pngspark-002.png

1 REPLY 1

avatar
Expert Contributor

Could be a data skew issue. Checkout if any partition has huge chunk of the data compared to the rest.

https://github.com/adnanalvee/spark-assist/blob/master/spark-assist.scala

From the link above, copy the function "partitionStats" and pass in your data as a dataframe.

It will show the maximum, minimum and average amount of data across your partitions like below.

    +------+-----+------------------+
    |MAX   |MIN  |AVERAGE           |
    +------+-----+------------------+
    |135695|87694|100338.61149653122|
    +------+-----+------------------+