Support Questions

Find answers, ask questions, and share your expertise

Broadcast error in spark 3

avatar

Hi All,

I have been using spark 2.2 for long time in CDSW and recently trying to work in spark 3 in CDP. One of my queries is failing in spark 3 with an error of following 

Py4JJavaError: An error occurred while calling o96.sql.

: org.apache.spark.SparkException: Cannot broadcast the table over 512000000 rows: 1235668051 rows

              at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotBroadcastTableOverMaxTableRowsError(QueryExecutionErrors.scala:1824)

Even though this same query runs fine in spark 2.2 in CDSW. My spark session configuration is following 

# SET GENERAL SPARK PROPERTIES #
print(" Configuring General Spark Properties")
spark_session_builder = spark_session_builder.appName(name="Wrangler-Routine")
spark_session_builder = spark_session_builder.master(master="yarn")
spark_session_builder = spark_session_builder.enableHiveSupport()
spark_session_builder = spark_session_builder.config("spark.yarn.queue", "root.project")
spark_session_builder = spark_session_builder.config("spark.kryoserializer.buffer", "128m")
spark_session_builder = spark_session_builder.config("spark.kryoserializer.buffer.max", "2024m")
# SET SPARK DRIVER PROPERTIES #
print(" Configuring Spark Driver Properties")
spark_session_builder = spark_session_builder.config("spark.driver.cores", "16")
spark_session_builder = spark_session_builder.config("spark.driver.memory", "64g")
spark_session_builder = spark_session_builder.config("spark.driver.memoryOverhead", "8g")
spark_session_builder = spark_session_builder.config("spark.driver.maxResultSize", "16g")
# SET SPARK EXECUTOR PROPERTIES #
print(" Configuring Spark Executor Properties")
spark_session_builder = spark_session_builder.config("spark.executors.instances", "16")
spark_session_builder = spark_session_builder.config("spark.executor.cores", "8")
spark_session_builder = spark_session_builder.config("spark.executor.memory", "8g")
spark_session_builder = spark_session_builder.config("spark.executor.memoryOverhead", "8g")
# SET SPARK SQL PROPERTIES #
print(" Configuring Spark SQL Properties")
spark_session_builder = spark_session_builder.config("spark.sql.crossJoin.enabled", "true")
spark_session_builder = spark_session_builder.config("spark.sql.autoBroadcastJoinThreshold", "-1")
spark_session_builder = spark_session_builder.config("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1")
# INSTANTIATE SPARK SESSION #
print("Instantiating Spark Session")
spark_session = spark_session_builder.getOrCreate()

spark_session.sql("""my sql here""")

what am I missing here?!

1 REPLY 1

avatar
Rising Star

1. Check if you have any Hints (Broadcast) set at the query level . 
2. try increasing spark.sql.shuffle.partitions
3. you can set SQL HINTS such as MERGE to use sort merge join , instead of broadcast*