Created on 03-23-2017 03:35 AM - edited 09-16-2022 04:19 AM
Hi,
We are currently evaluating the possiblity of migrating some of our batch ETL jobs from oldschool Hive to Hive-on-Spark. The first candidate is a pretty straightforward step that splits incoming data into groups based on their type, using a multiple insert Hive query. The execution is triggered from an Oozie workflow.
Most of the time this works out of the box without any problems. The amount of input data to be processed in each cycle is capped in order to keep resource requirements somewhat predictable, and the query normally finished within 15 minutes.
Now the issue: In a few rare cases, this query never stops running. At least not within the 24 hours we have (kind of accidentally) allowed it to keep running. The cause seems to be a few seemingly ordinary Spark tasks that just stop making any visible progress.
Some observations:
Log contents
Below is a log snippet from the "application master" (Spark driver?)
... [Stage 10:=>(62 + 4) / 66][Stage 12:=>(17 + 7) / 24][Stage 13:=>(19 + 1) / 20] [Stage 10:=>(62 + 4) / 66][Stage 12:=>(18 + 6) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:49:50 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. [Stage 10:=>(62 + 4) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:49:53 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. [Stage 10:=>(63 + 3) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:49:56 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:49:59 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:50:02 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:50:06 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. [Stage 10:=>(64 + 2) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:50:07 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:50:10 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:50:13 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. ... 17/03/21 19:51:02 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:51:06 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. [Stage 10:=>(64 + 2) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20] [Stage 10:=>(64 + 2) / 66][Stage 12:=>(20 + 4) / 24][Stage 13:=>(19 + 1) / 20] [Stage 10:=>(65 + 1) / 66][Stage 12:=>(20 + 4) / 24][Stage 13:=>(19 + 1) / 20] [Stage 12:=============> (20 + 4) / 24][Stage 13:===============>(19 + 1) / 20]17/03/21 19:52:15 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:53:14 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. [Stage 12:=============> (20 + 4) / 24][Stage 13:===============>(19 + 1) / 20]17/03/21 19:54:03 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]17/03/21 19:54:06 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:54:09 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:54:12 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. ... 17/03/21 19:54:57 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/21 19:55:00 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20] [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20] [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20] [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20] [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20] ... [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20] [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20] [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20] [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20] [Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]17/03/22 19:37:12 WARN spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0 17/03/22 19:37:13 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 17/03/22 19:37:13 WARN client.RemoteDriver: Shutting down driver because RPC channel was closed. 17/03/22 19:37:13 WARN rpc.Rpc: Failed to send RPC, closing connection. java.nio.channels.ClosedChannelException 17/03/22 19:37:13 WARN rpc.Rpc: Failed to send RPC, closing connection. java.nio.channels.ClosedChannelException
Triple dots, "..." inserted by me where repeated log data has been cut away. Not really sure what all those warnings are about, but to me they do not appear to be directly related to this issue.
Screenshot
And here are some screenshots from the Spark UI. (Sorry about the font size. Couldn't get them any bigger here)
Environment:
CDH 5.9.1, Parcels
CentOS 6.7
Spark 1.6.1 used as execution engine for Hive
Spark 2.0.0 also installed on the cluster
22 data nodes (24-32 cores, 128 GB total RAM)
72 GB allocated to YARN containers
Some questions:
Any hints or help to let us move forward would be highly appreciated.
Cheers!
\Knut
Created on 04-06-2017 06:59 AM - edited 04-06-2017 07:02 AM
Hi,
We have now been running this batch job successfully for a week using Hive-on-Spark, configured with single core executors. As you write Sumit, the performance of the query is definitely not great due to the reduced parallelism, but it is still slightly faster than running on MapReduce.
I believe that the effectiveness of this workaround strongly indicates that our problem is indeed caused by a lack of thread safety in the Avro deserializer, as described in an earlier post. Since limiting the executor cores to one is not an ideal solution, to say the least, it would be great to see a patch for this make its way into a not too distant CDH release.
Cheers!
\Knut
Created on 03-29-2017 12:56 AM - edited 03-29-2017 01:10 AM
So, the same thing happened again, and this time I was smart enough to have a closer look at some thread dumps before killing the application.
As previously, the hang seems to happen on one executor rather than in individual tasks. In this case one executor has three tasks running that never seem to make any progress. Looking at the stack for the threads called "Executor task launch worker-n" reveals the following:
java.util.HashMap.getEntry(HashMap.java:465) java.util.HashMap.containsKey(HashMap.java:449) org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:57) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:189) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114) org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateRecordTypeInfo(SchemaToTypeInfo.java:225) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:199) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114) org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateArrayTypeInfo(SchemaToTypeInfo.java:248) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:201) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114) org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:189) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114) org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateRecordTypeInfo(SchemaToTypeInfo.java:225) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:199) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114) org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:189) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114) org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateColumnTypes(SchemaToTypeInfo.java:108) org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateColumnTypes(SchemaToTypeInfo.java:87) org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.<init>(AvroObjectInspectorGenerator.java:53) org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:126) org.apache.hadoop.hive.ql.exec.FileSinkOperator.initializeOp(FileSinkOperator.java:337) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469) org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425) org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469) org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425) org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:66) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469) org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425) org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469) org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425) org.apache.hadoop.hive.ql.exec.TableScanOperator.initializeOp(TableScanOperator.java:195) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) org.apache.hadoop.hive.ql.exec.MapOperator.initializeOp(MapOperator.java:431) org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385) org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.init(SparkMapRecordHandler.java:103) org.apache.hadoop.hive.ql.exec.spark.HiveMapFunction.call(HiveMapFunction.java:55) org.apache.hadoop.hive.ql.exec.spark.HiveMapFunction.call(HiveMapFunction.java:30) org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:192) org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:192) org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) org.apache.spark.rdd.RDD.iterator(RDD.scala:270) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:89) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745)
And if I follow this to what I believe is the most relevant version of the source code, I find a call to a static instance of 'InstanceCache' and further to a HashMap that seems to lack any sorts of thread safety:
Some further Googling seems to indicate that this part of the code is indeed vulnerable to concurrency issues, and that a fix for it does not appear until Hive version 2.2.0:
https://issues.apache.org/jira/browse/HIVE-16175
Now, as this line of thought is heavily based on assumptions, I might be mistaken on a number of points here. If someone would be able to fill me in on the following points, that would be super appreciated:
Cheers!
\Knut
Created on 03-29-2017 12:10 PM - edited 03-29-2017 01:55 PM
Hi knut N,
We had attempted to connect Tableau to our Cloudera cluster and used Spark as the execution engine for Hive. We faced similar problems. The Spark job/task would never finish and to add some other observations, our Tableau queries would sometime finish quickly and sometime it would take long time and sometime would never finish. For these never ending applications that kept using resources, I was using the yarn -kill command to kill them. But, my expericence was not impressive.
On the other hand, When we switched back to default Mapreduce execution engine for hive, our queries would always/always finish in on time(like 30+ seconds), the results were returned to the tableau properly and the applicationmaster and all other slave(map/reduce) tasks use to finish successfully all the time.
Did you issue sql from shell or outside the cluster? Since, we were connecting via tableau, I suspected tableau may be part of the problem.
Thanks for bringing this up, I would be curious to know more about this issue. As, we still are planning to adopt the Hive-On-Spark for our reporting purposes, but this experiecence has led us to suspect hive-on-spark.
Thanks
Sumit
Created 03-30-2017 05:42 AM
Hi Sumit,
We are issuing all queries from Oozie workflows, this being part of our batch ETL jobs.
Are you too using Avro as the storage format for your tables?
I am currently running a test using single threaded executors (spark.executor.cores = 1) on the same queries. If the problem is in fact a race condition in the Avro deserializer, this should hopefully avoid it by preventing concurrent access to the unprotected HashMap.
We will let this version run for a few days to see if the problem reappears. I'll make sure to post an update here when we know more.
Thanks!
\Knut
Created on 03-30-2017 09:55 AM - edited 03-30-2017 09:56 AM
Thanks for updating Knut,
In our environment, we used Parquet files on which we had hive external tables and queries were issued from Tableau.
I had a question, if you set the spark.executor.cores = 1, will not the over all ETL batch job be slow? I mean will it not loose the the core concurrency power? Also, if you are using spark, you will see its effect on data sets that are often visited but not the one visit data sets?
All your updates will greatly appreciated, as another team in my work place is going to try hive-on-spark.
Thanks
Sumit
Created on 04-06-2017 06:59 AM - edited 04-06-2017 07:02 AM
Hi,
We have now been running this batch job successfully for a week using Hive-on-Spark, configured with single core executors. As you write Sumit, the performance of the query is definitely not great due to the reduced parallelism, but it is still slightly faster than running on MapReduce.
I believe that the effectiveness of this workaround strongly indicates that our problem is indeed caused by a lack of thread safety in the Avro deserializer, as described in an earlier post. Since limiting the executor cores to one is not an ideal solution, to say the least, it would be great to see a patch for this make its way into a not too distant CDH release.
Cheers!
\Knut
Created on 06-13-2018 02:45 AM - edited 06-13-2018 02:45 AM
can you please share the setting that you had set for hive on spark.
i have set following setting for running hive-on -spark but not release resource after completing the job.
spark-defaults.conf
(spark.dynamicAllocation.executorIdleTimeout -> 10).
(spark.dynamicAllocation.schedulerBacklogTimeout -> 1)
(spark.dynamicAllocation.initialExecutors -> 1)
(spark.eventLog.enabled -> true)
(spark.master -> yarn-client)
(spark.dynamicAllocation.enabled -> true).
(spark.dynamicAllocation.minExecutors -> 1)
hive-on-spark job is running succcessfully but the job is not removed by application master after successful completiion.
thanks.
Created 09-07-2018 06:24 AM
For me all spark tasks are completed successfully but the yarn application is hanged indefinitely with status as "RUNNING". And its occupying the containers and CPU VCores as well. I looked at the hive server logs and there are no errors. Even yarn applications logs does not have any error, and we are using fair scheduler with CM 5.11
I see this issue is in resolved state but no solution posted in this thread. I appreciate if there is anu fix related to this problem.
Attaching some screenshots.
/Users/ntalla/Desktop/Screen Shot 2018-09-07 at 8.16.39 AM.png
/Users/ntalla/Desktop/Screen Shot 2018-09-07 at 8.16.56 AM.png
/Users/ntalla/Desktop/Screen Shot 2018-09-07 at 8.17.06 AM.png