Support Questions

Find answers, ask questions, and share your expertise

Performance issue with simple reproducible case

avatar
Explorer

Hi,

This is a reproducible, simple issue where the performance is surprisingly bad. It is a follow-up to the case under this link, where initially a stoackoverflow issue occurred.

 

The script below ran for 26 hours in over 8 cores at full calculation as is seen in the hardware statistics. 

 

Of course the object is "quite large"- but in similar operations with such an object it does not take so long. The generated size was 42.5GB in the 8 parquet files on HDFS. 

 

Here is the code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand


ss = SparkSession.builder.appName("test_replication") \
.config("spark.kryoserializer.buffer.max.mb", "2047") \
.config('spark.sql.execution.arrow.pyspark.enabled', "true") \
.config("spark.driver.maxResultSize", "16G") \
.config("spark.driver.memory", "4G") \
.config("spark.executor.memory", "16G") \
.config("spark.dynamicAllocation.maxExecutors","8") \
.config("spark.executor.instances", "2") \
.config("spark.executor.cores", "4") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.driver.extraJavaOptions", "-Xss1024m") \
.config("spark.executor.extraJavaOptions", "-Xss1024m") \
.config("spark.yarn.tags","dev") \
.getOrCreate()

rows=2350000
cols=2500

hdfs_dir="/destination/on/hdfs"


data = ss.range(rows)
for i in range(cols):
data=data.withColumn(f'col{i}', rand() * 2 -1)

data.write.format("parquet").mode("overwrite").save(f"{hdfs_dir}/test.parquet")

 

Am I doing something wrong?

 

Edit: I see in the log of the applicationHistory the following element that surprises me - is this normal?

"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1}, 

Edit2: Is this due to no cache() or persist() in place?

1 ACCEPTED SOLUTION

avatar
Master Collaborator

Hi @cirrus 

 

You can find the following optimize code. 

 

/tmp/test_pyspark.py

from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession
from datetime import datetime
import math

spark = SparkSession.builder \
    .appName('Test App') \
    .getOrCreate()

num_rows = 2350000
num_columns = 2500
records_per_file=5000
num_partitions = int(math.ceil(num_rows/records_per_file))

data = spark.range(num_rows).repartition(num_partitions)
print("Number of Partitions: " + str(data.rdd.getNumPartitions()))

start_time = datetime.now()

data = data.select(*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
#data = data.select("*",*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])

end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference to select the columns is "+ str(delta.total_seconds()) +" seconds")

start_time = datetime.now()
data.write.format("parquet").mode("overwrite").save("/tmp/test")
end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference for writing the data to HDFS is "+ str(delta.total_seconds()) +" seconds")

spark.stop()

 

Spark-submit command:

spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=16G \
--conf spark.driver.memoryOverhead=1g \
--conf spark.executor.memory=16G \
--conf spark.executor.memoryOverhead=1g \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.4 \
--conf spark.executor.cores=5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.driver.extraJavaOptions="-Xss1024m" \
--conf spark.executor.extraJavaOptions="-Xss1024m" /tmp/test_pyspark.py

 

View solution in original post

2 REPLIES 2

avatar
Explorer

I tried to include a cache(), but it still takes that long:

data = ss.range(rows).cache()

 

I also reduced the following to 32 thinking that I overdid it now maybe with java stack size, but still the same effect

.config("spark.driver.extraJavaOptions", "-Xss32m") \
.config("spark.executor.extraJavaOptions", "-Xss32m") \

 

@RangaReddy do you have an idea what I am doing wrong?

 

Edit: I see in the logs the following - is cached actually working or does it show it only at the beginning?

"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},

avatar
Master Collaborator

Hi @cirrus 

 

You can find the following optimize code. 

 

/tmp/test_pyspark.py

from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession
from datetime import datetime
import math

spark = SparkSession.builder \
    .appName('Test App') \
    .getOrCreate()

num_rows = 2350000
num_columns = 2500
records_per_file=5000
num_partitions = int(math.ceil(num_rows/records_per_file))

data = spark.range(num_rows).repartition(num_partitions)
print("Number of Partitions: " + str(data.rdd.getNumPartitions()))

start_time = datetime.now()

data = data.select(*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
#data = data.select("*",*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])

end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference to select the columns is "+ str(delta.total_seconds()) +" seconds")

start_time = datetime.now()
data.write.format("parquet").mode("overwrite").save("/tmp/test")
end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference for writing the data to HDFS is "+ str(delta.total_seconds()) +" seconds")

spark.stop()

 

Spark-submit command:

spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=16G \
--conf spark.driver.memoryOverhead=1g \
--conf spark.executor.memory=16G \
--conf spark.executor.memoryOverhead=1g \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.4 \
--conf spark.executor.cores=5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.driver.extraJavaOptions="-Xss1024m" \
--conf spark.executor.extraJavaOptions="-Xss1024m" /tmp/test_pyspark.py