Created 07-19-2018 11:51 AM
Hi,
I am reading two files from S3 and taking their Union but code is failing when I run it on yarn . For single node it runs successfully and for cluster when I specify the -master yarn in spark-submit then it fails.
Below is the PySpark Code:
from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql.types import * from pyspark.sql.functions import * from functools import reduce # For Python 3.x from pyspark.sql import DataFrame # set Spark properties conf = (SparkConf() .setAppName("s3a_test") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy") .set("fs.s3a.server-side-encryption-algorithm", "AES256") ) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) #function to union multiple dataframes def unionMultiDF(*dfs): return reduce(DataFrame.union, dfs) pfely = "s3a://ics/parquet/salestodist/" pfely1 = "s3a://ics/parquet/salestodist/" FCSTEly = sqlContext.read.parquet(pfely) FCSTEly1 = sqlContext.read.parquet(pfely1) FCSTEly.registerTempTable('PFEly') FCSTEly1.registerTempTable('PFEly1') #Mapping to Forecast tbl_FCSTEly = sqlContext.sql("select * from PFEly limit 10 ") tbl_FCSTEly1 = sqlContext.sql("select * from PFEly1 limit 10") tbl_FCST = unionMultiDF(tbl_FCSTEly,tbl_FCSTEly1)
Yarn Log of failed container
=============================================================================== 18/07/19 11:20:15 INFO RMProxy: Connecting to ResourceManager at ip-10-246-5-252.example.com/10.246.5.252:8030 18/07/19 11:20:15 INFO YarnRMClient: Registering the ApplicationMaster 18/07/19 11:20:15 INFO YarnAllocator: Will request 2 executor container(s), each with 1 core(s) and 2432 MB memory (including 384 MB of overhead) 18/07/19 11:20:15 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark://YarnAM@10.246.5.186:32769) 18/07/19 11:20:15 INFO YarnAllocator: Submitted 2 unlocalized container requests. 18/07/19 11:20:15 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals 18/07/19 11:20:15 INFO AMRMClientImpl: Received new token for : ip-10-246-5-216.example.com:45454 18/07/19 11:20:15 INFO YarnAllocator: Launching container container_e15_1531738738949_0081_02_000002 on host ip-10-246-5-216.example.com for executor with ID 1 18/07/19 11:20:15 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. 18/07/19 11:20:15 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0 18/07/19 11:20:15 INFO ContainerManagementProtocolProxy: Opening proxy : ip-10-246-5-216.example.com:45454 18/07/19 11:20:18 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.246.5.216:52932) with ID 1 18/07/19 11:20:18 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-246-5-216.example.com:34919 with 912.3 MB RAM, BlockManagerId(1, ip-10-246-5-216.example.com, 34919, None) 18/07/19 11:20:44 INFO YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms) 18/07/19 11:20:44 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done 18/07/19 11:20:44 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/hadoopfs/fs1/yarn/nodemanager/usercache/spark/appcache/application_1531738738949_0081/container_e15_1531738738949_0081_02_000001/spark-warehouse'). 18/07/19 11:20:44 INFO SharedState: Warehouse path is 'file:/hadoopfs/fs1/yarn/nodemanager/usercache/spark/appcache/application_1531738738949_0081/container_e15_1531738738949_0081_02_000001/spark-warehouse'. 18/07/19 11:20:45 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 18/07/19 11:20:46 INFO deprecation: fs.s3a.server-side-encryption-key is deprecated. Instead, use fs.s3a.server-side-encryption.key 18/07/19 11:20:47 INFO SparkContext: Starting job: parquet at NativeMethodAccessorImpl.java:0 18/07/19 11:20:47 INFO DAGScheduler: Got job 0 (parquet at NativeMethodAccessorImpl.java:0) with 1 output partitions 18/07/19 11:20:47 INFO DAGScheduler: Final stage: ResultStage 0 (parquet at NativeMethodAccessorImpl.java:0) 18/07/19 11:20:47 INFO DAGScheduler: Parents of final stage: List() 18/07/19 11:20:47 INFO DAGScheduler: Missing parents: List() 18/07/19 11:20:47 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at parquet at NativeMethodAccessorImpl.java:0), which has no missing parents 18/07/19 11:20:47 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 92.0 KB, free 912.2 MB) 18/07/19 11:20:47 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 33.5 KB, free 912.2 MB) 18/07/19 11:20:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.246.5.186:32771 (size: 33.5 KB, free: 912.3 MB) 18/07/19 11:20:47 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 18/07/19 11:20:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at parquet at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 18/07/19 11:20:47 INFO YarnClusterScheduler: Adding task set 0.0 with 1 tasks 18/07/19 11:20:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-246-5-216.example.com, executor 1, partition 0, PROCESS_LOCAL, 5003 bytes) 18/07/19 11:20:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-246-5-216.example.com:34919 (size: 33.5 KB, free: 912.3 MB) 18/07/19 11:20:48 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1706 ms on ip-10-246-5-216.example.com (executor 1) (1/1) 18/07/19 11:20:48 INFO DAGScheduler: ResultStage 0 (parquet at NativeMethodAccessorImpl.java:0) finished in 1.713 s 18/07/19 11:20:48 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/07/19 11:20:49 INFO DAGScheduler: Job 0 finished: parquet at NativeMethodAccessorImpl.java:0, took 1.914374 s 18/07/19 11:20:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on ip-10-246-5-216.example.com:34919 in memory (size: 33.5 KB, free: 912.3 MB) 18/07/19 11:20:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 10.246.5.186:32771 in memory (size: 33.5 KB, free: 912.3 MB) 18/07/19 11:20:50 INFO SparkContext: Starting job: parquet at NativeMethodAccessorImpl.java:0 18/07/19 11:20:50 INFO DAGScheduler: Got job 1 (parquet at NativeMethodAccessorImpl.java:0) with 1 output partitions 18/07/19 11:20:50 INFO DAGScheduler: Final stage: ResultStage 1 (parquet at NativeMethodAccessorImpl.java:0) 18/07/19 11:20:50 INFO DAGScheduler: Parents of final stage: List() 18/07/19 11:20:50 INFO DAGScheduler: Missing parents: List() 18/07/19 11:20:50 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at parquet at NativeMethodAccessorImpl.java:0), which has no missing parents 18/07/19 11:20:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 92.0 KB, free 912.2 MB) 18/07/19 11:20:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 33.5 KB, free 912.2 MB) 18/07/19 11:20:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.246.5.186:32771 (size: 33.5 KB, free: 912.3 MB) 18/07/19 11:20:50 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 18/07/19 11:20:50 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at parquet at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 18/07/19 11:20:50 INFO YarnClusterScheduler: Adding task set 1.0 with 1 tasks 18/07/19 11:20:50 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, ip-10-246-5-216.example.com, executor 1, partition 0, PROCESS_LOCAL, 5003 bytes) 18/07/19 11:20:51 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-10-246-5-216.example.com:34919 (size: 33.5 KB, free: 912.3 MB) 18/07/19 11:20:51 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 176 ms on ip-10-246-5-216.example.com (executor 1) (1/1) 18/07/19 11:20:51 INFO DAGScheduler: ResultStage 1 (parquet at NativeMethodAccessorImpl.java:0) finished in 0.175 s 18/07/19 11:20:51 INFO DAGScheduler: Job 1 finished: parquet at NativeMethodAccessorImpl.java:0, took 0.208283 s 18/07/19 11:20:51 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/07/19 11:20:51 ERROR ApplicationMaster: User application exited with status 1 18/07/19 11:20:51 INFO ApplicationMaster: Final app status: FAILED, exitCode: 1, (reason: User application exited with status 1) 18/07/19 11:20:51 INFO SparkContext: Invoking stop() from shutdown hook 18/07/19 11:20:51 INFO SparkUI: Stopped Spark web UI at http://10.112.5.186:40567 18/07/19 11:20:51 INFO YarnAllocator: Driver requested a total number of 0 executor(s). 18/07/19 11:20:51 INFO YarnClusterSchedulerBackend: Shutting down all executors 18/07/19 11:20:51 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down 18/07/19 11:20:51 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices (serviceOption=None, services=List(), started=false) 18/07/19 11:20:51 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/07/19 11:20:51 INFO MemoryStore: MemoryStore cleared 18/07/19 11:20:51 INFO BlockManager: BlockManager stopped 18/07/19 11:20:51 INFO BlockManagerMaster: BlockManagerMaster stopped 18/07/19 11:20:51 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/07/19 11:20:51 INFO SparkContext: Successfully stopped SparkContext 18/07/19 11:20:51 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User application exited with status 1) 18/07/19 11:20:51 INFO AMRMClientImpl: Waiting for application to be successfully unregistered. 18/07/19 11:20:51 INFO ApplicationMaster: Deleting staging directory hdfs://ip-10-112-5-252.example.com:8020/user/spark/.sparkStaging/application_1531738738949_0081 18/07/19 11:20:51 INFO ShutdownHookManager: Shutdown hook called 18/07/19 11:20:51 INFO ShutdownHookManager: Deleting directory /hadoopfs/fs1/yarn/nodemanager/usercache/spark/appcache/application_1531738738949_0081/spark-1acac860-733e-4f51-89fb-cb31bf1913ed/pyspark-5dc64ae0-97c1-4c5e-812e-e2931ef7933f 18/07/19 11:20:51 INFO ShutdownHookManager: Deleting directory /hadoopfs/fs1/yarn/nodemanager/usercache/spark/appcache/application_1531738738949_0081/spark-1acac860-733e-4f51-89fb-cb31bf1913ed End of LogType:stderr
Spark-submit Command:
sudo -u spark /usr/hdp/2.6.4.5-2/spark2/bin/spark-submit --master yarn --executor-memory 2g --executor-cores 1 --num-executors 2 --driver-memory 2g --driver-cores 1 --deploy-mode cluster hdfs:/code/salesforecast_u.py
Created 07-19-2018 12:59 PM
What python version are you using? One of the imports seems to point to python3 - If that is the case you will need to export few environment settings in order for this to run correctly. Check:
When running on yarn master deployment mode client the executors will run on any of the cluster worker nodes. This means that you need to make sure all the necessary python libraries you are using along with python desired version is installed on all cluster worker nodes in advanced.
Finally, it would be good to have both the driver log (which is printed to stdout of spark-submit) along complete yarn logs -applicationId <appId> for further diagnosis.
HTH
*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.