Posts: 35
Registered: ‎12-13-2013

Would fully replicating dimensions avoid broadcast/shuffle in joins?

We have some dims growing past 1M records and soon will be 10M or more.  I know that as we scale out with more nodes the shuffle or broadcast speed should remain about constant, but can we also effectively have a full copy in each node?


If I do -Ddfs.replication=30 in my sqoop import (or run a dfs -setrep 30 on the tables's dir after the load) on a 30 node cluster... will joins then take place against local blocks whatever portion of the table is involved?


Would it be different whether the join is for a filter vs. a lookup? i.e.


select fact.x from fact join dim where dim.y=1




select fact.x, dim.y from fact join dim 


Much appreciated!



Posts: 35
Registered: ‎12-13-2013

Re: Would fully replicating dimensions avoid broadcast/shuffle in joins?

Tried this out myself.  It appears repfactor makes no difference, but in fact you probably wouldn't want it to!  The hash join exchange fragment exec time is trivial, most of the time is spent on the hdfs read, so it's better to have different nodes read different parts of the table and exchanging than each node read all parts of the table to save the exchange.


I tested in 8 node cluster, with a dim table of 1 file and 5 blocks.  Querying against the normal dim (repfactor 3) and a copy with repfactor 8 made no difference in query plan (or execution stats):


| hash predicates: media_dim_id =
| hosts=8 per-host-mem=1.42MB
| tuple-ids=0,1 row-size=20B cardinality=3395855
| hosts=5 per-host-mem=0B
| tuple-ids=1 row-size=8B cardinality=1351085
07:EXCHANGE [HASH(media_dim_id)]
hosts=8 per-host-mem=0B
tuple-ids=0 row-size=12B cardinality=3395855


01:SCAN HDFS [irdw_prod.media_dim_temp_testfullrep md, RANDOM]
partitions=1/1 files=1 size=550.81MB
table stats: 1350954 rows total
column stats: all
hosts=5 per-host-mem=176.00MB
tuple-ids=1 row-size=8B cardinality=1350954




EXCHANGE_NODE (id=8) (716ms)

BytesReceived: 1.7 MiB
ConvertRowBatchTime: 4ms
DeserializeRowBatchTimer: 4ms
FirstBatchArrivalWaitTime: 272ms
InactiveTotalTime: 0ns
PeakMemoryUsage: 0 B
RowsReturned: 168,869
RowsReturnedRate: 236174 per second
SendersBlockedTimer: 0ns
SendersBlockedTotalTimer(*): 0ns
TotalTime: 716ms
EXCHANGE_NODE (id=7) (42ms)

BytesReceived: 4.6 MiB
ConvertRowBatchTime: 7ms
DeserializeRowBatchTimer: 12ms
FirstBatchArrivalWaitTime: 32ms
InactiveTotalTime: 0ns
PeakMemoryUsage: 0 B
RowsReturned: 425,054
RowsReturnedRate: 15756869 per second
SendersBlockedTimer: 0ns
SendersBlockedTotalTimer(*): 0ns
TotalTime: 42ms


Averaged Fragment F01
split sizes: min: 38.81 MB, max: 128.00 MB, avg: 110.16 MB, stddev: 35.67 MB
completion times: min:395.139ms max:730.221ms mean: 604.864475.864.475000ms stddev:112.090613.90.612695ms
execution rates: min:98.23 MB/sec max:206.98 MB/sec mean:176.11 MB/sec stddev:40.51 MB/sec
num instances: 5
AverageThreadTokens: 1.40
InactiveTotalTime: 0ns
PeakMemoryUsage: 32.4 MiB
PerHostPeakMemUsage: 57.1 MiB
PrepareTime: 214ms
RowsProduced: 270,190
TotalCpuTime: 1.11s
TotalNetworkReceiveTime: 0ns
TotalNetworkSendTime: 2ms
TotalStorageWaitTime: 7ms
TotalTime: 519ms
CodeGen (508ms)

DataStreamSender (dst_id=8) (53ms)

HDFS_SCAN_NODE (id=1) (462ms)



select md.publisher_id, sum(num_actions_good) from agg_daily_activity_cam_mp_performance
inner join media_dim_temp_testfullrep md on = media_dim_id inner join campaign_dim cd on = campaign_dim_id
where cd.campaign_id = 3094 and p_campaign_id_mod = 4 group by 1 having sum(num_actions_good) > 0 order by 1