Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Cloudera Employee

What is Apache Iceberg?

Apache Iceberg is an Open table format developed by the open source community for high performance analytics on petabyte scale data sets. Apache Iceberg is engine agnostic and also supports SQL commands, that is Hive, Spark, Impala, and so on can all be used to work with Iceberg tables. It is quickly becoming the format of choice for large data sets of sizes anywhere from 100s of TB to PBs across the data industry.

 

What are the benefits of Apache Iceberg?

By maintaining table metadata and state in manifest files and snapshots on the object store, Iceberg avoids expensive calls to a separate metastore like HMS to retrieve partition information which results in 10x faster query planning. Its main features include Hidden partitioning, In-place partition evolution, Time travel, Out-of-the box Data compaction, and Update, Delete, Merge operations in v2.

 

What is Apache Ozone?

Apache Ozone is a highly scalable, highly available, and secure object store that can handle billions of files without sacrificing performance. Apache Ozone is a fully replicated system that uses Apache Ratis, which is an optimized RAFT implementation for consistency. Apache Ozone  supports both Object store and File System semantics and is compatible with both Cloud APIs and Hadoop FileSystem Interfaces. It integrates with YARN, Hive, Impala, and Spark engines out of the box and is a popular choice for storage on-prem at large enterprises.

 

Large Data Challenges

 

Let us follow the data journey of a Global Financial Technologies Company. Alice, their Chief Data Officer, is currently facing the following challenges as typical of any company with a large data footprint:

  • They are experiencing or expecting their data volumes to grow to multiple petabytes of data, thousands of tables, billions of objects with individual data set sizes ranging from hundreds of terabytes to petabytes with room for data growth every year.
  • They want to use their storage efficiently by avoiding replication of data as much as possible and still be highly available and tolerant to hardware failures.
  • They want their data to be encrypted and want to implement uniform data access models across all organizations to reduce management overhead.
  • They have multiple organizations and sub organizations within their enterprise that want to run Data Warehouse, Data Engineering, Machine Learning, ad hoc data exploration, and analytic workloads on these large data sets without stepping on each other’s toes.
  • They have long running streaming applications that generate a large volume of small files and they do not want these objects to break their storage system’s metadata management service.
  • They want to migrate cloud native applications on-prem to make way for a hybrid cloud solution.

 

Alice is looking for a data platform solution that can accommodate all of Global Fintech’s data and workloads, is secure, scalable, performant, and will not blow their budget. Alice decided to deploy a CDP Private Cloud environment to take advantage of the following benefits:

 

saketa_0-1677107100911.png

Fig 1. CDP Private Cloud Architecture

 

  • Ozone Object Store as Storage – Highly scalable and performant data store to handle the growing data volumes as well as large volumes of small files without saturating metadata space. Supports data encryption for security and Erasure Coding for storage efficiency and high availability of data. Supports both Amazon s3 and Hadoop compatible file system protocol ofs.
  • Separation of storage and compute and isolation of resources across compute clusters allows for a multi-tenancy and makes it easier for platform admins to manage or avoid resource contention, manage workloads at-scale, and implement effective chargeback strategies as necessary.
  • A unified security, governance, and metadata management layer that allows sharing of data contexts and data access models across multiple tenants that is easy to manage.
  • Replication Manager – For Backup and Disaster recovery of data.

 

saketa_1-1677107101102.png

Fig 2. Benefits Iceberg and Ozone

 

With Iceberg on Ozone, Alice is building a modern data platform at Global FinTech that is capable of handling large data and multi-tenant workload demands, that is compatible with Cloud APIs allowing them to implement hybrid cloud strategies and is future-proofed for the impending data explosion all the while providing high performance exabyte scale storage and petabyte scale analytics.

 

Iceberg Operations on Ozone

 

Bob is the Head of Data Analytics at Global FinTech. Bob and his team want to leverage Apache Iceberg table format for data sets stored in Ozone to get performance benefits for their workloads. They can do so easily with no additional setup as demonstrated below.

 

Hive

 

Create Table

 

Ozone has a Hadoop file system compatible interface ofs that can be used to define table locations on Ozone.

 

CREATE TABLE iceberg_table(
id int,
value string)
STORED BY ICEBERG
LOCATION 'ofs://ozone1/tenant1/warehouse/iceberg/default/iceberg_table';

 

Cloud native applications or BI tools that use S3 connector can continue to use the S3A protocol to define table locations on Ozone. Simply create a bucket under the S3 volume in Ozone or link existing buckets to the S3 volume and start creating tables.

 

# Link existing buckets to S3 volume
$ ozone sh bucket link /tenant1/warehouse /s3v/tenant1-warehouse

$ beeline
CREATE TABLE iceberg_s3table(
id int,
value string)
STORED BY ICEBERG
LOCATION 's3a://tenant1-warehouse/iceberg/default/iceberg_s3table';

 

Upgrade Existing Tables

 

Upgraded existing tables to Iceberg with a single alter table statement. This is an instant operation without expensive data rewrites.

 

ALTER TABLE existing_table
SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');

 

saketa_2-1677107101443.png

Fig 3. Before upgrading existing parquet table

 

saketa_3-1677107101569.png

Fig 4. After upgrading existing table

 

Iceberg Metadata

 

Iceberg stores the following table metadata on the object store which allows for faster query planning, hidden partitioning, in-place partition evolution, time travel, and so on.

 

saketa_4-1677107101719.png

Fig 5. Iceberg table directory/file structure on Ozone

 

  • Metadata file – Contains table state including table schema, partitioning spec, and list of snapshots.
  • Manifest list – Snapshots of table state are taken whenever the state is modified by table operations. A manifest list is generated for each snapshot to keep a track of manifest files associated with the snapshot.
  • Manifest file – Contains list of data files and the range of partition values in the data files. Data files tracked by multiple manifest files can be part of a snapshot.

 

Hidden Partitioning

 

After deploying their workloads, Bob & team want to implement a housekeeping job. They want to ingest logs from multiple web apps for compliance and forensic analysis.The log data contains a timestamp field and since users would query logs based on timestamps, it makes sense to partition the data by date to improve scan times.

With Hive partitioning, they would have to first extract the date from timestamp values into a separate partition column and users would also have to add an additional date filter to their queries to take advantage of partitioning.

With Iceberg Hidden Partitioning, a partition spec can be created by applying transformations on existing columns and it would be hidden from users and maintained in metadata. This means that they no longer have to create a separate partition column and users need not add additional non-data fields to their queries. Users need not even know about the underlying partitioning scheme, they can simply write their queries with timestamp filters without the additional date filter and still get the performance advantage of partitioning.

 

CREATE TABLE iceberg_weblogs (
`time` timestamp,
app string,
request string,
response_code int)
PARTITIONED BY SPEC(day(`time`))
STORED BY ICEBERG
LOCATION 'ofs://ozone1/tenant1/warehouse/iceberg/default/iceberg_weblogs';

INSERT INTO iceberg_weblogs VALUES('2023-01-17 18:35:49', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200);
INSERT INTO iceberg_weblogs VALUES('2023-01-17 18:50:12', 'search', 'GET /search/?collection=10000001 HTTP/1.1', 200);
INSERT INTO iceberg_weblogs VALUES('2023-01-17 19:10:30', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200);

 

saketa_5-1677107101814.png

Fig 6. Partition folder structure in Iceberg table location

 

Partition evolution

 

Bob & team have been running the housekeeping job for awhile now and they notice that along with the time interval users also usually query for web logs from a specific application, so they decide to add column app as an additional partition for the weblogs data.

With Hive partitioning, in-place partition evolution is not possible.

With Iceberg, this becomes as simple as modifying the existing partition spec with a single alter table command and no table rewrites.

 

ALTER TABLE iceberg_weblogs
SET PARTITION SPEC(day(`time`), app);

INSERT INTO iceberg_weblogs VALUES('2023-01-18 18:35:49', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200);
INSERT INTO iceberg_weblogs VALUES('2023-01-18 18:50:12', 'search', 'GET /search/?collection=10000001 HTTP/1.1', 200);
INSERT INTO iceberg_weblogs VALUES('2023-01-18 19:10:30', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200);
INSERT INTO iceberg_weblogs VALUES('2023-01-19 18:35:49', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200);
INSERT INTO iceberg_weblogs VALUES('2023-01-19 18:50:12', 'search', 'GET /search/?collection=10000001 HTTP/1.1', 200);
INSERT INTO iceberg_weblogs VALUES('2023-01-19 19:10:30', 'metastore', 'GET /metastore/table/default/sample_07 HTTP/1.1', 200);

 

saketa_6-1677107101996.png

Fig 7. Evolving partition folder structure in Iceberg table location

 

Time Travel

 

Bob & team have developed a downstream application, app_alert, to scan the web logs in batches of 15 minute time intervals and send alerts to app owners whenever it sees error codes for fast resolution of potential issues. On date 2023-01-19, app owners of search app were notified of an issue by users. The app owners checked the web logs table and found error logs around 7:11 PM which helped them resolve the issue. However, they were perplexed because even though they could see the error log in the table, the app_alert job had not alerted them about this log.

Since Iceberg supports time travel, they decided to compare the results of the SQL that the app_alert job ran at the time of processing the corresponding batch with the latest results. And lo and behold, they found that the error log was missed by the app_alert job because it arrived late for an unexpected reason.

 

-- On 2023-01-19 19:18:17 the following late arriving record was inserted
INSERT INTO iceberg_weblogs VALUES('2023-01-19 19:11:17', 'search', 'GET /search/?collection=10000001 HTTP/1.1', 302);

SELECT * FROM iceberg_weblogs FOR SYSTEM_TIME AS OF '2023-01-19 19:15:00'
WHERE `time` between '2023-01-19 19:00:00' and '2023-01-19 19:15:00' and app='search';

 

saketa_7-1677107102023.png

Fig 8. Search app web logs from Iceberg table as of '2023-01-19 19:15:00'

 

SELECT * FROM iceberg_weblogs
WHERE `time` between '2023-01-19 19:00:00' and '2023-01-19 19:15:00' and app='search';

 

saketa_8-1677107102160.png

Fig 9. Search app web logs from Iceberg table as of now

 

Iceberg supports traveling back in time not only by timestamp but by snapshot version as well by using "AS OF SYSTEM_TIME <snapshot ID>" clause.

 

Impala

 

Getting Started

 

Syntax for CREATE TABLE in Impala is similar to Hive. Tables created using Hive or Impala can be queried between the engines since they use the same metastore.

 

CREATE EXTERNAL TABLE iceberg_impalaweblogs (
`time` timestamp,
app string,
request string,
response_code int)
PARTITIONED BY SPEC(day(`time`), app)
STORED BY ICEBERG
LOCATION 'ofs://ozone1/tenant1/warehouse/iceberg/default/iceberg_impalaweblogs';

 

Query Data

 

Querying data with Impala is straightforward. As an example, let us consider the Iceberg Hidden Partitioning feature which enables users to write logical queries without having to embellish them with additional partition fields to take advantage of partitioning.

 

EXPLAIN SELECT * FROM iceberg_weblogs
WHERE `time` BETWEEN '2023-01-19 18:30:00' AND '2023-01-19 19:00:00' AND
app='metastore';

 

saketa_9-1677107102228.png

Fig 10. Explain plan for select on Iceberg table only scans 1 file

 

EXPLAIN SELECT * FROM hive_weblogs
WHERE `time` BETWEEN '2023-01-19 18:30:00' AND '2023-01-19 19:00:00' AND
`date`='2023-01-19' AND app='metastore';

 

saketa_10-1677107102305.png

Fig 11. Explain plan for select on Hive table scans 3 files

 

Spark

 

Getting Started

 

Setting up Data Engineering Spark jobs to use Iceberg is just as easy as Hive or Impala. Spark is also able to use Hive catalog to access tables created using Hive or Impala.

 

$ spark3-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.iceberg.handle-timestamp-without-timezone=true --conf spark.yarn.access.hadoopFileSystems=ofs://ozone1/

> val df = spark.sql("""CREATE TABLE spark_catalog.default.iceberg_sparkweblogs (
`time` timestamp,
app string,
request string,
reponse_code int)
USING iceberg
PARTITIONED BY (date(`time`), app)
LOCATION 'ofs://ozone1/tenant1/warehouse/iceberg/default/iceberg_sparkweblogs'""")

 

Query Data

 

Iceberg Tables can be queried by using Spark SQL or by using DataFrame API.
Consider the Time Travel scenario. To handle late arriving logs, the app_alert job can track snapshot IDs and read data since last snapshot instead of data since last timestamp to process all logs.

 

val df = spark.
read.
format("iceberg").
option("start-snapshot-id", 1064347137204200399L).
option("end-snapshot-id", 6813555884815767956L).
load("iceberg_weblogs")

 

saketa_11-1677107102432.png

Fig 12. Read delta of Iceberg table between snapshots

 

Conclusion

 

This article has described how to create, upgrade, and use Iceberg tables on Apache Ozone storage in CDP Private Cloud with minimal setup allowing you to reap the benefits of both a powerful storage system and an optimized table format to store huge data and run high performance analytics at-scale.

 

If you are interested in chatting about Apache Iceberg in CDP, let your account team know or contact us directly. As always, please provide your feedback in the comments section below. 

 

References:

 

[1] https://iceberg.apache.org/docs/latest/

[2] If you are interested in learning more about how to use Apache Ozone to power data science, read the Apache Ozone Powers Data Science in CDP Private Cloud article.

[3]  If you want to know more about CDP Private Cloud Data Services, read the CDP Private Cloud Data Services documentation.

[4] If you would like to know more about Iceberg in CDP Platform, you can read the Introducing Apache Iceberg in Cloudera Data Platform blog.

7,152 Views
Comments

@saketa Magic sauce right here, great article!!

avatar
Cloudera Employee

Great article @saketa 👏

 

webinar banner
Version history
Last update:
‎03-07-2023 12:33 PM
Updated by:
Contributors
meetups banner