Support Questions
Find answers, ask questions, and share your expertise

Help optimizing a query

Help optimizing a query

Super Collaborator

I'm having some trouble optimizing a query and hoping someone can see something I'm missing.

Basically, I have a series of statements that follow the pattern below.

I create a table, then populate it using a UDF.

Then I create a partitioned copy of the table and copy the data into it (TEZ won't allow me to combine the UDF step and the partitioning step).

Here's my big issue: On my big insert statement

(INSERT INTO TABLE ${hiveconf:dbName}.${hiveconf:prod_table_name}_keyed)

I see all the heavy lifting happen (242 mappers, then 1 mapper, then 1 reducer).

But then the actual writing of the data to disk (the final 10 reducers for the 10 buckets) takes longer than everything else combined.

The reason I added the buckets is because the next step, where I partition the results, has a similar issue - a very slow, single reducer. So I was hoping that by forcing the data into 10 buckets, I could get 10 reducers and it would run faster.

But regardless, I'm getting a horrible bottleneck at the end of the query that has to do with IO latency.

Can anyone suggest a way to improve this?

Thanks!

CREATE external TABLE ${hiveconf:dbName}.${hiveconf:prod_table_name}_keyed
(
   source_file_name string,
   ingest_timestamp timestamp,
   ...
   20 other columns
   ...
)
CLUSTERED BY (sample_point) INTO 10 BUCKETS
stored as parquet
LOCATION '${hiveconf:incremental_data_path}_keyed_temp'
TBLPROPERTIES ('PARQUET.COMPRESSION'='SNAPPY');


CREATE TEMPORARY FUNCTION func_1 as 'com.do.stuff' USING JAR '/home/hadoop/my-jar';
CREATE TEMPORARY FUNCTION func_2 as 'com.do.other.stuff' USING JAR '/home/hadoop/may-jar';


INSERT INTO TABLE ${hiveconf:dbName}.${hiveconf:prod_table_name}_keyed
  select
   x.source_file_name,
   x.ingest_timestamp,
   ...
   other columns
   ...
  from
  (
    select
      s.source_file_name,
      s.ingest_timestamp,
      ...
      other columns
      ...
      func_1(input_column) as unit_of_measure,
      func_2(input_columns2) as something_else
    from
      (
         select 
          er.source_file_name,
          er.ingest_timestamp,
          ...
          other columns
          ...
        from ${hiveconf:dbName}.${hiveconf:prod_table_name} er 
             inner join ${hiveconf:db_name}.${hiveconf:table_name} edd
            where <clause>
        distribute by something
         sort by something_else asc
      ) s
   ) x ;


CREATE external TABLE ${hiveconf:dbName}.${hiveconf:prod_table_name}_keyed_partitioned
 (
   source_file_name string,
   ingest_timestamp timestamp,
   ...
   other columns
   ...
)
PARTITIONED BY (sample_point_day date)
stored as parquet
LOCATION '${hiveconf:incremental_data_path}_keyed'
TBLPROPERTIES ('PARQUET.COMPRESSION'='SNAPPY');


insert into table ${hiveconf:dbName}.${hiveconf:prod_table_name}_keyed_partitioned
  PARTITION (sample_point_day)
  select 
   r.source_file_name,
   r.ingest_timestamp,
   r.other_columns
   from ${hiveconf:dbName}.${hiveconf:prod_table_name}_keyed r;
1 REPLY 1

Re: Help optimizing a query

Expert Contributor

Hi @Zack Riesland,

Indeed increasing the the number of bucket will increase the parallelism to write to hdfs (then to the disk).
If I was you I would have a look at the disk/iops usage, if you try to load a lot of data and you have only one disk it can take a long time. generally its recommended to have multiple disk per node to avoid iops congestion.
Whats the exact query that you are doing to insert the data? does it contain some casting? whats the size of your data?

Also as a good optimisation is to use ORC table and not avro. you the loading face it should not change a lot but when you are going to query your data that will make the difference,

Michel