Back to blog

Apache Iceberg Query Planning Explained: Predicate Pushdown, Manifest Filtering, and Data Skipping

Apache Iceberg query planning is the coordinator-bound bottleneck before any parallel scan starts. This guide covers predicate pushdown, manifest list pruning, file-level data skipping, and what data platform teams do to keep planning fast as tables grow.

Apache Iceberg query planning internals — predicate pushdown, manifest filtering, and data skipping

Every query against an Apache Iceberg table starts the same way: query planning (scan planning) runs on the coordinator to decide which files might match the query. This phase happens before any parallel execution, before any data is read, before any row is returned. If Iceberg query planning is slow, the query is slow — regardless of how many Spark, Trino, or Flink executors are waiting on the other side.

Planning is where Iceberg's metadata architecture either pays off or falls apart. A well-maintained table with compact manifests, accurate column statistics, and a thoughtful sort order plans in milliseconds — the engine skips 95% of files and launches tasks against a small, precise set of data. A neglected table with thousands of fragmented manifests, missing stats, and randomly ordered files forces the planner to open and evaluate every manifest, consider every file, and hand off far more work than necessary to the execution layer.

This article is a technical deep-dive into how Iceberg query planning actually works — drawing on the official Iceberg performance documentation and the Java implementation internals. We walk through the three-level filtering hierarchy, how predicates bind by field ID, the key evaluator classes in the Java implementation, what makes planning slow, and how to keep it fast.

Why planning is a single-node problem

Distributed query engines — Spark, Trino, Flink — parallelize data processing across hundreds or thousands of tasks. But planning cannot be parallelized in the same way. The planner must read the table's metadata tree, apply the query's filter predicates against partition summaries and column statistics, determine which files survive filtering, and produce a set of scan tasks that the engine distributes to workers.

This entire process runs on a single node — the driver in Spark, the coordinator in Trino. The planner is CPU-bound and I/O-bound against the metadata layer (reading manifests from object storage or cache). If the table has 2,000 manifests, the planner must evaluate all 2,000. Modern Iceberg versions can parallelize manifest reading within the coordinator (via ManifestGroup.planWith(ExecutorService)), but the work is still bounded by that single node's CPU, memory, and network. If each manifest contains column statistics stored as maps that must be fully deserialized, the coordinator pays that cost regardless of how many executors are waiting downstream.

The implication is direct: planning time is bounded by metadata volume, not by cluster size. You cannot throw more executors at a slow planning phase. You can only reduce the amount of metadata the planner must evaluate and increase the effectiveness of each evaluation. That is what Iceberg's hierarchical filtering is designed to do — and that is what breaks down when metadata is not maintained.

The three-level filtering hierarchy

Iceberg's planning uses a hierarchical metadata structure that progressively narrows the set of files the engine must read. Each level acts as a filter, passing only surviving candidates to the next level. The three levels are: manifest list filtering, manifest file evaluation, and Parquet row group skipping.

Level 1: Manifest list → manifest filtering

The entry point to planning is the current snapshot. Every snapshot points to a manifest list — an Avro file that contains one entry per manifest. Each manifest list entry stores metadata about the manifest it references, including the partition spec ID and partition summaries: the min and max bounds of the partition values across all files tracked by that manifest.

The planner's first job is to determine which manifests are worth opening. It does this by projecting the query's filter predicate onto the partition spec and evaluating the projected expression against each manifest's partition summary. If a manifest's partition value range cannot intersect the query's filter, the entire manifest — and every data file it tracks — is skipped without being read.

For example, consider a table partitioned by day(event_timestamp) with a query filtering WHERE event_timestamp > '2026-06-01'. The planner converts this to a partition predicate: day_event_timestamp > days_from_epoch('2026-06-01'). A manifest whose partition summary shows max(day_event_timestamp) = days_from_epoch('2026-03-15') — meaning all its files are from March or earlier — is eliminated immediately. The manifest file is never opened, and every file it contains is skipped.

This is the cheapest level of filtering. The manifest list is a single file, typically cached after the first read. Evaluating partition summaries is a comparison of integer or binary values against the projected predicate. For well-partitioned tables, this level alone can eliminate 80–90% of manifests.

In the Java implementation, this logic lives in ManifestGroup.java. The class builds a ManifestEvaluator for each partition spec (cached via Caffeine), projects the data filter inclusively onto the partition spec using Projections.inclusive(), and evaluates each manifest in the list. Manifests that fail evaluation are counted in skippedDataManifests scan metrics and never opened.

Level 2: Manifest files → data file filtering

Manifests that survive Level 1 are opened and their entries evaluated. Each manifest is an Avro file where every entry represents a data file. Each entry includes the file's specific partition tuple, and — critically — per-column statistics: value counts, null counts, NaN counts, and lower/upper bounds (min/max) for each column.

The planner applies two evaluators against each data file entry.

Partition evaluator. The query's partition filter is evaluated against the file's exact partition tuple using Evaluator.java. This is a tighter check than Level 1 — Level 1 tested against the manifest's partition range, while Level 2 tests against each file's specific partition value. A manifest might have a partition range of January through March, but a query filtering for February will skip January and March files at this level.

Metrics evaluator. The query's column filter is evaluated against the file's column-level statistics using InclusiveMetricsEvaluator. This is where data skipping happens — the evaluator tests whether the file's column value ranges could possibly contain rows matching the predicate. If the query filters WHERE user_id = 42 and a file's user_id upper bound is 30, the file provably cannot contain matching rows and is skipped.

The InclusiveMetricsEvaluator is inclusive: it returns ROWS_MIGHT_MATCH or ROWS_CANNOT_MATCH. It never claims rows definitely exist — only that they might. This is safe because a false positive (reading a file that has no matching rows) wastes I/O but returns correct results. A false negative (skipping a file that does have matching rows) would silently drop data. Inclusive evaluation guarantees correctness at the cost of occasional unnecessary reads.

Its counterpart, StrictMetricsEvaluator, works the opposite direction: it returns true only if all rows in the file must match the expression. This is used for optimization decisions like residual filter elimination — if all rows in a file match the predicate, the engine can skip applying the filter during scan. Strict evaluation is used to avoid redundant work, not to skip files.

For a concrete example: a query WHERE price > 100 evaluated against a file with min(price) = 150 and max(price) = 500. The inclusive evaluator says ROWS_MIGHT_MATCH (trivially true — the range overlaps). The strict evaluator says ROWS_MUST_MATCH — every row has price >= 150 > 100, so the engine can skip applying the price > 100 filter when reading this file.

The effectiveness of Level 2 filtering depends entirely on the quality of column statistics. If a column's min/max bounds are very wide (the column is randomly distributed across files), every file's range overlaps with most predicates and nothing is skipped. If the column is sorted or clustered, min/max bounds are tight and most files can be eliminated. This is why sort order is the single most impactful lever for data skipping — and why LakeOps, a control plane for Apache Iceberg lakehouses, captures cross-engine query telemetry to determine optimal sort orders per table, keeping column statistics tight and planning times low without manual intervention.

Level 3: Parquet row groups → within-file skipping

Files that survive Level 2 are handed to the execution layer as scan tasks. But filtering does not stop at the file boundary. Parquet files are internally organized into row groups, and each row group carries its own column statistics in the Parquet footer — min/max values, null counts, row counts per column chunk.

When the engine opens a surviving file, it reads the Parquet footer and evaluates the same predicate against each row group's statistics. Row groups that cannot contain matching rows are skipped without reading their column data. This is transparent to Iceberg's planning layer — it is handled by the Parquet reader — but it provides a meaningful third tier of data elimination.

Row group skipping is most effective on sorted files. If a file's rows are sorted by event_date, each row group covers a narrow date range. A query filtering on a specific date skips all but one or two row groups. On an unsorted file, every row group's date range spans the full range of the file, and nothing is skipped.

The three levels compound. A table with 1,000 manifests, 100,000 data files, and 10 row groups per file has 1,000,000 potential row groups. Level 1 might eliminate 800 manifests (skipping 80,000 files). Level 2 might eliminate 15,000 of the remaining 20,000 files. Level 3 might skip 90% of row groups in the surviving 5,000 files. The query ends up reading 5,000 row groups out of 1,000,000 — a 200x reduction before any row-level filtering.

How predicates bind by field ID

One of the most important design decisions in Iceberg's planning is how predicates connect to data. When Spark pushes a predicate like WHERE user_id = 42 into the Iceberg scan builder, the predicate arrives as an unbound expression — it has a column name (user_id), a literal value (42), and an operation (equal). It does not yet know which column in the data files corresponds to user_id.

Binding resolves this. The Iceberg library looks up user_id in the current table schema, finds its field ID (say, field ID 7), and binds the expression to that ID. From this point forward, the predicate operates by field ID, not by name. When the planner evaluates column statistics in a manifest entry, it looks up the lower and upper bounds for field ID 7 — not for a column named user_id.

This design is what makes schema evolution work with query planning. Consider a table where user_id was originally added as field ID 7. A year later, someone renames it to customer_id. In an older system that binds predicates by column name, the rename would break filtering — old data files have a column named user_id, the query references customer_id, and the predicate cannot be applied against historical files.

In Iceberg, the rename only changes the table schema's name mapping. Field ID 7 remains field ID 7 in every data file ever written. The predicate bound to field ID 7 evaluates correctly against both old and new files, regardless of what the column was called when the file was written. Column statistics in manifests are also keyed by field ID, so manifest-level filtering works across schema versions without any special handling.

The same mechanism handles column drops and re-adds. If user_id (field ID 7) is dropped and a new user_id is added later, the new column gets a new field ID (say, field ID 23). Old predicates bound to field ID 7 will not accidentally match the new column, and new predicates bound to field ID 23 will correctly identify that old data files have no statistics for field ID 23 — because the column did not exist when those files were written.

When a predicate references a column that does not exist in a data file (because it was added via schema evolution after the file was written), the evaluator handles this gracefully. The InclusiveMetricsEvaluator treats the absence of statistics for a field as "cannot determine" and returns ROWS_MIGHT_MATCH — the file is read (safe, not optimal). The StrictMetricsEvaluator returns ROWS_MIGHT_NOT_MATCH — the file is not guaranteed to fully satisfy the predicate. For IS NULL checks on columns added after schema evolution, all values in old files are null by definition, enabling additional optimization opportunities.

Inside ManifestGroup.java — the core planning class

The entry point for Iceberg's Java planning implementation is ManifestGroup.java. This class coordinates the entire filtering pipeline: it takes the list of data manifests and delete manifests from the current snapshot's manifest list, applies the three-level filtering, and produces the stream of data file entries that survive.

The flow works as follows. The snapshot is read and its manifest list parsed into two collections: data manifests and delete manifests. These are passed to ManifestGroup, which stores them alongside the partition filter, data filter, schema, partition specs, and configuration for residual evaluation and column projection.

When the scan is executed, ManifestGroup enters its entries() method. The first action is building a ManifestEvaluator cache — a LoadingCache keyed by partition spec ID. Each evaluator is constructed by combining the partition filter with the data filter projected inclusively onto the partition spec (Projections.inclusive(spec, caseSensitive).project(dataFilter)). The inclusive projection converts column predicates into partition predicates that are safe for pruning — a predicate that might match in column space is conservatively widened in partition space.

The evaluator cache ensures that for tables with multiple partition specs (common after partition evolution), each spec's evaluator is built only once. The planner then filters the list of data manifests: each manifest is evaluated by looking up its partition spec ID in the cache and calling eval(manifest). Manifests that return false are skipped and counted in skippedDataManifests metrics.

Surviving manifests are opened by ManifestReader, which iterates over entries and applies two per-entry evaluators: the Evaluator (for partition tuple matching) and the InclusiveMetricsEvaluator (for column statistics filtering). Entries that survive both evaluators are emitted as candidate data files. When an ExecutorService is configured via planWith(), manifest reading and entry evaluation are dispatched to a worker pool using ParallelIterable, allowing multiple manifests to be processed concurrently on the coordinator. This reduces wall-clock planning time on tables with many manifests but does not change the fundamental constraint: all manifest data must be read and evaluated on a single node.

Delete manifests go through a parallel filtering pipeline. The same partition and metrics evaluation applies — delete manifests that cannot affect surviving data files are skipped. The surviving delete files are indexed by partition, and during final task assembly, each data file is paired with its applicable delete files (equality deletes and position deletes) based on partition matching and sequence number comparison.

The final step is planTasks in TableScanUtil, which takes the surviving file entries and produces scan tasks. This is where split sizing comes in — controlled by read.split.target-size and read.split.planning-lookback. Files may be split into multiple tasks if they exceed the target size, or combined into a single task if they are small enough. The output is a list of serializable scan tasks that the engine distributes to workers.

What makes planning slow

Planning performance degrades from three primary causes: metadata volume, statistics quality, and schema width.

Too many manifests. Every manifest in the current snapshot must at minimum have its partition summary evaluated. Manifests that survive Level 1 filtering must be opened, deserialized from Avro, and have every entry evaluated. A table with 5,000 manifests — common for streaming tables with frequent commits and no manifest rewriting — forces the planner to evaluate 5,000 partition summaries and potentially open hundreds of manifest files. Each manifest open is an object storage GET request (or cache lookup), and each entry evaluation is CPU work. Manifest count is the single most common cause of slow planning.

Missing or degraded column statistics. When column statistics are absent — because the writer did not produce them, or because the manifest format does not support them for certain types — the InclusiveMetricsEvaluator cannot eliminate files. Every file becomes a potential match, and the planner passes all of them to the execution layer. This effectively disables Level 2 filtering. The problem compounds with Puffin statistics: without table-level NDV estimates, the engine cannot make informed decisions about join ordering or split sizing, falling back to heuristic defaults.

Wide schemas. Iceberg's V1–V3 manifest format stores column statistics as maps — a map from field ID to lower bound, another map from field ID to upper bound, and so on. To read the statistics for a single column, the Avro reader must deserialize the entire map, including statistics for every other column. For a table with 500 columns, reading one column's bounds requires reading all 500 columns' bounds. This overhead scales linearly with schema width. This is a known pain point: maps are not good for columnar file formats — they prevent pushdown within the metadata file itself. Iceberg V4 proposes replacing these maps with columnar structures to enable selective statistics reading.

Partition spec evolution. Tables that have changed their partition spec over time accumulate manifests with different spec IDs. The planner must maintain a separate evaluator per spec and correctly project predicates for each. If the projection for an old spec cannot express the current query's predicate, manifests under that spec cannot be filtered and must all be opened. This is a subtle but real source of planning slowdown in long-lived tables.

Excessive file count. Even with perfect statistics, evaluating 500,000 data file entries in manifests is expensive CPU work. Each entry requires deserializing its statistics, evaluating the partition, and evaluating column bounds. Reducing total file count through compaction directly reduces the number of entries the planner must process.

How to keep planning fast

Planning performance is a maintenance problem, not a tuning problem. The settings that control planning behavior (read.split.target-size, read.split.planning-lookback) affect task assembly, not the filtering phases that dominate planning time. Keeping planning fast requires keeping the metadata structures that planning depends on in good shape.

Rewrite manifests regularly

Manifest rewriting consolidates fragmented manifests into fewer, larger manifests aligned with partition boundaries. This directly reduces the number of manifests the planner must evaluate and open. A table with 3,000 manifests that compresses to 30 manifests after rewriting will plan orders of magnitude faster.

sql
1CALL catalog.system.rewrite_manifests(2  table => 'db.events'3);

Manifest rewriting should follow compaction — compaction changes the file layout, so rewriting manifests before compaction produces manifests that immediately become stale. The correct sequence is: compact data files, then rewrite manifests against the new layout. For details on the full maintenance pipeline and sequencing, see Apache Iceberg Table Health and Maintenance.

Compact data files

Compaction reduces total file count, which reduces the number of manifest entries the planner must evaluate. But the bigger impact is on Level 2 filtering: sort compaction clusters data by query-relevant columns, producing tight min/max bounds per file. This transforms Level 2 filtering from a no-op (wide bounds, everything overlaps) to a powerful eliminator (tight bounds, most files skipped).

sql
1CALL catalog.system.rewrite_data_files(2  table => 'db.events',3  strategy => 'sort',4  sort_order => 'region ASC NULLS LAST, event_date ASC NULLS LAST'5);

Choosing the right sort order is critical. Sorting by a column that no query filters on wastes the effort — min/max bounds are tight but never tested. Sorting by the most frequently filtered column maximizes data skipping. For multi-column sort orders, column ordering matters: the first sort key gets the tightest clustering, subsequent keys provide diminishing benefit. For a deeper discussion of partitioning strategies that complement sort order, see Iceberg Partitioning Best Practices.

LakeOps Layout Simulations
Layout Simulations replay production SQL against candidate sort strategies before data is rewritten — ensuring the first compaction pass produces the optimal layout for actual query patterns.

Maintain column statistics

Column statistics — the min/max bounds, null counts, and value counts stored per data file in manifests — are the input to Level 2 filtering. Without them, the InclusiveMetricsEvaluator has nothing to evaluate and defaults to ROWS_MIGHT_MATCH for every file. Most modern writers (Spark, Trino, Flink) produce these statistics by default, but some configurations disable them for write performance, and certain column types (complex types, very long strings) may not have meaningful bounds.

Beyond file-level statistics, table-level statistics in Puffin files feed cost-based optimization. NDV estimates from Theta sketches determine join ordering, aggregation strategy, and split sizing. Collecting and maintaining these statistics — especially after compaction invalidates old snapshots — is part of keeping planning effective.

sql
1ANALYZE TABLE catalog.db.events2  COMPUTE STATISTICS FOR COLUMNS user_id, region, event_type;

Use bloom filters for point lookups

For equality predicates (WHERE order_id = '12345'), column min/max bounds provide weak filtering — the value is within the range of almost every file. Bloom filters, stored in Puffin files, provide much stronger pruning for point lookups. A bloom filter can definitively say a value is not in a file, enabling the planner to skip files that min/max bounds would have kept.

Bloom filter support varies by engine and Iceberg version. When available, it adds a fourth filtering layer that is particularly valuable for high-cardinality equality lookups on unsorted columns.

How LakeOps keeps planning fast

LakeOps is a dedicated lakehouse control plane for Apache Iceberg that continuously optimizes the metadata structures that query planning depends on. Rather than reacting to slow queries, LakeOps monitors the structural health signals that predict planning degradation — manifest count, manifest fragmentation, missing statistics, sort order alignment — and acts before performance declines.

LakeOps Tables — health classification
Lake-wide table health classification: every table scored as Critical, Warning, or Healthy based on file counts, manifest count, snapshot depth, and delete file ratios — the structural signals that directly affect planning performance.

Manifest optimization. LakeOps monitors manifest count and manifest-to-file ratio per table. When manifests fragment — from frequent streaming commits, compaction passes, or partition evolution — LakeOps rewrites them automatically, consolidating small manifests into larger, partition-aligned manifests. This directly reduces the number of manifests the planner must evaluate at Level 1 and the number of manifest files it must open for Level 2. The result is planning time that stays constant as the table grows, rather than scaling linearly with commit history.

Statistics generation. LakeOps computes and refreshes Puffin statistics — NDV sketches via Apache DataSketches — as part of the maintenance pipeline, sequenced after compaction. This ensures that statistics reflect the current, compacted layout rather than a stale pre-compaction state. Engines consuming these statistics get accurate cardinality estimates for join ordering, aggregation strategy selection, and dynamic filter optimization. Without a control plane managing statistics lifecycle, teams either never collect statistics (degraded CBO) or collect them once and let them go stale (misleading CBO — arguably worse).

Query-aware sort optimization. Sort order is the most impactful lever for Level 2 data skipping, but choosing the right sort order requires knowing which columns production queries actually filter on. LakeOps captures cross-engine query telemetry — which columns appear in WHERE, JOIN, and GROUP BY clauses across Spark, Trino, Flink, Snowflake, and Athena — and uses this to determine optimal sort orders per table. When access patterns shift, sort orders adapt. The effect on planning is indirect but powerful: sorted data produces tight column statistics, tight statistics mean the InclusiveMetricsEvaluator can eliminate more files, and fewer surviving files mean faster task assembly.

LakeOps Partitions
Partition-level detail: file counts, data sizes, and structural health per partition — revealing which partitions are well-maintained and which are dragging planning performance.

Compaction for planning. Compaction is typically discussed in terms of read performance — fewer files, less I/O overhead. But compaction also directly improves planning. Every data file that compaction eliminates is one fewer manifest entry the planner must evaluate. A table compacted from 50,000 files to 5,000 files plans 10x faster at Level 2, independent of any statistics improvement. LakeOps triggers compaction based on structural signals (file count thresholds, average file size, delete file accumulation) rather than fixed schedules, ensuring tables are compacted when they need it — not on a cron timer that may be too early or too late.

Observability. LakeOps surfaces the planning-related health signals — manifest count, average file size, statistics freshness, sort order alignment — as Insights with severity classifications (Critical, High, Warning, Low). When a table's manifest count crosses a threshold that will impact planning, the system raises a proactive alert rather than waiting for a user to notice query slowdowns. This observability layer connects directly to lakehouse performance and observability solutions.

LakeOps Insights — proactive alerts for table health
Severity-ranked Insights: CRITICAL for partition file issues, HIGH for excessive manifests, WARNING for partition skew and small files.
LakeOps platform walkthrough — catalog connection, table health, and autonomous optimization.

Planning in Iceberg V4

Several changes planned for Iceberg V4 directly address current planning limitations.

Columnar statistics in manifests. The current map-based storage of column statistics (field ID → bound) forces readers to deserialize all columns' statistics to read one. V4 proposes storing statistics in a columnar layout within manifests, enabling the planner to read only the statistics it needs. For wide tables with hundreds of columns, this change alone could reduce manifest parsing time by an order of magnitude.

Root manifest and adaptive tree. Currently, manifest lists and manifest files use different formats and evaluation logic despite serving a similar purpose — each is a list of references with summary statistics. V4 replaces the manifest list with a single Root Manifest per snapshot that can contain entries at different levels depending on the workload. For small streaming writes, file entries can be inlined directly into the root — achieving O(1) commit overhead. For large batch writes, the root points to leaf manifests as the current architecture does. This adaptive tree unifies the metadata hierarchy and allows the same evaluation optimizations (columnar projection, predicate pushdown) to apply at every level.

Colocated deletion vectors. V4 replaces position delete files — separate files that every reader must reconcile — with deletion vectors embedded directly in the data file's content entry within the manifest. Instead of maintaining a parallel tree of delete manifests that the planner must filter and index against data files, row-level deletes become a colocated property of the data file entry itself. This eliminates an entire class of metadata objects from the planning path and removes the delete-file-to-data-file reconciliation step that adds latency to task assembly on tables with frequent row-level updates.

These changes will not alter the fundamental three-level planning architecture — the principle of hierarchical metadata filtering is sound. They will make each level faster and more efficient, particularly for wide tables and tables with deep metadata history.

Conclusion

Iceberg query planning is a coordinator-bound phase that determines how much work the rest of the query must do. The three-level filtering hierarchy — manifest list pruning, manifest file evaluation, and Parquet row group skipping — is designed to eliminate as much data as possible before any parallel execution begins. The effectiveness of this filtering depends directly on the quality of the metadata it evaluates: how many manifests exist, how accurate column statistics are, how tightly data is clustered by sort order.

Keeping planning fast is not a one-time configuration — it is ongoing maintenance. Manifests fragment. Statistics go stale. Sort orders drift from current query patterns. Compaction backlogs grow. Each of these degradations erodes planning performance incrementally, often invisibly, until queries that once planned in 200ms take 15 seconds.

For teams managing many tables across multiple engines, a control plane like LakeOps automates the structural maintenance that planning depends on — manifest rewriting, statistics refresh, query-aware sort optimization, and compaction — so planning stays fast without manual intervention. The result is a lakehouse where the serial bottleneck of every query is consistently optimized, and query performance scales with compute rather than degrading with metadata growth.

Related articles

Found this useful? Share it with your team.