Created on 09-10-2018 06:57 AM - last edited on 12-16-2019 06:23 AM by cjervis
We use cloudera 5.15.0. When submitting a Spark job, it fails without obvious clue. The merely messages that relates to the problem
ERROR cluster.YarnScheduler: Lost executor 1 on myhost.com: Executor heartbeat timed out after 171854 ms WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, myhost.com): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 171854 ms
This doesn't happen when the job runs on my local laptop (The job that runs on my laptop works fine and finished execution eventually). The command used to submit job (both locally and cloudera cluster)
spark2-submit \ --driver-class-path ... \ --driver-java-options -Djava.security.auth.login.config ... \ --class ... \ --conf spark.authenticate=true \ --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config= ... \ --conf spark.executor.extraClassPath=-Dconfig.file= ... \ --master yarn \ --driver-memory 25g \ --executor-memory 10g \ --files ... \ <assembled fat jar file>
Logs obtained by `yarn logs -applicationId <app id>` only shows warning
WARN crypto.AuthClientBootstrap: New auth protocol failed, trying SASL. java.lang.RuntimeException: java.lang.IllegalStateException: Expected SaslMessage, received something else (maybe your client does not have SASL enabled?) at org.apache.spark.network.sasl.SaslMessage.decode(SaslMessage.java:69) at org.apache.spark.network.sasl.SaslRpcHandler.receive(SaslRpcHandler.java:87) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:154)
And this warning also happened with the same code where lesser SQL fields are retrieved out of postgresql database (but the code that selects lesser SQL fields works perfectly without a problem because the job that retrieves lesser SQL fields successfully writes parquet data to hdfs and finishes its execution). So I suspect the problem stemmed from not enough memory, but I do not know how to debug Spark job submitted to yarn cluster.
The Spark job code
val df = spark.read.format("jdbc"). option("driver", "org.postgresql.Driver"). option("url", s"jdbc:postgresql://db-host:port/database-name"). option("dbtable", "(select oid, * from table where fieldX <> '') temp_table"). option("fetchSize", "100000"). option("user", username). option("password", password). load() df.createOrReplaceTempView(s"temp_table") val filtered = spark.sql(s""" SELECT field1, field2, ... field55 // when select 55 fields, spark job fails running in cluster. but it works fine when select only 2 fields. FROM temp_table """).as[DBTableCaseClass].filter{...}.map { row => // logic to extract info }.filter{...} filtered.write.mode("overwrite").parquet(destPath)
Any suggestions? Thanks