Reply
Highlighted
Contributor
Posts: 29
Registered: ‎02-11-2014
Accepted Solution

Hive-on-Spark tasks never finish

[ Edited ]

Hi,

 

We are currently evaluating the possiblity of migrating some of our batch ETL jobs from oldschool Hive to Hive-on-Spark. The first candidate is a pretty straightforward step that splits incoming data into groups based on their type, using a multiple insert Hive query. The execution is triggered from an Oozie workflow.

 

Most of the time this works out of the box without any problems. The amount of input data to be processed in each cycle is capped in order to keep resource requirements somewhat predictable, and the query normally finished within 15 minutes.

 

Now the issue: In a few rare cases, this query never stops running. At least not within the 24 hours we have (kind of accidentally) allowed it to keep running. The cause seems to be a few seemingly ordinary Spark tasks that just stop making any visible progress.

 

Some observations:

  • The tasks in question are all foreachAsync calls
  • All of the stalled tasks are running in the same executor
  • Even after the application has been killed, the tasks are shown as RUNNING, and the associated executor is listed as Active in the Spark UI
  • stdout and stderr of the executor contain no information, alternatively have been removed. At least he links in the UI give nothing useful
  • The input sizes between tasks does not seem significantly scewed. I have yet to figure out how to see the input size to the running tasks though, which should be the interesting bit here.

Log contents

Below is a log snippet from the "application master" (Spark driver?)

 

 

...
[Stage 10:=>(62 + 4) / 66][Stage 12:=>(17 + 7) / 24][Stage 13:=>(19 + 1) / 20]
[Stage 10:=>(62 + 4) / 66][Stage 12:=>(18 + 6) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:49:50 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.

[Stage 10:=>(62 + 4) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:49:53 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.

[Stage 10:=>(63 + 3) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:49:56 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:49:59 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:50:02 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:50:06 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.

[Stage 10:=>(64 + 2) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:50:07 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:50:10 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:50:13 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
...
17/03/21 19:51:02 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:51:06 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.

[Stage 10:=>(64 + 2) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]
[Stage 10:=>(64 + 2) / 66][Stage 12:=>(20 + 4) / 24][Stage 13:=>(19 + 1) / 20]
[Stage 10:=>(65 + 1) / 66][Stage 12:=>(20 + 4) / 24][Stage 13:=>(19 + 1) / 20]
[Stage 12:=============>  (20 + 4) / 24][Stage 13:===============>(19 + 1) / 20]17/03/21 19:52:15 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:53:14 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.

[Stage 12:=============>  (20 + 4) / 24][Stage 13:===============>(19 + 1) / 20]17/03/21 19:54:03 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.

[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]17/03/21 19:54:06 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:54:09 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:54:12 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
...
17/03/21 19:54:57 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:55:00 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.

[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
...
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]17/03/22 19:37:12 WARN spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
17/03/22 19:37:13 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/22 19:37:13 WARN client.RemoteDriver: Shutting down driver because RPC channel was closed.
17/03/22 19:37:13 WARN rpc.Rpc: Failed to send RPC, closing connection.
java.nio.channels.ClosedChannelException
17/03/22 19:37:13 WARN rpc.Rpc: Failed to send RPC, closing connection.
java.nio.channels.ClosedChannelException

 

Triple dots, "..." inserted by me where repeated log data has been cut away. Not really sure what all those warnings are about, but to me they do not appear to be directly related to this issue.

 

 

Screenshot

And here are some screenshots from the Spark UI. (Sorry about the font size. Couldn't get them any bigger here)

 

Failed stages (After application is killed)Failed stages (After application is killed)Sample tasks of a failed stage. Note tasks still running after application is killedSample tasks of a failed stage. Note tasks still running after application is killed

Environment:

CDH 5.9.1, Parcels

CentOS 6.7

Spark 1.6.1 used as execution engine for Hive

Spark 2.0.0 also installed on the cluster

22 data nodes (24-32 cores, 128 GB total RAM)

72 GB allocated to YARN containers

spark.executor.cores=4
spark.executor.memory=15g
spark.executor.memoryOverhead=3g
spark.yarn.executor.memoryOverhead=3072

 

Some questions:

  • Does this issue seem familiar to anyone else? If so, do you know what might be the root cause and/or solution?
  • If not, are there any more sources of information that we could use to troubleshoot this. (Log files, UI pages etc.)
  • Is the seemingly orphaned zombie executor actually still running somewhere, or is that a monitoring glitch? (Checking processes on the appointed host indicates nothing from this application is alive )

Any hints or help to let us move forward would be highly appreciated.

 

Cheers!

\Knut

Contributor
Posts: 29
Registered: ‎02-11-2014

Re: Hive-on-Spark tasks never finish

[ Edited ]

So, the same thing happened again, and this time I was smart enough to have a closer look at some thread dumps before killing the application.

 

As previously, the hang seems to happen on one executor rather than in individual tasks. In this case one executor has three tasks running that never seem to make any progress. Looking at the stack for the threads called "Executor task launch worker-n" reveals the following:

 

java.util.HashMap.getEntry(HashMap.java:465)
java.util.HashMap.containsKey(HashMap.java:449)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:57)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:189)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateRecordTypeInfo(SchemaToTypeInfo.java:225)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:199)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateArrayTypeInfo(SchemaToTypeInfo.java:248)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:201)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:189)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateRecordTypeInfo(SchemaToTypeInfo.java:225)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:199)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:189)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateColumnTypes(SchemaToTypeInfo.java:108)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateColumnTypes(SchemaToTypeInfo.java:87)
org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.<init>(AvroObjectInspectorGenerator.java:53)
org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:126)
org.apache.hadoop.hive.ql.exec.FileSinkOperator.initializeOp(FileSinkOperator.java:337)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:66)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
org.apache.hadoop.hive.ql.exec.TableScanOperator.initializeOp(TableScanOperator.java:195)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.MapOperator.initializeOp(MapOperator.java:431)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.init(SparkMapRecordHandler.java:103)
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunction.call(HiveMapFunction.java:55)
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunction.call(HiveMapFunction.java:30)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:192)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:192)
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:89)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

And if I follow this to what I believe is the most relevant version of the source code, I find a call to a static instance of 'InstanceCache' and further to a HashMap that seems to lack any sorts of thread safety:

 

https://github.com/apache/hive/blob/branch-1.1/serde/src/java/org/apache/hadoop/hive/serde2/avro/Sch...

https://github.com/apache/hive/blob/branch-1.1/serde/src/java/org/apache/hadoop/hive/serde2/avro/Ins...

 

Some further Googling seems to indicate that this part of the code is indeed vulnerable to concurrency issues, and that a fix for it does not appear until Hive version 2.2.0:

 

https://issues.apache.org/jira/browse/HIVE-16175

 

Now, as this line of thought is heavily based on assumptions, I might be mistaken on a number of points here. If someone would be able to fill me in on the following points, that would be super appreciated:

 

  • Am I looking in the right place in the right thread dump?
  • Is it correct that multiple tasks within an Executor run as threads inside the same JVM, i.e. have access to the same static object instances?
  • Is the version of the code that I linked to representative of what is used in CDH 5.9.1, or have patches for this been included even though base version is Hive 1.1?

Cheers!

\Knut

 

New Contributor
Posts: 3
Registered: ‎10-22-2015

Re: Hive-on-Spark tasks never finish

[ Edited ]

Hi knut N,

We had attempted to connect Tableau to our Cloudera cluster and used Spark as the execution engine for Hive. We faced similar problems. The Spark job/task would never finish and to add some other observations, our Tableau queries would sometime finish quickly and sometime it would take long time and sometime would never finish. For these never ending applications that kept using resources, I was using the yarn -kill command to kill them. But, my expericence was not impressive.

 

On the other hand, When we switched back to default Mapreduce execution engine for hive, our queries would always/always finish in on time(like 30+ seconds), the results were returned to the tableau properly and the applicationmaster and all other slave(map/reduce) tasks use to finish successfully all the time.

 

Did you issue sql from shell or outside the cluster? Since, we were connecting via tableau, I suspected tableau may be part of the problem.

 

Thanks for bringing this up, I would be curious to know more about this issue. As, we still are planning to adopt the Hive-On-Spark for our reporting purposes, but this experiecence has led us to suspect hive-on-spark.

 

Thanks

Sumit

Contributor
Posts: 29
Registered: ‎02-11-2014

Re: Hive-on-Spark tasks never finish

Hi Sumit,

 

We are issuing all queries from Oozie workflows, this being part of our batch ETL jobs.

 

Are you too using Avro as the storage format for your tables?

 

I am currently running a test using single threaded executors (spark.executor.cores = 1) on the same queries. If the problem is in fact a race condition in the Avro deserializer, this should hopefully avoid it by preventing concurrent access to the unprotected HashMap.

 

We will let this version run for a few days to see if the problem reappears. I'll make sure to post an update here when we know more.

 

Thanks!

\Knut

New Contributor
Posts: 3
Registered: ‎10-22-2015

Re: Hive-on-Spark tasks never finish

[ Edited ]

Thanks for updating Knut,

In our environment, we used Parquet files on which we had hive external tables and queries were issued from Tableau. 

 

I had a question, if you set the spark.executor.cores = 1, will not the over all ETL batch job be slow? I mean will it not loose the the core concurrency power? Also, if you are using spark, you will see its effect on data sets that are often visited but not the one visit data sets?

 

All your updates will greatly appreciated, as another team in my work place is going to try hive-on-spark. 

 

Thanks

Sumit

 

Contributor
Posts: 29
Registered: ‎02-11-2014

Re: Hive-on-Spark tasks never finish

[ Edited ]

Hi,

 

We have now been running this batch job successfully for a week using Hive-on-Spark, configured with single core executors. As you write Sumit, the performance of the query is definitely not great due to the reduced parallelism, but it is still slightly faster than running on MapReduce.

 

I believe that the effectiveness of this workaround strongly indicates that our problem is indeed caused by a lack of thread safety in the Avro deserializer, as described in an earlier post. Since limiting the executor cores to one is not an ideal solution, to say the least, it would be great to see a patch for this make its way into a not too distant CDH release.

 

Cheers!

\Knut

Announcements