
Apache Flink is one of the most powerful engines for streaming data into Apache Iceberg. It provides exactly-once semantics, flexible windowing, and native Iceberg integration through both the DataStream API and Flink SQL. But streaming creates a structural problem that batch engines do not: every checkpoint produces a commit, and every commit produces files. At 60-second checkpoints with 32 subtasks writing to 10 partitions, you generate 320 files per minute — 460,000 files per day.
This guide covers how to configure Flink for Iceberg, tune the streaming write path, handle the inevitable small files problem, and set up the external maintenance that keeps streaming tables queryable in production.
Flink's Iceberg Integration Architecture
Flink connects to Iceberg through the iceberg-flink-runtime JAR, which provides FlinkSink (legacy DataStream API), IcebergSink (newer SinkV2-based API, recommended for new projects), IcebergSource (unified source), and a full SQL catalog integration. FlinkSink is deprecated as of Iceberg 1.9+ and will be removed in the Flink 2.0 code path — new development should use IcebergSink. The integration supports three catalog backends:
1-- REST catalog (Polaris, Gravitino, Nessie, Lakekeeper)2CREATE CATALOG iceberg_rest WITH (3 'type' = 'iceberg',4 'catalog-type' = 'rest',5 'uri' = 'http://rest-catalog:8181',6 'warehouse' = 's3://lakehouse/warehouse'7);8 9-- Hive Metastore catalog10CREATE CATALOG iceberg_hive WITH (11 'type' = 'iceberg',12 'catalog-type' = 'hive',13 'uri' = 'thrift://hive-metastore:9083',14 'warehouse' = 's3://lakehouse/warehouse'15);16 17-- Hadoop catalog (filesystem-based)18CREATE CATALOG iceberg_hadoop WITH (19 'type' = 'iceberg',20 'catalog-type' = 'hadoop',21 'warehouse' = 's3://lakehouse/warehouse'22);For production streaming, REST catalogs are preferred — they support concurrent commits from multiple writers, provide centralized access control, and avoid the single-point-of-failure of a Hive Metastore. The managed Iceberg control plane architecture pairs well with REST catalogs that expose a standardized API surface.
How Streaming Writes Create Commits and Files
Understanding the commit mechanics is essential. Flink's Iceberg sink ties its commit cycle to Flink checkpoints:
- 1.Each Flink subtask writes Parquet files to the target partition during a checkpoint interval
- 2.When a checkpoint triggers, each subtask flushes its current file (regardless of size)
- 3.The Iceberg committer operator collects all flushed file paths and issues a single Iceberg commit
- 4.The commit atomically appends all new data files to the table's metadata
This means: number of files per commit = number of writer subtasks × number of active partitions. With 16 subtasks and 5 active partitions, each checkpoint produces up to 80 files. At a 1-minute checkpoint interval, that is 115,200 files per day — most far below the optimal 256–512 MB target size.

LakeOps addresses this by running compaction independently — detecting checkpoint-driven file accumulation and optimizing tables without interfering with streaming writes. Rather than coupling maintenance to the same Flink cluster, it operates as an external control plane that responds to actual table state. More on this below.
Tuning Checkpoint Intervals
The checkpoint interval is the single most impactful setting for Flink-to-Iceberg streaming. It controls both data freshness and file accumulation rate:
1// StreamExecutionEnvironment configuration2env.enableCheckpointing(300_000); // 5 minutes3env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60_000);4env.getCheckpointConfig().setCheckpointTimeout(600_000);Recommended intervals by workload
- Real-time dashboards (< 1 min latency): 30–60 seconds. Accept heavy file accumulation, requires aggressive external compaction.
- Near-real-time analytics (1–5 min latency): 3–5 minutes. Balances freshness with manageable file counts.
- Streaming ETL (5–15 min latency tolerance): 10–15 minutes. Produces larger files, less compaction pressure.
- Micro-batch simulation (> 15 min tolerance): 15–30 minutes. Approaches batch-like file sizes.
A 5-minute interval with 16 subtasks and 5 partitions produces 80 files per commit, ~23,000 per day. Each file averages 10–50 MB depending on throughput — still below optimal but manageable with regular compaction. Tools like LakeOps detect when file accumulation crosses thresholds and trigger compaction automatically — no cron schedules to manage or manual intervention required.
Write Distribution Modes
Flink's Iceberg sink supports three write distribution modes that control how records are shuffled before writing. This directly impacts file count and downstream query performance:
1-- Set distribution mode at table level2ALTER TABLE catalog.db.events SET ('write.distribution-mode' = 'hash');Distribution mode comparison
- `none` (default): No shuffle. Each subtask writes whatever records it receives. Fastest throughput but creates the most files (subtasks × partitions). Use for unpartitioned tables or when write latency is critical.
- `hash`: Shuffles records by partition key so each partition is written by a single subtask. Reduces file count to one per partition per checkpoint. Adds network shuffle cost. Use for partitioned tables with moderate partition cardinality (< 100 active partitions).
- `range`: Distributes records by sort key range across subtasks before writing. Produces files with better data clustering for range queries. Most expensive but produces the best physical layout. Currently supported only in
FlinkSink—IcebergSinkfalls back tohashifrangeis specified. Use when downstream queries heavily filter on the sort columns.
For streaming workloads, hash distribution is usually the best trade-off — it cuts file count by the parallelism factor without the full cost of range sorting on every checkpoint. Combined with proper compaction at scale, hash distribution provides the best balance of write throughput and downstream query performance. Even with hash distribution, streaming tables still accumulate files that require autonomous maintenance to stay queryable.
Flink SQL for Iceberg — Common Patterns
Table creation with write properties
1CREATE TABLE iceberg_rest.analytics.user_events (2 event_id STRING,3 user_id BIGINT,4 event_type STRING,5 event_time TIMESTAMP(3),6 properties MAP<STRING, STRING>,7 dt STRING8) PARTITIONED BY (dt)9WITH (10 'format-version' = '2',11 'write.distribution-mode' = 'hash',12 'write.target-file-size-bytes' = '268435456',13 'write.parquet.compression-codec' = 'zstd'14);Streaming INSERT
1-- Continuous streaming insert from Kafka source2INSERT INTO iceberg_rest.analytics.user_events3SELECT4 event_id,5 user_id,6 event_type,7 event_time,8 properties,9 DATE_FORMAT(event_time, 'yyyy-MM-dd') AS dt10FROM kafka_source.default.raw_events;Upsert mode with equality deletes
1-- Enable upsert mode for CDC streams2CREATE TABLE iceberg_rest.analytics.user_profiles (3 user_id BIGINT,4 email STRING,5 name STRING,6 updated_at TIMESTAMP(3),7 PRIMARY KEY (user_id) NOT ENFORCED8) WITH (9 'format-version' = '2',10 'write.upsert.enabled' = 'true'11);12 13-- Upsert from a CDC source14INSERT INTO iceberg_rest.analytics.user_profiles15SELECT user_id, email, name, updated_at16FROM mysql_cdc.users.user_profiles;Upsert mode generates equality delete files on every commit. These accumulate fast in streaming and must be compacted — readers pay reconciliation cost proportional to delete file count. This makes external maintenance even more critical for upsert tables.
Read Optimization — Streaming and Incremental Reads
Flink can also consume Iceberg tables as streaming sources, reading new commits incrementally:
1-- Streaming read from a specific snapshot2SELECT * FROM iceberg_rest.analytics.user_events3/*+ OPTIONS(4 'streaming' = 'true',5 'monitor-interval' = '30s',6 'start-snapshot-id' = '3821550127947089987'7) */;Read modes
- Streaming read: Polls for new snapshots at
monitor-interval, emits only new data files from each commit. Ideal for downstream Flink jobs consuming from Iceberg as a message bus replacement. - Incremental read: Reads data between two specific snapshots. Useful for backfills and catch-up processing.
- Snapshot-based batch read: Standard batch scan of a specific snapshot. Use for periodic analytics or validation jobs.
For streaming reads, small files directly impact read performance — each file requires a split, task scheduling overhead, and S3 GET request. Tables with 100,000+ files per partition become unreadable in real-time. This reinforces why lakehouse performance optimization through compaction is non-negotiable for streaming tables.
Common Problems with Flink + Iceberg Streaming
Small files from frequent checkpoints
The core issue. A 1-minute checkpoint with 32 subtasks across 20 partitions generates 640 files per minute. Each file is 1–10 MB. Query planning alone (listing and parsing all file metadata) takes longer than the actual scan.
Partition explosion
Streaming to hourly or daily partitions with high-cardinality secondary keys creates long partition lists. Combined with high file counts per partition, metadata operations become the bottleneck. Tables with 10,000+ partitions and millions of files experience multi-second planning times even on simple queries.
Metadata bloat
Every checkpoint produces a new snapshot. With 1-minute checkpoints, a table accumulates 1,440 snapshots per day. Each snapshot references a manifest list, which references manifests, which reference data files. Without snapshot expiration, the metadata tree grows unbounded — eventually the metadata lifecycle itself becomes the performance problem.
Commit conflicts under concurrency
Multiple Flink jobs writing to the same table (or a Flink job concurrent with compaction) can trigger commit conflicts. Iceberg's optimistic concurrency detects conflicts and retries, but frequent conflicts degrade throughput. Use separate tables or careful partition isolation when running concurrent writers. LakeOps handles this by using Iceberg's snapshot isolation for conflict-free compaction that never blocks streaming writes — a critical property for production streaming architectures.

Flink-Side Tuning Parameters
1// Key Iceberg sink properties2Map<String, String> writeProps = new HashMap<>();3writeProps.put("write.target-file-size-bytes", "268435456"); // 256 MB target4writeProps.put("write.parquet.compression-codec", "zstd");5writeProps.put("write.distribution-mode", "hash");6writeProps.put("write.upsert.enabled", "false");7 8// FlinkSink (legacy API) with write properties9FlinkSink.forRowData(input)10 .tableLoader(tableLoader)11 .setAll(writeProps)12 .overwrite(false)13 .append();14 15// IcebergSink (recommended SinkV2 API) — same .set()/.setAll() interface16IcebergSink.forRowData(input)17 .tableLoader(tableLoader)18 .setAll(writeProps)19 .uidSuffix("iceberg-events-sink")20 .append();Critical parameters
- `write.target-file-size-bytes` — Target file size before rolling. Default is 512 MB but streaming rarely hits this; increase checkpoint interval or reduce parallelism to let files grow larger.
- `sink.parallelism` — Controls how many subtasks write concurrently. Lower parallelism = fewer files per checkpoint but potentially higher backpressure. Match to your throughput requirements.
- `write.parquet.row-group-size-bytes` — Row group size within Parquet files. Smaller row groups enable finer-grained predicate pushdown but add overhead for tiny files.
- `write.metadata.delete-after-commit.enabled` — Automatically delete old metadata files after commit. Helps prevent metadata accumulation.
- `write.metadata.previous-versions-max` — Number of previous metadata files to keep. Set to 3–5 for streaming tables.
Note: When using IcebergSink, use uidSuffix (not uidPrefix as in FlinkSink) to identify sink operators in Flink's job graph. Both APIs use .set(key, value) and .setAll(map) for write properties.
Parallelism guidance
For a Kafka topic with 64 partitions, you do not need 64 writer subtasks. Flink can decouple source parallelism from sink parallelism. Setting source at 64 and sink at 8–16 with hash distribution yields far fewer files while maintaining consumption throughput.
Post-Write Maintenance — Why Flink Alone Is Not Enough
Iceberg introduced a TableMaintenance API for Flink (available since Iceberg 1.7) that can run compaction, snapshot expiration, and orphan file cleanup as integrated Flink tasks alongside streaming pipelines. It uses trigger locks to prevent concurrent maintenance and supports partial-progress commits for large rewrites. This is a meaningful step forward — but it does not eliminate the need for external maintenance in production.
The limitations of Flink-native maintenance:
- Policy is manual. You define how often tasks run, how aggressive compaction is, and resource allocation. The system executes policy — it does not derive it from table state or query patterns.
- No query-aware optimization. Flink maintenance can binpack files but cannot reorder data by access patterns observed across your query engines.
- Resource contention with streaming SLAs. Compaction tasks in the same Flink cluster compete for slots and memory with your streaming pipeline. Aggressive compaction can cause backpressure on ingestion.
- No cross-table coordination. Each table's maintenance runs independently — there is no global prioritization based on which tables are degraded most.
- JVM overhead at scale. Rewriting thousands of small files into 256 MB targets through Flink's JVM carries GC pressure and memory overhead that compounds across many tables.
Production streaming tables need an external maintenance system that watches for file accumulation and triggers compaction independently, without interrupting the streaming pipeline. This is where a dedicated optimization layer becomes essential — especially at scale across dozens of streaming tables.
How LakeOps Handles Flink-Generated Tables
Streaming tables are the hardest tables to keep healthy — files accumulate continuously, delete files grow from upserts, snapshots multiply by the minute, and access patterns shift with business hours. LakeOps is a dedicated lakehouse control plane built in Rust on Apache DataFusion. It connects to your existing catalogs and query engines — no pipeline changes, no data movement — and provides event-driven maintenance purpose-built for the challenges Flink streaming creates.

Key capabilities for Flink-generated tables:
- Event-driven compaction triggered by streaming accumulation — continuously monitors per-table and per-partition structural signals (file count, average file size, delete-file ratio). When a streaming table crosses a threshold, compaction fires automatically — proportional to actual need, not fixed cron schedules.
- Rust execution engine (86% faster than Spark) — processes Parquet through Arrow columnar buffers with bounded memory, zero GC, and lock-free parallelism. Critical for streaming tables that need multiple compaction passes per day.
- Coordinated maintenance stack — sequences snapshot expiration (critical at 1,440+ snapshots/day from minute-level checkpoints), orphan cleanup, compaction, and manifest optimization automatically. Each step's output becomes the next step's clean input.
- Conflict-free operation alongside streaming writers — uses Iceberg's snapshot isolation so compaction commits never conflict with Flink's streaming commits. The streaming pipeline is never interrupted or blocked.
- Query-aware sort driven by production telemetry — collects query patterns from all connected engines (including Flink SQL reads) and applies the sort order that maximizes data skipping, without manual ALTER TABLE statements or pipeline restarts.
- Per-table policies — each table gets independent maintenance configuration tuned to its ingestion rate, query patterns, and SLA requirements.





Best Practices for Production Flink + Iceberg
- 1.Use an external maintenance system from day one. Streaming tables accumulate files faster than any other workload. Do not wait until queries degrade — connect a dedicated control plane like LakeOps that detects file accumulation in real time and triggers compaction, snapshot expiration, and manifest optimization automatically. Flink's built-in
TableMaintenancehandles basic binpack but lacks query-aware sorting, cross-table prioritization, and adaptive triggering. - 2.Set checkpoint interval to 3–5 minutes minimum for analytics tables. Only use sub-minute checkpoints when downstream consumers genuinely need second-level freshness.
- 3.Use hash distribution mode for partitioned tables to reduce file count by the parallelism factor.
- 4.Decouple source and sink parallelism. Read from Kafka at full partition count but write to Iceberg with 8–16 subtasks.
- 5.Use `IcebergSink` for new projects — it is based on Flink's modern SinkV2 interface and will be the only supported sink in Flink 2.0. Migrate existing
FlinkSinkusage proactively. - 6.Enable format-version 2 for all tables — required for equality deletes, row-level updates, and streaming reads.
- 7.Set snapshot expiration aggressively — retain 3–5 snapshots or 1–2 hours for streaming tables. Do not let snapshot count grow unbounded.
- 8.Isolate partitions between concurrent writers. If multiple Flink jobs write to the same table, ensure each writes to distinct partitions to avoid commit conflicts.
- 9.Monitor file counts per partition — set alerts at 500+ files. By 1,000 files, query planning alone dominates latency.
- 10.Use zstd compression — best ratio-to-speed trade-off for streaming Parquet files.
- 11.Test upsert tables under load before production — equality delete accumulation can degrade read performance faster than append-only tables.
Conclusion
Flink is an excellent streaming engine for Iceberg — it provides exactly-once semantics, flexible SQL and DataStream APIs, and robust checkpoint-based commits. But streaming fundamentally creates a maintenance debt that compounds over time: small files, snapshot proliferation, delete file accumulation, and metadata growth.
The key insight is that Flink optimization is not just about Flink configuration. Checkpoint tuning and distribution modes reduce the rate of file accumulation, but they cannot eliminate it. Production streaming tables need a continuous, event-driven maintenance system that responds to actual table state — not schedules — and coordinates the full stack of compaction, snapshot expiration, orphan cleanup, and manifest optimization.
Whether you manage that maintenance manually, through scheduled Spark jobs, or with a dedicated control plane like LakeOps, the worst outcome is ignoring it. A streaming table left unmaintained for a week can accumulate enough files to make queries 10–50× slower than the same data properly compacted. Explore the solutions overview to see how autonomous maintenance fits into your lakehouse architecture, or dive into related guides on Iceberg table health and partitioning best practices.



