New Contributor
Posts: 2
Registered: ‎09-10-2018
Debug Spark job
[ Edited ]

We use cloudera 5.15.0. When submitting a Spark job, it fails without obvious clue. The merely messages that relates to the problem


ERROR cluster.YarnScheduler: Lost executor 1 on Executor heartbeat timed out after 171854 ms
WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 171854 ms

This doesn't happen when the job runs on my local laptop (The job that runs on my laptop works fine and finished execution eventually). The command used to submit job (both locally and cloudera cluster)

spark2-submit                              \
      --driver-class-path   ...                \
      --driver-java-options ...    \
      --class ...                                    \
      --conf spark.authenticate=true     \
      --conf ...     \
      --conf spark.executor.extraClassPath=-Dconfig.file= ... \
      --master yarn  \
      --driver-memory 25g \
      --executor-memory 10g \
      --files ... \
      <assembled fat jar file>


Logs obtained by `yarn logs -applicationId <app id>` only shows warning


WARN crypto.AuthClientBootstrap: New auth protocol failed, trying SASL.
java.lang.RuntimeException: java.lang.IllegalStateException: Expected SaslMessage, received something else (maybe your client does not have SASL enabled?)

And this warning also happened with the same code where lesser SQL fields are retrieved out of postgresql database (but the code that selects lesser SQL fields works perfectly without a problem because the job that retrieves lesser SQL fields successfully writes parquet data to hdfs and finishes its execution). So I suspect the problem stemmed from not enough memory, but I do not know how to debug Spark job submitted to yarn cluster.


The Spark job code


 val df ="jdbc").
    option("driver", "org.postgresql.Driver").
    option("url", s"jdbc:postgresql://db-host:port/database-name").
    option("dbtable", "(select oid, * from table where fieldX <> '') temp_table").
    option("fetchSize", "100000").
    option("user", username).
    option("password", password).

 val filtered = spark.sql(s"""
    SELECT field1, field2, ... field55 // when select 55 fields, spark job fails running in cluster. but it works fine when select only 2 fields.  
      FROM temp_table 
  """).as[DBTableCaseClass].filter{...}.map { row => 
    // logic to extract info 


Any suggestions? Thanks



Who Me Too'd this topic