Created on 02-23-2023 11:51 AM - edited 03-07-2023 12:33 PM
What is Apache Iceberg?
Apache Iceberg is an Open table format developed by the open source community for high performance analytics on petabyte scale data sets. Apache Iceberg is engine agnostic and also supports SQL commands, that is Hive, Spark, Impala, and so on can all be used to work with Iceberg tables. It is quickly becoming the format of choice for large data sets of sizes anywhere from 100s of TB to PBs across the data industry.
What are the benefits of Apache Iceberg?
By maintaining table metadata and state in manifest files and snapshots on the object store, Iceberg avoids expensive calls to a separate metastore like HMS to retrieve partition information which results in 10x faster query planning. Its main features include Hidden partitioning, In-place partition evolution, Time travel, Out-of-the box Data compaction, and Update, Delete, Merge operations in v2.
What is Apache Ozone?
Apache Ozone is a highly scalable, highly available, and secure object store that can handle billions of files without sacrificing performance. Apache Ozone is a fully replicated system that uses Apache Ratis, which is an optimized RAFT implementation for consistency. Apache Ozone supports both Object store and File System semantics and is compatible with both Cloud APIs and Hadoop FileSystem Interfaces. It integrates with YARN, Hive, Impala, and Spark engines out of the box and is a popular choice for storage on-prem at large enterprises.
Let us follow the data journey of a Global Financial Technologies Company. Alice, their Chief Data Officer, is currently facing the following challenges as typical of any company with a large data footprint:
Alice is looking for a data platform solution that can accommodate all of Global Fintech’s data and workloads, is secure, scalable, performant, and will not blow their budget. Alice decided to deploy a CDP Private Cloud environment to take advantage of the following benefits:
Fig 1. CDP Private Cloud Architecture
Fig 2. Benefits Iceberg and Ozone
With Iceberg on Ozone, Alice is building a modern data platform at Global FinTech that is capable of handling large data and multi-tenant workload demands, that is compatible with Cloud APIs allowing them to implement hybrid cloud strategies and is future-proofed for the impending data explosion all the while providing high performance exabyte scale storage and petabyte scale analytics.
Bob is the Head of Data Analytics at Global FinTech. Bob and his team want to leverage Apache Iceberg table format for data sets stored in Ozone to get performance benefits for their workloads. They can do so easily with no additional setup as demonstrated below.
Ozone has a Hadoop file system compatible interface ofs that can be used to define table locations on Ozone.
CREATE TABLE iceberg_table( id int, value string) STORED BY ICEBERG LOCATION 'ofs://ozone1/tenant1/warehouse/iceberg/default/iceberg_table';
Cloud native applications or BI tools that use S3 connector can continue to use the S3A protocol to define table locations on Ozone. Simply create a bucket under the S3 volume in Ozone or link existing buckets to the S3 volume and start creating tables.
# Link existing buckets to S3 volume $ ozone sh bucket link /tenant1/warehouse /s3v/tenant1-warehouse $ beeline CREATE TABLE iceberg_s3table( id int, value string) STORED BY ICEBERG LOCATION 's3a://tenant1-warehouse/iceberg/default/iceberg_s3table';
Upgraded existing tables to Iceberg with a single alter table statement. This is an instant operation without expensive data rewrites.
ALTER TABLE existing_table SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
Fig 3. Before upgrading existing parquet table
Fig 4. After upgrading existing table
Iceberg stores the following table metadata on the object store which allows for faster query planning, hidden partitioning, in-place partition evolution, time travel, and so on.
Fig 5. Iceberg table directory/file structure on Ozone
After deploying their workloads, Bob & team want to implement a housekeeping job. They want to ingest logs from multiple web apps for compliance and forensic analysis.The log data contains a timestamp field and since users would query logs based on timestamps, it makes sense to partition the data by date to improve scan times.
With Hive partitioning, they would have to first extract the date from timestamp values into a separate partition column and users would also have to add an additional date filter to their queries to take advantage of partitioning.
With Iceberg Hidden Partitioning, a partition spec can be created by applying transformations on existing columns and it would be hidden from users and maintained in metadata. This means that they no longer have to create a separate partition column and users need not add additional non-data fields to their queries. Users need not even know about the underlying partitioning scheme, they can simply write their queries with timestamp filters without the additional date filter and still get the performance advantage of partitioning.
CREATE TABLE iceberg_weblogs ( `time` timestamp, app string, request string, response_code int) PARTITIONED BY SPEC(day(`time`)) STORED BY ICEBERG LOCATION 'ofs://ozone1/tenant1/warehouse/iceberg/default/iceberg_weblogs'; INSERT INTO iceberg_weblogs VALUES('2023-01-17 18:35:49', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200); INSERT INTO iceberg_weblogs VALUES('2023-01-17 18:50:12', 'search', 'GET /search/?collection=10000001 HTTP/1.1', 200); INSERT INTO iceberg_weblogs VALUES('2023-01-17 19:10:30', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200);
Fig 6. Partition folder structure in Iceberg table location
Bob & team have been running the housekeeping job for awhile now and they notice that along with the time interval users also usually query for web logs from a specific application, so they decide to add column app as an additional partition for the weblogs data.
With Hive partitioning, in-place partition evolution is not possible.
With Iceberg, this becomes as simple as modifying the existing partition spec with a single alter table command and no table rewrites.
ALTER TABLE iceberg_weblogs SET PARTITION SPEC(day(`time`), app); INSERT INTO iceberg_weblogs VALUES('2023-01-18 18:35:49', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200); INSERT INTO iceberg_weblogs VALUES('2023-01-18 18:50:12', 'search', 'GET /search/?collection=10000001 HTTP/1.1', 200); INSERT INTO iceberg_weblogs VALUES('2023-01-18 19:10:30', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200); INSERT INTO iceberg_weblogs VALUES('2023-01-19 18:35:49', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200); INSERT INTO iceberg_weblogs VALUES('2023-01-19 18:50:12', 'search', 'GET /search/?collection=10000001 HTTP/1.1', 200); INSERT INTO iceberg_weblogs VALUES('2023-01-19 19:10:30', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200);
Fig 7. Evolving partition folder structure in Iceberg table location
Bob & team have developed a downstream application, app_alert, to scan the web logs in batches of 15 minute time intervals and send alerts to app owners whenever it sees error codes for fast resolution of potential issues. On date 2023-01-19, app owners of search app were notified of an issue by users. The app owners checked the web logs table and found error logs around 7:11 PM which helped them resolve the issue. However, they were perplexed because even though they could see the error log in the table, the app_alert job had not alerted them about this log.
Since Iceberg supports time travel, they decided to compare the results of the SQL that the app_alert job ran at the time of processing the corresponding batch with the latest results. And lo and behold, they found that the error log was missed by the app_alert job because it arrived late for an unexpected reason.
-- On 2023-01-19 19:18:17 the following late arriving record was inserted INSERT INTO iceberg_weblogs VALUES('2023-01-19 19:11:17', 'search', 'GET /search/?collection=10000001 HTTP/1.1', 302); SELECT * FROM iceberg_weblogs FOR SYSTEM_TIME AS OF '2023-01-19 19:15:00' WHERE `time` between '2023-01-19 19:00:00' and '2023-01-19 19:15:00' and app='search';
Fig 8. Search app web logs from Iceberg table as of '2023-01-19 19:15:00'
SELECT * FROM iceberg_weblogs WHERE `time` between '2023-01-19 19:00:00' and '2023-01-19 19:15:00' and app='search';
Fig 9. Search app web logs from Iceberg table as of now
Iceberg supports traveling back in time not only by timestamp but by snapshot version as well by using "AS OF SYSTEM_TIME <snapshot ID>" clause.
Syntax for CREATE TABLE in Impala is similar to Hive. Tables created using Hive or Impala can be queried between the engines since they use the same metastore.
CREATE EXTERNAL TABLE iceberg_impalaweblogs ( `time` timestamp, app string, request string, response_code int) PARTITIONED BY SPEC(day(`time`), app) STORED BY ICEBERG LOCATION 'ofs://ozone1/tenant1/warehouse/iceberg/default/iceberg_impalaweblogs';
Querying data with Impala is straightforward. As an example, let us consider the Iceberg Hidden Partitioning feature which enables users to write logical queries without having to embellish them with additional partition fields to take advantage of partitioning.
EXPLAIN SELECT * FROM iceberg_weblogs WHERE `time` BETWEEN '2023-01-19 18:30:00' AND '2023-01-19 19:00:00' AND app='metastore';
Fig 10. Explain plan for select on Iceberg table only scans 1 file
EXPLAIN SELECT * FROM hive_weblogs WHERE `time` BETWEEN '2023-01-19 18:30:00' AND '2023-01-19 19:00:00' AND `date`='2023-01-19' AND app='metastore';
Fig 11. Explain plan for select on Hive table scans 3 files
Setting up Data Engineering Spark jobs to use Iceberg is just as easy as Hive or Impala. Spark is also able to use Hive catalog to access tables created using Hive or Impala.
$ spark3-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.iceberg.handle-timestamp-without-timezone=true --conf spark.yarn.access.hadoopFileSystems=ofs://ozone1/ > val df = spark.sql("""CREATE TABLE spark_catalog.default.iceberg_sparkweblogs ( `time` timestamp, app string, request string, reponse_code int) USING iceberg PARTITIONED BY (date(`time`), app) LOCATION 'ofs://ozone1/tenant1/warehouse/iceberg/default/iceberg_sparkweblogs'""")
Iceberg Tables can be queried by using Spark SQL or by using DataFrame API.
Consider the Time Travel scenario. To handle late arriving logs, the app_alert job can track snapshot IDs and read data since last snapshot instead of data since last timestamp to process all logs.
val df = spark. read. format("iceberg"). option("start-snapshot-id", 1064347137204200399L). option("end-snapshot-id", 6813555884815767956L). load("iceberg_weblogs")
Fig 12. Read delta of Iceberg table between snapshots
This article has described how to create, upgrade, and use Iceberg tables on Apache Ozone storage in CDP Private Cloud with minimal setup allowing you to reap the benefits of both a powerful storage system and an optimized table format to store huge data and run high performance analytics at-scale.
If you are interested in chatting about Apache Iceberg in CDP, let your account team know or contact us directly. As always, please provide your feedback in the comments section below.
 If you are interested in learning more about how to use Apache Ozone to power data science, read the Apache Ozone Powers Data Science in CDP Private Cloud article.
 If you want to know more about CDP Private Cloud Data Services, read the CDP Private Cloud Data Services documentation.
 If you would like to know more about Iceberg in CDP Platform, you can read the Introducing Apache Iceberg in Cloudera Data Platform blog.