Back to blog

Kafka to Iceberg: Ingestion Guide

A practical guide to streaming data from Apache Kafka into Apache Iceberg tables — covering Kafka Connect, Apache Flink, Spark Structured Streaming, and CDC with Debezium. Includes configuration examples, schema management, partitioning strategies, production pitfalls, and how to keep streaming tables healthy at scale.

Kafka to Iceberg Ingestion Guide — Kafka logo with streaming data records flowing into a geometric iceberg lakehouse.

Apache Kafka is the default event backbone for most data platforms. Apache Iceberg is becoming the default table format for analytical storage. The question every streaming team faces is: how do you connect the two in production — reliably, efficiently, and without creating a maintenance nightmare?

This guide covers the four primary ingestion paths from Kafka to Iceberg — Kafka Connect, Apache Flink, Spark Structured Streaming, and CDC via Debezium — with configuration examples, schema management patterns, partitioning strategies, and the operational reality of keeping streaming Iceberg tables healthy after data lands.

Why Kafka to Iceberg

The combination of Kafka and Iceberg solves a structural problem in data architecture. Kafka provides durable, ordered, exactly-once event delivery. Iceberg provides ACID-compliant analytical storage with schema evolution, partition evolution, and time travel — readable by every major query engine.

Before Iceberg, streaming data from Kafka typically landed in raw Parquet or JSON files on S3 with no transactional guarantees, no schema enforcement, and no ability to query mid-write data safely. Downstream consumers — Spark, Trino, Athena, Snowflake — each needed custom logic to handle partial files, schema drift, and late-arriving data.

Iceberg eliminates these problems. A Kafka-to-Iceberg pipeline writes data into proper Iceberg snapshots with atomic commits, column-level statistics, and partition metadata that every engine understands. The result is a real-time lakehouse where data is queryable within minutes of production, using the same tables your batch pipelines and BI tools already read.

The tradeoff is operational: streaming writes create structural debt — small files, snapshot accumulation, and metadata growth — that must be managed continuously. Ingestion gets you to production; maintenance keeps you there. This guide also covers how a dedicated control plane like LakeOps automates the compaction, snapshot expiration, and orphan cleanup that streaming tables require — so you can focus on the pipeline, not the plumbing.

Four ingestion paths compared

Kafka ConnectApache FlinkSpark Structured StreamingDebezium CDC (via Flink/Spark)
Best forDirect topic-to-table, no transformationComplex joins, aggregations, enrichmentMicro-batch ETL with Spark ecosystemDatabase replication to Iceberg
LatencyMinutes (batched commits)Seconds to minutes (checkpoint-driven)Minutes (micro-batch trigger)Seconds to minutes (depends on processor)
Exactly-onceYes (KIP-447, Kafka 2.5+)Yes (checkpoint + Iceberg atomic commit)Yes (write-ahead log + Iceberg commit)Yes (via Flink checkpoint or Spark WAL)
Operational complexityLow — managed frameworkHigh — stateful distributed systemMedium — Spark cluster managementHigh — Debezium + processor + Iceberg
TransformationsMinimal (SMTs only)Full — stateful SQL, joins, windowsFull — DataFrame/SQL APIFull (in processor layer)
CDC supportAppend-only (no upserts)Full MERGE INTO / upsertsFull MERGE INTO via foreachBatchNative — the primary use case
Small file impactLower (less frequent commits)High (per-checkpoint commits)Medium (per-trigger commits)High (frequent upserts + delete files)
Schema evolutionAutomatic (with Schema Registry)Manual or via Flink SQL DDLManual or via Spark SQL DDLAutomatic (Debezium captures DDL)

The right path depends on what happens between the Kafka topic and the Iceberg table. If the answer is "nothing" — just move data as-is — Kafka Connect is the simplest option. If you need to transform, join, or apply CDC upserts, Flink or Spark is required. If you are replicating a database to your lakehouse, Debezium CDC is the standard pattern.

Path 1: Kafka Connect with the Iceberg Sink Connector

The Apache Iceberg Sink Connector is a community-maintained Kafka Connect plugin that reads from Kafka topics and writes Parquet files to Iceberg tables. It is the lowest-overhead path for teams that already run Kafka Connect and need straightforward append workloads.

Setup

Build the connector from the Iceberg source repository or download the distribution archive. Copy the archive into your Kafka Connect plugins directory on all worker nodes. The connector ships with support for REST, Glue, DynamoDB, Hadoop, Nessie, JDBC, Hive, and BigQuery Metastore catalogs out of the box. JDBC drivers are not included and must be added separately if needed.

Minimal configuration

json
1{2  "name": "iceberg-events-sink",3  "config": {4    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",5    "tasks.max": "2",6    "topics": "ecommerce.clickstream,ecommerce.orders",7    "iceberg.tables": "analytics.clickstream,analytics.orders",8    "iceberg.catalog.type": "rest",9    "iceberg.catalog.uri": "https://catalog.example.com",10    "iceberg.catalog.warehouse": "s3://lakehouse/warehouse",11    "iceberg.control.commit.interval-ms": "300000",12    "iceberg.tables.evolve-schema-enabled": "true",13    "iceberg.tables.auto-create-enabled": "true"14  }15}

Key parameters

  • `iceberg.control.commit.interval-ms` — how often the connector commits to Iceberg. Default is 5 minutes (300,000 ms). Shorter intervals give fresher data but create more files. For most analytical workloads, 5–10 minutes is the right range
  • `iceberg.tables.evolve-schema-enabled` — set to true if your Kafka schema may change (new fields added). The connector will add columns to the Iceberg table automatically, respecting the schema from Confluent Schema Registry or the embedded schema in your records
  • `iceberg.tables.auto-create-enabled` — set to true to have the connector create the Iceberg table if it does not exist. The table schema is inferred from the first batch of records
  • `iceberg.tables.schema-force-optional` — when true, all columns are set as optional during table creation and evolution. Useful when source schemas have strict nullability that does not match your analytics use case
  • `iceberg.tables.dynamic-enabled` — enables routing records to different tables based on a record field (see Multi-table fan-out below)
  • `tasks.max` — parallelism. Each task consumes from a subset of topic partitions. Set to the number of topic partitions for maximum throughput

Exactly-once semantics

The connector relies on KIP-447 for exactly-once delivery, which requires Kafka 2.5 or later. Under the hood, the connector coordinates commits across tasks using a control topic — all tasks agree on which offsets to commit before writing the Iceberg snapshot. This means no duplicate records and no partial commits, even if a worker crashes mid-batch.

Multi-table fan-out

A single Kafka topic often carries multiple event types — clickstream, purchases, page views, errors — all from the same application. The Iceberg Sink Connector can route records to different Iceberg tables based on a field value in each record:

json
1{2  "name": "iceberg-fanout-sink",3  "config": {4    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",5    "topics": "ecommerce.events",6    "iceberg.tables.dynamic-enabled": "true",7    "iceberg.tables.route-field": "event_type",8    "iceberg.tables.default-partition-by": "day(event_timestamp)",9    "iceberg.tables.auto-create-enabled": "true",10    "iceberg.tables.evolve-schema-enabled": "true",11    "iceberg.catalog.type": "rest",12    "iceberg.catalog.uri": "https://catalog.example.com",13    "iceberg.catalog.warehouse": "s3://lakehouse/warehouse"14  }15}

With dynamic-enabled set to true, each unique value of event_type routes to a separate Iceberg table. If event_type is clickstream, the record goes to analytics.clickstream. If it is purchase, it goes to analytics.purchase. Tables are created automatically if they do not exist. This eliminates the need for separate Kafka topics per event type and simplifies the upstream producer.

Limitations

Kafka Connect is an append-only pipeline. It does not support MERGE INTO, row-level updates, or deletes. If your use case requires upserts — CDC replication, slowly changing dimensions, or deduplication — you need Flink or Spark downstream. The connector also does not perform any transformation beyond Kafka Connect's built-in Single Message Transforms (SMTs), which are limited to field-level operations like renaming, casting, and routing.

Be aware of dependency conflicts: the Avro converter may expect a different Avro library version than the one bundled with Iceberg. If you see ClassNotFoundException at startup, check for version mismatches between the Schema Registry client and the Iceberg distribution.

When to use it

Kafka Connect is the right choice when your pipeline is "Kafka topic → Iceberg table, no changes needed" and you already operate Kafka Connect for other sinks. Adding an Iceberg sink connector is minimal additional work — no new infrastructure, no custom application code, no Flink or Spark cluster to manage.

Apache Flink is the most capable engine for streaming into Iceberg. It provides exactly-once semantics via checkpoint alignment, supports full SQL and DataStream API transformations, and handles CDC upserts natively. The cost is operational complexity — Flink is a stateful distributed system that requires cluster management, checkpoint tuning, and monitoring.

For a detailed guide to Flink-specific tuning, see Apache Iceberg with Flink: Streaming Optimization Guide.

Flink connects to Iceberg through the iceberg-flink-runtime JAR, which provides FlinkSink / IcebergSink, IcebergSource, and a full SQL catalog integration. The integration supports REST, Hive, and Hadoop catalog backends.

sql
1-- Register the Iceberg catalog2CREATE CATALOG iceberg_catalog WITH (3  'type' = 'iceberg',4  'catalog-type' = 'rest',5  'uri' = 'https://catalog.example.com',6  'warehouse' = 's3://lakehouse/warehouse'7);8 9-- Define a Kafka source table10CREATE TABLE kafka_clickstream (11  event_id STRING,12  user_id STRING,13  event_type STRING,14  page_url STRING,15  event_timestamp TIMESTAMP(3),16  session_id STRING,17  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND18) WITH (19  'connector' = 'kafka',20  'topic' = 'ecommerce.clickstream',21  'properties.bootstrap.servers' = 'kafka:9092',22  'properties.group.id' = 'flink-iceberg-sink',23  'format' = 'json',24  'scan.startup.mode' = 'latest-offset'25);26 27-- Stream into Iceberg28INSERT INTO iceberg_catalog.analytics.clickstream29SELECT30  event_id,31  user_id,32  event_type,33  page_url,34  event_timestamp,35  session_id36FROM kafka_clickstream;

DataStream API

For programmatic control — custom serialization, complex routing, or multi-table writes — use the DataStream API with FlinkSink:

java
1// Configure checkpointing for exactly-once2env.enableCheckpointing(300000); // 5 minutes3env.getCheckpointConfig()4   .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);5env.getCheckpointConfig()6   .setMinPauseBetweenCheckpoints(60000);7 8// Build the Iceberg sink9FlinkSink.forRowData(inputStream)10    .tableLoader(11        TableLoader.fromCatalog(catalogLoader,12            TableIdentifier.of("analytics", "clickstream")))13    .distributionMode(DistributionMode.HASH)14    .writeParallelism(8)15    .set("write.target-file-size-bytes", "268435456") // 256 MB16    .set("write.distribution-mode", "hash")17    .append();

Flink also supports upsert(true) on the sink builder for CDC workloads, along with equalityFieldColumns to specify the primary key columns for equality-based deduplication.

Checkpoint interval and file creation

Flink's checkpoint interval directly controls how often data is committed to Iceberg. Every checkpoint produces a new Iceberg snapshot with whatever data has accumulated since the last one. This is where the fundamental tension lives: shorter intervals give fresher data but create more files.

Checkpoint intervalData freshnessFiles per day (50 partitions)Small file risk
30 seconds~30s144,000Very high
1 minute~1 min72,000High
5 minutes~5 min14,400Moderate
15 minutes~15 min4,800Low
1 hour~1 hour1,200Very low

For most analytical workloads, 1–5 minute checkpoint intervals balance latency against file management overhead. Sub-minute freshness is rarely necessary and significantly increases operational complexity and maintenance costs.

Write distribution mode

Flink's write distribution mode controls how records are routed to writer subtasks before they are written to Iceberg files. The choice directly impacts file size and partition alignment:

  • `none` (default) — records go to writers in round-robin. Fastest write throughput but produces files across all partitions from every subtask. Results in many small files per partition
  • `hash` — records are hashed by partition key so each subtask writes to a consistent set of partitions. Better file sizes but adds shuffle overhead
  • `range` — records are range-partitioned, producing the fewest files but with the highest shuffle cost. Best for sort-optimized tables

For streaming workloads, hash is usually the right choice. It keeps file counts manageable without the latency penalty of full range partitioning.

Flink is the right engine when you need to transform, join, or aggregate data before it reaches Iceberg — or when you need CDC upsert support with exactly-once guarantees and sub-minute latency. If your pipeline is a simple topic-to-table copy, the operational overhead of running a Flink cluster is not justified; use Kafka Connect instead.

Path 3: Spark Structured Streaming

Spark Structured Streaming provides a micro-batch processing model for writing Kafka data to Iceberg. It is the natural choice for teams already running Spark for batch processing who want to add streaming without introducing a new engine.

Append-only streaming

For straightforward event ingestion without transformations, use the Iceberg DataSource writer directly:

python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import from_json, col3from pyspark.sql.types import StructType, StructField, StringType, TimestampType4 5spark = SparkSession.builder \6    .config("spark.sql.extensions",7            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \8    .config("spark.sql.catalog.lakehouse",9            "org.apache.iceberg.spark.SparkCatalog") \10    .config("spark.sql.catalog.lakehouse.type", "rest") \11    .config("spark.sql.catalog.lakehouse.uri",12            "https://catalog.example.com") \13    .config("spark.sql.catalog.lakehouse.warehouse",14            "s3://lakehouse/warehouse") \15    .getOrCreate()16 17schema = StructType([18    StructField("event_id", StringType()),19    StructField("user_id", StringType()),20    StructField("event_type", StringType()),21    StructField("page_url", StringType()),22    StructField("event_timestamp", TimestampType()),23    StructField("session_id", StringType()),24])25 26df = spark.readStream \27    .format("kafka") \28    .option("kafka.bootstrap.servers", "kafka:9092") \29    .option("subscribe", "ecommerce.clickstream") \30    .option("startingOffsets", "latest") \31    .load()32 33events = df.selectExpr("CAST(value AS STRING) as json_str") \34    .select(from_json(col("json_str"), schema).alias("data")) \35    .select("data.*")36 37events.writeStream \38    .format("iceberg") \39    .outputMode("append") \40    .option("checkpointLocation", "s3://lakehouse/checkpoints/clickstream") \41    .trigger(processingTime="5 minutes") \42    .toTable("lakehouse.analytics.clickstream")

CDC upserts with foreachBatch

For CDC workloads that require MERGE INTO semantics, use foreachBatch to execute upsert logic on each micro-batch:

python
1def upsert_to_iceberg(batch_df, batch_id):2    batch_df.createOrReplaceTempView("updates")3    batch_df.sparkSession.sql("""4        MERGE INTO lakehouse.production.customers AS target5        USING updates AS source6        ON target.customer_id = source.customer_id7        WHEN MATCHED AND source.op = 'D' THEN DELETE8        WHEN MATCHED THEN UPDATE SET *9        WHEN NOT MATCHED AND source.op != 'D' THEN INSERT *10    """)11 12df.writeStream \13    .foreachBatch(upsert_to_iceberg) \14    .option("checkpointLocation", "s3://lakehouse/checkpoints/customers") \15    .trigger(processingTime="5 minutes") \16    .start()

The foreachBatch pattern receives the micro-batch DataFrame and a unique batch ID. On failure and retry, the batch ID stays the same, which you can use for idempotency checks. The Iceberg table must use format version 2 or 3 for row-level delete support.

Tuning Spark Structured Streaming for Iceberg

  • Trigger intervalprocessingTime="5 minutes" balances freshness against file creation. Setting it to "1 minute" creates 5x more files and proportionally more maintenance work
  • `write.target-file-size-bytes` — set as a table property to guide Spark toward optimal output file sizes (256–512 MB)
  • `write.distribution-mode` — set to hash to align output files with partition boundaries, reducing cross-partition file scatter
  • Shuffle partitions — default spark.sql.shuffle.partitions is 200, which produces 200 output files per batch. Set this to match your actual partition count or use Spark's adaptive query execution (AQE) to coalesce automatically
  • `streaming-max-files-per-micro-batch` — bounds the number of files consumed per trigger in recovery scenarios, preventing memory issues during backlog processing

When to use Spark

Spark Structured Streaming is the right choice when your team already operates Spark for batch ETL and wants to add streaming without introducing Flink. It is also the natural path for foreachBatch CDC upserts in environments where Flink expertise is limited. The tradeoff is higher latency (minutes, not seconds) and micro-batch semantics rather than true continuous processing.

Path 4: CDC with Debezium

Change Data Capture is a specific variant of Kafka-to-Iceberg ingestion where the Kafka topic contains row-level change events from a source database — typically captured by Debezium. The goal is to maintain a continuously updated mirror of the source database in Iceberg, with full insert, update, and delete semantics.

Architecture

text
1Source DB (PostgreSQL / MySQL / MongoDB)2  │ (WAL / binlog / oplog)34Debezium CDC Connector → Kafka5  │ (change events: insert, update, delete)67Apache Flink or Spark (MERGE INTO / upsert)8  │ (writes data files + delete files)910Iceberg Table (V2/V3, merge-on-read)11  │ (queryable by any engine)1213Trino / Spark / Athena / Snowflake

Debezium reads the source database's write-ahead log (PostgreSQL), binary log (MySQL), or oplog (MongoDB) and publishes structured change events to Kafka topics — one per table. Each event contains the operation type (create, update, delete), the before and after row state, and source metadata like transaction ID, timestamp, and LSN/binlog position.

Flink handles Debezium's envelope format natively. Define the Kafka source table with 'format' = 'debezium-json' or 'debezium-avro-confluent', and Flink will extract the operation type and row data automatically:

sql
1CREATE TABLE debezium_customers (2  customer_id INT,3  name STRING,4  email STRING,5  updated_at TIMESTAMP(3),6  PRIMARY KEY (customer_id) NOT ENFORCED7) WITH (8  'connector' = 'kafka',9  'topic' = 'dbserver1.public.customers',10  'properties.bootstrap.servers' = 'kafka:9092',11  'format' = 'debezium-json',12  'debezium-json.schema-include' = 'true'13);14 15-- Upsert into Iceberg16INSERT INTO iceberg_catalog.production.customers17SELECT customer_id, name, email, updated_at18FROM debezium_customers;

When the Iceberg sink detects a primary key declaration and the source uses a CDC format, Flink automatically applies upsert semantics — inserts become appends, updates become equality deletes + inserts, and deletes become equality deletes.

Table format requirements

For CDC to work, the Iceberg table must use format version 2 or 3 to support row-level deletes. Use merge-on-read (MoR) mode for the write path — it keeps streaming writes lightweight by creating delete files alongside data files rather than rewriting entire files on every update. Compaction then periodically merges delete files into the base data.

With Iceberg 1.11.0, deletion vectors replace positional delete files with compressed Roaring bitmaps stored in Puffin files — significantly reducing both the storage footprint and the read-time overhead of delete file accumulation on CDC tables.

CDC-specific considerations

  • Initial snapshot — Debezium takes a consistent snapshot of the entire source table before switching to streaming from the WAL. Plan for the initial load volume — a 100 GB PostgreSQL table will produce a 100 GB snapshot through Kafka before CDC streaming begins
  • Schema evolution — Debezium captures DDL changes from the source database. Enable iceberg.tables.evolve-schema-enabled on the sink or configure Flink SQL to propagate new columns automatically
  • Delete file accumulation — CDC tables accumulate delete files proportional to update frequency. Without compaction, read performance degrades linearly. See Iceberg delete files guide for details on monitoring and resolution
  • Ordering guarantees — Kafka preserves per-partition ordering. Ensure the primary key maps to the Kafka partition key so updates to the same row are processed in order
  • Tombstones — Debezium emits a tombstone record (null value) after a delete event by default. Ensure your consumer handles or filters tombstones correctly

Schema management: Kafka to Iceberg

Schema drift between producers and the Iceberg table is one of the most common causes of pipeline failures. A field is renamed upstream, a new column is added, a type is widened from int to long — and the ingestion pipeline breaks or silently drops data.

Schema Registry

Use Confluent Schema Registry (or a compatible alternative) to enforce schema contracts between producers and consumers. Every schema version gets a unique ID and incremented version number. Schema Registry validates new versions against configurable compatibility rules before accepting them.

The most common compatibility modes for Kafka-to-Iceberg pipelines:

  • BACKWARD (default) — new schema can read data written with the previous schema. Safe for adding optional fields
  • FORWARD — old consumers can read data written with the new schema. Safe for removing optional fields
  • FULL — both backward and forward compatible. The strictest practical option
  • BACKWARD_TRANSITIVE — recommended for Protobuf, where adding new message types breaks forward compatibility

Use Avro or Protobuf serialization with Schema Registry rather than JSON. Avro embeds the writer schema ID in every record, so the consumer always knows which schema version produced each record. JSON requires either embedding the schema inline (bloating record size) or inferring the schema from the data (fragile and slow).

Iceberg schema evolution

Iceberg supports adding, renaming, reordering, widening, and dropping columns without rewriting existing data files. When a new column is added to the schema, existing data files return null for that column. When a column is widened (e.g., intlong), Iceberg handles the type promotion at read time.

The Kafka Connect Iceberg Sink Connector propagates schema changes automatically when iceberg.tables.evolve-schema-enabled is true. For Flink and Spark, schema changes require DDL updates to the Iceberg table — either manually or through a schema management automation layer.

Best practice: Align Schema Registry compatibility mode with Iceberg's schema evolution capabilities. BACKWARD compatibility (the default) maps naturally to Iceberg's column-addition model — new fields are added, old data files return null for them. Avoid breaking changes (type narrowing, column removal with data loss) that neither Schema Registry nor Iceberg can handle automatically.

Partitioning streaming tables

Partition strategy directly impacts both query performance and compaction efficiency on streaming tables. Iceberg's hidden partitioning separates logical and physical layout — you define transforms on existing columns, and Iceberg handles the mapping transparently. Queries filter on the original column; Iceberg prunes partitions automatically.

Choosing the right transform

TransformExampleUse caseStreaming tradeoff
day(timestamp)day(event_time)Daily reporting, dashboardsBest default — concentrates files for compaction
hour(timestamp)hour(event_time)Real-time monitoring, high-volume (>5 GB/hr)More partitions = more small files = more compaction
month(timestamp)month(event_time)Low-volume tables, monthly rollupsVery few files per partition — minimal compaction needed
bucket(N, col)bucket(16, user_id)Even distribution for joins, high-cardinality lookupsGood for non-time queries; combines with time transforms
truncate(N, col)truncate(3, region)String prefix groupingUseful for geo or categorical data

Rules for streaming tables

Start coarse, add granularity later. Begin with day(event_time) partitioning. If data volume exceeds 5 GB per hour per partition and queries need hour-level filtering, evolve to hour(event_time) using Iceberg's partition evolution — no data rewrite required. Iceberg tracks versioned partition specs and prunes queries across both old and new layouts transparently.

Never partition by raw timestamps. Partitioning by the raw event_time column (identity transform) creates a unique partition for every distinct timestamp value — millions of partitions, each with a single tiny file. Always use a time-based transform (day, hour, month).

Avoid high-cardinality identity partitions. Partitioning by user_id, device_id, or session_id with an identity transform creates as many partitions as there are distinct values. Use bucket(N, col) instead — bucket(16, user_id) distributes data across 16 buckets regardless of cardinality.

Compound partitions for multi-dimensional queries. If queries frequently filter on both time and a categorical column, combine transforms: PARTITIONED BY (day(event_time), bucket(8, customer_id)). When a query filters on event_time, Iceberg prunes by day. When it filters on both, both dimensions prune. The tradeoff is more partitions and more small files per commit.

For detailed guidance with benchmarks, see Iceberg Partitioning Best Practices.

What streaming does to your Iceberg tables

Getting data from Kafka into Iceberg is the first half of the problem. The second half — the one that bites teams in production — is what happens to the table after months of continuous streaming writes.

Multiple engines writing to Iceberg — fragmented ingestion patterns
Multiple engines and streaming jobs writing to the same Iceberg tables — each producing different file sizes and commit frequencies. Every small file, orphan, and retained snapshot inflates the storage bill independently.

Small files

A Flink job with 5-minute checkpoints writing to a table with 50 active partitions creates 14,400 files per day. Each file contains only the records that arrived in that 5-minute window for that partition — typically 1–10 MB. Within a week, the table carries over 100,000 tiny files that every query must plan against. S3 GET costs multiply — $0.0004 per 1,000 requests regardless of file size — query planning stretches from milliseconds to seconds, and manifests bloat. For a detailed breakdown of the mechanics and fixes, see Fixing Small Files in Apache Iceberg.

Snapshot accumulation

Every streaming commit creates a new snapshot. A table with 5-minute commits accumulates over 8,600 snapshots per month. Each snapshot pins references to data files at that point in time, preventing storage reclamation of superseded bytes. Without snapshot expiration, every byte ever committed remains billable in S3 indefinitely.

Orphan files

Failed checkpoints, aborted writes, and concurrent writer conflicts leave data files in object storage that are never referenced in any Iceberg snapshot. These orphan files are invisible to Iceberg metadata but billable by S3. On mature streaming lakes, orphan files routinely account for 25–40% of billable storage on affected prefixes.

Metadata growth

Frequent commits create many manifest files — one per commit at minimum. A streaming table with 5-minute commits over 30 days accumulates roughly 8,600 manifests. Query planning must traverse all of them. Without manifest consolidation, the metadata layer that enables Iceberg's fast scan planning becomes the bottleneck.

Delete file overhead (CDC tables)

CDC tables accumulate delete files proportional to their update frequency. A table receiving 10,000 updates per hour creates delete files on every commit. At read time, the engine must reconcile each delete file against the corresponding data files — a cost that grows linearly. Compaction merges delete files into the base data, restoring read performance to baseline. For a dedicated deep dive on compaction for streaming tables, see Kafka to Iceberg Compaction — Done Right.

Keeping streaming tables healthy

The structural problems above are not a reason to avoid streaming into Iceberg — they are a reason to pair ingestion with automated maintenance. The correct maintenance sequence for streaming tables is:

  1. 1.Expire snapshots — remove old snapshots beyond the retention window (3–7 days for streaming tables), freeing metadata references and enabling garbage collection
  2. 2.Remove orphan files — clean up unreferenced files from failed writes, with a safety window (7+ days) to avoid touching in-progress writes
  3. 3.Compact data files — merge small files into optimal 256–512 MB targets with query-aware sort ordering
  4. 4.Rewrite manifests — consolidate fragmented manifests and refresh Puffin column statistics against the final compacted file set

The order matters. Expiring snapshots first allows orphan cleanup to identify more unreferenced files. Compacting after cleanup avoids rewriting files that are about to be deleted. Rewriting manifests last ensures the final metadata reflects the compacted file set. Running these as independent cron jobs produces conflicts and redundant work. For a detailed breakdown of the correct maintenance pipeline, see Iceberg Table Health & Maintenance.

LakeOps compaction — small files funneled into optimized blocks
Compaction transforms hundreds of small files from streaming writes into properly sized, query-optimized file blocks — restoring read performance and cutting storage costs.

Why cron-based compaction fails for streaming tables

Nightly Spark compaction jobs — the standard approach for batch tables — do not work for streaming:

  • 23 hours of degradation — between nightly runs, queries pay the full small-file penalty. A streaming table can accumulate 100,000 new files before the next compaction window
  • Conflicts with active writers — running compaction on the same partition a streaming job is writing to triggers Iceberg's optimistic concurrency control, causing write failures or data corruption. The workaround is partition-aware compaction that only touches cold partitions — which requires custom scheduling logic
  • No priority ordering — all tables are treated equally. A critical customer-facing table with 200,000 small files waits in the same queue as a low-priority staging table

Streaming tables need maintenance that runs continuously, reacts to table health signals, and coordinates with active writers — not maintenance that fires on a fixed schedule and hopes for the best.

Autonomous maintenance with LakeOps

LakeOps is a dedicated lakehouse control plane for Apache Iceberg that connects to your existing catalogs and runs the full maintenance lifecycle autonomously — including on streaming tables receiving continuous Kafka ingestion.

LakeOps architecture — control plane for catalogs, engines, and Iceberg storage
LakeOps sits as a control plane between your catalogs, engines, and Iceberg storage — orchestrating maintenance across the entire multi-engine lakehouse. Streaming writes from Kafka flow into the same tables that Trino, Spark, and Athena query.

How it works for streaming tables

Once connected to your catalog (Glue, REST, Polaris, Nessie, S3 Tables), LakeOps discovers every table automatically and begins collecting structural telemetry — file counts, partition distributions, snapshot velocity, manifest depth, and delete file ratios.

For streaming tables, this telemetry typically triggers:

  • Event-driven compaction — fires when file count or average file size crosses a threshold, not on a fixed schedule. A high-throughput streaming table may compact multiple times per hour; a low-volume table compacts once a day. The Rust-based compaction engine executes at $5/TB versus $50/TB for Spark, with 95% faster throughput
  • Query-aware sort — analyzes cross-engine query telemetry to determine which columns your workloads filter on. Compaction reorganizes data files by those columns so predicate pushdown skips entire file groups. This turns compaction from a pure file-merge operation into a query optimization
  • Coordinated maintenance — runs snapshot expiration → orphan cleanup → compaction → manifest rewrite as a sequenced pipeline. Each step's output feeds the next
  • Conflict-aware execution — compaction targets cold partitions while active streaming partitions continue receiving writes, avoiding optimistic concurrency conflicts

Policies at scale

When you operate dozens or hundreds of streaming tables, per-table configuration does not scale. LakeOps policies apply maintenance rules at the catalog or namespace level — every table in scope inherits the policy automatically, including new tables created after the policy is defined. Adaptive Maintenance bundles all four operations into a single data-driven policy that reacts to table activity without separate cron schedules.

Observability for streaming health

Every streaming table is continuously classified as Healthy, Warning, or Critical based on structural signals. Critical tables — the ones with 200,000 small files and query times that have tripled — surface immediately in the LakeOps dashboard. You see which tables need attention before your analysts report slow queries.

LakeOps table health monitoring — Healthy, Warning, Critical classification
Lake-wide table health monitoring — every table classified by health status with record count, size, and last modified timestamp. Streaming tables with small-file accumulation surface as Critical immediately.
LakeOps in action — from catalog connection to autonomous optimization of streaming and batch Iceberg tables.
LakeOps lake-wide events — autonomous maintenance across streaming tables
Lake-wide event log showing autonomous maintenance across streaming tables — raw_clickstream compacted from 970 files to 87, snapshots expired, orphans reclaimed. Every operation is logged with before/after metrics.

Production checklist

Before deploying a Kafka-to-Iceberg pipeline to production, validate each layer:

Catalog

  • Use a REST catalog (Polaris, Nessie, Lakekeeper, Gravitino) for multi-engine interoperability. REST catalogs support concurrent commits from multiple writers and provide centralized access control
  • If using AWS Glue, ensure the Glue table version and Iceberg library version are aligned — version mismatches are a common source of metadata errors
  • Register your catalog with LakeOps to get immediate visibility into table health across your streaming tables

Schema

  • Deploy Schema Registry with BACKWARD compatibility for Avro, BACKWARD_TRANSITIVE for Protobuf
  • Enable iceberg.tables.evolve-schema-enabled on Kafka Connect sinks
  • Test schema changes (add column, widen type) in a staging environment before rolling to production
  • Never remove or rename columns on the producer side without coordinating with the Iceberg table schema — this is a breaking change that Schema Registry's BACKWARD mode will reject

Partitioning

  • Start with day(timestamp) for most streaming tables. Evolve to hour(timestamp) only when volume exceeds 5 GB/hour per partition
  • Avoid high-cardinality identity partitions (user_id, device_id). Use bucket(N) transforms instead
  • For compound partition strategies, validate that the additional partition dimensions actually improve query pruning for your workload
  • See Iceberg Partitioning Best Practices for detailed guidance with benchmarks

Writer tuning

  • Set checkpoint/commit intervals to 5 minutes for analytical workloads. Sub-minute freshness is rarely necessary and exponentially increases maintenance burden
  • Use hash write distribution mode in Flink to align records with partitions before writing
  • Set write.target-file-size-bytes to 536870912 (512 MB) for analytical tables, 268435456 (256 MB) for BI/low-latency tables
  • For Spark, set spark.sql.shuffle.partitions to match your actual partition count or enable AQE
  • For CDC tables, use format version 3 with merge-on-read mode and deletion vectors enabled

Maintenance

  • Never rely on nightly Spark cron jobs for streaming tables. Implement continuous, health-driven maintenance
  • Snapshot retention: 3–7 days for streaming tables (balancing time travel needs against metadata growth)
  • Orphan cleanup: run after snapshot expiration with a safety window (7+ days) to avoid touching in-progress writes
  • Manifest rewrite: consolidate after compaction, not before
  • Use LakeOps Adaptive Maintenance to automate the full lifecycle across all streaming tables without per-table configuration

Monitoring

  • Track file count per partition over time — the leading indicator of small-file accumulation
  • Monitor snapshot count — if it exceeds 1,000 on a streaming table, expiration is not running or not aggressive enough
  • Watch S3 GET request costs — a sudden increase often indicates small-file proliferation before query latency degrades visibly
  • Track delete file count on CDC tables — rising delete-to-data ratios signal that compaction is not keeping pace with updates
  • Monitor consumer lag on the Kafka side — if the ingestion pipeline falls behind, backlog processing can create bursts of small files during recovery
  • Set up alerts on query planning time — when planning exceeds 5 seconds on a table that previously planned in milliseconds, small files or manifest bloat is the likely cause

Summary

Streaming from Kafka to Iceberg is a solved problem at the ingestion layer. Kafka Connect handles simple topic-to-table copies with multi-table fan-out and automatic schema evolution. Flink handles complex transformations and CDC with exactly-once guarantees. Spark Structured Streaming provides a familiar micro-batch path for teams already in the Spark ecosystem. Debezium captures database changes and feeds them through Flink or Spark for continuous replication.

The unsolved problem — for most teams — is what happens after ingestion. Streaming writes create small files, snapshots, orphans, and metadata growth that degrade query performance and inflate storage costs continuously. Manual maintenance does not scale; nightly cron jobs leave 23 hours of degradation between runs.

The production-ready architecture pairs streaming ingestion with autonomous maintenance: data flows from Kafka through your ingestion engine of choice, and a dedicated control plane like LakeOps keeps every table compacted, lean, and queryable — continuously, without scripts, cron jobs, or 2 AM pages.

How streaming ingestion inflates storage and compute costs — and how autonomous maintenance recovers them.

Further reading

Tags

Apache IcebergApache IcebergApache KafkaKafka ConnectStreaming IngestionApache FlinkCDC

Related articles

Found this useful? Share it with your team.