Member since
06-09-2018
12
Posts
0
Kudos Received
0
Solutions
10-12-2018
02:51 AM
We did fix it. The driver memory settings weren't configured as we thought. We use client mode, and thought that spark.yarn.am.memory would set the driver max heap. Turns out, it wasn't. The driver heap was at default values. Our app's driver doesn't use much memory, but it uses more than 384mb 😕 Only figured it out by looking at the Executor page in the spark UI, which shows you the driver/executor memory max values in effect. So now we set spark.driver.memory and spark.yarn.am.memory.
... View more
06-21-2018
04:24 PM
Hi @Sandeep Nemuri. I'm running client mode, so I believe I've followed his instructions correctly. For the driver and executor in client mode, @Felix Albani suggested the following: --driver-java-options "-Dlog4j.configuration=file:/path/to/log4j/log4j.properties" --conf "spark.executor.extraJavaOptions='-Dlog4j.configuration=log4j.properties'" Aside from the --files instruction. I can confirm in the logs that log4j.properties does get uploaded.
... View more
06-20-2018
09:11 PM
Unfortunately this hasn't resolved the issue. We are still getting huge logs. is the "file:/..." necessary in the driver java options? --conf spark.executor.extraJavaOptions='-Dlog4j.configuration=config/log4j.properties' --driver-java-options -Dlog4j.configuration=config/log4j.properties --files config/log4j.properties
... View more
06-18-2018
01:39 PM
Our hortonworks cluster is down at the moment. Once it's up and I can test that this works, I will 🙂
... View more
06-17-2018
04:37 AM
You know, I had added the log4j.properties to --files but I don't think I had added it to both driver and executors JavaOptions at the same time. I'll give that a shot. Thanks.
... View more
06-15-2018
07:15 PM
I've searched this forum and elsewhere, and there seem to be plenty of ways to do this but none seem to have worked for me. We recently started using Spark 2.2/Yarn at work and have been having mixed success with it. One troublesome thing is the incredible verbosity at the INFO level, which is where our driver's logs generally are. This kind of thing: 18/06/15 15:05:03 INFO TaskSetManager: Starting task 39.0 in stage 0.0 (TID 39, company02.mycomp.server.com, executor 1, partition 39, PROCESS_LOCAL, 5311 bytes) 18/06/15 15:05:12 INFO ServerChannelGroup: Connection to /49.70.7.85:48820 accepted at Fri Jun 15 15:05:12 EDT 2018. 18/06/15 15:05:21 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on company02.mycomp.server.com:43587 (size: 60.6 KB, free: 4.1 GB) I've tried to use this log4j.properties file and pass it to both the driver and the executor, but nothing seems to work: # Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.com.mycompany=INFO
log4j.logger.org.http4s=INFO
log4j.logger.io.javalin=INFO
log4j.logger.org.spark_project=WARN
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=WARN
log4j.logger.parquet=WARN
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
... View more
Labels:
- Labels:
-
Apache Spark
-
Apache YARN
06-15-2018
01:30 AM
We're using Spark at work to do some batch jobs, but now that we're loading up with a larger set of data, Spark is throwing java.lang.OutOfMemory errors. We're running with Yarn as a resource manager, but in client mode.
- Driver memory = 64gb
- Driver cores = 8
- Executors = 8
- Executor memory = 20gb
- Executor cores = 5
- Deploy mode = client
The driver first makes many network queries to other services to get data, which it writes in chunks of compressed parquet (using Spark) to many temporary sub-directories of the main batch directory. In aggregate we're talking about 100,000,000 case classes with ~40 properties written to compressed parquet using Spark -> dataframe -> parquet. This part works fine.
Once that work is done, we have two sets of data: people and animals. We need to join the animals on the id of the people they belong to. But first, we clean up a bunch of animal date columns to make sure they look right (birthdays, vaccination dates, etc.).
The amount of memory housed in each animal or people sub-directory is not massive. It's actually quite small - maybe 100kb per directory and ~800 directories. Again, it's snappy-compressed parquet.
Previously, the final size of the joined directory's parquet was about 130 **mega**bytes. I think with our new data we'll have about double that, but I just don't understand how we hit memory issues during this process.
I would greatly appreciate any wisdom anyone has to offer.
Here are some methods used in this joining process: def fromHdfs(path) {
// Construct Seq of sub-directories with data...
val dataframes = dirs.map(dir => spark.read.format("parquet").option("header", "true").load(s"$root/$dir"))
// Concatenate each dataframe into one resulting df
dataframes.reduce(_ union _)
} private def convertDates(df: DataFrame, dateCols: Seq[String]): DataFrame = {
// Clean up dates from a pre-determined list of 'dateCol' strings df.columns.intersect(dateCols).foldLeft(df)((newDf, col) =>
newDf.withColumn(col, unix_timestamp(df(col).cast("string"), "yyyyMMdd")
.cast("timestamp")
.cast("date")
))} // Join the people and animals dataframes on 'id'
def peopleWithAnimals(people: DataFrame, animals: DataFrame)(implicit spark: SparkSession): DataFrame = {
// The only collect in here, just to get the column names to foldLeft over
val cols = animals.select("animal").distinct.select("animal").rdd.map(r => r(0)).collect().map(_.toString)
val animalsReshaped = cols.foldLeft(animals) { (newDf, colName) =>
newDf.withColumn(colName, when($"animal" === colName, animals("value")).otherwise(null))
}
val peopleNoDups = people.dropDuplicates()
val animalsNoDups = animalsReshaped.dropDuplicates()
convertDates(peopleNoDups.join(animalsNoDups, "id"), dateCols)
}
The above methods are used in the final method, the entirety of which is as follows:
def writeJoined(....) = {
// Read all of the animal and people data subdirectories into dataframes
val animals = Read.animalsFromHdfs(sourcePath)
val people = Read.ownersFromHdfs(sourcePath)
// Join the dataframes on id
val joined = Join.peopleWithAnimals(animals, people)
// Write the result to HDFS.
joined.write.option("header", "true").parquet(destinationPath)
}
Our application now gets as far as creating the temporary directory that the joined data will be written to, but runs out of memory somewhere in the process after that. 18/06/14 20:37:39 INFO scheduler.TaskSetManager: Finished task 1196.0 in stage 1320.0 (TID 61383) in 1388 ms on machine-name (executor 8) (1242/24967)
JVMDUMP039I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" at 2018/06/14 20:38:39 - please wait.
JVMDUMP032I JVM requested System dump using '/appfolder/core.20180614.203839.27070.0001.dmp' in response to an event java.lang.OutOfMemoryErrorException in thread "Spark Context Cleaner" Exception in thread "DataStreamer for file /spark2-history/application_1527888960577_0084.inprogress block BP-864704807-49.70.7.82-1525204078667:blk_1073896985_156181" Exception in thread "SparkListenerBus" Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Exception in thread "RequestSenderProxy-ts" Java heap space
java.lang.OutOfMemoryError: Exception in thread "task-result-getter-0"
Exception in thread "task-result-getter-3" Exception in thread "heartbeat-receiver-event-loop-thread"
Exception in thread "LeaseRenewer:valrs_dev_usr@companyhost:companyport" java/lang/OutOfMemoryError: Java heap space
... View more
Labels:
- Labels:
-
Apache Spark
-
Apache YARN
06-09-2018
07:06 PM
I'll give these a shot! I was reading about shading. Does it somehow change all the references in the dependency code that imports the newer protobuf? I can't change the code in the other API's my company has, but if shading changes the name of the import and all the underlying (ie not in my project) references to it, I guess that'll work.
... View more
06-09-2018
05:58 PM
In my Scala work project, I use spark-submit to launch my application into a yarn cluster. I am quite new to Maven projects and pom.xml, but the problem I seem to be having is that hadoop's spark2 jars use an older version of google protobuf (2.5.0) than the internal dependencies I'm importing at work (2.6.1).
The error is here: java.lang.NoSuchMethodError:
com/google/protobuf/LazyStringList.getUnmodifiableView()Lcom/google/protobuf/LazyStringList;
(loaded from file:/usr/hdp/2.6.4.0-91/spark2/jars/protobuf-java-2.5.0.jar
by sun.misc.Launcher$AppClassLoader@8b6f2bf7)
called from class protobuf.com.mycompany.group.otherproject.api.JobProto$Query
Since I'm not quite sure how to approach dependency issues like this, and I can't change the code of the internal dependency that uses 2.6.1, I added the required protobuf version as a dependency to my project, as well: <dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.6.1</version>
</dependency>
Unfortunately, this hasn't resolved the issue. When the internal dependency (which does import 2.6.1 on its own) tries to use its proto, the conflict occurs. Any suggestions on how I could force the usage of the newer, correct version would be greatly appreciated.
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Spark