I use hive orc create table, but the sql use distinct by sort by is very slow and file is large
sql example hive is 1.2.1 hdp 2.2
CREATE TABLE `weblog`( `a1` string, `a2` string, `a3` string, `a4` int , `a5` int , `a6` array<string>, `a7` array<string>, `a8` string, `a9` string , `a10` string ) PARTITIONED BY ( `p` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' STORED AS ORC TBLPROPERTIES ('orc.compress'='SNAPPY', 'orc.create.index'='true', 'orc.stripe.size'='10485760', 'orc.row.index.stride'='10000')
insert overwrite table weblog partition (p = '20170110') select a1,a2,a3,a4,a5,a6,a7,a8,a9,a10 from old where p = 20170110 distribute by a1 sort by a1
it takes 30 minutes and file is 10 GB
but i don't use distinct by sort by
insert overwrite table weblog partition (p = '20170110') select a1,a2,a3,a4,a5,a6,a7,a8,a9,a10 from old where p = 20170110
is very fast ，and file is small it takes 4 minutes, file is 4 GB
if use sort by not distinct by , the file is more than distnct by and sort by
If so, its efficiency is too poor
Or the use of a problem
hao can i set the correct parameters？
In order to better understand your problem, could you please provide some more details:
- how many containers do you have in total in your queue to perform such kind of job?
- how much memory is allocated per reducer?
- how many reducers are being allocated in each of the cases (sort by only, and cluster by)?
- what are the values of a1 (some normally distributed sequence, like ID, or is that something else)?
- is a1 column have a lot of repeating values or is it kind of unique?
Alright, since there is no response to the comment, I’ll try to answer in more abstract way.
I’ll assume an execution engine being used is TEZ and block size (HDFS and ORC) are default.
Compression ratio depends on the data presented in each compression block.
When you insert as select with no sort-by/cluster-by, a job takes the data from source table as is, chunks are taken based on input split size defined for a job. There won’t be shuffling, reduce operation, etc. It looks like a data is well suited for good compression in ORIGINAL table, so when you insert into target table, it just gets good compression ratio.
When you use “sort by” or “distribute by + sort be” (which actually is “cluster by”), the data is being shuffled and redistributed. Each reducer will get different blocks of data, which not necessarily is the best match for compression. But 2.5x difference can be caused by having large text field(s) with repeating values in original table being clustered, so compression codec will make the best work for them. When data is being sorted by another field, those values from large text field are not clustered nicely anymore, so compression ration decreasing.
Also, when you use “distribute by”, you create better clusters of a1 field, and compression ratio will be somewhat higher.
If compression ratio in general is important point, I would go rather with ZLIB compression codec, instead of SNAPPY.
Now, to the point, why observed performance is so different.
As I already mentioned, when you don’t have any sort-by operations, you also don’t have reducers, and of course you don’t have any computing, it’s just copy-paste. So, it makes perfect sense to have different time for INSERT statement if these two cases.
SORT-BY is very expensive operation, and the performance of this operation will depend on your data, distribute-by function, and on available resources (number of containers, memory per container, etc). Shuffling stage also can be impacted by network, as data is traveling between data nodes (caused by distribute-by).
Assuming default block size, change stripe size to 64M (10M will give you too many stripes, and that can impact your “select” operations).
Use ZLIB instead of SNAPPY for better compression
There can be more recommendations, but they depend on a cluster architecture/configuration.
I use cluster by or sort by in order to have a reduce process, if dont't use it , No aggregation , A lot of small files
but now use it takes more time
use zlib or snappy almost
so My question is how you can aggregate small files use hivesql and orc
How are the new file size (with simple insert with no aggregation) comparable to the files under source table? and what are the block size you have defined on HDFS and ORC levels? Anyway, you should be able to control file size produced by mappers, when you run without reduce functions.
try (assuming tez engine):
set hive.hadoop.supports.splittable.combineinputformat=true; set hive.merge.tezfiles=true; -- this is to merge files as part of the job set tez.grouping.max-size=268435456; set tez.grouping.min-size=134217728; -- this is to minimize number of mappers created for a job
If you have UPDATE/DELETE transactions which are causing small files, then compact should work:
If small files are being created as part of INSERT statements, you can use CONCATENATE:
But, CONCATENATE won't change stripe size. so in order to get best READ (SELECT) performance on your data, you would probably need to rebuild table/partition that having small files, using insert as select and temp table or partition for example. There are couple of techniques for that, and the best approach depends on usage/availability requirements.