Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark: How to simultaneously read from and write to the same parquet file

avatar
New Contributor

How can I read a DataFrame from a parquet file, do transformations and write this modified DataFrame back to the same same parquet file?

 

If I attempt to do so, I get an error, understandably because spark reads from the source and one cannot write back to it simultaneously. Let me reproduce the problem -

df = spark.createDataFrame([(1, 10),(2, 20),(3, 30)], ['sex','date'])

# Save as parquet

df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

# Load it back

df = spark.read.format('parquet').load('.../temp')

# Save it back - This produces ERROR

df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

 

ERROR:

java.io.FileNotFoundException: Requested file maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

 

One workaround to this problem is to save the DataFrame with a differently named parquet folder -> Delete the old parquet folder -> rename this newly created parquet folder to the old name. But this is very inefficient way of doing it, not to mention those DataFrames which are having billions of rows.


I did some research and found that people are suggesting doing some REFRESH TABLE to refresh the MetaData, as can be seen here and here.

Can anyone suggest how to read and then write back to exactly the same parquet file ?

 

2 REPLIES 2

avatar
New Contributor

you can refresh table or path before write statement. 

 

spark.catalog.refreshTable/refreshPath

avatar
New Contributor

Hello I am using spark/pyspark 2.4.6 and even after doing 

df.createOrReplaceTempView("table")
spark_session.sql("REFRESH TABLE table")
spark_session.catalog.refreshTable("table")
spark_session.catalog.refreshByPath(path)

I still have the same issue. 

Thanks in advance