Support Questions

Find answers, ask questions, and share your expertise

OOM issues when writing into parquet

avatar
Explorer

Hello,

I am running since approx 4 weeks into unsolvable OOM issues, using CDSW, yarn cluster, pyspark 2.4.7 and python 3.6. It seems that I am making generally something fundamentally wrong. The following is a reproducible code example to mimic one issue in its simplest form: A large spark dataframe is created and then it is supposed to be written in a parquet file on HDFS. The HDFS-disk has more than enough space. Below I took partitioning out. However, coalesce as also partitionBy lead to same issues. Generating only a small file works (to rule out other possible issues).

 

The reproducible code is the following. 

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand


ss = SparkSession.builder.appName("test_replication") \
.config("spark.kryoserializer.buffer.max.mb", "2047") \
.config('spark.sql.execution.arrow.pyspark.enabled', "true") \
.config("spark.driver.maxResultSize", "2G") \
.config("spark.driver.memory", "3G") \
.config("spark.executor.memory", "16G") \
.config("spark.dynamicAllocation.maxExecutors","3") \
.config("spark.dynamicAllocation.enabled", "false") \
.config("spark.driver.extraJavaOptions", "-Xss4M") \
.config("spark.hadoop.fs.hdfs.impl.disable.cache", "true") \
.config("spark.yarn.tags","dev") \
.getOrCreate()

rows=2350000
cols=2500

hdfs_dir="/destination/on/hdfs"


data = ss.range(rows)
for i in range(cols):
data=data.withColumn(f'col{i}', rand() * 2 -1)

data.write.format("parquet").mode("overwrite").save(f"{hdfs_dir}/test.parquet")

 

In this case, the error looks like this:

23/06/09 09:37:45 613 ERROR FileFormatWriter: Aborting job 211fa863-3662-4cbc-8002-8c22f7334061.
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.lang.StackOverflowError
	at java.base/java.lang.Exception.<init>(Exception.java:102)
	at java.base/java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:89)
	at java.base/java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:73)
	at jdk.internal.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
	at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
	at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
	at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
	at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
	at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
	at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
	at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:479)
	at jdk.internal.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 

If my calculation of the above is correct, I expect an object of 16kb (floats) * 2.35 million rows * 2500 columns which would be approx 90GB. I would expect spark/yarn to handle this kind of situation in chunks/partioning through disk writing. Or am I wrong?

 

And my real data example was actually loading in a parquet file on HDFS, making randomization of rows, and then saving into two parquet files (train and test data split). There OOM in the client seemed to be the issue! So not on the driver node or on any worker node - but on the client! Also there I would have expect that all the spark operations would run on the cluster. THus, I hope that the above simplified reproducible example can help to understand the underlying memory issues better. I'm entirely confused for now. 

 

Thank you very much in advance.

 

1 ACCEPTED SOLUTION

avatar
Master Collaborator

Hi @cirrus 

 

Application is failed with StackOverflowError. To resolve the StackOverflowError you need to increase the Stack size. You can refer the following article how to increase the StackSize.

 

https://rangareddy.github.io/SparkStackOverflow/

 

 

 

View solution in original post

6 REPLIES 6

avatar
Community Manager

@cirrus Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our Spark experts @Bharati and @smdas  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Master Collaborator

Hi @cirrus 

 

Application is failed with StackOverflowError. To resolve the StackOverflowError you need to increase the Stack size. You can refer the following article how to increase the StackSize.

 

https://rangareddy.github.io/SparkStackOverflow/

 

 

 

avatar
Community Manager

@cirrus Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Explorer

Dear @RangaReddy , dear @DianaTorres ,

 

Thank you very much for the help! I didn't answer yet, as the current script is running since now over 17 hours and didn't end yet. 

However, the StackOverFlowError was the issue, and your link @RangaReddy seemed to have solved my issue as it is running now, why I'm flagging it as accepted solution. Thank you very much! For the others as info of my own confusion: there are TWO settings on java stack size: one for the driver and one for the executor. Setting both to 1024 solved that StackOverflowIssue.

 

There are two further questions and if you want I can open new topics on them for procedural clarity. 

1) I couldn't test yet this option with the stack size on my underlying issue, where it was not explicitly stackoverflow. So maybe I will create an own topic for it, if it persists.

2) Why does it take over 17 hours to safe so far only 29.5GB of data? Am I doing something wrong? I'm attaching below the newest config, increased for 8 cores that are running full since 17 hours. But as said: I could also open up a new issue if procedurally preferred and more related to performance.

 

ss = SparkSession.builder.appName("test_replication") \
.config("spark.kryoserializer.buffer.max.mb", "2047") \
.config('spark.sql.execution.arrow.pyspark.enabled', "true") \
.config("spark.driver.maxResultSize", "16G") \
.config("spark.driver.memory", "4G") \
.config("spark.executor.memory", "16G") \
.config("spark.dynamicAllocation.maxExecutors","8") \
.config("spark.executor.instances", "2") \
.config("spark.executor.cores", "4") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.driver.extraJavaOptions", "-Xss1024m") \
.config("spark.executor.extraJavaOptions", "-Xss1024m") \
.getOrCreate()

 

 

 

avatar
Master Collaborator

Hi @cirrus 

 

If you have more questions, i will suggest to open a cloudera case we will provide all required information. After creating cloudera case, please attach application logs, event logs and if possible code as well to the case.

avatar
Explorer

Fun fact for those interested: In order to have 8 cores running you need in this example minimum 64m as xss options. If you chose 32m, then it will not give a stackoverflow error, but only 4 cores will be running 😲