Reply
Explorer
Posts: 19
Registered: ‎05-03-2018

DynamicAllocation doesn't release executors that used cache

[ Edited ]

Hi,

 

We're starting to use Spark with usecases for Dynamic Allocation.

However, it was noticed it doesn't work as expected when dataset is cached&uncached (persist&unpersist).

The cluster runs with:

CDH 5.15.0

Spark 2.3.0

Oracle Java 8.131

 

The following configs are passed to spark (as well as setup at cluster):

# Dynamic Allocation
spark.shuffle.service.enabled                                      true
spark.dynamicAllocation.enabled                                    true

spark.dynamicAllocation.schedulerBacklogTimeout                    1
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout           1
spark.dynamicAllocation.executorIdleTimeout                        90
 
spark.dynamicAllocation.initialExecutors                           1
spark.dynamicAllocation.minExecutors                               1
spark.dynamicAllocation.maxExecutors                               30

 

Cluster also has these configs enabled, as well as spark_shuffle is setup and YARN application classpath is populated. The executors' storage is freed upon application finish (based on: https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service)

 

Here is the simplified code that reproduced the issue in our cluster (HA YARN).

When the following code is executed with "cache=false" - the executors are created, used and killed by idle timeout.

When "cache=true" - the executors are created, used, but not killed and they remain hanging.

The storage in both cases was cleaned up.

void run() {
    List<O1> objList = new ArrayList<>();
    for (long i = 0; i < 1000; i++) {
        objList.add(new O1(i, "test"));
    }

    Dataset<O1> ds = sparkSession.createDataset(objList, Encoders.bean(O1.class));
    ds = ds.repartition(4);

    if (cache) {
        ds.persist(StorageLevel.MEMORY_AND_DISK());
        try {
            ds.show(100, false);
        } finally {
            ds.unpersist();
        }
    } else {
        ds.show(100, false);
    }
}

//O1 POJO class:
public class O1 {                               
    private Long transactionDate;
    private String name;

    public O1() {
    }
    public O1(Long transactionDate, String name) {
        this.transactionDate = transactionDate;
        this.name = name;
    }

    public Long getTransactionDate() {
        return transactionDate;
    }
    public void setTransactionDate(Long transactionDate) {
        this.transactionDate = transactionDate;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

Moreover, when spark.dynamicAllocation.cachedExecutorIdleTimeout is set to some particular time, then the containers are killed successfully (even if they have used cache) (the check was inspired by: https://spark.apache.org/docs/latest/job-scheduling.html#graceful-decommission-of-executors )

Unfortunately, we will have in future containers that keep cache and might live for a long time, as well as containers that free the cache (unpersist) and are expected to be killed (along with idling executors).

 

Is it a bug or some configuration is missing?

Announcements