Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Writing from Spark to a shared file system

avatar
Contributor

Can a spark job running under yarn write a file not to HDFS (that works fine) but to a shared file system (we use GPFS but I doubt it matters). So far I could not make it work. 

 

The command that fails is:

 

ts.saveAsTextFile("file:///home/me/z11")

 

Notice that /home/me is mounted on all the nodes of the Hadoop cluster.

 

The error that I am getting is:

============

at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Mkdirs failed to create file:/home/me/z11/_temporary/0/_temporary/attempt_201704290002_0002_m_000000_15 (exists=false, cwd=file:/data/6/yarn/nm/usercache/ivy2/appcache/application_1490816225123_1660/container_e04_1490816225123_1660_01_000002)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:447)

============

 

The empty directory /home/me/z11/_temporary/0/ was created but that's all.

 

2 REPLIES 2

avatar
Rising Star
You might have to include your GPFS libraries to your SPARK_CLASSPATH and LD_LIBRARY_PATH

avatar
Explorer

Its a problem with permissions, you need to let spark let know about local dir, following code then works:

def xmlConvert(spark😞    etl_time = time.time()    df = spark.read.format('com.databricks.spark.xml').options(rowTag='HistoricalTextData').load(
        '/home/zangetsu/proj/prometheus-core/demo/demo-1-iot-predictive-maintainance/dataset/train/')    df = df.withColumn("TimeStamp", df["TimeStamp"].cast("timestamp")).groupBy("TimeStamp").pivot("TagName").sum(
        "TagValue").na.fill(0)    df.repartition(1).write.csv(        path="/home/zangetsu/proj/prometheus-core/demo/demo-1-iot-predictive-maintainance/result/",        mode="overwrite",        header=True,        sep=",")    print("Time taken to do xml transformation: --- %s seconds ---" % (time.time() - etl_time))


if __name__ == '__main__':    spark = SparkSession \
        .builder \
        .appName('XML ETL') \
        .master("local[*]") \
        .config('job.local.dir', '/home/zangetsu/proj/prometheus-core/demo/demo-1-iot-predictive-maintainance') \
        .config('spark.driver.memory','64g') \
        .config('spark.debug.maxToStringFields','200') \
        .config('spark.jars.packages', 'com.databricks:spark-xml_2.11:0.5.0') \
        .getOrCreate()    print('Session created')

    try:        xmlConvert(spark)

    finally:        spark.stop()