Back to Blog
Engineering11 min read

Apache Iceberg Optimizations for Real-Time Data Loading and Processing

How we optimized Apache Iceberg equality-delete caching in Spark to reduce memory consumption by 1,222 GB and accelerate maintenance from hours to minutes.

Alphyn.ai Engineering Team

Optimizing Apache Iceberg Functionality for Real-Time Data Loading and Processing Tasks

Our engineering team at Alphyn.ai leads the Apache Spark development direction in the Alphyn Lakehouse platform. We work on solving non-trivial problems in Spark computing, some of which become part of the final product.

In this article we describe the problems you can encounter when implementing Upsert Streaming in Iceberg, what equality deletes are, why they create a read load on Apache Iceberg tables, and how we optimized Apache Spark to reduce memory consumption and accelerate data reads.

Context

In recent years, streaming data processing architectures have gained rapid momentum, particularly in scenarios where data freshness and timeliness are critical. In our product we decided to follow this path: to reduce data delivery time to the warehouse, we implemented Flink Upsert Streaming on top of Apache Iceberg -- a popular format for building Data Lakehouses that supports transactions and efficient handling of large data volumes and has effectively become the standard, winning the "open formats battle."

One of Apache Iceberg's key advantages is support for UPDATE, DELETE, and MERGE operations without full table rewrites (in Merge-on-Read mode), starting from version 2 of the format specification.

In the early stages everything looked promising: Apache Flink streaming jobs were stably writing data to Iceberg tables, and the data maintenance processes we implemented via Spark using built-in Iceberg procedures were working. On test environments with synthetic loads, the system demonstrated stability and predictable behavior.

However, reality in the client's production environment turned out to be less rosy. Some time after launch we encountered a critical problem: tables became practically unreadable. Spark maintenance processes began failing en masse with OutOfMemory (OOM) errors, and attempts to experimentally find the "sufficient" amount of resources for executing procedures proved futile -- the required memory volume and execution time grew non-linearly and unpredictably. Moreover, during maintenance execution the table's state continued to deteriorate: new commits from Flink accumulated ever more equality delete files, which worsened fragmentation and slowed reads.

This article describes how we encountered the limitations of Equality Delete implementation in Apache Iceberg and why timely maintenance of Iceberg tables is critically important. We explain the problem and describe our solution.

Why We Chose Spark for Maintenance Processes

We perform Iceberg maintenance on Spark for the following reasons:

  • The Iceberg implementation on Spark includes the full set of maintenance procedures and allows performing all necessary tasks, unlike other alternative engines in our platform;
  • Only Spark has the functionality for fine-grained configuration of maintenance procedures;
  • Spark allows reliably constraining maintenance processes to allocated resources without impacting other processes.

The data file maintenance process consists of the following stages: reading, applying deletions, writing. The main problems in Spark (and in other engines as well) arise precisely at the reading and deletion application stages. The rest of this discussion focuses on these processes.

What Is an Equality Delete and Why It Occurs

Iceberg supports two row-level deletion methods:

  • Positional delete -- deletion of a specific row by position (file + index within file). Suitable when the application knows the physical location of the row;
  • Equality delete -- deletion by matching values of one or more columns (for example, customer_id = 123). Equality deletes are convenient to use during streaming writes: we don't know in which file the old versions of rows reside, so we simply record the values that need to be excluded.

When reading a table, Spark must merge data files with delete files to return the current data snapshot. For this purpose Iceberg reads equality delete files, builds sets (filters) of deleted values, and applies them to data files by partition.

Table Update Semantics: CoW vs MoR

  • Copy-on-Write (CoW): when a table is modified, a new data file is created with all current data -- old files become obsolete.
  • Merge-on-Read (MoR): on additions, a new data file is created; on deletions/updates, delete files are created (positional or equality). The current state is obtained by "merging" data files with delete files during reading.

It is precisely in MoR mode that the number and size of delete files grow, increasing the load on the mechanisms that process them.

How Iceberg Caching Works on Spark

Deletion application in Iceberg on Spark is implemented at the moment of reading each file. To optimize table reads in MoR mode, Iceberg implements a delete file caching mechanism that reduces S3 load and avoids re-fetching the same files.

The process of applying equality-delete files has 4 abstraction levels:

  1. Compressed equality-delete files on S3;
  2. Compressed equality-delete files in memory cache;
  3. A set of unique values from the relevant equality-delete files for reading a specific data file;
  4. Filters for reading data files based on the constructed set of unique values.

The current architectural design requires storing both raw data and data prepared as filters in memory. The result: the more CPU and Spark tasks we have, the more memory we need -- because each thread builds its own set of values and filter.

For example, if we are reading one large data-file.parquet that has multiple row groups, Spark can read each row group in parallel in separate tasks. These parallel threads will read the same equality-delete files and build approximately identical sets of values and filters -- meaning the data is duplicated in memory.

The Equality-Delete Creation and Application Process

When performing actions on a table with Merge-on-Read semantics, each action creates its own data file and equality-delete file with the data from that action. Each action takes the next SequenceNumber value. To obtain the final state, all files are merged according to a specific logic.

To obtain the final value, each data file must be read and the equality-delete files with a SequenceNumber greater than its own must be applied to it.

Data files and equality-delete files are stored in Parquet format, which stores meta-information from the file. This allows applying to each data file not all equality-delete files with a higher SequenceNumber, but only those that contain potentially current values for reading that specific data file.

Cache Lifecycle

In Apache Iceberg's table storage structure, the table data directory is divided into independent partition directories. When reading, each partition can be treated as an entity independent of other partitions. Equality-delete files in a specific partition are relevant only to that partition.

On the other hand, the Spark cache lifecycle is oriented not by partition but by the entire table. The cache key is formed by concatenating the table name and the path to the equality-delete file, with the file contents in raw form as the value. The file path reveals which partition it belongs to, but this is not applied in the caching logic. Cache invalidation only occurs when processing of the entire table is complete, at which point invalidate is called to find all keys belonging to the table by name and delete them from the cache.

Another cache lifecycle characteristic: the process from starting to read a file to its arrival in the cache. A file only enters the cache after it has been fully read from S3. Consequently, there can be cases where multiple Spark tasks request the same file simultaneously, resulting in multiple memory loads.

Problems with Standard Equality Delete Caching

From analyzing work with a large partitioned table, we identified three main problems with the standard read implementation:

  1. Excessive memory consumption during parallel reads. When many tasks simultaneously request the same delete file, each task may load its own copy into memory. Simultaneously, each task builds its own value set to create a deletion filter. This leads to a multiplicative increase in memory usage on the executor and prevents using executors with high CPU counts.
  2. Data duplication and unnecessary CPU work. The cache stores the equality delete files themselves, which contain stale and duplicate values. Since files are stored in memory in their original uncompressed form, they take up more space. Each access to these equality delete files requires re-converting them into the structure Iceberg needs, creating unnecessary CPU overhead.
  3. Non-optimal cache lifecycle. Equality delete files belong to specific partitions. Nevertheless, the standard cache retains them for the entire duration of table processing, even after a specific partition has been fully processed. As a result, memory holds stale data.

Having analyzed these problems, our engineering team rewrote the standard mechanism for reading and applying equality-delete files in the Alphyn Lakehouse platform to eliminate the deficiencies described above.

Note: Our implementation of equality-delete file reading and application functionality is not published as open source and is delivered as part of the Managed Iceberg Tables functionality in the Alphyn Lakehouse platform. Therefore we cannot share the implementation source code, but we have outlined the problems and approaches to solving them.

Benchmark Data on the Test Stand

Test table and environment parameters:

  • Table file size: ~22 GB;
  • Compression: zstd, level 3;
  • Number of partitions: 6;
  • Each subsequent partition contains more deletions than the previous;
  • Spark 3.5.4, Iceberg 1.8.1.

Executor parameters (variants): CPU: 12 to 32 cores; Memory (ORM): 140 to 197 GB.

Experiment design. Knowing the optimal values for our optimized Alphyn Lakehouse Cache, determine with what parameters the same maintenance command can be run on the same table using the original open-source Apache Spark Cache.

Results (comparison):

| # | Cache | CPU | Memory | Time | Status | |---|-------|-----|--------|------|--------| | 1 | Alphyn Lakehouse Cache | 32 | 140 GB | 8 min | Success | | 2 | Apache Spark Cache | 32 | 197 GB | ~2 hours | Error (OOM) | | 3 | Apache Spark Cache | 20 | 197 GB | ~2 hours | Error (OOM) | | 4 | Apache Spark Cache | 12 | 197 GB | ~1 hour | Success | | 5 | Apache Spark Cache | 12 | 140 GB | ~3 hours | Error (OOM) |

Key findings from results:

  • Our optimized cache completed the task at high parallelism (32 cores) with reduced memory (140 GB) in 8 minutes, while the standard cache either failed with OOM or required heavy reduction in parallelism to complete successfully;
  • This confirms that optimization through deduplication and partition lifecycle management provides significant gains in both memory and execution time;
  • The metrics show that the optimized cache can utilize more CPU thanks to more threads. CPU time usage also improved -- the gap between the most time-consuming method and others narrowed.

Application in Production Environment

To verify the functionality in production, duplicate copies of 3 data streams were created, and a full initial load into target tables was performed to enable performance and maintenance time comparison on identical data.

We simultaneously reproduced the maintenance schedule covering typical modes (nightly and periodic runs with different loads) and demonstrated a sustained reduction in memory consumption and acceleration of processing while maintaining or increasing parallelism in a number of scenarios.

Test Table 1: Partitioned by Date

Daily new partition; partition volume grows throughout the day. Runs: nightly (full cycle), hourly.

| Parameter | Apache Spark Cache | Alphyn Lakehouse Cache | |-----------|-------------------|----------------------| | Executors | 5 | 1 | | Cores (total cluster) | 10 | 9 | | Memory (per executor) | 100 GB | 71 GB | | Peak actual consumption | 100 GB | 65 GB | | Processing parallelism | 10 | 3 | | Average execution time | ~60 min | ~45 min | | Total cluster resources | 50 cores, 500 GB | 9 cores, 71 GB |

Test Table 2: Partitioned by Bucket

Equality deletes apply to files in each bucket -- maintenance requires rewriting a large number of files.

| Parameter | Apache Spark Cache | Alphyn Lakehouse Cache | |-----------|-------------------|----------------------| | Executors | 5 | 1 | | Cores | 5 | 27 | | Memory (per executor) | 100 GB | 81 GB | | Peak actual consumption | 90 GB | 70 GB | | Processing parallelism | 10 | 7 | | Average execution time | ~21 min | ~23 min | | Total cluster resources | 25 cores, 500 GB | 27 cores, 81 GB |

Test Table 3: No Partitioning

Equality deletes potentially affect all data files, increasing the rewrite volume.

| Parameter | Apache Spark Cache | Alphyn Lakehouse Cache | |-----------|-------------------|----------------------| | Executors | 5 | 1 | | Cores | 5 | 32 | | Memory (per executor) | 100 GB | 188 GB | | Peak actual consumption | 95 GB | 170 GB | | Processing parallelism | 10 | 8 | | Average execution time | ~20 min | ~30 min | | Total cluster resources | 25 cores, 500 GB | 32 cores, 188 GB |

Total Effect Across Three Tables

  • Potentially freed memory: ~1,222 GB;
  • Reduction in pod count: ~12 pods;
  • Released CPU cores: ~38 cores.

Conclusion

Equality delete is a practical and viable option for implementing deletion/updates in Apache Iceberg for streaming scenarios. Our optimization (deduplication, partition-aware lifecycle management, accelerated membership checks) resolves the key problems and demonstrated practical benefits in production tests:

  • Significant reduction in total allocated memory (~1,222 GB potentially freed across three production tables);
  • Reduction in the number of pods in the Kubernetes cluster;
  • Ability to increase CPU density per executor;
  • Maintenance or improvement of execution times in most scenarios.

Currently our solution is used in client production environments for maintaining all Iceberg tables as part of the data functionality. It is delivered with the Alphyn Lakehouse data platform.

Topics

icebergsparkstreamingoptimizationequality-deleteflink

Ready to Modernize Your Data Platform?

See how Alphyn Lakehouse can transform your data infrastructure.