Streaming Architecture — Flink + Kafka deep-dive.
The round when they say "design a real-time pipeline for X". Event-time, watermarks, exactly-once, backpressure, checkpointing, state backends, schema evolution. Eight sections, every concept reducible to a one-paragraph "what it is + when it bites".
Contents
- The streaming opening — Kafka as the spine, Flink as the engine
- Event-time vs ingest-time vs processing-time
- Watermarks — the late-event contract
- Exactly-once — the 4-link end-to-end chain
- Backpressure, checkpointing & state backends
- Schema evolution in a streaming world
- The 8-step play-event walkthrough (Netflix-flavored)
- The 90-second articulation script
Kafka as the spine, Flink as the engine.
Most modern streaming designs are some flavor of: events into Kafka, transformations in Flink, sink to a serving store. Every interview prompt — fraud detection, real-time recommendations, IoT telemetry, click-stream attribution — fits this skeleton. The contents change; the shape doesn't.
Why Kafka is non-negotiable for the spine
| Property | Why it matters for streaming |
|---|---|
| Durable + ordered | Replay any past window from disk; order preserved within partition |
| Partitioned by key | Same key always lands on same partition → stateful operators stay local |
| Decoupled producer / consumer | Consumers can lag, restart, scale independently of producers |
| Tiered / infinite retention | Replay last 90 days for backfill; Kappa architecture relies on this |
| Cross-language clients | Java/Python/Go/Node producers all write the same wire format |
Why Flink usually beats the alternatives
| Flink | Spark Structured Streaming | Kafka Streams | |
|---|---|---|---|
| Latency | True per-event, sub-100ms | Micro-batch (typically 1-30s) | Per-event, sub-100ms |
| State semantics | Exactly-once with savepoints | Exactly-once via WAL | Exactly-once within Kafka |
| Event-time + watermarks | First-class, mature | Supported, less flexible | Yes |
| Pick when | Latency-critical + complex state | Same team owns batch ETL + streaming | Lightweight, Kafka-only, no extra cluster |
The five questions to ask BEFORE drawing
- What's the latency target? Sub-second changes the architecture; minutes is just batch with smaller windows.
- What's the event volume? 10K/sec vs 10M/sec is a different system entirely.
- Is state involved? Stateless filters are easy; stateful joins / windowed aggregations need RocksDB + checkpoints.
- What's the ordering requirement? Per-key order is cheap (partition key); global order is expensive.
- How do you replay if the job crashes? Kafka offsets + Flink savepoints define the answer.
Event-time vs ingest-time vs processing-time.
Every event has at least three timestamps. Knowing which one to use is the most-asked streaming question after "what's a watermark".
| Time | What it measures | Set by | Use when |
|---|---|---|---|
| Event-time | When the event actually happened in the real world | The producing client | Almost always — it's the truth |
| Ingest-time | When the event entered Kafka / the streaming engine | The broker / engine | When client clocks aren't trustworthy; audit trails |
| Processing-time | When the operator processed the event | The Flink task manager wall clock | Wallclock-driven side effects (rate limiters); never for analytics |
Why event-time is almost always right
Imagine a click on a banner ad at 3:00:00 PM. The mobile is offline, uploads at 3:30:00 PM. Three timestamps:
- Event-time: 3:00:00 (when the click happened)
- Ingest-time: 3:30:00 (when Kafka received it)
- Processing-time: 3:30:01 (when Flink processed it)
The "click happened at 3pm" is the truth users care about; the other two are infrastructure timestamps. Always partition / window by event-time.
The store-all-three discipline
// Every event row should carry all three timestamps:
{
"user_id": "U_001",
"event": "click",
"event_ts": "2024-09-12T15:00:00Z", // client-set
"ingest_ts": "2024-09-12T15:30:00Z", // Kafka-set
"processing_ts":"2024-09-12T15:30:01Z" // Flink-set
}
Partition / window by event_ts. Alert / debug by processing_ts - event_ts latency. Audit by ingest_ts.
When event-time goes wrong
| Failure mode | Mitigation |
|---|---|
| Client clock skew (mobile clocks drift) | Bound event_ts to ±N minutes from ingest_ts; reject implausible |
| Replay attack (forged old timestamps) | Sign event_ts at trusted edge; reject if signature mismatch |
| No client clock (some IoT devices) | Use ingest_ts; document explicitly; downstream knows it's lossy |
| Daylight Saving / TZ confusion | Store UTC always; never local TZ in event timestamps |
Watermarks — the late-event contract.
A watermark is a timestamp the engine commits to: "I've waited long enough; I do not expect any event with event-time earlier than this watermark to arrive from now on." It's the contract that lets windowed aggregations close.
The mental model
Watermark generation strategies
| Strategy | How it picks the watermark | Pick when |
|---|---|---|
| Bounded out-of-orderness | watermark = max_seen_event_time − N seconds | Default; pick N as the 99th-percentile lateness |
| Punctuated | Producer emits explicit watermark messages | You control the producer + bounded sources |
| Idle-source aware | If a partition has no events, advance watermark anyway after timeout | Prevents stalled partitions from blocking the whole job |
// Flink — bounded-out-of-orderness watermark, 10s tolerance
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.getEventTs())
.withIdleness(Duration.ofMinutes(1)); // idle-source escape
What happens when an event arrives AFTER its watermark
The event is late. Three options:
- Drop — the simplest; you lose the event. Default in many engines.
- Side-output — route late events to a separate stream for offline processing / alerting.
- Allowed lateness + update — keep the window state for an extra grace period; emit an UPDATE result when a late event arrives.
// Flink — windowed aggregation with allowed lateness + late-event side output
DataStream<Result> results = events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(2)) // grace period
.sideOutputLateData(LATE_EVENTS_TAG) // side output the late ones
.aggregate(new MyAggregator());
DataStream<Event> lateEvents = results.getSideOutput(LATE_EVENTS_TAG);
The trade-off
| Tighter watermark (small lag) | Looser watermark (big lag) |
|---|---|
| Lower output latency | Higher output latency |
| More late events dropped | Fewer late events |
| Risk: emitting incorrect early results | Risk: nobody trusts the freshness |
The right answer is empirical: measure the 99th-percentile event-arrival lag for your pipeline; pick watermark lag = P99 + safety margin. Re-tune quarterly.
processing_ts − event_ts is 6 seconds, so I'd set watermark lag to 10 seconds. We side-output anything beyond and run a nightly batch reconciliation against the side-output topic."
Exactly-once — the 4-link end-to-end chain.
"Exactly-once" is not a feature you turn on. It's a chain of four guarantees, and the chain is only as strong as the weakest link. If your sink isn't idempotent, the producer / consumer / engine-level guarantees don't matter.
The four links
| Link | Mechanism | Examples |
|---|---|---|
| 1. Idempotent producer | Producer ID + sequence number; broker dedupes retries | Kafka enable.idempotence=true |
| 2. Idempotent consumer | Commit offset + output in one transaction | Kafka transactions; Flink's two-phase commit |
| 3. Idempotent sink | Natural-key MERGE or unique constraint on source_event_id | Postgres ON CONFLICT, Snowflake MERGE, Iceberg upsert |
| 4. Replay-safe state | Checkpointed state restored on restart | Flink savepoints, Kafka Streams state stores |
Producer-side idempotency (link 1)
# Kafka producer config
enable.idempotence = true
acks = all
max.in.flight.requests = 5 # ≤ 5 with idempotence enabled
retries = MAX
# What this gives you:
# - Producer attaches (producer_id, sequence_number) to every batch
# - Broker rejects duplicates with the same (PID, seq) pair
# - Retries are safe — no double-write
Consumer + engine atomicity (links 2 + 4)
Flink's two-phase commit barrier:
- Periodic checkpoint barrier flows through the operator graph alongside data.
- Each operator snapshots its state (RocksDB → S3) when the barrier arrives.
- Sink operator pre-commits — the next batch of output is staged but not visible to consumers.
- When all operators acknowledge, the JobManager commits — pre-committed sink batches become visible.
- Crash before commit → restart from last successful checkpoint, replay from Kafka offset stored in checkpoint.
Sink-side idempotency (link 3) — the hardest link
| Sink type | Idempotency strategy |
|---|---|
| Postgres / MySQL | INSERT ... ON CONFLICT (source_event_id) DO UPDATE |
| Snowflake / BigQuery | MERGE on natural key with WHEN MATCHED AND ts > existing |
| Iceberg / Delta | Upsert by primary key; engine handles tombstone-and-rewrite |
| Kafka (sink-as-source for next stage) | Kafka transactions — pre-commit + commit atomic with offset commit |
| HTTP API (downstream service) | Pass idempotency_key header; downstream MUST honor |
| S3 / object store | Write to {key}.tmp, atomic rename; or use Iceberg manifest |
When at-least-once is OK
Exactly-once is expensive (latency cost of two-phase commit, throughput cost of transactions). Skip it when:
- Downstream is itself idempotent (counter increment via SUM is naturally tolerant of duplicates? No — but distinct count of events IS, with proper deduplication on event_id).
- The metric is approximate anyway (DAU, click-through-rate — small dup counts wash out).
- You can run a daily reconciliation batch that fixes any dup damage.
Backpressure, checkpointing & state backends.
The interview moves from semantics to operations. "What happens when the job lags?", "How do you handle 200GB of state?" — these are the questions that separate someone who's read the docs from someone who's run Flink in production.
Backpressure — what it is
Backpressure = downstream operator can't keep up with upstream's rate. Flink's transport layer signals upstream to slow down. Symptoms:
- Kafka consumer lag rising — you're consuming slower than producing.
- Operator CPU pinned at 100%, but throughput flat.
- End-to-end event latency growing without bound.
Backpressure causes — the four classics
| Cause | Detection signal | Mitigation |
|---|---|---|
| Skewed key (one user / one product gets 50% of events) | One subtask CPU pinned; others idle | Salt the key for the heavy aggregation; pre-aggregate |
| Slow external call (sync HTTP / RPC in operator) | Operator latency = downstream service latency | Async I/O with bounded queue; cache; circuit-break |
| State grew too big | Checkpoint duration creeping up | Reduce window size; TTL on state; compaction tuning |
| Sink bottleneck | Sink operator is the lagging one in metrics | Batch writes; partition by sink-friendly key; provision sink |
Async I/O — the #1 fix for slow external calls
// BAD — synchronous; one slow call blocks the whole subtask
DataStream<Enriched> enriched = events.map(event -> {
Profile profile = http.get("/profile/" + event.userId); // blocks!
return enrich(event, profile);
});
// GOOD — async; many in-flight requests, bounded by capacity
DataStream<Enriched> enriched = AsyncDataStream.unorderedWait(
events,
new ProfileLookup(),
/*timeout*/ 1, TimeUnit.SECONDS,
/*capacity*/ 100 // max in-flight async requests
);
Checkpointing — what to know
| Aspect | Default | Tuning |
|---|---|---|
| Interval | 10s-60s typical | Smaller = lower restart-replay window; bigger = less overhead |
| Mode | EXACTLY_ONCE | Switch to AT_LEAST_ONCE if exactly-once latency cost too high |
| Storage | S3 / GCS / HDFS | Use incremental checkpoints (RocksDB) for > 10GB state |
| Timeout | 10 minutes | If checkpoints exceed timeout repeatedly, state is too big OR aligned-checkpoint barriers stuck |
Aligned vs unaligned checkpoints
| Mode | Behavior | Pick when |
|---|---|---|
| Aligned (default) | Operator waits for barrier on ALL input channels before snapshotting | Stable, no backpressure — typical case |
| Unaligned | Snapshot in-flight buffers immediately; faster under backpressure | Backpressure regular; checkpoint duration matters more than overhead |
State backends — the three flavors
| Backend | Storage | Pick when |
|---|---|---|
| HashMapStateBackend | JVM heap | Small state (< 1GB); lowest latency |
| EmbeddedRocksDBStateBackend | Local disk via RocksDB | Large state (10GB-1TB+); spills past heap |
| RocksDB + incremental checkpoints | RocksDB SSTables uploaded to S3 incrementally | Production default for stateful jobs at scale |
The "checkpoint storm" anti-pattern
checkpoint_duration_ms / checkpoint_interval_ms > 0.5 means you're > 50% time checkpointing. Fix with incremental checkpoints, smaller state, or async/unaligned checkpoints.
Schema evolution in a streaming world.
Schema changes break streaming jobs catastrophically. A producer adds a field; the consumer's deserializer fails; the entire pipeline stalls. The schema registry pattern (covered in the System Design page) is how you survive change.
The four schema-evolution scenarios
| Change | Backward-compatible? | How to ship safely |
|---|---|---|
| Add optional field with default | Yes | Just deploy producer; consumers ignore unknown fields |
| Add required field | No | Deploy as optional first; backfill; THEN switch consumers; THEN make required |
| Remove field | No | Deprecation flag → wait for consumer migration → drop |
| Rename / change type | Never compatible | Add new field; dual-write; migrate consumers; drop old field — over weeks |
The dual-write deprecation pattern
Rename username → display_name:
- Phase 1 — Add new field: producer writes BOTH
usernameanddisplay_namewith same value. - Phase 2 — Migrate consumers: each consumer team switches to read
display_name; markusernameas deprecated in the schema. - Phase 3 — Drop the old field: after deprecation window (typically 90 days), producer stops writing
username; schema removes it.
Stream-Table joins with evolving dimensions
If your stream joins to a slowly-changing dim table, schema changes on the dim ripple through:
// Flink SQL — streaming fact joined to versioned dim (event-time temporal join)
SELECT
e.user_id,
e.event_ts,
d.tier,
d.region
FROM events e
JOIN dim_user FOR SYSTEM_TIME AS OF e.event_ts AS d
ON e.user_id = d.user_id;
-- Joins against the dim VERSION current at event time, not now.
-- Schema-evolves cleanly: as long as new fields have defaults.
CDC streams — schema evolution at the source
Most streaming pipelines today have CDC (Debezium → Kafka) as a source. The OLTP database adds a column → Debezium emits the change with the new schema. Three things to set up:
- Schema registry per CDC topic with backward compatibility
- Tolerant Flink deserializer that skips unknown fields rather than crashing
- Alert on schema-version mismatch — when consumer's expected version drifts > 1 from latest, page the team
FOR SYSTEM_TIME AS OF temporal join lets dim evolution stay invisible to fact streams as long as new fields have defaults."
The 8-step play-event walkthrough (Netflix-flavored).
Putting it all together — every concept above shows up in a single canonical walkthrough. The interview prompt: "design Netflix's playback-event pipeline.". The 8-step answer:
Step 1 — Producer: client SDK emits event
// On every play-start, pause, resume, complete:
{
"event_id": "evt_8f3a...", // client-generated UUID — idempotency key
"user_id": "U_001",
"profile_id": "P_005",
"title_id": "T_strangerthings_S5E1",
"event_type": "play_start",
"event_ts": "2024-09-12T19:00:00Z", // client clock
"device": "tv",
"country": "US"
}
The event_id is the idempotency key — every retry uses the same id; the broker dedupes.
Step 2 — Kafka: durable spine
Topic play_events, partitioned by profile_id (so one profile's events stay in order), 30 partitions, 7-day retention (shorter than typical because we Iceberg-archive everything).
Step 3 — Flink job: enrich + sessionize
// Pseudo-Flink:
events
.keyBy(Event::getProfileId)
.map(new ProfileEnrichment()) // async I/O lookup to dim_profile
.process(new SessionAssigner()) // 30-min idle → new session_id
.keyBy(e -> e.userId + "|" + e.titleId)
.window(SessionWindow.withGap(Time.minutes(30)))
.aggregate(new SessionMetrics()) // total watch_minutes per session
.addSink(new IcebergSink()); // exactly-once via 2PC + Iceberg
Step 4 — Watermarks: 60-second lateness budget
Mobile uploads can lag (subway tunnel = 2-minute spike). Set watermark = max_event_ts − 60s. P99 lateness in production is 8s, so 60s is comfortable. Side-output anything later for the daily reconciliation job.
Step 5 — State: per-profile session window
State backend is RocksDB with 30-second incremental checkpoints to S3. Session state is small (a few hundred bytes per profile) but multiplied by 250M profiles = 50GB working set. RocksDB handles this comfortably; HashMap would OOM.
Step 6 — Sink: Iceberg upsert by event_id
Iceberg fct_play_event table partitioned by DATE(event_ts). Sink writes use Iceberg's two-phase commit; uniqueness on event_id means retries are no-ops. Downstream Flink re-streams and dbt batch jobs both read from this Iceberg table.
Step 7 — Serving: low-latency analytical store
Per-title rolling counts (last 1h, 24h, 7d) shipped to Pinot for sub-100ms queries. The TopN-by-country dashboard reads Pinot directly; the historical reports read the Iceberg table via Spark or Trino.
Step 8 — Operations: the 4 SLOs
| SLO | Target | Alert at |
|---|---|---|
| End-to-end latency (event_ts → Iceberg) | P99 < 90 sec | P99 > 5 min for 10 min |
| Kafka consumer lag | < 60s | > 5 min |
| Checkpoint duration | < 30s | > 5 min |
| Late-event rate (after watermark) | < 0.5% | > 2% |
The 90-second articulation script.
▸ THE 90-SECOND SCRIPT
"For a streaming pipeline with [latency target] and [event volume], I'd shape it as five layers: producer → Kafka → Flink → sink → serving, with schema registry and checkpoint store as cross-cutting infrastructure."
"Time semantics: partition by event-time, store all three timestamps on the row, watermark with bounded out-of-orderness set to P99 of mobile-upload lateness — typically 30-60 seconds. Late events go to a side-output topic; nightly batch reconciles."
"Exactly-once is a four-link chain: idempotent Kafka producer with enable.idempotence, Flink's two-phase commit barriers for atomic consumer+sink, an idempotent sink with unique constraint on source_event_id, and replay-safe state via RocksDB incremental checkpoints. The sink is almost always the weakest link."
"Backpressure diagnostic loop: check Kafka lag → identify lagging operator from Flink UI → classify as skew (salt the key), I/O-bound (async I/O), state-bound (TTL or shrink window), or sink-bound (batch + provision). 90% of incidents are one of these four."
"Schema evolution goes through schema registry with backward compatibility default. Renames are tombstone-then-add over 90 days. Flink temporal joins with FOR SYSTEM_TIME AS OF let dim changes stay invisible to fact streams."
"Four operational SLOs to surface and page on: end-to-end latency P99, Kafka consumer lag, checkpoint duration, late-event rate. Without these, the pipeline is un-runnable in production."
Three sentences that signal seniority — in any streaming round
- "Watermarks are a contract, not a clock — the engine commits to not seeing earlier events, and downstream windows close on that commitment."
- "Exactly-once is end-to-end; if the sink isn't idempotent, the upstream guarantees don't matter."
- "Backpressure is rarely about throughput — it's almost always skew, sync I/O, state size, or sink bottleneck. Diagnose by exclusion."