Created on 04-28-2017 10:09 PM - edited 09-16-2022 04:32 AM
Can a spark job running under yarn write a file not to HDFS (that works fine) but to a shared file system (we use GPFS but I doubt it matters). So far I could not make it work.
The command that fails is:
ts.saveAsTextFile("file:///home/me/z11")
Notice that /home/me is mounted on all the nodes of the Hadoop cluster.
The error that I am getting is:
============
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Mkdirs failed to create file:/home/me/z11/_temporary/0/_temporary/attempt_201704290002_0002_m_000000_15 (exists=false, cwd=file:/data/6/yarn/nm/usercache/ivy2/appcache/application_1490816225123_1660/container_e04_1490816225123_1660_01_000002)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:447)
============
The empty directory /home/me/z11/_temporary/0/ was created but that's all.
Created 06-14-2017 12:04 AM
Created 05-31-2019 10:32 AM
Its a problem with permissions, you need to let spark let know about local dir, following code then works:
def xmlConvert(spark😞 etl_time = time.time() df = spark.read.format('com.databricks.spark.xml').options(rowTag='HistoricalTextData').load( '/home/zangetsu/proj/prometheus-core/demo/demo-1-iot-predictive-maintainance/dataset/train/') df = df.withColumn("TimeStamp", df["TimeStamp"].cast("timestamp")).groupBy("TimeStamp").pivot("TagName").sum( "TagValue").na.fill(0) df.repartition(1).write.csv( path="/home/zangetsu/proj/prometheus-core/demo/demo-1-iot-predictive-maintainance/result/", mode="overwrite", header=True, sep=",") print("Time taken to do xml transformation: --- %s seconds ---" % (time.time() - etl_time)) if __name__ == '__main__': spark = SparkSession \ .builder \ .appName('XML ETL') \ .master("local[*]") \ .config('job.local.dir', '/home/zangetsu/proj/prometheus-core/demo/demo-1-iot-predictive-maintainance') \ .config('spark.driver.memory','64g') \ .config('spark.debug.maxToStringFields','200') \ .config('spark.jars.packages', 'com.databricks:spark-xml_2.11:0.5.0') \ .getOrCreate() print('Session created') try: xmlConvert(spark) finally: spark.stop()