Created 06-09-2023 05:34 AM
Hello,
I am running since approx 4 weeks into unsolvable OOM issues, using CDSW, yarn cluster, pyspark 2.4.7 and python 3.6. It seems that I am making generally something fundamentally wrong. The following is a reproducible code example to mimic one issue in its simplest form: A large spark dataframe is created and then it is supposed to be written in a parquet file on HDFS. The HDFS-disk has more than enough space. Below I took partitioning out. However, coalesce as also partitionBy lead to same issues. Generating only a small file works (to rule out other possible issues).
The reproducible code is the following.
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
ss = SparkSession.builder.appName("test_replication") \
.config("spark.kryoserializer.buffer.max.mb", "2047") \
.config('spark.sql.execution.arrow.pyspark.enabled', "true") \
.config("spark.driver.maxResultSize", "2G") \
.config("spark.driver.memory", "3G") \
.config("spark.executor.memory", "16G") \
.config("spark.dynamicAllocation.maxExecutors","3") \
.config("spark.dynamicAllocation.enabled", "false") \
.config("spark.driver.extraJavaOptions", "-Xss4M") \
.config("spark.hadoop.fs.hdfs.impl.disable.cache", "true") \
.config("spark.yarn.tags","dev") \
.getOrCreate()
rows=2350000
cols=2500
hdfs_dir="/destination/on/hdfs"
data = ss.range(rows)
for i in range(cols):
data=data.withColumn(f'col{i}', rand() * 2 -1)
data.write.format("parquet").mode("overwrite").save(f"{hdfs_dir}/test.parquet")
In this case, the error looks like this:
23/06/09 09:37:45 613 ERROR FileFormatWriter: Aborting job 211fa863-3662-4cbc-8002-8c22f7334061. org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError java.lang.StackOverflowError at java.base/java.lang.Exception.<init>(Exception.java:102) at java.base/java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:89) at java.base/java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:73) at jdk.internal.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145) at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349) at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:479) at jdk.internal.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
If my calculation of the above is correct, I expect an object of 16kb (floats) * 2.35 million rows * 2500 columns which would be approx 90GB. I would expect spark/yarn to handle this kind of situation in chunks/partioning through disk writing. Or am I wrong?
And my real data example was actually loading in a parquet file on HDFS, making randomization of rows, and then saving into two parquet files (train and test data split). There OOM in the client seemed to be the issue! So not on the driver node or on any worker node - but on the client! Also there I would have expect that all the spark operations would run on the cluster. THus, I hope that the above simplified reproducible example can help to understand the underlying memory issues better. I'm entirely confused for now.
Thank you very much in advance.
Created 06-09-2023 11:40 AM
Hi @cirrus
Application is failed with StackOverflowError. To resolve the StackOverflowError you need to increase the Stack size. You can refer the following article how to increase the StackSize.
https://rangareddy.github.io/SparkStackOverflow/
Created 06-09-2023 11:09 AM
@cirrus Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our Spark experts @Bharati and @smdas who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,Created 06-09-2023 11:40 AM
Hi @cirrus
Application is failed with StackOverflowError. To resolve the StackOverflowError you need to increase the Stack size. You can refer the following article how to increase the StackSize.
https://rangareddy.github.io/SparkStackOverflow/
Created 06-12-2023 05:04 PM
@cirrus Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. Thanks.
Regards,
Diana Torres,Created 06-12-2023 10:27 PM
Dear @RangaReddy , dear @DianaTorres ,
Thank you very much for the help! I didn't answer yet, as the current script is running since now over 17 hours and didn't end yet.
However, the StackOverFlowError was the issue, and your link @RangaReddy seemed to have solved my issue as it is running now, why I'm flagging it as accepted solution. Thank you very much! For the others as info of my own confusion: there are TWO settings on java stack size: one for the driver and one for the executor. Setting both to 1024 solved that StackOverflowIssue.
There are two further questions and if you want I can open new topics on them for procedural clarity.
1) I couldn't test yet this option with the stack size on my underlying issue, where it was not explicitly stackoverflow. So maybe I will create an own topic for it, if it persists.
2) Why does it take over 17 hours to safe so far only 29.5GB of data? Am I doing something wrong? I'm attaching below the newest config, increased for 8 cores that are running full since 17 hours. But as said: I could also open up a new issue if procedurally preferred and more related to performance.
ss = SparkSession.builder.appName("test_replication") \
.config("spark.kryoserializer.buffer.max.mb", "2047") \
.config('spark.sql.execution.arrow.pyspark.enabled', "true") \
.config("spark.driver.maxResultSize", "16G") \
.config("spark.driver.memory", "4G") \
.config("spark.executor.memory", "16G") \
.config("spark.dynamicAllocation.maxExecutors","8") \
.config("spark.executor.instances", "2") \
.config("spark.executor.cores", "4") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.driver.extraJavaOptions", "-Xss1024m") \
.config("spark.executor.extraJavaOptions", "-Xss1024m") \
.getOrCreate()
Created on 06-12-2023 10:48 PM - edited 06-12-2023 10:50 PM
Hi @cirrus
If you have more questions, i will suggest to open a cloudera case we will provide all required information. After creating cloudera case, please attach application logs, event logs and if possible code as well to the case.
Created 06-14-2023 01:06 AM
Fun fact for those interested: In order to have 8 cores running you need in this example minimum 64m as xss options. If you chose 32m, then it will not give a stackoverflow error, but only 4 cores will be running 😲