Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark can't join dataframes without using over a hundred GB of ram and going OOM?

avatar
Explorer

We're using Spark at work to do some batch jobs, but now that we're loading up with a larger set of data, Spark is throwing java.lang.OutOfMemory errors. We're running with Yarn as a resource manager, but in client mode.

- Driver memory = 64gb

- Driver cores = 8

- Executors = 8

- Executor memory = 20gb

- Executor cores = 5

- Deploy mode = client

The driver first makes many network queries to other services to get data, which it writes in chunks of compressed parquet (using Spark) to many temporary sub-directories of the main batch directory. In aggregate we're talking about 100,000,000 case classes with ~40 properties written to compressed parquet using Spark -> dataframe -> parquet. This part works fine.

Once that work is done, we have two sets of data: people and animals. We need to join the animals on the id of the people they belong to. But first, we clean up a bunch of animal date columns to make sure they look right (birthdays, vaccination dates, etc.).

The amount of memory housed in each animal or people sub-directory is not massive. It's actually quite small - maybe 100kb per directory and ~800 directories. Again, it's snappy-compressed parquet.

Previously, the final size of the joined directory's parquet was about 130 **mega**bytes. I think with our new data we'll have about double that, but I just don't understand how we hit memory issues during this process.

I would greatly appreciate any wisdom anyone has to offer.

Here are some methods used in this joining process:

def fromHdfs(path) { 
  // Construct Seq of sub-directories with data... 
  val dataframes = dirs.map(dir => spark.read.format("parquet").option("header", "true").load(s"$root/$dir"))
  // Concatenate each dataframe into one resulting df
  dataframes.reduce(_ union _)
} 	    
private def convertDates(df: DataFrame, dateCols: Seq[String]): DataFrame = { 
// Clean up dates from a pre-determined list of 'dateCol' strings    df.columns.intersect(dateCols).foldLeft(df)((newDf, col) => 
newDf.withColumn(col, unix_timestamp(df(col).cast("string"), "yyyyMMdd")
              .cast("timestamp")
              .cast("date")
            ))} 
// Join the people and animals dataframes on 'id' 
def peopleWithAnimals(people: DataFrame, animals: DataFrame)(implicit spark: SparkSession): DataFrame = { 
  // The only collect in here, just to get the column names to foldLeft over
  val cols = animals.select("animal").distinct.select("animal").rdd.map(r => r(0)).collect().map(_.toString)
  val animalsReshaped = cols.foldLeft(animals) { (newDf, colName) =>
     newDf.withColumn(colName, when($"animal" === colName, animals("value")).otherwise(null))
  } 
  val peopleNoDups = people.dropDuplicates() 
  val animalsNoDups = animalsReshaped.dropDuplicates() 
  convertDates(peopleNoDups.join(animalsNoDups, "id"), dateCols) 
} 

The above methods are used in the final method, the entirety of which is as follows:

def writeJoined(....) = { 
  // Read all of the animal and people data subdirectories into dataframes 
  val animals = Read.animalsFromHdfs(sourcePath) 
  val people = Read.ownersFromHdfs(sourcePath) 
  // Join the dataframes on id 
  val joined = Join.peopleWithAnimals(animals, people) 
  // Write the result to HDFS.
  joined.write.option("header", "true").parquet(destinationPath) 
}

Our application now gets as far as creating the temporary directory that the joined data will be written to, but runs out of memory somewhere in the process after that.

    18/06/14 20:37:39 INFO scheduler.TaskSetManager: Finished task 1196.0 in stage 1320.0 (TID 61383) in 1388 ms on machine-name (executor 😎 (1242/24967)
    JVMDUMP039I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" at 2018/06/14 20:38:39 - please wait.
    JVMDUMP032I JVM requested System dump using '/appfolder/core.20180614.203839.27070.0001.dmp' in response to an event 
	java.lang.OutOfMemoryErrorException in thread "Spark Context Cleaner" Exception in thread "DataStreamer for file /spark2-history/application_1527888960577_0084.inprogress block BP-864704807-49.70.7.82-1525204078667:blk_1073896985_156181" Exception in thread "SparkListenerBus" Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Exception in thread "RequestSenderProxy-ts" Java heap space
    java.lang.OutOfMemoryError: Exception in thread "task-result-getter-0"
     Exception in thread "task-result-getter-3" Exception in thread "heartbeat-receiver-event-loop-thread" 
    Exception in thread "LeaseRenewer:valrs_dev_usr@companyhost:companyport" java/lang/OutOfMemoryError: Java heap space
2 REPLIES 2

avatar
New Contributor

@Robert Cornell
We have a similar issue with joining, even after bucketing and presorting of both tables it still throws to us this kind of behavior(in theory it simply should zip both dataframes without any additional operations).


Only one difference is we use Spark SQL and outer join instead of RDD, but the symptoms look quite similar. Did you manage to fix it?

avatar
Explorer

We did fix it.

The driver memory settings weren't configured as we thought.

We use client mode, and thought that spark.yarn.am.memory would set the driver max heap. Turns out, it wasn't. The driver heap was at default values. Our app's driver doesn't use much memory, but it uses more than 384mb 😕 Only figured it out by looking at the Executor page in the spark UI, which shows you the driver/executor memory max values in effect.

So now we set spark.driver.memory and spark.yarn.am.memory.