Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Rising Star

Hive Insert Query Optimization

Some business users deeply analyze their data profile, especially skewness across partitions.There are many other tuning parameters to optimize inserts like tez parallelism, manually changing reduce tasks (not recommended), setting reduce tasks etc.This article focuses on insert query tuning to give more control over handling partitions with no need to tweak any of these properties.

Consider a scenario where we are inserting 3 million records across 200 partitions. The file format is ORC and it is further bucketed and sorted for quicker retrieval during select. Apart from writing, bucketing/sorting is going to add additional work for the reducer.

Target Table

create table if not exists final.final_data_2 (
creation_timestamp bigint,
creator string,
deletion_timestamp bigint,
deletor string,
subject string,
predicate string,
object string,
language_code string )
partitioned by(range_partition bigint)
CLUSTERED BY(creation_timestamp) SORTED BY(creation_timestamp) INTO 8 BUCKETS
stored as ORC;

Simple Insert Query

hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> insert overwrite table final_data_1 partition (range_partition) select creation_timestamp, creator, deletion_timestamp, deletor, subject, predicate, object, language_code, floor(creation_timestamp/1000000000) as range_partition from staging.staging_data;

Query ID = hdfs_20160629110841_fd2ee9ed-b36f-417e-ad6d-d76b45cda15d
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id
--------------------------------------------------------------------------------
VERTICES  STATUS TOTAL  COMPLETED  RUNNING PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     15         15        0        0       0       0
Reducer 2 ......   SUCCEEDED      8          8        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 138.84 s   
--------------------------------------------------------------------------------
Loading data to table final.final_data_1 partition (range_partition=null)
	 Time taken for load dynamic partitions : 100940
	Loading partition {range_partition=1299}
	Loading partition {range_partition=1247}
	Loading partition {range_partition=1304}
	Loading partition {range_partition=1343}
	Loading partition {range_partition=1235}
	Loading partition {range_partition=1170}
.
.
.
.
.
Partition final.final_data_1{range_partition=1360} stats: [numFiles=8, numRows=95800, totalSize=2168211, rawDataSize=60300725]
Partition final.final_data_1{range_partition=1361} stats: [numFiles=8, numRows=5916, totalSize=173888, rawDataSize=3611249]
Partition final.final_data_1{range_partition=1362} stats: [numFiles=8, numRows=20602, totalSize=819304, rawDataSize=13403465]
Partition final.final_data_1{range_partition=1363} stats: [numFiles=8, numRows=25376, totalSize=767015, rawDataSize=16242356]
Partition final.final_data_1{range_partition=1364} stats: [numFiles=8, numRows=33810, totalSize=901617, rawDataSize=21328693]
OK
Time taken: 298.047 seconds
  • The above query required 15 mappers based on the splits identified while selecting from the source.
  • 1 reducers per bucket is assigned and each reducer works on corresponding bucket file across each of the 200 partitions.
  • Reducer is clearly a bottleneck in such cases. The case selected in this particular example is relatively small containing
  • 3 millions records and around 300 MB data on a 3 node cluster. This problem is more pronounced at higher number of rows and byte size.

Optimization

Please refer the modified query below which leverages distributing the work to more number of reducers.

  • The partitions for each insert-query should be selected so that inserts are equally distributed. for ex: If partitioned by date and amounts of records are increasing every year :
    • query 1 : complete year 2000 to 2005
    • query 2 : complete year 2005 to 2007
    • query 3 : year 2008 to 2009
    • query 4 : year 2010
    • query 5 : 6 months and so on.
  • The bucket numbers can be identified by calculating each bucket file size marginally lower than 1 split size.

Optimized Insert Query

hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> from staging.staging_view stg 
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1161 and 1180
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1181 and 1200
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1201 and 1220
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1221 and 1240
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1241 and 1260
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1261 and 1280
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1281 and 1300
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1301 and 1320
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1321 and 1340
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1341 and 1360
    > insert into final_data_2 partition(range_partition) select stg.creation_timestamp, stg.creator,
    > stg.deletion_timestamp, stg.deletor, stg.subject, 
    > stg.predicate, stg.object, stg.language_code, stg.range_partition
    > where range_partition between 1361 and 1380;

Query ID = hdfs_20160629130923_ae533e87-3621-4773-8ed9-9d53a1cc857a
Total jobs = 1
Launching Job 1 out of 1

Status: Running (Executing on YARN cluster with App id application_1466522743023_0015)


--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     15         15        0        0       0       0
Reducer 10 .         RUNNING      8          2        0        6       0       0
Reducer 11 ...       RUNNING      8          5        0        3       0       0
Reducer 12 .         RUNNING      8          2        0        6       0       0
Reducer 2 .          RUNNING      8          2        0        6       0       0
Reducer 3 ...        RUNNING      8          5        0        3       0       0
Reducer 4 ...        RUNNING      8          5        0        3       0       0
Reducer 5 ...        RUNNING      8          5        0        3       0       0
Reducer 6 ..         RUNNING      8          3        2        3       0       0
Reducer 7 ...        RUNNING      8          5        0        3       0       0
Reducer 8 .          RUNNING      8          2        3        3       0       0
Reducer 9 .          RUNNING      8          2        3        3       0       0
--------------------------------------------------------------------------------
VERTICES: 01/12  [=============>>-------------] 51%   ELAPSED TIME: 94.33 s    
--------------------------------------------------------------------------------


You can see here that 8 reducers are assigned per query. So, each reducer is working on sorting 
and writing only 20 files.

Status: Running (Executing on YARN cluster with App id application_1466522743023_0015)


--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     15         15        0        0       0       0
Reducer 10 .....   SUCCEEDED      8          8        0        0       0       0
Reducer 11 .....   SUCCEEDED      8          8        0        0       0       0
Reducer 12 .....   SUCCEEDED      8          8        0        0       0       0
Reducer 2 ......   SUCCEEDED      8          8        0        0       0       0
Reducer 3 ......   SUCCEEDED      8          8        0        0       0       0
Reducer 4 ......   SUCCEEDED      8          8        0        0       0       0
Reducer 5 ......   SUCCEEDED      8          8        0        0       0       0
Reducer 6 ......   SUCCEEDED      8          8        0        0       0       0
Reducer 7 ......   SUCCEEDED      8          8        0        0       0       0
Reducer 8 ......   SUCCEEDED      8          8        0        0       0       0
Reducer 9 ......   SUCCEEDED      8          8        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 12/12  [==========================>>] 100%  ELAPSED TIME: 129.25 s   
--------------------------------------------------------------------------------
Loading data to table final.final_data_2 partition (range_partition=null)
	 Time taken for load dynamic partitions : 9291
	Loading partition {range_partition=1178}
	Loading partition {range_partition=1174}
	Loading partition {range_partition=1163}
	Loading partition {range_partition=1165}
	Loading partition {range_partition=1172}
	Loading partition {range_partition=1176}
	Loading partition {range_partition=1179}
	Loading partition {range_partition=1166}
	Loading partition {range_partition=1175}
	Loading partition {range_partition=1177}
	Loading partition {range_partition=1167}
	Loading partition {range_partition=1180}
	 Time taken for adding to write entity : 8
Loading data to table final.final_data_2 partition (range_partition=null)
	 Time taken for load dynamic partitions : 9718
.
.
.
.
.
.
.
.
.
.
.
.
OK
Time taken: 269.01 seconds
hive> 

Here we see some improvements from 298 seconds to 269 seconds but the user will have to try it on his particular case to identify the exact impact. This process of parallelizing inserts is not new and usually designed to insert into multiple table. But the same modification can help us gain more control over reducers without explicitly setting the reducer task number property. The number of reducer tasks increases but each task runs for much lesser time than just 8 reducers.

Note : Increasing reducers may not always increase performance. It will only if the reducers are bottleneck. Task slot utilization analysis will point out whether there are sufficient task slots available to leverage further breaking it into more reducers.

13,483 Views
Comments
avatar
Explorer

Hi,

Thx lot for this tip !! It works properly for ACID table.

Could we use this kind of request with delete statement ? It seems not work