Community Articles

Find and share helpful community-sourced technical articles.
avatar
Cloudera Employee

Introduction

Impala is a horizontally scalable, distributed database engine that is best suited for fast BI workloads. Apache Iceberg is an open table format that has brought new and exciting capabilities to the big data world with support for deletes, updates, time travel, partition management, and more. 

With Iceberg, users can insert data into tables that have advanced and evolving partition layouts. Data warehouse users often need to comply with regulations like GDPR and CCPA, requiring them to delete records from their warehouses. They might also need to update their data from time to time. These operations often require regular removal of old partitions. Impala and Iceberg can easily handle these use cases. Frequent write operations can

 

CREATE TABLE ice_tbl (i INT, s STRING, ts TIMESTAMP)
STORED BY ICEBERG;

 

potentially lead to performance issues, including the small file problem, or the overhead of handling lots of delete files. To resolve these issues, Impala can also optimize the table, ensuring subsequent queries execute efficiently.

In this blog post, we will showcase all of the write operations that Impala supports on Iceberg tables, including DELETE, UPDATE, and OPTIMIZE, and also share our roadmap for Impala.

CREATE TABLE

First, let’s create an Iceberg table. We can do it easily via the usual CREATE TABLE syntax:

 

CREATE TABLE ice_tbl (i INT, s STRING, ts TIMESTAMP)
STORED BY ICEBERG;

 

By default, Impala always creates Iceberg tables with write format Parquet. Impala can also read Iceberg tables containing ORC and Avro files, but it cannot write such data files. We can migrate Hive external tables to Iceberg via the following command:

 

ALTER TABLE my_legacy_table CONVERT TO ICEBERG;

 

If you want to create a partitioned table that benefits from Iceberg’s advanced partitioning features, you can use the statement below:

 

CREATE TABLE ice_tbl (i INT, s STRING, ts TIMESTAMP)
PARTITIONED BY SPEC (truncate(1, s), day(ts))
STORED BY ICEBERG;

 

Impala supports all of the partition transforms that Iceberg provides. When you ingest data into partitioned Iceberg tables via Impala, it automatically writes the data records to the right partitions. For example:

  • /path/to/table/data/s_trunc=a/ts_day=2024-07-12/a.parquet
  • /path/to/table/data/s_trunc=a/ts_day=2024-07-13/b.parquet
  • /path/to/table/data/s_trunc=b/ts_day=2024-07-12/c.parquet

You can also change the partition layout of an existing table via an ALTER TABLE statement:

 

ALTER TABLE ice_tbl SET PARTITION SPEC (bucket(7, s), hour(ts));

 

The change in partition spec applies from that point forward. New files will be written according to the new partition specification without requiring a rewrite of the older data, which improves operational efficiency and processing time, and minimizes disruption to existing workloads.

INSERT INTO / OVERWRITE

The most basic DML command to add data to a table is the INSERT statement. INSERT INTO adds new data to the table, while INSERT OVERWRITEoverwrites existing data. Both statements can have a VALUES clause, where users write literal expressions they want to add to the table:

 

INSERT INTO tbl VALUES (1, 'John', '1987-08–05'), (2, 'Jane', '2000-09-09');
INSERT OVERWRITE tbl VALUES (1, 'John', '1996-05–08'), (2, 'Jane', '1999-09-09');

 

Users can also  use a query as a source for the INSERT statement:

 

INSERT INTO tbl SELECT * FROM source_tbl;
INSERT OVERWRITE tbl SELECT * FROM source_tbl;

 

INSERT OVERWRITE does a dynamic overwrite, which only replaces the partitions that are touched by the source statement:

 

INSERT OVERWRITE year, id, location, measurement SELECT * FROM source_tbl WHERE year = 2019;

 

If the target table is partitioned by year, then only the year=2019 partition is replaced. The other partitions remain intact. If a user needs to rewrite the whole table, they can execute a TRUNCATE statement, and then use an INSERT statement.

LOAD DATA

The LOAD DATA statement can be used to load a single file or directory into an existing Iceberg table. Users execute this operation differently compared to legacy Hive tables, as the data is not simply copied to the table directory, but instead is inserted into the table via sequentially executed DDL/DML statements. This ensures that all of the data files in the Iceberg table comply with the Iceberg specification. If not, the load command fails.

Only files that meet the following requirements can be used as sources:

  • File format needs to be Parquet or ORC
  • File schema should match the table schema

 

LOAD DATA INPATH 'path/to/data/file/or/directory' INTO TABLE ice_target;

 

The PARTITION clause is NOT supported for Iceberg tables, as Impala automatically inserts data records into the appropriate partitions. The loaded files will be re-written as Parquet files.

DELETE

Iceberg V2 tables enable the efficient removal of records from large data sets by writing position delete files. Impala supports a merge-on-read strategy and is very efficient at merging position delete files with the actual data files during read operations. To create an Iceberg V2 table, the user needs to specify the format version in the table properties:

 

CREATE TABLE ice_tbl (i INT, s STRING, ts TIMESTAMP)
STORED BY ICEBERG
TBLPROPERTIES ('format-version'='2');

 

If you have an existing Iceberg V1 table, you can migrate it to the Iceberg v2 format:

 

ALTER TABLE ice_tbl SET TBLPROPERTIES('format-version'='2');

 

Note: New Impala versions create Iceberg V2 tables by default.

Users can issue SQL-compliant DELETE statements to remove the records that are not needed anymore:

 

DELETE FROM measurements
WHERE sensor_type = 13
AND station in ('Budapest', 'Paris', 'Zurich', 'Kuala Lumpur');

 

You can also use joins and subqueries in the DELETE statement:

 

DELETE FROM ice_t WHERE s = (SELECT min(s) FROM other_t);

DELETE FROM ice_t
WHERE c5 IN (SELECT DISTINCT other_col FROM other_table);

 

UPDATE

Iceberg V2 tables enable you to modify records. Impala supports the standard UPDATE DML statement, so you can simply use ANSI SQL update statements as shown below.

 

UPDATE measurements
SET measure = CAST(measure - 2 as decimal(5,2))
WHERE station IN ('Budapest', 'Paris', 'Zurich', 'Kuala Lumpur')
      AND sensor_type IN (7, 17, 77);

 

The UPDATE FROM statement is a non-ANSI SQL extension that can be used to update a target Iceberg table based on a source table or view that doesn't need to be an Iceberg table. You can think about it as a simplified version of the MERGE statement:

 

UPDATE ice_t
SET ice_t.k = o.k, ice_t.j = o.j
FROM ice_t, other_table o
WHERE ice_t.id = o.id;

 

DROP PARTITION

Sometimes, data needs to be removed on a large scale effectively. For this purpose, DROP PARTITION gives an efficient solution by removing the specified partitions. As users drop data within partition boundaries it can be implemented without writing delete files.

The DROP PARTITION statement accepts arbitrary combinations of partition filtering expressions: (identity(a) > 10 and truncate(5,b) = "longs").

This expression filters partitions where the value of a equals 5 and the truncated value of b equals "longs".

For transformed partition values, the filtering must contain the explicit partitioning expression. For identity partitioning, it can be omitted.

Consider the following partitions for table example

 

{"id":"3","a_trunc":"strin"}, {"id":"4","a_trunc":"longs"},
{"id":"4","a_trunc":"strin"}, {"id":"6","a_trunc":"longs"}

 

Let's execute the following DROP PARTITION statement:

 

ALTER TABLE example DROP PARTITION(id > 3 and truncate(5,a) = 'longs');

 

This statement removed 2 partition entries: {"id":"4","a_trunc":"longs"} and {"id":"6","a_trunc":"longs"}.

OPTIMIZE

Tables that experience frequent ingestion, updates, and deletes accumulate lots of small data files or position delete files that result in slower scans. To improve read performance, tables need regular maintenance. The OPTIMIZE TABLE statement provides a convenient way to rewrite Iceberg tables to compact small data files and merge position delete files with the actual data files. The operation creates a new snapshot that contains the table content in a compact form. 

 

OPTIMIZE TABLE ice_table;

 

The user needs ‘ALL’ privileges on the table to execute the OPTIMIZE TABLE command.

Note that rewriting the entire table can take a while depending on the amount of data and the number of data and position delete files. This will be enhanced in the near future.

Future work

Impala’s Iceberg support is evolving rapidly. The MERGE statement will combine the capabilities of INSERT, UPDATE, and DELETE and provide a simple way to execute complex DML operations covered by only one snapshot. The MERGE statement’s working set is defined by a right join over the target and the source table, and multiple conditional cases can be defined for the result of the join. If the join condition matches, then the target row can be updated or deleted, and if the join condition does not match, a new row can be inserted.

 

MERGE INTO target t USING source s ON t.id = s.id 
WHEN MATCHED AND t.column1 ="data" THEN UPDATE SET column1 = s.column1 
WHEN NOT MATCHED THEN INSERT (id, column1) VALUES (s.id, s.column1) 
WHEN MATCHED THEN DELETE;

 

merge_tables.png

Enhancements for OPTIMIZE

Frequently modified Iceberg tables need regular maintenance. The current implementation of the OPTIMIZE command scans and rewrites the entire table, which can take a long time in the case of large tables. Compacting only the small data files and the delete files makes table maintenance more cost-effective.

In an upcoming release, the OPTIMIZE TABLE command will be extended with an optional file size threshold parameter. Impala will scan the table and select only the small files and the delete files for compaction. This capability also implicitly means that only those files that were created since the last compaction are affected.

For example, to compact files smaller than 100MB, simply issue the following command:

 

OPTIMIZE TABLE ice_table (file_size_threshold_mb=100);

 

Puffin Stats

We are scoping support for both reading and writing Puffin stats. Puffin stat files, which are part of the Iceberg spec, enable stats captured using data sketches like Theta to be used for efficient query planning.

Limitations

Even though the new capabilities mentioned above are intended to satisfy many different uses and needs, there are some general limitations that are worth mentioning. Impala has not introduced new restrictions for Iceberg tables. Everything that worked on Hive tables will work on Iceberg tables. The only exception is that TEXT files - which are rarely used anyway - are not allowed in Iceberg tables.

This is a short list of limitations:

  • The table can contain any file formats that Impala can read, but ‘write.format.default’ has to be ‘parquet’.
  • Files will be written in Parquet.
  • Row-level modifications like DELETE and UPDATE via Impala always use merge-on-read and Impala can only write position delete files.

Summary

Apache Iceberg is a modern open table format that brings flexibility and simplified data management to large analytic tables. In this post, we presented how Impala makes using Iceberg easy. Now, in addition to performing fast analytic queries, you can also easily modify and maintain Iceberg tables with Impala. To try these commands for yourself, sign up for your free 5-day Cloudera Trial and head to the data lakehouse module.

160 Views