Support Questions

Find answers, ask questions, and share your expertise

KryoSerializer and toPandas():What if kryoserializer.buffer.max is already at maximum 2047m?

avatar
Explorer

Hi,

I am hitting a kryoserializer buffer issue with the following simple line in PySpark (Spark version 2.4):

df=ss.read.parquet(data_dir).limit(how_many).toPandas()

 

Thus I am reading a partitioned parquet file in, limit it to 800k rows (still huge as it has 2500 columns) and try to convert toPandas. 

Kryoserializer.buffer.max is already at the maximum possible value:

.config("spark.kryoserializer.buffer.max", "2047m") 

 

What other ways are there to make it run (except of reducing the amount of rows even further down)? While in stage1 he is making many steps (approx 157), in stage2 he has only one step - and thus tries to juggle with a very large object. 

 

The following in an excerpt of the error message:

 

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 158, localhost, executor driver): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 78352248. To avoid this, increase spark.kryoserializer.buffer.max value.
	at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:331)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:461)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 78352248
	at com.esotericsoftware.kryo.io.Output.require(Output.java:167)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)
	at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:332)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:328)

 

 

Edit1: After reducing howmany to 400k (from 800k as shown above) it also gives the exact same error (see below). This confuses me a bit: Why is the required amount the same? 

Buffer overflow. Available: 0, required: 78352248

 

Edit2: What also confuses me: There are three stages in spark for the above. The first one is clear: A very quick "parquet at NativeMethodAccessorImpl.java:0". In the third stage it crashes (and there is only one task there, so that it cannot be calculated in partitions or so). But my confusion is in the second stage: It results in "Input Size / Records" of 14.7 GB / 1922984 and in "Shuffle Write Size / Records" of 25.4 GB / 1922984. In other words it calculated here all of the rows of the file (approx 1.9 mio rows). The limit() function had no effect (?).

1 ACCEPTED SOLUTION

avatar
Rising Star

spark.kryoserializer.buffer.max limit is fixed to 2GB . It cannot be extended. 
You can try to repartition() the dataframe in the spark code. 

View solution in original post

3 REPLIES 3

avatar
Explorer

Does anyone have an idea? Otherwise this is a block for working with larger datasets.

avatar
Rising Star

spark.kryoserializer.buffer.max limit is fixed to 2GB . It cannot be extended. 
You can try to repartition() the dataframe in the spark code. 

avatar
Explorer

Thank you @haridjh ! It worked! I am even further confused because the underlying parquet file is already partitioned. But when inserting a repartition() the code works!