Created on 06-29-2016 07:35 PM
Some business users deeply analyze their data profile, especially skewness across partitions.There are many other tuning parameters to optimize inserts like tez parallelism, manually changing reduce tasks (not recommended), setting reduce tasks etc.This article focuses on insert query tuning to give more control over handling partitions with no need to tweak any of these properties.
Consider a scenario where we are inserting 3 million records across 200 partitions. The file format is ORC and it is further bucketed and sorted for quicker retrieval during select. Apart from writing, bucketing/sorting is going to add additional work for the reducer.
create table if not exists final.final_data_2 ( creation_timestamp bigint, creator string, deletion_timestamp bigint, deletor string, subject string, predicate string, object string, language_code string ) partitioned by(range_partition bigint) CLUSTERED BY(creation_timestamp) SORTED BY(creation_timestamp) INTO 8 BUCKETS stored as ORC;
hive> set hive.exec.dynamic.partition=true; hive> set hive.exec.dynamic.partition.mode=nonstrict; hive> insert overwrite table final_data_1 partition (range_partition) select creation_timestamp, creator, deletion_timestamp, deletor, subject, predicate, object, language_code, floor(creation_timestamp/1000000000) as range_partition from staging.staging_data; Query ID = hdfs_20160629110841_fd2ee9ed-b36f-417e-ad6d-d76b45cda15d Total jobs = 1 Launching Job 1 out of 1 Status: Running (Executing on YARN cluster with App id -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 15 15 0 0 0 0 Reducer 2 ...... SUCCEEDED 8 8 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 138.84 s -------------------------------------------------------------------------------- Loading data to table final.final_data_1 partition (range_partition=null) Time taken for load dynamic partitions : 100940 Loading partition {range_partition=1299} Loading partition {range_partition=1247} Loading partition {range_partition=1304} Loading partition {range_partition=1343} Loading partition {range_partition=1235} Loading partition {range_partition=1170} . . . . . Partition final.final_data_1{range_partition=1360} stats: [numFiles=8, numRows=95800, totalSize=2168211, rawDataSize=60300725] Partition final.final_data_1{range_partition=1361} stats: [numFiles=8, numRows=5916, totalSize=173888, rawDataSize=3611249] Partition final.final_data_1{range_partition=1362} stats: [numFiles=8, numRows=20602, totalSize=819304, rawDataSize=13403465] Partition final.final_data_1{range_partition=1363} stats: [numFiles=8, numRows=25376, totalSize=767015, rawDataSize=16242356] Partition final.final_data_1{range_partition=1364} stats: [numFiles=8, numRows=33810, totalSize=901617, rawDataSize=21328693] OK Time taken: 298.047 seconds
Please refer the modified query below which leverages distributing the work to more number of reducers.
hive> set hive.exec.dynamic.partition=true; hive> set hive.exec.dynamic.partition.mode=nonstrict; hive> from staging.staging_view stg > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1161 and 1180 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1181 and 1200 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1201 and 1220 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1221 and 1240 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1241 and 1260 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1261 and 1280 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1281 and 1300 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1301 and 1320 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1321 and 1340 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1341 and 1360 > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator, > stg.deletion_timestamp, stg.deletor, stg.subject, > stg.predicate, stg.object, stg.language_code, stg.range_partition > where range_partition between 1361 and 1380; Query ID = hdfs_20160629130923_ae533e87-3621-4773-8ed9-9d53a1cc857a Total jobs = 1 Launching Job 1 out of 1 Status: Running (Executing on YARN cluster with App id application_1466522743023_0015) -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 15 15 0 0 0 0 Reducer 10 . RUNNING 8 2 0 6 0 0 Reducer 11 ... RUNNING 8 5 0 3 0 0 Reducer 12 . RUNNING 8 2 0 6 0 0 Reducer 2 . RUNNING 8 2 0 6 0 0 Reducer 3 ... RUNNING 8 5 0 3 0 0 Reducer 4 ... RUNNING 8 5 0 3 0 0 Reducer 5 ... RUNNING 8 5 0 3 0 0 Reducer 6 .. RUNNING 8 3 2 3 0 0 Reducer 7 ... RUNNING 8 5 0 3 0 0 Reducer 8 . RUNNING 8 2 3 3 0 0 Reducer 9 . RUNNING 8 2 3 3 0 0 -------------------------------------------------------------------------------- VERTICES: 01/12 [=============>>-------------] 51% ELAPSED TIME: 94.33 s -------------------------------------------------------------------------------- You can see here that 8 reducers are assigned per query. So, each reducer is working on sorting and writing only 20 files. Status: Running (Executing on YARN cluster with App id application_1466522743023_0015) -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 15 15 0 0 0 0 Reducer 10 ..... SUCCEEDED 8 8 0 0 0 0 Reducer 11 ..... SUCCEEDED 8 8 0 0 0 0 Reducer 12 ..... SUCCEEDED 8 8 0 0 0 0 Reducer 2 ...... SUCCEEDED 8 8 0 0 0 0 Reducer 3 ...... SUCCEEDED 8 8 0 0 0 0 Reducer 4 ...... SUCCEEDED 8 8 0 0 0 0 Reducer 5 ...... SUCCEEDED 8 8 0 0 0 0 Reducer 6 ...... SUCCEEDED 8 8 0 0 0 0 Reducer 7 ...... SUCCEEDED 8 8 0 0 0 0 Reducer 8 ...... SUCCEEDED 8 8 0 0 0 0 Reducer 9 ...... SUCCEEDED 8 8 0 0 0 0 -------------------------------------------------------------------------------- VERTICES: 12/12 [==========================>>] 100% ELAPSED TIME: 129.25 s -------------------------------------------------------------------------------- Loading data to table final.final_data_2 partition (range_partition=null) Time taken for load dynamic partitions : 9291 Loading partition {range_partition=1178} Loading partition {range_partition=1174} Loading partition {range_partition=1163} Loading partition {range_partition=1165} Loading partition {range_partition=1172} Loading partition {range_partition=1176} Loading partition {range_partition=1179} Loading partition {range_partition=1166} Loading partition {range_partition=1175} Loading partition {range_partition=1177} Loading partition {range_partition=1167} Loading partition {range_partition=1180} Time taken for adding to write entity : 8 Loading data to table final.final_data_2 partition (range_partition=null) Time taken for load dynamic partitions : 9718 . . . . . . . . . . . . OK Time taken: 269.01 seconds hive>
Here we see some improvements from 298 seconds to 269 seconds but the user will have to try it on his particular case to identify the exact impact. This process of parallelizing inserts is not new and usually designed to insert into multiple table. But the same modification can help us gain more control over reducers without explicitly setting the reducer task number property. The number of reducer tasks increases but each task runs for much lesser time than just 8 reducers.
Note : Increasing reducers may not always increase performance. It will only if the reducers are bottleneck. Task slot utilization analysis will point out whether there are sufficient task slots available to leverage further breaking it into more reducers.
Created on 04-27-2018 03:40 PM
Hi,
Thx lot for this tip !! It works properly for ACID table.
Could we use this kind of request with delete statement ? It seems not work