Back to blog

Apache Iceberg with Flink: Streaming Optimization Guide

Flink streaming into Iceberg creates thousands of small files per hour. This guide covers checkpoint tuning, write distribution modes, Flink SQL patterns, and why external maintenance is essential for production streaming tables.

Apache Iceberg with Flink Optimization — Flink squirrel mascot with streaming data flowing through an optimization ring into a geometric iceberg, with performance metric icons

Apache Flink is one of the most powerful engines for streaming data into Apache Iceberg. It provides exactly-once semantics, flexible windowing, and native Iceberg integration through both the DataStream API and Flink SQL. But streaming creates a structural problem that batch engines do not: every checkpoint produces a commit, and every commit produces files. At 60-second checkpoints with 32 subtasks writing to 10 partitions, you generate 320 files per minute — 460,000 files per day.

This guide covers how to configure Flink for Iceberg, tune the streaming write path, handle the inevitable small files problem, and set up the external maintenance that keeps streaming tables queryable in production.

Flink connects to Iceberg through the iceberg-flink-runtime JAR, which provides FlinkSink (legacy DataStream API), IcebergSink (newer SinkV2-based API, recommended for new projects), IcebergSource (unified source), and a full SQL catalog integration. FlinkSink is deprecated as of Iceberg 1.9+ and will be removed in the Flink 2.0 code path — new development should use IcebergSink. The integration supports three catalog backends:

sql
1-- REST catalog (Polaris, Gravitino, Nessie, Lakekeeper)2CREATE CATALOG iceberg_rest WITH (3  'type' = 'iceberg',4  'catalog-type' = 'rest',5  'uri' = 'http://rest-catalog:8181',6  'warehouse' = 's3://lakehouse/warehouse'7);8 9-- Hive Metastore catalog10CREATE CATALOG iceberg_hive WITH (11  'type' = 'iceberg',12  'catalog-type' = 'hive',13  'uri' = 'thrift://hive-metastore:9083',14  'warehouse' = 's3://lakehouse/warehouse'15);16 17-- Hadoop catalog (filesystem-based)18CREATE CATALOG iceberg_hadoop WITH (19  'type' = 'iceberg',20  'catalog-type' = 'hadoop',21  'warehouse' = 's3://lakehouse/warehouse'22);

For production streaming, REST catalogs are preferred — they support concurrent commits from multiple writers, provide centralized access control, and avoid the single-point-of-failure of a Hive Metastore. The managed Iceberg control plane architecture pairs well with REST catalogs that expose a standardized API surface.

How Streaming Writes Create Commits and Files

Understanding the commit mechanics is essential. Flink's Iceberg sink ties its commit cycle to Flink checkpoints:

  1. 1.Each Flink subtask writes Parquet files to the target partition during a checkpoint interval
  2. 2.When a checkpoint triggers, each subtask flushes its current file (regardless of size)
  3. 3.The Iceberg committer operator collects all flushed file paths and issues a single Iceberg commit
  4. 4.The commit atomically appends all new data files to the table's metadata

This means: number of files per commit = number of writer subtasks × number of active partitions. With 16 subtasks and 5 active partitions, each checkpoint produces up to 80 files. At a 1-minute checkpoint interval, that is 115,200 files per day — most far below the optimal 256–512 MB target size.

Streaming ingestion creates file proliferation across partitions
Multi-engine and streaming ingestion into Iceberg tables creates rapid file accumulation — every checkpoint, every writer subtask, every active partition produces a new file.

LakeOps addresses this by running compaction independently — detecting checkpoint-driven file accumulation and optimizing tables without interfering with streaming writes. Rather than coupling maintenance to the same Flink cluster, it operates as an external control plane that responds to actual table state. More on this below.

Tuning Checkpoint Intervals

The checkpoint interval is the single most impactful setting for Flink-to-Iceberg streaming. It controls both data freshness and file accumulation rate:

java
1// StreamExecutionEnvironment configuration2env.enableCheckpointing(300_000); // 5 minutes3env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60_000);4env.getCheckpointConfig().setCheckpointTimeout(600_000);
  • Real-time dashboards (< 1 min latency): 30–60 seconds. Accept heavy file accumulation, requires aggressive external compaction.
  • Near-real-time analytics (1–5 min latency): 3–5 minutes. Balances freshness with manageable file counts.
  • Streaming ETL (5–15 min latency tolerance): 10–15 minutes. Produces larger files, less compaction pressure.
  • Micro-batch simulation (> 15 min tolerance): 15–30 minutes. Approaches batch-like file sizes.

A 5-minute interval with 16 subtasks and 5 partitions produces 80 files per commit, ~23,000 per day. Each file averages 10–50 MB depending on throughput — still below optimal but manageable with regular compaction. Tools like LakeOps detect when file accumulation crosses thresholds and trigger compaction automatically — no cron schedules to manage or manual intervention required.

Write Distribution Modes

Flink's Iceberg sink supports three write distribution modes that control how records are shuffled before writing. This directly impacts file count and downstream query performance:

sql
1-- Set distribution mode at table level2ALTER TABLE catalog.db.events SET ('write.distribution-mode' = 'hash');

Distribution mode comparison

  • `none` (default): No shuffle. Each subtask writes whatever records it receives. Fastest throughput but creates the most files (subtasks × partitions). Use for unpartitioned tables or when write latency is critical.
  • `hash`: Shuffles records by partition key so each partition is written by a single subtask. Reduces file count to one per partition per checkpoint. Adds network shuffle cost. Use for partitioned tables with moderate partition cardinality (< 100 active partitions).
  • `range`: Distributes records by sort key range across subtasks before writing. Produces files with better data clustering for range queries. Most expensive but produces the best physical layout. Currently supported only in FlinkSinkIcebergSink falls back to hash if range is specified. Use when downstream queries heavily filter on the sort columns.

For streaming workloads, hash distribution is usually the best trade-off — it cuts file count by the parallelism factor without the full cost of range sorting on every checkpoint. Combined with proper compaction at scale, hash distribution provides the best balance of write throughput and downstream query performance. Even with hash distribution, streaming tables still accumulate files that require autonomous maintenance to stay queryable.

Table creation with write properties

sql
1CREATE TABLE iceberg_rest.analytics.user_events (2  event_id STRING,3  user_id BIGINT,4  event_type STRING,5  event_time TIMESTAMP(3),6  properties MAP<STRING, STRING>,7  dt STRING8) PARTITIONED BY (dt)9WITH (10  'format-version' = '2',11  'write.distribution-mode' = 'hash',12  'write.target-file-size-bytes' = '268435456',13  'write.parquet.compression-codec' = 'zstd'14);

Streaming INSERT

sql
1-- Continuous streaming insert from Kafka source2INSERT INTO iceberg_rest.analytics.user_events3SELECT4  event_id,5  user_id,6  event_type,7  event_time,8  properties,9  DATE_FORMAT(event_time, 'yyyy-MM-dd') AS dt10FROM kafka_source.default.raw_events;

Upsert mode with equality deletes

sql
1-- Enable upsert mode for CDC streams2CREATE TABLE iceberg_rest.analytics.user_profiles (3  user_id BIGINT,4  email STRING,5  name STRING,6  updated_at TIMESTAMP(3),7  PRIMARY KEY (user_id) NOT ENFORCED8) WITH (9  'format-version' = '2',10  'write.upsert.enabled' = 'true'11);12 13-- Upsert from a CDC source14INSERT INTO iceberg_rest.analytics.user_profiles15SELECT user_id, email, name, updated_at16FROM mysql_cdc.users.user_profiles;

Upsert mode generates equality delete files on every commit. These accumulate fast in streaming and must be compacted — readers pay reconciliation cost proportional to delete file count. This makes external maintenance even more critical for upsert tables.

Read Optimization — Streaming and Incremental Reads

Flink can also consume Iceberg tables as streaming sources, reading new commits incrementally:

sql
1-- Streaming read from a specific snapshot2SELECT * FROM iceberg_rest.analytics.user_events3/*+ OPTIONS(4  'streaming' = 'true',5  'monitor-interval' = '30s',6  'start-snapshot-id' = '3821550127947089987'7) */;

Read modes

  • Streaming read: Polls for new snapshots at monitor-interval, emits only new data files from each commit. Ideal for downstream Flink jobs consuming from Iceberg as a message bus replacement.
  • Incremental read: Reads data between two specific snapshots. Useful for backfills and catch-up processing.
  • Snapshot-based batch read: Standard batch scan of a specific snapshot. Use for periodic analytics or validation jobs.

For streaming reads, small files directly impact read performance — each file requires a split, task scheduling overhead, and S3 GET request. Tables with 100,000+ files per partition become unreadable in real-time. This reinforces why lakehouse performance optimization through compaction is non-negotiable for streaming tables.

Small files from frequent checkpoints

The core issue. A 1-minute checkpoint with 32 subtasks across 20 partitions generates 640 files per minute. Each file is 1–10 MB. Query planning alone (listing and parsing all file metadata) takes longer than the actual scan.

Partition explosion

Streaming to hourly or daily partitions with high-cardinality secondary keys creates long partition lists. Combined with high file counts per partition, metadata operations become the bottleneck. Tables with 10,000+ partitions and millions of files experience multi-second planning times even on simple queries.

Metadata bloat

Every checkpoint produces a new snapshot. With 1-minute checkpoints, a table accumulates 1,440 snapshots per day. Each snapshot references a manifest list, which references manifests, which reference data files. Without snapshot expiration, the metadata tree grows unbounded — eventually the metadata lifecycle itself becomes the performance problem.

Commit conflicts under concurrency

Multiple Flink jobs writing to the same table (or a Flink job concurrent with compaction) can trigger commit conflicts. Iceberg's optimistic concurrency detects conflicts and retries, but frequent conflicts degrade throughput. Use separate tables or careful partition isolation when running concurrent writers. LakeOps handles this by using Iceberg's snapshot isolation for conflict-free compaction that never blocks streaming writes — a critical property for production streaming architectures.

Healthy vs Unhealthy Iceberg tables — the impact of streaming without maintenance
Unmaintained streaming tables accumulate small files, fragmented manifests, and slow queries. Autonomous maintenance restores them to a healthy state — compacted files, optimized metadata, fast reads.
java
1// Key Iceberg sink properties2Map<String, String> writeProps = new HashMap<>();3writeProps.put("write.target-file-size-bytes", "268435456"); // 256 MB target4writeProps.put("write.parquet.compression-codec", "zstd");5writeProps.put("write.distribution-mode", "hash");6writeProps.put("write.upsert.enabled", "false");7 8// FlinkSink (legacy API) with write properties9FlinkSink.forRowData(input)10    .tableLoader(tableLoader)11    .setAll(writeProps)12    .overwrite(false)13    .append();14 15// IcebergSink (recommended SinkV2 API) — same .set()/.setAll() interface16IcebergSink.forRowData(input)17    .tableLoader(tableLoader)18    .setAll(writeProps)19    .uidSuffix("iceberg-events-sink")20    .append();

Critical parameters

  • `write.target-file-size-bytes` — Target file size before rolling. Default is 512 MB but streaming rarely hits this; increase checkpoint interval or reduce parallelism to let files grow larger.
  • `sink.parallelism` — Controls how many subtasks write concurrently. Lower parallelism = fewer files per checkpoint but potentially higher backpressure. Match to your throughput requirements.
  • `write.parquet.row-group-size-bytes` — Row group size within Parquet files. Smaller row groups enable finer-grained predicate pushdown but add overhead for tiny files.
  • `write.metadata.delete-after-commit.enabled` — Automatically delete old metadata files after commit. Helps prevent metadata accumulation.
  • `write.metadata.previous-versions-max` — Number of previous metadata files to keep. Set to 3–5 for streaming tables.

Note: When using IcebergSink, use uidSuffix (not uidPrefix as in FlinkSink) to identify sink operators in Flink's job graph. Both APIs use .set(key, value) and .setAll(map) for write properties.

Parallelism guidance

For a Kafka topic with 64 partitions, you do not need 64 writer subtasks. Flink can decouple source parallelism from sink parallelism. Setting source at 64 and sink at 8–16 with hash distribution yields far fewer files while maintaining consumption throughput.

Post-Write Maintenance — Why Flink Alone Is Not Enough

Iceberg introduced a TableMaintenance API for Flink (available since Iceberg 1.7) that can run compaction, snapshot expiration, and orphan file cleanup as integrated Flink tasks alongside streaming pipelines. It uses trigger locks to prevent concurrent maintenance and supports partial-progress commits for large rewrites. This is a meaningful step forward — but it does not eliminate the need for external maintenance in production.

The limitations of Flink-native maintenance:

  • Policy is manual. You define how often tasks run, how aggressive compaction is, and resource allocation. The system executes policy — it does not derive it from table state or query patterns.
  • No query-aware optimization. Flink maintenance can binpack files but cannot reorder data by access patterns observed across your query engines.
  • Resource contention with streaming SLAs. Compaction tasks in the same Flink cluster compete for slots and memory with your streaming pipeline. Aggressive compaction can cause backpressure on ingestion.
  • No cross-table coordination. Each table's maintenance runs independently — there is no global prioritization based on which tables are degraded most.
  • JVM overhead at scale. Rewriting thousands of small files into 256 MB targets through Flink's JVM carries GC pressure and memory overhead that compounds across many tables.

Production streaming tables need an external maintenance system that watches for file accumulation and triggers compaction independently, without interrupting the streaming pipeline. This is where a dedicated optimization layer becomes essential — especially at scale across dozens of streaming tables.

Streaming tables are the hardest tables to keep healthy — files accumulate continuously, delete files grow from upserts, snapshots multiply by the minute, and access patterns shift with business hours. LakeOps is a dedicated lakehouse control plane built in Rust on Apache DataFusion. It connects to your existing catalogs and query engines — no pipeline changes, no data movement — and provides event-driven maintenance purpose-built for the challenges Flink streaming creates.

LakeOps compaction engine processing streaming table file accumulation
LakeOps autonomous compaction — event-driven triggers detect file accumulation from streaming writers and execute coordinated maintenance without interrupting active pipelines.
LakeOps in action — autonomous compaction and table health monitoring for streaming Iceberg workloads.

Key capabilities for Flink-generated tables:

  • Event-driven compaction triggered by streaming accumulation — continuously monitors per-table and per-partition structural signals (file count, average file size, delete-file ratio). When a streaming table crosses a threshold, compaction fires automatically — proportional to actual need, not fixed cron schedules.
  • Rust execution engine (86% faster than Spark) — processes Parquet through Arrow columnar buffers with bounded memory, zero GC, and lock-free parallelism. Critical for streaming tables that need multiple compaction passes per day.
  • Coordinated maintenance stack — sequences snapshot expiration (critical at 1,440+ snapshots/day from minute-level checkpoints), orphan cleanup, compaction, and manifest optimization automatically. Each step's output becomes the next step's clean input.
  • Conflict-free operation alongside streaming writers — uses Iceberg's snapshot isolation so compaction commits never conflict with Flink's streaming commits. The streaming pipeline is never interrupted or blocked.
  • Query-aware sort driven by production telemetry — collects query patterns from all connected engines (including Flink SQL reads) and applies the sort order that maximizes data skipping, without manual ALTER TABLE statements or pipeline restarts.
  • Per-table policies — each table gets independent maintenance configuration tuned to its ingestion rate, query patterns, and SLA requirements.
LakeOps optimization and compaction events for streaming tables
LakeOps Optimization tab — compaction strategy, target file sizes, and maintenance configuration for streaming Iceberg tables.
LakeOps dashboard showing streaming table health metrics
LakeOps Dashboard — real-time visibility into table health across your lakehouse, with streaming tables prioritized by degradation severity and maintenance urgency.
LakeOps Layout Simulations — optimizing sort order for streaming tables
Layout Simulations: field access frequency from production queries across all engines, candidate sort configurations compared against the baseline — ensuring streaming table compaction produces optimally sorted files.
LakeOps table events showing maintenance pipeline execution
Table Events showing the sequenced maintenance pipeline — Compact Data Files, Expire Snapshots, Rewrite Manifests — with timing and impact metrics for each operation.
LakeOps compaction benchmarks comparing performance
Compaction benchmarks — LakeOps Rust engine vs. Spark across varying file counts, demonstrating 86% faster execution critical for streaming tables requiring multiple compaction passes per day.
  1. 1.Use an external maintenance system from day one. Streaming tables accumulate files faster than any other workload. Do not wait until queries degrade — connect a dedicated control plane like LakeOps that detects file accumulation in real time and triggers compaction, snapshot expiration, and manifest optimization automatically. Flink's built-in TableMaintenance handles basic binpack but lacks query-aware sorting, cross-table prioritization, and adaptive triggering.
  2. 2.Set checkpoint interval to 3–5 minutes minimum for analytics tables. Only use sub-minute checkpoints when downstream consumers genuinely need second-level freshness.
  3. 3.Use hash distribution mode for partitioned tables to reduce file count by the parallelism factor.
  4. 4.Decouple source and sink parallelism. Read from Kafka at full partition count but write to Iceberg with 8–16 subtasks.
  5. 5.Use `IcebergSink` for new projects — it is based on Flink's modern SinkV2 interface and will be the only supported sink in Flink 2.0. Migrate existing FlinkSink usage proactively.
  6. 6.Enable format-version 2 for all tables — required for equality deletes, row-level updates, and streaming reads.
  7. 7.Set snapshot expiration aggressively — retain 3–5 snapshots or 1–2 hours for streaming tables. Do not let snapshot count grow unbounded.
  8. 8.Isolate partitions between concurrent writers. If multiple Flink jobs write to the same table, ensure each writes to distinct partitions to avoid commit conflicts.
  9. 9.Monitor file counts per partition — set alerts at 500+ files. By 1,000 files, query planning alone dominates latency.
  10. 10.Use zstd compression — best ratio-to-speed trade-off for streaming Parquet files.
  11. 11.Test upsert tables under load before production — equality delete accumulation can degrade read performance faster than append-only tables.

Conclusion

Flink is an excellent streaming engine for Iceberg — it provides exactly-once semantics, flexible SQL and DataStream APIs, and robust checkpoint-based commits. But streaming fundamentally creates a maintenance debt that compounds over time: small files, snapshot proliferation, delete file accumulation, and metadata growth.

The key insight is that Flink optimization is not just about Flink configuration. Checkpoint tuning and distribution modes reduce the rate of file accumulation, but they cannot eliminate it. Production streaming tables need a continuous, event-driven maintenance system that responds to actual table state — not schedules — and coordinates the full stack of compaction, snapshot expiration, orphan cleanup, and manifest optimization.

Whether you manage that maintenance manually, through scheduled Spark jobs, or with a dedicated control plane like LakeOps, the worst outcome is ignoring it. A streaming table left unmaintained for a week can accumulate enough files to make queries 10–50× slower than the same data properly compacted. Explore the solutions overview to see how autonomous maintenance fits into your lakehouse architecture, or dive into related guides on Iceberg table health and partitioning best practices.

Tags

Apache IcebergApache IcebergApache FlinkFlink streamingIceberg compactionsmall filescheckpoint tuning

Related articles

Found this useful? Share it with your team.