Member since
02-11-2014
29
Posts
6
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
33270 | 04-06-2017 06:59 AM | |
23461 | 01-02-2015 12:14 AM |
04-06-2017
06:59 AM
1 Kudo
Hi, We have now been running this batch job successfully for a week using Hive-on-Spark, configured with single core executors. As you write Sumit, the performance of the query is definitely not great due to the reduced parallelism, but it is still slightly faster than running on MapReduce. I believe that the effectiveness of this workaround strongly indicates that our problem is indeed caused by a lack of thread safety in the Avro deserializer, as described in an earlier post. Since limiting the executor cores to one is not an ideal solution, to say the least, it would be great to see a patch for this make its way into a not too distant CDH release. Cheers! \Knut
... View more
03-30-2017
05:42 AM
1 Kudo
Hi Sumit, We are issuing all queries from Oozie workflows, this being part of our batch ETL jobs. Are you too using Avro as the storage format for your tables? I am currently running a test using single threaded executors (spark.executor.cores = 1) on the same queries. If the problem is in fact a race condition in the Avro deserializer, this should hopefully avoid it by preventing concurrent access to the unprotected HashMap. We will let this version run for a few days to see if the problem reappears. I'll make sure to post an update here when we know more. Thanks! \Knut
... View more
03-29-2017
12:56 AM
1 Kudo
So, the same thing happened again, and this time I was smart enough to have a closer look at some thread dumps before killing the application. As previously, the hang seems to happen on one executor rather than in individual tasks. In this case one executor has three tasks running that never seem to make any progress. Looking at the stack for the threads called "Executor task launch worker-n" reveals the following: java.util.HashMap.getEntry(HashMap.java:465)
java.util.HashMap.containsKey(HashMap.java:449)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:57)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:189)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateRecordTypeInfo(SchemaToTypeInfo.java:225)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:199)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateArrayTypeInfo(SchemaToTypeInfo.java:248)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:201)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:189)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateRecordTypeInfo(SchemaToTypeInfo.java:225)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:199)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfoWorker(SchemaToTypeInfo.java:189)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.access$000(SchemaToTypeInfo.java:46)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:119)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo$1.makeInstance(SchemaToTypeInfo.java:114)
org.apache.hadoop.hive.serde2.avro.InstanceCache.retrieve(InstanceCache.java:64)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateTypeInfo(SchemaToTypeInfo.java:181)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateColumnTypes(SchemaToTypeInfo.java:108)
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo.generateColumnTypes(SchemaToTypeInfo.java:87)
org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.<init>(AvroObjectInspectorGenerator.java:53)
org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:126)
org.apache.hadoop.hive.ql.exec.FileSinkOperator.initializeOp(FileSinkOperator.java:337)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:66)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
org.apache.hadoop.hive.ql.exec.TableScanOperator.initializeOp(TableScanOperator.java:195)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.MapOperator.initializeOp(MapOperator.java:431)
org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.init(SparkMapRecordHandler.java:103)
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunction.call(HiveMapFunction.java:55)
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunction.call(HiveMapFunction.java:30)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:192)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:192)
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:89)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745) And if I follow this to what I believe is the most relevant version of the source code, I find a call to a static instance of 'InstanceCache' and further to a HashMap that seems to lack any sorts of thread safety: https://github.com/apache/hive/blob/branch-1.1/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java#L96 https://github.com/apache/hive/blob/branch-1.1/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java#L48 Some further Googling seems to indicate that this part of the code is indeed vulnerable to concurrency issues, and that a fix for it does not appear until Hive version 2.2.0: https://issues.apache.org/jira/browse/HIVE-16175 Now, as this line of thought is heavily based on assumptions, I might be mistaken on a number of points here. If someone would be able to fill me in on the following points, that would be super appreciated: Am I looking in the right place in the right thread dump? Is it correct that multiple tasks within an Executor run as threads inside the same JVM, i.e. have access to the same static object instances? Is the version of the code that I linked to representative of what is used in CDH 5.9.1, or have patches for this been included even though base version is Hive 1.1? Cheers! \Knut
... View more
03-23-2017
03:35 AM
1 Kudo
Hi, We are currently evaluating the possiblity of migrating some of our batch ETL jobs from oldschool Hive to Hive-on-Spark. The first candidate is a pretty straightforward step that splits incoming data into groups based on their type, using a multiple insert Hive query. The execution is triggered from an Oozie workflow. Most of the time this works out of the box without any problems. The amount of input data to be processed in each cycle is capped in order to keep resource requirements somewhat predictable, and the query normally finished within 15 minutes. Now the issue: In a few rare cases, this query never stops running. At least not within the 24 hours we have (kind of accidentally) allowed it to keep running. The cause seems to be a few seemingly ordinary Spark tasks that just stop making any visible progress. Some observations: The tasks in question are all foreachAsync calls All of the stalled tasks are running in the same executor Even after the application has been killed, the tasks are shown as RUNNING, and the associated executor is listed as Active in the Spark UI stdout and stderr of the executor contain no information, alternatively have been removed. At least he links in the UI give nothing useful The input sizes between tasks does not seem significantly scewed. I have yet to figure out how to see the input size to the running tasks though, which should be the interesting bit here. Log contents Below is a log snippet from the "application master" (Spark driver?) ...
[Stage 10:=>(62 + 4) / 66][Stage 12:=>(17 + 7) / 24][Stage 13:=>(19 + 1) / 20]
[Stage 10:=>(62 + 4) / 66][Stage 12:=>(18 + 6) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:49:50 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
[Stage 10:=>(62 + 4) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:49:53 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
[Stage 10:=>(63 + 3) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:49:56 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:49:59 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:50:02 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:50:06 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
[Stage 10:=>(64 + 2) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]17/03/21 19:50:07 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:50:10 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:50:13 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
...
17/03/21 19:51:02 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:51:06 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
[Stage 10:=>(64 + 2) / 66][Stage 12:=>(19 + 5) / 24][Stage 13:=>(19 + 1) / 20]
[Stage 10:=>(64 + 2) / 66][Stage 12:=>(20 + 4) / 24][Stage 13:=>(19 + 1) / 20]
[Stage 10:=>(65 + 1) / 66][Stage 12:=>(20 + 4) / 24][Stage 13:=>(19 + 1) / 20]
[Stage 12:=============> (20 + 4) / 24][Stage 13:===============>(19 + 1) / 20]17/03/21 19:52:15 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:53:14 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
[Stage 12:=============> (20 + 4) / 24][Stage 13:===============>(19 + 1) / 20]17/03/21 19:54:03 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]17/03/21 19:54:06 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:54:09 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:54:12 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
...
17/03/21 19:54:57 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/21 19:55:00 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
...
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]
[Stage 12:==============> (21 + 3) / 24][Stage 13:===============>(19 + 1) / 20]17/03/22 19:37:12 WARN spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
17/03/22 19:37:13 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/03/22 19:37:13 WARN client.RemoteDriver: Shutting down driver because RPC channel was closed.
17/03/22 19:37:13 WARN rpc.Rpc: Failed to send RPC, closing connection.
java.nio.channels.ClosedChannelException
17/03/22 19:37:13 WARN rpc.Rpc: Failed to send RPC, closing connection.
java.nio.channels.ClosedChannelException Triple dots, "..." inserted by me where repeated log data has been cut away. Not really sure what all those warnings are about, but to me they do not appear to be directly related to this issue. Screenshot And here are some screenshots from the Spark UI. (Sorry about the font size. Couldn't get them any bigger here) Failed stages (After application is killed) Sample tasks of a failed stage. Note tasks still running after application is killed Environment: CDH 5.9.1, Parcels CentOS 6.7 Spark 1.6.1 used as execution engine for Hive Spark 2.0.0 also installed on the cluster 22 data nodes (24-32 cores, 128 GB total RAM) 72 GB allocated to YARN containers spark.executor.cores=4 spark.executor.memory=15g spark.executor.memoryOverhead=3g spark.yarn.executor.memoryOverhead=3072 Some questions: Does this issue seem familiar to anyone else? If so, do you know what might be the root cause and/or solution? If not, are there any more sources of information that we could use to troubleshoot this. (Log files, UI pages etc.) Is the seemingly orphaned zombie executor actually still running somewhere, or is that a monitoring glitch? (Checking processes on the appointed host indicates nothing from this application is alive ) Any hints or help to let us move forward would be highly appreciated. Cheers! \Knut
... View more
Labels:
- Labels:
-
Apache Spark
01-02-2015
12:14 AM
1 Kudo
The source of this error has been found. It turned out that /etc/fstab on this node was badly configured, so that one of the disks was mounted twice as two separate data directories. Interestingly, this has not been causing any visible errors until upgrading to CDH 5.2.1. Nice that it was pointed out to us by this version though.
... View more
12-18-2014
11:53 PM
I can confirm that CM is running version 5.2.1, so that should not be an issue. When searching for critical log messages I came across these: It appears that another namenode 56559@datanode_hostname has already locked the storage directory datanode_hostname:50010:DataXceiver error processing WRITE_BLOCK operation src: /10.8.19.28:37029 dst: /10.8.19.14:50010 We have only ever had one namenode, and none on the datanode host, so I am a bit confused by what the first message really means. The second one I suspect might be a secondary failure of the first one? Regards Knut
... View more
12-18-2014
02:44 AM
Not sure if this is helpful, but here are some charts of volume failures and disk capacity the faultly node, and for a healthy one as reference. The strange thing is that all disks seem to be in use on both nodes, but the total configured capacity is different. Knut
... View more
12-17-2014
12:49 AM
Thank you for the suggestions! Host Inspector reports that everything is OK. I have checked the agent logs and also the log for the DataNode, but can not really find anything that gives any clues about this particular error. Is there a command for making the DataNode report which physical directories it is using and what it thinks their status is?
... View more
12-16-2014
05:00 AM
I just upgraded our cluster from CDH 5.0.1 to 5.2.1, using parcels and following the provided instructions. After the upgrade has finished, the health test "Data Directory Status" is critical for one of the data nodes. The reported error message is "The DataNode has 1 volume failure(s)". By running 'hdfs dfsadmin -report' I can also confirm that the available HDFS space on that node is approximately 4 TB less than on the other nodes, indicating that one of the disks is not being used. However, when checking the status of the actual disks and regular file system we can not find anything that seems wrong. All disks are mounted and seem to be working as they should. There is also an in_use.lock file in the dfs/nn directory on all of the disks. How can I get more detailed information about which volume the DataNode is complaining about, and what the issue might be? Best Regards \Knut Nordin
... View more
Labels:
- Labels:
-
HDFS
04-07-2014
04:28 AM
Thanks! Upgrading CM helped. For some reason I did not get any new packages when I first tried running yum upgrade for CM, so I figured the beta2 one was still supposed to be used. Not sure what I did there really, but at least things worked better this time. \Knut
... View more