Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Pyspark code failing on cluster mode

Pyspark code failing on cluster mode

New Contributor

Hi,

I am reading two files from S3 and taking their Union but code is failing when I run it on yarn . For single node it runs successfully and for cluster when I specify the -master yarn in spark-submit then it fails.

Below is the PySpark Code:

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame


# set Spark properties
conf = (SparkConf()
         .setAppName("s3a_test")
		 .set("spark.shuffle.compress", "true")
         .set("spark.io.compression.codec", "snappy")
         .set("fs.s3a.server-side-encryption-algorithm", "AES256")
        )






sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)


#function to union multiple dataframes


def unionMultiDF(*dfs):
    return reduce(DataFrame.union, dfs)




pfely = "s3a://ics/parquet/salestodist/"
pfely1 = "s3a://ics/parquet/salestodist/"




FCSTEly = sqlContext.read.parquet(pfely)


FCSTEly1 = sqlContext.read.parquet(pfely1)


FCSTEly.registerTempTable('PFEly')
FCSTEly1.registerTempTable('PFEly1')




#Mapping to Forecast


tbl_FCSTEly = sqlContext.sql("select * from PFEly limit 10 ")
tbl_FCSTEly1 = sqlContext.sql("select * from PFEly1 limit 10")


tbl_FCST = unionMultiDF(tbl_FCSTEly,tbl_FCSTEly1)


Yarn Log of failed container


===============================================================================
18/07/19 11:20:15 INFO RMProxy: Connecting to ResourceManager at ip-10-246-5-252.example.com/10.246.5.252:8030
18/07/19 11:20:15 INFO YarnRMClient: Registering the ApplicationMaster
18/07/19 11:20:15 INFO YarnAllocator: Will request 2 executor container(s), each with 1 core(s) and 2432 MB memory (including 384 MB of overhead)
18/07/19 11:20:15 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark://YarnAM@10.246.5.186:32769)
18/07/19 11:20:15 INFO YarnAllocator: Submitted 2 unlocalized container requests.
18/07/19 11:20:15 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
18/07/19 11:20:15 INFO AMRMClientImpl: Received new token for : ip-10-246-5-216.example.com:45454
18/07/19 11:20:15 INFO YarnAllocator: Launching container container_e15_1531738738949_0081_02_000002 on host ip-10-246-5-216.example.com for executor with ID 1
18/07/19 11:20:15 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
18/07/19 11:20:15 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
18/07/19 11:20:15 INFO ContainerManagementProtocolProxy: Opening proxy : ip-10-246-5-216.example.com:45454
18/07/19 11:20:18 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.246.5.216:52932) with ID 1
18/07/19 11:20:18 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-246-5-216.example.com:34919 with 912.3 MB RAM, BlockManagerId(1, ip-10-246-5-216.example.com, 34919, None)
18/07/19 11:20:44 INFO YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
18/07/19 11:20:44 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done
18/07/19 11:20:44 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/hadoopfs/fs1/yarn/nodemanager/usercache/spark/appcache/application_1531738738949_0081/container_e15_1531738738949_0081_02_000001/spark-warehouse').
18/07/19 11:20:44 INFO SharedState: Warehouse path is 'file:/hadoopfs/fs1/yarn/nodemanager/usercache/spark/appcache/application_1531738738949_0081/container_e15_1531738738949_0081_02_000001/spark-warehouse'.
18/07/19 11:20:45 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
18/07/19 11:20:46 INFO deprecation: fs.s3a.server-side-encryption-key is deprecated. Instead, use fs.s3a.server-side-encryption.key
18/07/19 11:20:47 INFO SparkContext: Starting job: parquet at NativeMethodAccessorImpl.java:0
18/07/19 11:20:47 INFO DAGScheduler: Got job 0 (parquet at NativeMethodAccessorImpl.java:0) with 1 output partitions
18/07/19 11:20:47 INFO DAGScheduler: Final stage: ResultStage 0 (parquet at NativeMethodAccessorImpl.java:0)
18/07/19 11:20:47 INFO DAGScheduler: Parents of final stage: List()
18/07/19 11:20:47 INFO DAGScheduler: Missing parents: List()
18/07/19 11:20:47 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at parquet at NativeMethodAccessorImpl.java:0), which has no missing parents
18/07/19 11:20:47 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 92.0 KB, free 912.2 MB)
18/07/19 11:20:47 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 33.5 KB, free 912.2 MB)
18/07/19 11:20:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.246.5.186:32771 (size: 33.5 KB, free: 912.3 MB)
18/07/19 11:20:47 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
18/07/19 11:20:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at parquet at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/07/19 11:20:47 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks
18/07/19 11:20:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-246-5-216.example.com, executor 1, partition 0, PROCESS_LOCAL, 5003 bytes)
18/07/19 11:20:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-246-5-216.example.com:34919 (size: 33.5 KB, free: 912.3 MB)
18/07/19 11:20:48 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1706 ms on ip-10-246-5-216.example.com (executor 1) (1/1)
18/07/19 11:20:48 INFO DAGScheduler: ResultStage 0 (parquet at NativeMethodAccessorImpl.java:0) finished in 1.713 s
18/07/19 11:20:48 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/07/19 11:20:49 INFO DAGScheduler: Job 0 finished: parquet at NativeMethodAccessorImpl.java:0, took 1.914374 s
18/07/19 11:20:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on ip-10-246-5-216.example.com:34919 in memory (size: 33.5 KB, free: 912.3 MB)
18/07/19 11:20:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 10.246.5.186:32771 in memory (size: 33.5 KB, free: 912.3 MB)
18/07/19 11:20:50 INFO SparkContext: Starting job: parquet at NativeMethodAccessorImpl.java:0
18/07/19 11:20:50 INFO DAGScheduler: Got job 1 (parquet at NativeMethodAccessorImpl.java:0) with 1 output partitions
18/07/19 11:20:50 INFO DAGScheduler: Final stage: ResultStage 1 (parquet at NativeMethodAccessorImpl.java:0)
18/07/19 11:20:50 INFO DAGScheduler: Parents of final stage: List()
18/07/19 11:20:50 INFO DAGScheduler: Missing parents: List()
18/07/19 11:20:50 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at parquet at NativeMethodAccessorImpl.java:0), which has no missing parents
18/07/19 11:20:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 92.0 KB, free 912.2 MB)
18/07/19 11:20:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 33.5 KB, free 912.2 MB)
18/07/19 11:20:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.246.5.186:32771 (size: 33.5 KB, free: 912.3 MB)
18/07/19 11:20:50 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
18/07/19 11:20:50 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at parquet at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/07/19 11:20:50 INFO YarnClusterScheduler: Adding task set 1.0 with 1 tasks
18/07/19 11:20:50 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, ip-10-246-5-216.example.com, executor 1, partition 0, PROCESS_LOCAL, 5003 bytes)
18/07/19 11:20:51 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-10-246-5-216.example.com:34919 (size: 33.5 KB, free: 912.3 MB)
18/07/19 11:20:51 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 176 ms on ip-10-246-5-216.example.com (executor 1) (1/1)
18/07/19 11:20:51 INFO DAGScheduler: ResultStage 1 (parquet at NativeMethodAccessorImpl.java:0) finished in 0.175 s
18/07/19 11:20:51 INFO DAGScheduler: Job 1 finished: parquet at NativeMethodAccessorImpl.java:0, took 0.208283 s
18/07/19 11:20:51 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
18/07/19 11:20:51 ERROR ApplicationMaster: User application exited with status 1
18/07/19 11:20:51 INFO ApplicationMaster: Final app status: FAILED, exitCode: 1, (reason: User application exited with status 1)
18/07/19 11:20:51 INFO SparkContext: Invoking stop() from shutdown hook
18/07/19 11:20:51 INFO SparkUI: Stopped Spark web UI at http://10.112.5.186:40567
18/07/19 11:20:51 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
18/07/19 11:20:51 INFO YarnClusterSchedulerBackend: Shutting down all executors
18/07/19 11:20:51 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
18/07/19 11:20:51 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
18/07/19 11:20:51 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/07/19 11:20:51 INFO MemoryStore: MemoryStore cleared
18/07/19 11:20:51 INFO BlockManager: BlockManager stopped
18/07/19 11:20:51 INFO BlockManagerMaster: BlockManagerMaster stopped
18/07/19 11:20:51 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/07/19 11:20:51 INFO SparkContext: Successfully stopped SparkContext
18/07/19 11:20:51 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User application exited with status 1)
18/07/19 11:20:51 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
18/07/19 11:20:51 INFO ApplicationMaster: Deleting staging directory hdfs://ip-10-112-5-252.example.com:8020/user/spark/.sparkStaging/application_1531738738949_0081
18/07/19 11:20:51 INFO ShutdownHookManager: Shutdown hook called
18/07/19 11:20:51 INFO ShutdownHookManager: Deleting directory /hadoopfs/fs1/yarn/nodemanager/usercache/spark/appcache/application_1531738738949_0081/spark-1acac860-733e-4f51-89fb-cb31bf1913ed/pyspark-5dc64ae0-97c1-4c5e-812e-e2931ef7933f
18/07/19 11:20:51 INFO ShutdownHookManager: Deleting directory /hadoopfs/fs1/yarn/nodemanager/usercache/spark/appcache/application_1531738738949_0081/spark-1acac860-733e-4f51-89fb-cb31bf1913ed


End of LogType:stderr


Spark-submit Command:

sudo -u spark /usr/hdp/2.6.4.5-2/spark2/bin/spark-submit --master yarn --executor-memory 2g --executor-cores 1 --num-executors 2 --driver-memory 2g --driver-cores 1  --deploy-mode cluster  hdfs:/code/salesforecast_u.py






1 REPLY 1

Re: Pyspark code failing on cluster mode

@Muhammad Umar

What python version are you using? One of the imports seems to point to python3 - If that is the case you will need to export few environment settings in order for this to run correctly. Check:

https://community.hortonworks.com/questions/138351/how-to-specify-python-version-to-use-with-pyspark...

When running on yarn master deployment mode client the executors will run on any of the cluster worker nodes. This means that you need to make sure all the necessary python libraries you are using along with python desired version is installed on all cluster worker nodes in advanced.

Finally, it would be good to have both the driver log (which is printed to stdout of spark-submit) along complete yarn logs -applicationId <appId> for further diagnosis.

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.