Reply
Highlighted
Explorer
Posts: 24
Registered: ‎01-07-2016

How to improve performance when creating Parquet files with Hive (for Impala)?

We are generating parquet files (to be used by Impala) daily with Hive. Reason is that the source file format is proprietary and not supported by Impala. The process works fine but it takes a long time for each conversion job to finish. It seems like the process of writing parquet files is very time consuming. The job uses very few map tasks and each map task can take several hours to complete. We are interested in getting a parquet file layout (i.e file size and page size) that will be performant when used with impala.

 

Each conversion job generates three tables. The most time-consuming table to generate is a table with approximately 200 columns where 30 columns have scalar types and 170 of the columns have complex (nested) data types. The data in the 170 complex columns can be very skew. Some columns can have a size in order of a few bytes and others up to 1 MB. Many columns values can also be NULL. So its fair to say that the table is wide and spare. The total daily size of the parquet files generated for this table varies around 5-10 GB (using GZIP compression).

 

The Hive MR job we use to generate the files comprises two map-only stages. The last stage is only used to even out the resulting file sizes (hive.merge.mapfiles=true) so they average at 1 GB. I am not sure this stage is needed. I guess it depends on how well impala handles smaller files. The last stage doubles the total job time. I think the reason for this is that the job has to write parquet files twice (once time per stage) . I have not found a way of controlling the intermediate file format when using hive.merge.mapfiles. I suspect that another intermediate file format would speed up a lot but it seems like it is not configurable.

 

Is there anybody out there with parquet generation knowledge that can help us look at the parameters we use (input size, buffers, heap size etc) or that have an opinion on if the second stage can be skipped?

 

Also, it seems like the dfs.blocksize and parquet.block.size parameters are not respected. We set the parquet block size to 256MB but the resulting files are generated with more smaller blocks. Perhaps this is a result of the skewness of the data.

 

We are using CDH 5.7.1.

 

Destination table:

CREATE EXTERNAL TABLE IF NOT EXISTS Destination (
  col1 STRING,
  col2 STRING,
  .
  .
  – Example of one of the simpler structs
  col200 struct<field1:string,field2:string,field3:string,field4:int>)
PARTITIONED BY (import_date STRING)
STORED AS PARQUET
LOCATION '/path/to/destination';

 

Conversion job:

SET parquet.compression=GZIP;
SET hive.merge.mapfiles=true;
SET hive.merge.smallfiles.avgsize=1073741824;
SET mapred.max.split.size=1073741824;
SET dfs.blocksize=268435456;
SET parquet.block.size=268435456;
SET mapreduce.map.memory.mb=4096;
SET mapreduce.reduce.memory.mb=2048;
SET mapreduce.map.java.opts.max.heap=3277;
SET mapreduce.reduce.java.opts.max.heap=1638;

SET mapreduce.task.io.sort.mb=1000;
SET mapreduce.task.io.sort.factor=100;
SET mapred.compress.map.output=true;
SET mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;


INSERT OVERWRITE TABLE Destination PARTITION (import_date='2016-08-10')
SELECT  col1, col2, ..., col200 
FROM Source
WHERE import_date='2016-08-10';

Thank in advance,

Petter

 

 

Announcements