Member since
12-09-2025
1
Post
0
Kudos Received
0
Solutions
01-05-2026
03:07 AM
Regular compaction is essential for Apache Iceberg tables to address the issue of small files, which can accumulate as transaction volume increases, leading to degraded query performance. Iceberg offers the rewrite data files Spark procedure as a mechanism for performing this compaction. What is Cloudera Lakehouse Optimizer? The Cloudera Lakehouse Optimizer (CLO) is an automated management service designed to optimize Apache Iceberg tables, thereby improving performance and reducing operational costs. It allows administrators to define maintenance policies for tables. The system evaluates these policies by gathering statistics from Iceberg metadata, which subsequently triggers maintenance actions. These actions include data compaction, snapshot expiration, and the removal of orphan files. It offers execution flexibility, supporting both schedule-based execution for predictable workloads and event-based triggers for real-time data changes. Lakehouse Optimizer maintenance actions are executed on Spark. This blog focuses on the key parameters within the Lakehouse Optimizer policy that should be configured to enhance compaction performance. The ClouderaAdaptive JEXL script is the default template provided by Lakehouse Optimizer. This script utilizes Iceberg metadata to gather table statistics and determine if an action should be initiated. To leverage this template for decision-making, a new policy must be created. Arguments for each action can then be supplied using a policy constant JSON. For comprehensive details on policy configuration, please refer to this documentation. File groups Consider an Apache Iceberg table with the following specifications: Metric Value Number of Partitions 10 Number of Files 363,775 Average File Size 0.5 MB Total Size 180.85 GB Number of Rows 1.4 Billion Calculated Metrics: Average Partition Size: approx 18 GB per partition Files per Partition: approx 36,377 files per partition File Size Observation: The small average file size (0.5 MB) is suboptimal and is expected to negatively impact query performance. The rewrite data file action is executed on this table using a Spark environment configured with a 16 GB executor size. We will now analyze two distinct parameter configurations. Case 1: High max-file-group-size Parameters: max-file-group-concurrency = 5 and max-file-group-size = 100 GB File Grouping Behavior: Since the partition size (18 GB) is significantly smaller than the maximum group size (100 GB), Iceberg attempts to consolidate the entire partition into a single File Group. Concurrent Workload: Spark concurrently attempts to process 5 File Groups (corresponding to 5 partitions). Risk Analysis : Each File Group contains approximately 36,000 small files. With 5 concurrent groups, Spark must manage the metadata, open file handles, and read buffers for approx 180,000 files simultaneously. Outcome: This configuration carries a high risk of Executor Out-Of-Memory (OOM) errors. Despite a 16 GB executor RAM allocation, the extensive overhead associated with tracking 36,000 file metadata objects per group can cause the JVM overhead memory to spike, frequently resulting in container termination by YARN due to exceeding configured memory limits. Case 2: Low max-file-group-size Parameters: max-file-group-concurrency = 5 and max-file-group-size = 1 GB File Grouping Behavior: Iceberg effectively splits each 18 GB partition into approximately 18 smaller File Groups (each approx 1 GB). Concurrent Workload: Spark concurrently processes 5 of these smaller File Groups. File Count per Group: Each File Group now contains only approx 2,000 files (1 GB/0.5 MB). Total Concurrent Files: The total number of files managed simultaneously is reduced to approx 10,000 files (5 times 2,000). Outcome: This approach is significantly safer. The memory footprint per concurrent task is minimized, substantially mitigating the risk of memory-related failures. Hybrid Approach (Modified Case 2): Case 2 is considerably safer; however, utilizing only five concurrent groups of 1GB may lead to under-utilization of the cluster resources, potentially resulting in slow execution. When a file group is excessively large, the corresponding Spark job to rewrite it generates numerous tasks. This can consume a significant portion of cluster resources, causing other concurrent file groups waiting for compaction to experience delays. Therefore, it is crucial to strike a balance between the size of the file groups and the number of concurrent file groups being rewritten. Based on the available YARN node instance types, adjust the spark.executors.cores setting. This increase allows a greater number of concurrent Spark tasks to run within the cluster. Metric Recommendation for the above table Why? Max File Group Size 3 GB 1GB is too small ( many commits). 3GB creates groups of ~6,000 files, which is safe for 16GB RAM but efficient. Max Concurrent Groups 5 - 10 With smaller 3GB groups, we can safely increase concurrency to keep all the Spark cores busy. Partial Progress The rewrite_data_files action incorporates a feature called Partial Progress, which can be activated by setting the option partial-progress.enabled to true. When enabled, this feature facilitates incremental commitment of changes rather than deferring the commit until the entire job is complete. The fundamental operational unit in a rewrite process is the File Group, which comprises a set of small files intended for consolidation. Standard Behaviour (Default) Under the standard configuration, the job processes all file groups before executing a single, final, large-scale commit. Advantages: Complete atomicity is maintained (the entire optimization succeeds or fails as a whole). Disadvantages: Presents a significant risk for large tables. A failure occurring late in the process (e.g., in the 59th minute of a 60-minute job) results in zero progress being retained. Partial Progress Behaviour When Partial Progress is enabled, the job processes a defined batch of file groups, commits these changes immediately, and subsequently proceeds to the next batch. partial-progress.max-commits determines the maximum number of commits the job will try to make. If you have 100 file groups and this is set to 10, each commit will contain roughly 10 file groups. Advantages: Enhanced fault tolerance. Should the job terminate unexpectedly, only the current batch undergoing processing is lost. Disadvantages: This can lead to a situation where the table is only partially optimized. For instance, partitions A to M might be compacted, leaving partitions N to Z in an unoptimized state. Filtering the data files Rewrite data file action supports filtering data files based on an expression. For larger tables with too many small files compacting them could be time consuming and might cause OOM. Hence it can be split into multiple maintenance tasks by using the filter expression efficiently. CLO supports setting the filter expression using table properties, for example for a Iceberg table ‘orders’ we can set the filter expression using, ALTER TABLE default.orders SET TBLPROPERTIES ( 'dlm.rewriteDataFiles.where' = "day_of_week IN ('Mon', 'Fri')"); Creating a Lakehouse Optimizer policy to perform rewrite data files To perform the rewriting of data files on a table using the Lakehouse Optimizer, a new policy must be created. This is done by utilizing the default ClouderaAdaptive template.The Lakehouse Optimizer provides REST APIs for uploading policy definitions, with detailed information available here. The policy's action arguments are defined using a JSON structure. The goal here is to create a policy that: Evaluates the need for data file rewriting every 30 minutes. Triggers the rewrite data files action using the provided arguments. It is worth noting that the ClouderaAdaptive templates support configuring the maximum rewrite file group concurrency and enabling partial progress. {
"script": "dlm://tps:default/ClouderaAdaptive",
"cron" : "0 0/30 * * * ?",
"rewriteDataFiles" : {
"enabled": true,
"targetFileSize": 536870912,
"maxConcurrentRewriteFileGroups": 10,
"partialProgressEnabled": true,
"partialProgressMaxCommits": 10
}
} To configure the file group size, which is not currently supported by the ClouderaAdaptive template, the creation of a new policy template is requisite. This policy template is a JEXL file that contains the logic necessary for generating maintenance actions. Comprehensive information regarding policy resources is accessible here. The provided template is designed to simply generate a rewrite data action builder based upon the arguments supplied. This template can be uploaded utilising the REST APIs, during which a URI must be provided. Further specifics on the required Lakehouse Optimiser URI format are documented here. let partialProgressEnabled = $constants.rewriteDataFiles.partialProgressEnabled ?? true;
let partialProgressMaxCommits = $constants.rewriteDataFiles.partialProgressMaxCommits ?? 10;
let maxConcurrentRewriteFileGroups = $constants.rewriteDataFiles.maxConcurrentRewriteFileGroups ?? 5;
// Default max file group size of 3GB
let maxFileGroupSize = $constants.rewriteDataFiles.maxFileGroupSize ?? 3221225472;
let targetFileSize = $constants.rewriteDataFiles.targetFileSize ?? 512*1024*1024;
return dlm:rewriteDataFiles($table)
.targetFileSize(targetFileSize)
.partialProgressEnabled(partialProgressEnabled)
.partialProgressMaxCommits(partialProgressMaxCommits)
.maxConcurrentRewriteFileGroups(maxConcurrentRewriteFileGroups)
.option('max-file-group-size-bytes', maxFileGroupSize) You should create a new policy that utilizes this template and includes the necessary arguments for rewriting data files. {
"script": "dlm://tps/hive/ClouderaAdaptive",
"cron" : "0 0/30 * * * ?",
"rewriteDataFiles" : {
"enabled": true,
"targetFileSize": 536870912,
"maxConcurrentRewriteFileGroups": 10,
"maxFileGroupSize" : 3221225472,
"partialProgressEnabled": true,
"partialProgressMaxCommits": 10
}
} Conclusion Compaction is a critical maintenance operation for optimizing Apache Iceberg query performance, particularly in tables with high transaction volumes resulting in many small files. As demonstrated, blindly increasing concurrency or file group size can lead to resource exhaustion, specifically Out-Of-Memory errors in Spark executors, when dealing with a high number of small files. The key to successful and robust compaction lies in a balanced configuration: File Group Size (max-file-group-size-bytes): This must be carefully sized relative to the average file size to ensure that the number of files processed concurrently per task remains manageable and does not overwhelm the executor's metadata and overhead memory. Concurrency (max-concurrent-file-group-rewrites): Once the file group size is set to a safe limit, concurrency can be increased to maximize cluster utilization and reduce overall job duration. Partial Progress: Utilizing partial-progress.enabled is highly recommended for large tables as it significantly improves fault tolerance by allowing incremental commits, ensuring that work is not lost due to a late-stage failure. Filtering: For extremely large tables, using filter expressions (dlm.rewriteDataFiles.where) can split the compaction effort into smaller, more manageable maintenance tasks, further enhancing reliability and control. By thoughtfully applying these parameter optimizations, users can transform the rewriteDataFiles procedure from a potential source of errors into an efficient, reliable, and performance-enhancing maintenance routine. Reviewed by - Naveen Gangam, Adam Benlemlih, Dipankar Mazumdar, Henri Biestro, Prabhat Mishra
... View more