Support Questions
Find answers, ask questions, and share your expertise

One task bottleneck in reducer while during sort operation in merge join. Hive on Tez, hdp2.5

New Contributor

Dear community members,

there are two tables with about 1,5 billion records each and hence it is not possible to execute MapJoin. Hive (on Tez) does MergeJoin. it creates 1009 tasks in a reducer, 1008 of which are executed fairly fast, about 10 minutes. However, there is one last task in the reducer, which is executed in about 3 hours. It does sort operation with PipelinedSorter. We have checked the java process related to that task. It is executed on only one node and occupies only one core out of 48 ! Here is the log:

2017-04-27 10:59:49,107 [INFO] [TezChild] |tez.ReduceRecordProcessor|:

<SEL>Id =10

<Children>

<PTF>Id =11

<Children>

<SEL>Id =12

<Children>

<RS>Id =13

<Children>

<\Children>

<Parent>Id = 12 null<\Parent>

<\RS>

<\Children>

<Parent>Id = 11 null<\Parent>

<\SEL>

<\Children>

<Parent>Id = 10 null<\Parent>

<\PTF>

<\Children>

<Parent><\Parent>

<\SEL>

2017-04-27 10:59:49,107 [INFO] [TezChild] |exec.OperatorUtils|: Setting output collector: RS[13] --> Reducer 12

2017-04-27 10:59:49,107 [INFO] [TezChild] |log.PerfLogger|: </PERFLOG method=TezInitializeOperators start=1493283422145 end=1493283589107 duration=166962 from=org.apache.hadoop.hive.ql.exec.tez.RecordProcessor>

2017-04-27 10:59:49,107 [INFO] [TezChild] |tez.ReduceRecordProcessor|: Starting Output: Reducer 12

2017-04-27 10:59:49,108 [INFO] [TezChild] |impl.ExternalSorter|: Reducer 12 using: memoryMb=1503, keySerializerClass=class org.apache.hadoop.hive.ql.io.HiveKey, valueSerializerClass=org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization$TezBytesWritableSerializer@5fc4158a, comparator=org.apache.tez.runtime.library.common.comparator.TezBytesComparator@426e5588, partitioner=org.apache.tez.mapreduce.partition.MRPartitioner, serialization=org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization,org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization,org.apache.hadoop.io.serializer.WritableSerialization

2017-04-27 10:59:49,108 [INFO] [TezChild] |partition.MRPartitioner|: Using oldApi, MRpartitionerClass=org.apache.hadoop.hive.ql.io.DefaultHivePartitioner

2017-04-27 10:59:49,108 [INFO] [TezChild] |impl.PipelinedSorter|: Setting up PipelinedSorter for Reducer 12: #blocks=1, maxMemUsage=1576009728, BLOCK_SIZE=1576009728, finalMergeEnabled=true, pipelinedShuffle=false, sendEmptyPartitions=true, tez.runtime.io.sort.mb=1503, UsingHashComparator=true

2017-04-27 10:59:49,293 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: reserved.remaining()=1576009728, reserved.metasize=16777216

2017-04-27 10:59:49,297 [INFO] [TezChild] |exec.ReduceSinkOperator|: keys are [reducesinkkey0, reducesinkkey1] num distributions: 2

2017-04-27 10:59:49,298 [INFO] [TezChild] |exec.ReduceSinkOperator|: RS[13]: records written - 1

2017-04-27 10:59:49,299 [INFO] [TezChild] |exec.ReduceSinkOperator|: RS[13]: records written - 10

2017-04-27 10:59:49,309 [INFO] [TezChild] |exec.ReduceSinkOperator|: RS[13]: records written - 100

2017-04-27 10:59:49,409 [INFO] [TezChild] |exec.ReduceSinkOperator|: RS[13]: records written - 1000

2017-04-27 10:59:49,715 [INFO] [TezChild] |exec.ReduceSinkOperator|: RS[13]: records written - 10000

2017-04-27 10:59:50,400 [INFO] [TezChild] |exec.ReduceSinkOperator|: RS[13]: records written - 100000

2017-04-27 10:59:58,317 [INFO] [TezChild] |exec.ReduceSinkOperator|: RS[13]: records written - 1000000

2017-04-27 10:59:58,653 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: Span0.length = 1048573, perItem = 84

2017-04-27 10:59:58,653 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: reserved.remaining()=1470418701, reserved.metasize=16777168

2017-04-27 10:59:58,653 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: New Span1.length = 1048573, perItem = 84, counter:1048573

2017-04-27 11:00:00,247 [INFO] [Sorter {Reducer_12} #0] |impl.PipelinedSorter|: Reducer 12: done sorting span=0, length=1048573, time=1594

2017-04-27 11:00:05,885 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: Span1.length = 1048570, perItem = 84

2017-04-27 11:00:05,885 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: reserved.remaining()=1365112709, reserved.metasize=16777120

2017-04-27 11:00:05,886 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: New Span2.length = 1048570, perItem = 84, counter:2097143

2017-04-27 11:00:07,031 [INFO] [Sorter {Reducer_12} #1] |impl.PipelinedSorter|: Reducer 12: done sorting span=1, length=1048570, time=1145

... and so on till counter=1683809058 .

2017-04-27 14:18:41,837 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: reserved.remaining()=1023994123, reserved.metasize=16776976

2017-04-27 14:18:41,837 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: New Span5.length = 1048561, perItem = 115, counter:1682760500

2017-04-27 14:18:43,855 [INFO] [Sorter {Reducer_12} #0] |impl.PipelinedSorter|: Reducer 12: done sorting span=4, length=1048561, time=2018

2017-04-27 14:18:49,900 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: Span5.length = 1048558, perItem = 123

2017-04-27 14:18:49,900 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: reserved.remaining()=878208819, reserved.metasize=16776928

2017-04-27 14:18:49,900 [INFO] [TezChild] |impl.PipelinedSorter|: Reducer 12: New Span6.length = 1048558, perItem = 123, counter:1683809058

2017-04-27 14:18:51,800 [INFO] [Sorter {Reducer_12} #1] |impl.PipelinedSorter|: Reducer 12: done sorting span=5, length=1048558, time=1900

2017-04-27 14:18:59,632 [INFO] [TezChild] |log.PerfLogger|: </PERFLOG method=TezRunProcessor start=1493283422095 end=1493295539632 duration=12117537 from=org.apache.hadoop.hive.ql.exec.tez.TezProcessor>

2017-04-27 14:18:59,632 [INFO] [TezChild] |exec.ReduceSinkOperator|: RS[13]: records written - 1684736059

2017-04-27 14:18:59,632 [INFO] [TezChild] |exec.ReduceSinkOperator|: RECORDS_OUT_INTERMEDIATE_Reducer_11:1684736059,

2017-04-27 14:18:59,633 [INFO] [TezChild] |task.TezTaskRunner|: Closing task, taskAttemptId=attempt_1491408859379_2878_2_17_000000_0

2017-04-27 14:18:59,633 [INFO] [TezChild] |orderedgrouped.Shuffle|: Shutting down Shuffle for source: Reducer_10

...

Question: why this single bottleneck is created? Is there a way to distribute sort operation over multiple data nodes? If it is not possible in Hive on Tez, would it be possible to join large table in parallel in Spark?

Thank you,

Pavel

0 REPLIES 0