New Contributor
Posts: 5
Registered: ‎08-23-2017

Spark executor hangs on binaryFiles read

[ Edited ]

We use Spark 2.1.0 on Yarn for batch elaboration of multiline records.
Our job is written in Pyspark and runs once every day. The input folder contains ~45000 very small files (the range is 1kB-100kB each file), for a total of ~2GB.

Every file contains a different number of multiline record. The first line of a record has a standard pattern, a timestamp followed by a greek µ and some other infos. For example:

28/09/2018 08:54:22µfirst record metadata
first record content with
number of
28/09/2018 08:57:12µsecond record metadata
second record content
with a different
number of lines

This is how we read files in our Dataframe:

df=spark.sparkContext.binaryFiles(input_path).toDF(['filename', 'content'])
raw ='filename', explode(split(df.content, r'(?=\d{2}\/\d{2}\/\d{4} \d{2}:\d{2}:\d{2}µ)'))).cache()

The first line's output is a dataframe with one entry for every file, the second line's output is a dataframe with one entry for every record. Dataframe is then cached and other operations are performed.

We are actually testing the solution and this is the current deploy mode for the job (memory requirements, however, are oversized):

spark2-submit --master yarn \
  --conf spark.kryoserializer.buffer.max=1g \
  --deploy-mode cluster \
  --driver-memory 16g \
  --driver-cores 1 \
  --conf spark.yarn.driver.memoryOverhead=1g \
  --num-executors 20 \
  --executor-memory 16g \
  --executor-cores 1 \
  --conf spark.yarn.executor.memoryOverhead=1g \

The job runs fine almost every day and it performs all its operations in 10-15 minutes, writing results to HDFS.

The problem is, once every 7-10 days one of the ~45000 input files has a completely different size compared to the others: 100MB to 1GB (less than 2GB, anyway). In this case, our job (in particular, one of the executors) hangs and seems to be doing nothing the entire time. There are no new log lines after the first minutes. It takes hours and we never saw the end of these job, because we have to kill them before some hours. We suspect this is because of the "big" file, in fact the job runs fine if we remove it from the input folder.
These are screenshots taken from our last run:


Pyspark documentation notes "Small files are preferred, large file is also allowable, but may cause bad performance.". We can accept a performance worsening, but we think this is not the case, because it seems to us that the job is simply doing nothing during the whole time.

Is a 200MB file really a "large file" in Spark point of view? If yes, how can we improve performances of our job, or at least understand if it is actually doing something?


Thank you,