Support Questions

Find answers, ask questions, and share your expertise

Spark 2.2 Broadcast Join fails with huge dataset

avatar

Hello everyone,

I am currently facing issues when trying to join (inner) a huge dataset (654 GB) with a smaller one (535 MB) using Spark DataFrame API.

I am broadcasting the smaller dataset to the worker nodes using the broadcast() function.


I am unable to do the join between those two datasets. Here is a sample of the errors I got :


19/04/26 19:39:07 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1315
19/04/26 19:39:07 INFO executor.Executor: Running task 25.1 in stage 13.0 (TID 1315)
19/04/26 19:39:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/26 19:39:07 INFO datasources.SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
19/04/26 19:39:07 INFO datasources.FileScanRDD: Reading File path: SOMEFILEPATH, range: 3087007744-3221225472, partition values: [empty row]
19/04/26 19:39:17 INFO datasources.FileScanRDD: Reading File path: SOMEFILEPATH, range: 15971909632-16106127360, partition values: [empty row]
19/04/26 19:39:24 WARN hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  for block isi_hdfs_pool:blk_4549851005_134218728
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:197)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
    at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2280)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:244)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:733)
19/04/26 19:39:27 ERROR util.Utils: Aborting task
com.univocity.parsers.common.TextWritingException: Error writing row.
Internal state when error was thrown: recordCount=458089, recordData=["SOMEDATA"]
    at com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:916)
    at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:706)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.write(UnivocityGenerator.scala:82)
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.write(CSVFileFormat.scala:139)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:327)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Error closing the output.
    at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:861)
    at com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:903)
    at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:811)
    at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:704)
    ... 15 more
Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage[10.241.209.34:585,null,DISK] are bad. Aborting...
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401)
19/04/26 19:39:27 WARN util.Utils: Suppressing exception in catch: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "SOMENODEHOST"; destination host is: "SOMEDESTINATIONHOST":SOMEPORT; 
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "SOMENODEHOST"; destination host is: "SOMEDESTINATIONHOST":SOMEPORT; 
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy17.delete(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy18.delete(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
    at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
    at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortTask(FileOutputCommitter.java:568)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortTask(FileOutputCommitter.java:557)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortTask(HadoopMapReduceCommitProtocol.scala:159)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:266)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1384)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:197)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
    at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
    at java.io.FilterInputStream.read(FilterInputStream.java:133)
    at java.io.FilterInputStream.read(FilterInputStream.java:133)
    at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:520)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1084)
    at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)


Before joining the large dataset with the smaller one, I tried joining 10 000 records of the first one with the entire smaller one (535 MB). I had a "Futures timed out [300 s] error".


I then increased the spark.sql.broadcastTimeout variable to 3600 s. It worked fine. But when I try joining it with the entire dataset (654 GB), it gives me the error you can see up (TextWriting Exception).


My questions are :

- How can I monitor more efficiently my spark jobs ? And how should I proceed ?

- What do you think is causing this error to happen ? How can I solve it ?


You will find below some information on the cluster, the execution and the configuration of the spark job.


Some information/context :

I am working on a production environment (see the cluster configuration below). I cannot upgrade my spark version. I do not have spark UI or yarn UI to monitor my jobs. All I can retrieve are the yarn logs.


Spark Version : 2.2


Cluster configuration:

21 compute nodes (workers)

8 cores each

64 GB RAM per node


Current Spark configuration :

master: yarn

executor-memory: 42G

executor-cores: 5

driver memory: 42G

num-executors: 28

spark.sql.broadcastTimeout=3600

spark.kryoserializer.buffer.max=512

spark.yarn.executor.memoryOverhead=2400

spark.driver.maxResultSize=500m

spark.memory.storageFraction=0.9

spark.memory.fraction=0.9

spark.hadoop.fs.permissions.umask-mode=007


How is the job executed:


We build an artifact (jar) with IntelliJ and then send it to a server. Then a bash script is executed . This script :


- export some environment variables (SPARK_HOME, HADOOP_CONF_DIR, PATH and SPARK_LOCAL_DIRS)

- launch the spark-submit command with all the parameters defined in the spark configuration above.

- retrieves the yarn logs of the application







1 REPLY 1

avatar

Problem solved.

Here is the post where it was solved with more details (code, execution plan) :

https://stackoverflow.com/questions/55919699/spark-2-2-join-fails-with-huge-dataset