Support Questions

Find answers, ask questions, and share your expertise

Keep getting "ConnectionRefused" or "OOM" errors when working with PySpark in Cloudera Machine Learning

avatar
Frequent Visitor

Hi all,

I'm setting up a CML session with 64GB of RAM and 4 CPUs, then I set up a PySpark session with these configurations

spark = SparkSession.builder \
.appName("OptimizedSparkSession") \
.master("local[*]") \
.config("spark.sql.execution.arrow.pyspark.enabled", "false") \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.executor.memory", "32g") \
.config('spark.executor.instance', '10') \
.config("spark.executor.cores", "4") \
.config("spark.driver.memory", "32g") \
.config("spark.network.timeout", "800s") \
.config("spark.executor.heartbeatInterval", "200s") \
.config("spark.driver.maxResultSize", "64g") \
.getOrCreate()

The transformations I do are quite simple. Basically loading two tables, one with entities of interest and another containing the additional features and other entities, so I join the second one to the first to keep only the records that are needed.

table1 = spark.sql(f"""SELECT DISTINCT key1, key2
FROM db1.table1""")


table2 = spark.sql(f"""SELECT key1, key2, col1, col2 ,col3 ,col4 ,col5 ,col6,col7
FROM db2.table2
WHERE (col1 LIKE 'A%' AND key2 > '2019-01-01')""")


table2 = table2.select(F.col('key1'),col
F.col('key2'),
F.col('col1').cast('double'),
F.col('col2').cast('double'),
F.col('col3').cast('date'),
F.col('col4'),
F.col('col5').cast('double'),
F.col('col6').cast('double'),
F.col('col7').cast('double')
)

table1_w_table2 = table1.join(table2, ((table1.key1 == table2.key1) & (table1.key2 == table2.key2)))

df = table1_w_table2.toPandas()

The first table is not so large only around 5 million rows with two columns, amounting to around 0.5GB. However, the second table is huge just the filtered table countains over 7 billion of rows.

What boggles me is what causes the error - the resources needed for the PySpark operation during the join and wrong configurations of PySpark and the CML session or just the sheer size of the table not fitting within the working memory of the session.

Here is an example error I have received:

Hive Session ID = b31990b9-eff8-4494-a3fa-5d6414b4dd3d
ERROR:root:Exception while sending command.age 6:> (0 + 0) / 1]
Traceback (most recent call last):
File "/home/cdsw/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 480, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/cdsw/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/cdsw/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 503, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
File ~/.local/lib/python3.10/site-packages/pyspark/sql/dataframe.py:693, in DataFrame.collect(self)
692 with SCCallSiteSync(self._sc) as css:
--> 693 sock_info = self._jdf.collectToPython()
694 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))

File ~/.local/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:

File ~/.local/lib/python3.10/site-packages/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:

File ~/.local/lib/python3.10/site-packages/py4j/protocol.py:334, in get_return_value(answer, gateway_client, target_id, name)
333 else:
--> 334 raise Py4JError(
335 "An error occurred while calling {0}{1}{2}".
336 format(target_id, ".", name))
337 else:

Py4JError: An error occurred while calling o168.collectToPython

During handling of the above exception, another exception occurred:

ConnectionRefusedError Traceback (most recent call last)
Cell In[1], line 93
76 securities_w_csdb = securities.join(csdb, ((csdb.externalcode_isin == securities.isin) & (csdb.correction_date == securities.lipper_date)))
77 securities_w_csdb = securities_w_csdb.select(
78 F.col('isin'),
79 F.col('originalmaturity'),
(...)
90 F.col('idirdebttype'),
91 F.col('idirclassificationcode_nace') )
---> 93 df = securities_w_csdb.toPandas()

File ~/.local/lib/python3.10/site-packages/pyspark/sql/pandas/conversion.py:157, in PandasConversionMixin.toPandas(self)
154 raise
156 # Below is toPandas without Arrow optimization.
--> 157 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
158 column_counter = Counter(self.columns)
160 dtype = [None] * len(self.schema)

File ~/.local/lib/python3.10/site-packages/pyspark/sql/dataframe.py:692, in DataFrame.collect(self)
682 def collect(self):
683 """Returns all the records as a list of :class:`Row`.
684
685 .. versionadded:: 1.3.0
(...)
690 [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
691 """
--> 692 with SCCallSiteSync(self._sc) as css:
693 sock_info = self._jdf.collectToPython()
694 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))

File ~/.local/lib/python3.10/site-packages/pyspark/traceback_utils.py:78, in SCCallSiteSync.__exit__(self, type, value, tb)
76 SCCallSiteSync._spark_stack_depth -= 1
77 if SCCallSiteSync._spark_stack_depth == 0:
---> 78 self._context._jsc.setCallSite(None)

File ~/.local/lib/python3.10/site-packages/py4j/java_gateway.py:1320, in JavaMember.__call__(self, *args)
1313 args_command, temp_args = self._build_args(*args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
-> 1320 answer = self.gateway_client.send_command(command)
1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:

File ~/.local/lib/python3.10/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
1015 def send_command(self, command, retry=True, binary=False):
1016 """Sends a command to the JVM. This method is not intended to be
1017 called directly by Py4J users. It is usually called by
1018 :class:`JavaMember` instances.
(...)
1034 if `binary` is `True`.
1035 """
-> 1036 connection = self._get_connection()
1037 try:
1038 response = connection.send_command(command)

File ~/.local/lib/python3.10/site-packages/py4j/clientserver.py:281, in JavaClient._get_connection(self)
278 pass
280 if connection is None or connection.socket is None:
--> 281 connection = self._create_new_connection()
282 return connection

File ~/.local/lib/python3.10/site-packages/py4j/clientserver.py:288, in JavaClient._create_new_connection(self)
284 def _create_new_connection(self):
285 connection = ClientServerConnection(
286 self.java_parameters, self.python_parameters,
287 self.gateway_property, self)
--> 288 connection.connect_to_java_server()
289 self.set_thread_connection(connection)
290 return connection

File ~/.local/lib/python3.10/site-packages/py4j/clientserver.py:402, in ClientServerConnection.connect_to_java_server(self)
399 if self.ssl_context:
400 self.socket = self.ssl_context.wrap_socket(
401 self.socket, server_hostname=self.java_address)
--> 402 self.socket.connect((self.java_address, self.java_port))
403 self.stream = self.socket.makefile("rb")
404 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/cdsw/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 480, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/cdsw/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/cdsw/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 503, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving

 

3 REPLIES 3

avatar
Community Manager

@euklidas Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our Spark experts @Bharati @jagadeesan  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Senior Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar

@euklidasIt looks like memory is not enough for the join operation, especially when you've got billions of rows. Because it's loading all that data into one driver process causes these errors. Instead of toPandas(), write the output to a file or limit data collection this ensures the driver is not overwhelmed. 

avatar
Frequent Visitor

Writing the output back into the database results in the same error. However, I checked the physical and logical plan of these operations and noticed that Spark does a "relation" operation for the second table that is over 900GB, reading all the columns within it instead of choosing the subset that is in the query. Thus, I translated the whole code into SQL and returned the table in dataframe format perfectly...

Perhaps you have an idea why doesn't Spark push down the filtering and column prunning operations?