Created on 04-26-2016 07:31 PM - edited 09-16-2022 03:15 AM
Hi:
i use three node with 64 memory to run join operation. But the the memory of one node is exceed no matter i set the mem_limit properties with the values of -1B,60G,50G or 40G. While other nodes only use 10G+ memory. Here is the part of profiles:
Query (id=6043a6bbde2f997a:3b3072b5dcfd1d89): Summary: Session ID: 4141715fae32f3b5:5d20c4f989cea7b8 Session Type: BEESWAX Start Time: 2016-04-26 18:11:31.958572000 End Time: 2016-04-27 04:19:46.109358000 Query Type: DML Query State: EXCEPTION Query Status: Memory limit exceeded Cannot perform hash join at node with id 2. Repartitioning did not reduce the size of a spilled partition. Repartitioning level 8. Number of rows 2052516353. Impala Version: impalad version 2.3.0-cdh5.5.0 RELEASE (build 0c891d79aa38f297d244855a32f1e17280e2129b) User: root Connected User: root Delegated User: Network Address: ::ffff:192.168.55.247:40383 Default Db: default Sql Statement: insert into table result select parquet_bigdatabench_dw_order_300g.buyer_id,sum(parquet_bigdatabench_dw_item_300g.goods_amount) as total from parquet_bigdatabench_dw_order_300g join [shuffle] parquet_bigdatabench_dw_item_300g on parquet_bigdatabench_dw_item_300g.order_id = parquet_bigdatabench_dw_order_300g.order_id group by parquet_bigdatabench_dw_order_300g.buyer_id limit 10 Coordinator: bigdata3:22000 Plan: ---------------- Estimated Per-Host Requirements: Memory=39.27GB VCores=2 F04:PLAN FRAGMENT [UNPARTITIONED] WRITE TO HDFS [default.result, OVERWRITE=false] | partitions=1 | hosts=1 per-host-mem=unavailable | 08:EXCHANGE [UNPARTITIONED] limit: 10 hosts=3 per-host-mem=unavailable tuple-ids=2 row-size=12B cardinality=10 F03:PLAN FRAGMENT [HASH(parquet_bigdatabench_dw_order_300g.buyer_id)] DATASTREAM SINK [FRAGMENT=F04, EXCHANGE=08, UNPARTITIONED] 07:AGGREGATE [FINALIZE] | output: sum:merge(parquet_bigdatabench_dw_item_300g.goods_amount) | group by: parquet_bigdatabench_dw_order_300g.buyer_id | limit: 10 | hosts=3 per-host-mem=9.21GB | tuple-ids=2 row-size=12B cardinality=2247426048 | 06:EXCHANGE [HASH(parquet_bigdatabench_dw_order_300g.buyer_id)] hosts=3 per-host-mem=0B tuple-ids=2 row-size=12B cardinality=2247426048 F02:PLAN FRAGMENT [HASH(parquet_bigdatabench_dw_item_300g.order_id)] DATASTREAM SINK [FRAGMENT=F03, EXCHANGE=06, HASH(parquet_bigdatabench_dw_order_300g.buyer_id)] 03:AGGREGATE | output: sum(parquet_bigdatabench_dw_item_300g.goods_amount) | group by: parquet_bigdatabench_dw_order_300g.buyer_id | hosts=3 per-host-mem=27.63GB | tuple-ids=2 row-size=12B cardinality=2247426048 | 02:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: parquet_bigdatabench_dw_item_300g.order_id = parquet_bigdatabench_dw_order_300g.order_id | hosts=3 per-host-mem=11.47GB | tuple-ids=1,0 row-size=20B cardinality=4103971316 | |--05:EXCHANGE [HASH(parquet_bigdatabench_dw_order_300g.order_id)] | hosts=3 per-host-mem=0B | tuple-ids=0 row-size=8B cardinality=4200000000 | 04:EXCHANGE [HASH(parquet_bigdatabench_dw_item_300g.order_id)] hosts=3 per-host-mem=0B tuple-ids=1 row-size=12B cardinality=4200000000 F01:PLAN FRAGMENT [RANDOM] DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=05, HASH(parquet_bigdatabench_dw_order_300g.order_id)] 00:SCAN HDFS [default.parquet_bigdatabench_dw_order_300g, RANDOM] partitions=1/1 files=87 size=21.15GB table stats: 4200000000 rows total column stats: all hosts=3 per-host-mem=176.00MB tuple-ids=0 row-size=8B cardinality=4200000000 F00:PLAN FRAGMENT [RANDOM] DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, HASH(parquet_bigdatabench_dw_item_300g.order_id)] 01:SCAN HDFS [default.parquet_bigdatabench_dw_item_300g, RANDOM] partitions=1/1 files=258 size=63.82GB table stats: 4200000000 rows total column stats: all hosts=3 per-host-mem=176.00MB tuple-ids=1 row-size=12B cardinality=4200000000 ---------------- Estimated Per-Host Mem: 42170573209 Estimated Per-Host VCores: 2 Admission result: Admitted immediately Request Pool: root.root ExecSummary: Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail --------------------------------------------------------------------------------------------------------------------------- 08:EXCHANGE 1 10h8m 10h8m 0 10 0 -1.00 B UNPARTITIONED 07:AGGREGATE 3 135.686ms 407.60ms 0 2.25B 155.03 MB 9.21 GB FINALIZE 06:EXCHANGE 3 15.975ms 24.767ms 1.20M 2.25B 0 0 HASH(parquet_bigdatabench_d... 03:AGGREGATE 3 887.849ms 1s340ms 1.20M 2.25B 155.02 MB 27.63 GB 02:HASH JOIN 3 3h19m 9h53m 1.50M 4.10B 31.46 GB 11.47 GB INNER JOIN, PARTITIONED |--05:EXCHANGE 3 1m2s 2m5s 4.20B 4.20B 0 0 HASH(parquet_bigdatabench_d... | 00:SCAN HDFS 3 12s695ms 16s494ms 4.20B 4.20B 485.76 MB 176.00 MB default.parquet_bigdatabenc... 04:EXCHANGE 3 59s722ms 2m59s 4.20B 4.20B 0 0 HASH(parquet_bigdatabench_d... 01:SCAN HDFS 3 14s341ms 19s831ms 4.20B 4.20B 205.20 MB 176.00 MB default.parquet_bigdatabenc...
Created 04-27-2016 04:00 PM
It looks like you have extreme skew in the key that you're joining on (~2 billion duplicates). The error message is:
"Cannot perform hash join at node with id 2. Repartitioning did not reduce the size of a spilled partition. Repartitioning level 8. Number of rows 2052516353."
In order for the hash join to join on a key all the values for that key on the right side of the join need to be able to fit in memory. Impala has several ways to avoid this problem, but it looks like you defeated them all.
First, it tries to put the smaller input on the right side of the join, but both your inputs are the same size, so that doesn't help.
Second, it will try to spill some of the rows to disk and process a subset of those.
Third, it will try to repeatedly split ("repartition") the right-side input based on the join key to try and get it small enough to fit in memory. Based on the error message, it tried to do that 8 times but still has 2 billion rows in one of the partitions, which probably means there are 2 billion rows with the same key.
Created 04-27-2016 04:00 PM
It looks like you have extreme skew in the key that you're joining on (~2 billion duplicates). The error message is:
"Cannot perform hash join at node with id 2. Repartitioning did not reduce the size of a spilled partition. Repartitioning level 8. Number of rows 2052516353."
In order for the hash join to join on a key all the values for that key on the right side of the join need to be able to fit in memory. Impala has several ways to avoid this problem, but it looks like you defeated them all.
First, it tries to put the smaller input on the right side of the join, but both your inputs are the same size, so that doesn't help.
Second, it will try to spill some of the rows to disk and process a subset of those.
Third, it will try to repeatedly split ("repartition") the right-side input based on the join key to try and get it small enough to fit in memory. Based on the error message, it tried to do that 8 times but still has 2 billion rows in one of the partitions, which probably means there are 2 billion rows with the same key.
Created on 04-28-2016 02:43 AM - edited 04-28-2016 03:14 AM
Tim,thank you for reply, yes ,I see i really have the data skew.But how can i to avoid it? By the way, how can i get the details of the partitioned join?
Created 04-28-2016 08:48 AM
It looks like it's some kind of benchmark data? Is it a publicly available benchmark. It seems strange that they would have such skewed data.
I think even if it did stay in memory it would run for an incredibly long time: if 2 billion keys in one table get matches to 2 billion keys in another table then you will get 10^18 output rows (over a quintillion). So I think either there is something strange with the benchmark data or it doesn't make sense to join the tables.
We don't have the exact join algorithm documented aside from in the Impala code. It's a version of the Hybrid hash join https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join
Created 04-28-2016 08:50 AM
The algorithm is described in the Impala source code here if you (or anyone else reading) is interested:
https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/exec/partitioned-hash-join-node.h
Created 04-28-2016 10:23 PM
tim, thank you very much. I think i should try another benchmark data
Created 04-29-2016 02:02 AM
Created 05-02-2016 09:21 PM
We often use TPC-H and TPC-DS, they're pretty standard for analytical databases. There's a TPC-DS kit for Impala here: https://github.com/cloudera/impala-tpcds-kit
Created 11-09-2016 08:10 PM
@lizhenmxcz @Tim Armstrong OP how did you get that query summary? I would like to be able to analyze my own queries like that.
Created 11-16-2016 03:13 PM
If you're using impala-shell, you can use the "summary;" command. Otherwise it's accessible through the Impala debug web pages (typically http://the-impala-server:25000)