<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Performance issue with simple reproducible case in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Performance-issue-with-simple-reproducible-case/m-p/372590#M241298</link>
    <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/105372"&gt;@cirrus&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You can find the following optimize code.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;/tmp/test_pyspark.py&lt;/STRONG&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession
from datetime import datetime
import math

spark = SparkSession.builder \
    .appName('Test App') \
    .getOrCreate()

num_rows = 2350000
num_columns = 2500
records_per_file=5000
num_partitions = int(math.ceil(num_rows/records_per_file))

data = spark.range(num_rows).repartition(num_partitions)
print("Number of Partitions: " + str(data.rdd.getNumPartitions()))

start_time = datetime.now()

data = data.select(*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
#data = data.select("*",*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])

end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference to select the columns is "+ str(delta.total_seconds()) +" seconds")

start_time = datetime.now()
data.write.format("parquet").mode("overwrite").save("/tmp/test")
end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference for writing the data to HDFS is "+ str(delta.total_seconds()) +" seconds")

spark.stop()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Spark-submit command:&lt;/STRONG&gt;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=16G \
--conf spark.driver.memoryOverhead=1g \
--conf spark.executor.memory=16G \
--conf spark.executor.memoryOverhead=1g \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.4 \
--conf spark.executor.cores=5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.driver.extraJavaOptions="-Xss1024m" \
--conf spark.executor.extraJavaOptions="-Xss1024m" /tmp/test_pyspark.py&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Wed, 14 Jun 2023 08:25:05 GMT</pubDate>
    <dc:creator>RangaReddy</dc:creator>
    <dc:date>2023-06-14T08:25:05Z</dc:date>
    <item>
      <title>Performance issue with simple reproducible case</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Performance-issue-with-simple-reproducible-case/m-p/372548#M241273</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;This is a reproducible, simple issue where the performance is surprisingly bad. It is a follow-up to the case &lt;A href="https://community.cloudera.com/t5/Support-Questions/OOM-issues-when-writing-into-parquet/td-p/372375" target="_self"&gt;under this link&lt;/A&gt;, where initially a stoackoverflow issue occurred.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;The script below ran for 26 hours in over 8 cores at full calculation as is seen in the hardware statistics.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Of course the object is "quite large"- but in similar operations with such an object it does not take so long. The generated size was 42.5GB in the 8 parquet files on HDFS.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is the code:&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;BR /&gt;from pyspark.sql.functions import rand&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;ss = SparkSession.builder.appName("test_replication") \&lt;BR /&gt;.config("spark.kryoserializer.buffer.max.mb", "2047") \&lt;BR /&gt;.config('spark.sql.execution.arrow.pyspark.enabled', "true") \&lt;BR /&gt;.config("spark.driver.maxResultSize", "16G") \&lt;BR /&gt;.config("spark.driver.memory", "4G") \&lt;BR /&gt;.config("spark.executor.memory", "16G") \&lt;BR /&gt;.config("spark.dynamicAllocation.maxExecutors","8") \&lt;BR /&gt;.config("spark.executor.instances", "2") \&lt;BR /&gt;.config("spark.executor.cores", "4") \&lt;BR /&gt;.config("spark.dynamicAllocation.enabled", "true") \&lt;BR /&gt;.config("spark.driver.extraJavaOptions", "-Xss1024m") \&lt;BR /&gt;.config("spark.executor.extraJavaOptions", "-Xss1024m") \&lt;BR /&gt;.config("spark.yarn.tags","dev") \&lt;BR /&gt;.getOrCreate()&lt;/P&gt;&lt;P&gt;rows=2350000&lt;BR /&gt;cols=2500&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;hdfs_dir="/destination/on/hdfs"&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;data = ss.range(rows)&lt;BR /&gt;for i in range(cols):&lt;BR /&gt;data=data.withColumn(f'col{i}', rand() * 2 -1)&lt;/P&gt;&lt;P&gt;data.write.format("parquet").mode("overwrite").save(f"{hdfs_dir}/test.parquet")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Am I doing something wrong?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Edit: I see in the log of the applicationHistory the following element that surprises me - is this normal?&lt;/P&gt;&lt;P&gt;"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},&amp;nbsp;&lt;/P&gt;&lt;P&gt;Edit2: Is this due to no cache() or persist() in place?&lt;/P&gt;</description>
      <pubDate>Tue, 13 Jun 2023 13:19:43 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Performance-issue-with-simple-reproducible-case/m-p/372548#M241273</guid>
      <dc:creator>cirrus</dc:creator>
      <dc:date>2023-06-13T13:19:43Z</dc:date>
    </item>
    <item>
      <title>Re: Performance issue with simple reproducible case</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Performance-issue-with-simple-reproducible-case/m-p/372582#M241292</link>
      <description>&lt;P&gt;I tried to include a cache(), but it still takes that long:&lt;/P&gt;&lt;P&gt;data = ss.range(rows).cache()&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I also reduced the following to 32 thinking that I overdid it now maybe with java stack size, but still the same effect&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;.config("spark.driver.extraJavaOptions", "-Xss32m") \&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;.config("spark.executor.extraJavaOptions", "-Xss32m") \&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/78612"&gt;@RangaReddy&lt;/a&gt;&amp;nbsp;do you have an idea what I am doing wrong?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Edit: I see in the logs the following - is cached actually working or does it show it only at the beginning?&lt;/P&gt;&lt;P&gt;"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},&lt;/P&gt;</description>
      <pubDate>Wed, 14 Jun 2023 06:37:13 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Performance-issue-with-simple-reproducible-case/m-p/372582#M241292</guid>
      <dc:creator>cirrus</dc:creator>
      <dc:date>2023-06-14T06:37:13Z</dc:date>
    </item>
    <item>
      <title>Re: Performance issue with simple reproducible case</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Performance-issue-with-simple-reproducible-case/m-p/372590#M241298</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/105372"&gt;@cirrus&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;You can find the following optimize code.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;/tmp/test_pyspark.py&lt;/STRONG&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession
from datetime import datetime
import math

spark = SparkSession.builder \
    .appName('Test App') \
    .getOrCreate()

num_rows = 2350000
num_columns = 2500
records_per_file=5000
num_partitions = int(math.ceil(num_rows/records_per_file))

data = spark.range(num_rows).repartition(num_partitions)
print("Number of Partitions: " + str(data.rdd.getNumPartitions()))

start_time = datetime.now()

data = data.select(*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
#data = data.select("*",*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])

end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference to select the columns is "+ str(delta.total_seconds()) +" seconds")

start_time = datetime.now()
data.write.format("parquet").mode("overwrite").save("/tmp/test")
end_time = datetime.now()
delta = end_time - start_time

# time difference in seconds
print("Time difference for writing the data to HDFS is "+ str(delta.total_seconds()) +" seconds")

spark.stop()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Spark-submit command:&lt;/STRONG&gt;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=16G \
--conf spark.driver.memoryOverhead=1g \
--conf spark.executor.memory=16G \
--conf spark.executor.memoryOverhead=1g \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.4 \
--conf spark.executor.cores=5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.driver.extraJavaOptions="-Xss1024m" \
--conf spark.executor.extraJavaOptions="-Xss1024m" /tmp/test_pyspark.py&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 14 Jun 2023 08:25:05 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Performance-issue-with-simple-reproducible-case/m-p/372590#M241298</guid>
      <dc:creator>RangaReddy</dc:creator>
      <dc:date>2023-06-14T08:25:05Z</dc:date>
    </item>
  </channel>
</rss>

