Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Performance issue with simple reproducible case

avatar
Explorer

Hi,

This is a reproducible, simple issue where the performance is surprisingly bad. It is a follow-up to the case under this link, where initially a stoackoverflow issue occurred.

 

The script below ran for 26 hours in over 8 cores at full calculation as is seen in the hardware statistics. 

 

Of course the object is "quite large"- but in similar operations with such an object it does not take so long. The generated size was 42.5GB in the 8 parquet files on HDFS. 

 

Here is the code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand


ss = SparkSession.builder.appName("test_replication") \
.config("spark.kryoserializer.buffer.max.mb", "2047") \
.config('spark.sql.execution.arrow.pyspark.enabled', "true") \
.config("spark.driver.maxResultSize", "16G") \
.config("spark.driver.memory", "4G") \
.config("spark.executor.memory", "16G") \
.config("spark.dynamicAllocation.maxExecutors","8") \
.config("spark.executor.instances", "2") \
.config("spark.executor.cores", "4") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.driver.extraJavaOptions", "-Xss1024m") \
.config("spark.executor.extraJavaOptions", "-Xss1024m") \
.config("spark.yarn.tags","dev") \
.getOrCreate()

rows=2350000
cols=2500

hdfs_dir="/destination/on/hdfs"


data = ss.range(rows)
for i in range(cols):
data=data.withColumn(f'col{i}', rand() * 2 -1)

data.write.format("parquet").mode("overwrite").save(f"{hdfs_dir}/test.parquet")

 

Am I doing something wrong?

 

Edit: I see in the log of the applicationHistory the following element that surprises me - is this normal?

"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1}, 

Edit2: Is this due to no cache() or persist() in place?

1 ACCEPTED SOLUTION

avatar
Master Collaborator

Hi @cirrus 

 

You can find the following optimize code. 

 

/tmp/test_pyspark.py

from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession
from datetime import datetime
import math

spark = SparkSession.builder \
    .appName('Test App') \
    .getOrCreate()

num_rows = 2350000
num_columns = 2500
records_per_file=5000
num_partitions = int(math.ceil(num_rows/records_per_file))

data = spark.range(num_rows).repartition(num_partitions)
print("Number of Partitions: " + str(data.rdd.getNumPartitions()))

start_time = datetime.now()

data = data.select(*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
#data = data.select("*",*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])

end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference to select the columns is "+ str(delta.total_seconds()) +" seconds")

start_time = datetime.now()
data.write.format("parquet").mode("overwrite").save("/tmp/test")
end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference for writing the data to HDFS is "+ str(delta.total_seconds()) +" seconds")

spark.stop()

 

Spark-submit command:

spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=16G \
--conf spark.driver.memoryOverhead=1g \
--conf spark.executor.memory=16G \
--conf spark.executor.memoryOverhead=1g \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.4 \
--conf spark.executor.cores=5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.driver.extraJavaOptions="-Xss1024m" \
--conf spark.executor.extraJavaOptions="-Xss1024m" /tmp/test_pyspark.py

 

View solution in original post

2 REPLIES 2

avatar
Explorer

I tried to include a cache(), but it still takes that long:

data = ss.range(rows).cache()

 

I also reduced the following to 32 thinking that I overdid it now maybe with java stack size, but still the same effect

.config("spark.driver.extraJavaOptions", "-Xss32m") \
.config("spark.executor.extraJavaOptions", "-Xss32m") \

 

@RangaReddy do you have an idea what I am doing wrong?

 

Edit: I see in the logs the following - is cached actually working or does it show it only at the beginning?

"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},

avatar
Master Collaborator

Hi @cirrus 

 

You can find the following optimize code. 

 

/tmp/test_pyspark.py

from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession
from datetime import datetime
import math

spark = SparkSession.builder \
    .appName('Test App') \
    .getOrCreate()

num_rows = 2350000
num_columns = 2500
records_per_file=5000
num_partitions = int(math.ceil(num_rows/records_per_file))

data = spark.range(num_rows).repartition(num_partitions)
print("Number of Partitions: " + str(data.rdd.getNumPartitions()))

start_time = datetime.now()

data = data.select(*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
#data = data.select("*",*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])

end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference to select the columns is "+ str(delta.total_seconds()) +" seconds")

start_time = datetime.now()
data.write.format("parquet").mode("overwrite").save("/tmp/test")
end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference for writing the data to HDFS is "+ str(delta.total_seconds()) +" seconds")

spark.stop()

 

Spark-submit command:

spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=16G \
--conf spark.driver.memoryOverhead=1g \
--conf spark.executor.memory=16G \
--conf spark.executor.memoryOverhead=1g \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.4 \
--conf spark.executor.cores=5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.driver.extraJavaOptions="-Xss1024m" \
--conf spark.executor.extraJavaOptions="-Xss1024m" /tmp/test_pyspark.py