Cloudera Labs
Provide feedback on Cloudera Labs
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark Streaming Job submit through Envelope end with OutOfMemory Exception

Spark Streaming Job submit through Envelope end with OutOfMemory Exception

New Contributor

We are using envelope to develop Spark Streaming Job. While after long running, the job will end with an driver OutOfMemory Exception. From the heap dump file, we can see most of the memory was occupied by the following instance: org$apache$spark$sql$execution$CacheManager$$cachedData which was like the cache of SQL operations. Could anyone help me solve this problem? Thank you so much.
172599_after_gc.png

10 REPLIES 10
Highlighted

Re: Spark Streaming Job submit through Envelope end with OutOfMemory Exception

Master Collaborator

You're simply running out of memory. A large portion of memory is dedicated to caching data in Spark, of course, and that explains why a lot of memory has cached data. That's not necessarily the issue here. You may be retaining state on lots of jobs in the driver and that's eating memory in the driver (wasn't clear whether that's the heap you're showing). You can just incease memory, or look for ways to reduce memory usage.

Re: Spark Streaming Job submit through Envelope end with OutOfMemory Exception

New Contributor
Hello Srowen, thank you for your reply. I had already tried to increase the driver memory, from 4g to 7g. However, the increased driver memory only extend the job's time to OOM. After discussing this issue with Cloudera Support, the engineer thought the problem may be caused by some design issue of Envelope and suggest me to seek help on this board.

Re: Spark Streaming Job submit through Envelope end with OutOfMemory Exception

Rising Star

For Envelope specifically, it does eagerly cache data, so if you have a streaming job with many steps then that might eventually cause this problem.

 

You can stop a step from being cached by adding "cache = false", e.g.:

 

steps {
  ...
  step_name_here {
    dependencies = ...
    cache = false
    ...
  }
  ...
}

Likely in the next version we will change the default to not cache a step unless it is configured to do so.

 

- Jeremy

 

Re: Spark Streaming Job submit through Envelope end with OutOfMemory Exception

Master Collaborator

Caching is OK in that Spark won't use more than it's allowed for caching, and you can turn that fraction down, if your app is heavily using memory for other things. Have a look at spark.memory.fraction or spark.memory.storageFraction. However that is only the issue if you're running out of memory on executors.

 

If you're running out of driver memory try retaining a lot fewer job history details. Turn spark.ui.retained{Jobs,Stages,Tasks} way down to reduce that memory consumption.

 

But the answer may simply be that you need more memory. I don't see evidence that 7G is necessarily enough, depending on what you are doing.

Re: Spark Streaming Job submit through Envelope end with OutOfMemory Exception

New Contributor
Thanks, srowen. The job is simply consuming from kafka, then output to kudu when meeting specific filter condition.
I think the suggestion of turn spark.ui.retained{Jobs,Stages, Tasks} will be help. I will try it later.

Re: Spark Streaming Job submit through Envelope end with OutOfMemory Exception

New Contributor
Hello Jeremy,
Thank you for your suggestion. I will try it now. If there are any progress, i will post it here.

Re: Spark Streaming Job submit through Envelope end with OutOfMemory Exception

New Contributor
Hello Jeremy, One more thing, after adding 'cache = false' on the step, how can i confirm this parameter has taken effect? Thank you for your help.

Re: Spark Streaming Job submit through Envelope end with OutOfMemory Exception

Rising Star

If you put that on every step then you shouldn't see any entries in the Storage tab of the Spark UI for the job.

Re: Spark Streaming Job submit through Envelope end with OutOfMemory Exception

New Contributor
Thank you, Jeremy. After setting "cache = false" on each step, i can still see RDD entries on the Storage tab of the Spark UI for the job. I set the parameter like the following way: steps { step1_input { cache = false ..... } step2_load { dependencies = [step1_input] cache = fasle ...... } } Is that correct?