Design a Distributed Message Queue (Kafka-like)¶
What We're Building¶
A distributed message queue (or log-centric streaming platform) decouples producers and consumers with a durable, partitioned append-only log. Unlike a classic work queue that deletes messages after consumption, systems like Apache Kafka retain records for a configurable time or size and allow many independent consumer groups to read the same data at their own pace—with replay for recovery and analytics.
| Capability | Why it matters |
|---|---|
| Durability & replay | Reprocess history after bugs, schema changes, or new consumers |
| Horizontal throughput | Partition the log; scale brokers and consumers independently |
| Backpressure-friendly | Consumers pull; slow readers do not collapse producers if disk retention holds |
| Ordering where it counts | Per-partition order gives a practical compromise vs global order |
Real-world systems:
| System | Positioning |
|---|---|
| Apache Kafka | Durable partitioned log, consumer groups, strong ecosystem (Connect, Streams) |
| Apache Pulsar | Tiered storage, multi-tenancy, broker–bookkeeper separation |
| RabbitMQ | General-purpose broker, exchanges/routing, classic queues + streams plugin |
| Amazon Kinesis / GCP Pub/Sub | Managed streaming with cloud operational model |
Types of messaging:
| Pattern | Behavior | Typical use |
|---|---|---|
| Publish/subscribe | Many subscribers receive copies (or partitioned subsets) of published data | Event buses, metrics pipelines |
| Point-to-point | Each message consumed by one worker from a queue | Task queues, job dispatch |
| Streaming / log | Ordered, replayable sequence of records; retention is first-class | ETL, CDC, event sourcing |
Note
In interviews, say explicitly whether you are designing a classic queue (delete on ack) or a log (retain + offsets). Kafka-style designs are usually log + consumer offsets; RabbitMQ classic queues are closer to point-to-point with optional pub/sub via exchanges.
Key Concepts Primer¶
Before diving into requirements, align on a few primitives that recur in log-centric systems. They separate a Kafka-like design from a classic broker queue.
Append-only log vs traditional queue¶
A traditional queue removes a message once a consumer acks it; other consumers never see it. An append-only log never deletes on read—it appends records in order, and consumers advance cursors (offsets). The same retained data can be read by many consumer groups, replayed, and processed at different speeds.
Traditional queue (consume destroys or hides for others)
PROD --> [ M1 ][ M2 ][ M3 ] --> CONSUMER_A (ack M1) --> M1 gone for this queue
|
+--> CONSUMER_B may never see M1 if already taken
Append-only partitioned log (retain + independent offsets)
PROD --> PARTITION P2: [o0][o1][o2][o3]... (immutable past; tail grows)
| |
| +-- ConsumerGroup B reads o0.. (its offset)
+-- ConsumerGroup A reads o0.. (same bytes, different cursor)
Tip
The mental model is Git for events: commits stay in history until retention or compaction policies trim them—not because someone “finished reading.”
Partition as the unit of parallelism¶
Topics are split into partitions. Each partition is a totally ordered log; ordering is not guaranteed across partitions. Throughput scales by adding partitions and parallel consumers (one consumer thread/process typically owns a partition in a group, up to #partitions members).
Python: stable partition choice from key (illustrative hash)
def partition_for_key(key: bytes | None, num_partitions: int) -> int:
"""Kafka uses murmur2 for keys; use any stable hash for demos."""
if num_partitions <= 0:
return 0
if key is None:
# Round-robin or random when no key — no per-key order guarantee
return 0
h = hash(key)
# Ensure non-negative modulus (Python hash can be negative)
return h % num_partitions
def assign_partitions_round_robin(member_ids: list[str], partitions: list[int]) -> dict[str, list[int]]:
"""Greedy assignment: P consumers, N partitions — each gets ~N/P partitions."""
out: dict[str, list[int]] = {m: [] for m in member_ids}
for i, p in enumerate(sorted(partitions)):
out[member_ids[i % len(member_ids)]].append(p)
return out
Hot keys that map to one partition become hot partitions—the unit of parallelism is also the unit of skew.
Consumer group model: rebalancing and offsets¶
A consumer group is a logical subscriber name. The cluster tracks which group member reads which partition; on join/leave/crash, the group rebalances—partitions may move, in-flight work must be handled carefully.
| Idea | Meaning |
|---|---|
| Committed offset | Persisted checkpoint (“we processed up to o”). Survives restarts. |
| Current position / uncommitted | In-memory fetch position; lost on crash unless committed. |
| Rebalance | Revoke partitions, reassign—stop-the-world (eager) or incremental (cooperative). |
At-least-once is common: process records, then commit the offset. If you crash after processing but before commit, you reprocess (duplicates). If you commit before processing, you can skip work on crash (at-most-once).
Member A crashes
Coordinator detects missed heartbeat
-> rebalance
-> partitions from A reassigned to B
-> B may re-read from last COMMITTED offset (duplicates possible)
Warning
Uncommitted offsets are not durable progress—only committed offsets define restart behavior unless you use an external store for idempotency keys.
ISR, LEO, and high watermark (HW)¶
Each partition has a leader and followers. Each replica has a log end offset (LEO)—the next offset it would assign. The high watermark is the first offset not yet safe for all in-sync replicas in the usual story: consumers reading “committed” data typically do not read past HW (Kafka’s exact rules evolved with transactions and leader epochs; interviews use this sketch).
Replica R0 (leader) LEO=10 log: ... [8][9] |
Replica R1 (follower) LEO=10 replicated through 9, catching up
Replica R2 (follower) LEO=8 lagging
HW = min(LEO across ISR) adjusted so only fully replicated records are "committed"
(conceptually: followers must fetch and append before HW advances past them)
offsets: 0 1 2 3 4 5 6 7 8 9
|<--- readable by consumer up to HW-1 (committed) --->|
Note
LEO is per-replica progress; HW is the replication barrier for visibility—they differ on every broker during catch-up.
Exactly-once semantics: decomposition¶
Brokers cannot magically make your database exactly-once. EOS is usually decomposed as:
- Idempotent producer —
Producer ID+ per-partition sequence numbers; retries dedupe at the log. - Transactions — atomic write to multiple partitions and/or commit consumer offsets with produced output (read-process-write).
- Idempotent consumer / sink — downstream upsert by id, dedupe table, or effectively-once handling of duplicates.
Note
In interviews, say exactly-once effects require all three layers in the path of side effects—not just broker flags.
Step 1: Requirements Clarification¶
Questions to Ask¶
| Question | Why it matters |
|---|---|
| Delivery semantics: at-most-once, at-least-once, exactly-once? | Drives producer acks, idempotency, transactions, and consumer dedupe |
| Ordering: per-key, per-partition, or global? | Global order rarely scales; partition count and key routing |
| Retention: time-based, size-based, compaction? | Storage sizing, tiered storage, changelog topics |
| Multi-tenancy / quotas? | Isolation, fairness, noisy-neighbor limits |
| Geo: single region vs multi-region replication? | Latency, RPO/RTO, offset semantics, MirrorMaker-style tools |
| Payload size distribution? | Large messages need chunking or separate blob store |
| Who runs consumers: your team vs external teams? | API stability, schema evolution (Avro/Protobuf), SLAs |
Functional Requirements¶
| Requirement | Priority | Notes |
|---|---|---|
| Publish records to named topics | Must have | Key, value, headers, timestamp |
| Subscribe via consumer groups | Must have | Each group gets a copy of the stream; partitions assigned to members |
| At-least-once delivery (default) | Must have | Retries + idempotent application or dedupe |
| Per-partition ordering | Must have | Total order only within a partition |
| Persistence & replay | Must have | Seek to offset; retention window |
| Partitioning by key | Must have | hash(key) % num_partitions for affinity |
| Admin: create topic, alter replicas, ACLs | Should have | Production operability |
Non-Functional Requirements¶
| Requirement | Target (illustrative) | Rationale |
|---|---|---|
| Throughput | > 1M messages/sec cluster-wide | Horizontal scale via partitions and brokers |
| Latency (p99) | < 10 ms produce/fetch on healthy path | Page cache, batching, zero-copy; cross-AZ hurts |
| Durability | No acknowledged write lost on acks=all + min ISR |
Replication + fsync policy |
| Availability | 99.99% | RF=3, leader election, rack awareness |
| Elastic scale | Add brokers; expand partitions (key compatibility) | Operational story |
Warning
Exactly-once in distributed systems is usually exactly-once semantics for side effects composed from idempotent writes + transactional reads/writes or transactions + idempotent producer—not “the network delivered exactly once.” Say that clearly in Staff interviews.
API Design¶
Illustrative REST for control plane; data plane is binary TCP (Kafka protocol) or gRPC in modern variants.
Topic & produce (sketch):
POST /v1/clusters/{cluster}/topics/{topic}/records
Content-Type: application/json
{
"records": [
{ "key": "user-42", "value": "eyJldmVudCI6InB1cmNoYXNlIn0=", "partition": null },
{ "key": "user-42", "value": "...", "partition": null }
],
"acks": "all",
"idempotence": true
}
Consumer group — fetch:
GET /v1/clusters/{cluster}/topics/{topic}/partitions/{p}/records?offset=100&max_bytes=1048576
X-Consumer-Group: payments-workers
Python client-style API (conceptual):
from dataclasses import dataclass
from typing import Iterator, Optional
@dataclass
class Record:
offset: int
key: bytes
value: bytes
timestamp_ms: int
class Producer:
def send(
self,
topic: str,
key: Optional[bytes],
value: bytes,
*,
partition: Optional[int] = None,
acks: str = "all",
) -> RecordMetadata: ...
class Consumer:
def subscribe(self, topics: list[str]) -> None: ...
def poll(self, timeout_ms: int) -> list[Record]: ...
def commit(self, offsets: dict[tuple[str, int], int]) -> None: ...
Tip
Mention metadata service (topic → partition leaders), authentication (SASL), and compression (gzip, lz4, zstd) at the boundary—they matter for production.
Technology Selection & Tradeoffs¶
Choosing a message platform is choosing durability semantics, operational model, and client protocol together. Interviewers expect you to name two realistic options and justify one for the stated workload (throughput, ordering, multi-tenancy, cloud vs self-managed).
Broker architecture¶
| Style | Examples | How it works | Strengths | Weaknesses |
|---|---|---|---|---|
| Log / partition-centric | Apache Kafka, Redpanda | Append-only partitions on brokers; offsets; consumer groups pull | Massive throughput, replay, many subscribers per topic | Ops-heavy; hot partitions; fewer built-in routing patterns than AMQP |
| Traditional broker / queue-centric | RabbitMQ, ActiveMQ | Exchanges, queues, optional streams plugin | Flexible routing, classic task queues, per-queue DLQ patterns | Classic queues are not infinite replay logs; throughput often broker-bound |
| Hybrid / layered | Apache Pulsar | Broker serves clients; BookKeeper or cloud journal for persistence; tiered offload | Strong isolation, geo, tiered storage story | More moving parts; steeper learning curve |
| Cloud-native managed | Amazon SQS + SNS, GCP Pub/Sub, Azure Event Hubs | Fully managed APIs, quotas, IAM | Fast to production, no broker fleet to run | Vendor semantics, egress/ingress costs, less control over internals |
Note
Why this matters in interviews: “Kafka vs RabbitMQ” is not a beauty contest—it is log + replay + fan-out vs routing + competing consumers + delete-on-ack. Pick the model that matches retention, ordering, and who owns consumer scale-out.
Storage backend (for log-style systems)¶
| Option | Idea | Strengths | Weaknesses |
|---|---|---|---|
| Append-only log on local disk | Segmented files per partition; sequential writes | Predictable throughput; easy replication (byte-identical followers) | Disk full, single-AZ failure without replication |
| Page cache + OS read path | Writes hit page cache; sendfile/mmap for reads |
Zero-copy-friendly fan-out; low CPU | Tail latency spikes if cache cold or GC pauses |
| Tiered storage (e.g. older segments → S3) | Hot data local; cold in object store | Long retention without linear disk spend | Read latency for cold segments; ops complexity |
Tip
Tie storage to SLO: local SSD + page cache wins p99 produce/fetch; tiered wins $/GB-month for compliance retention. Say both when the interviewer mentions “7 years of audit logs.”
Delivery semantics¶
| Semantics | Meaning | Duplicate / loss risk | Typical mechanism |
|---|---|---|---|
| At-most-once | Message may be lost, never duplicated | Loss on crash before ack | Ack before processing; or fire-and-forget producer |
| At-least-once | Message never lost if committed; may be processed twice | Duplicates on retry | Ack after processing + idempotent sinks |
| Exactly-once (broker + app) | Effects happen once | Requires idempotent producer, transactions where needed, idempotent consumer/sink | Sequence numbers, txn boundaries, dedupe keys |
Warning
Exactly-once is a system property, not a single checkbox: the broker can dedupe retries and support transactions, but your database still needs keys or upserts so duplicate delivery does not double-charge.
Protocol¶
| Protocol | Shape | Strengths | Weaknesses |
|---|---|---|---|
| Custom binary (Kafka protocol) | Length-prefixed RPC frames, versioned APIs | Mature clients; batching; zero-copy path | Not human-debuggable with curl |
| AMQP 0.9.1 / 1.0 | Broker exchanges, queues, acknowledgements | Great for routing and per-queue semantics | Heavier than minimal log RPC for huge firehose |
| MQTT | Pub/sub over TCP; lightweight | IoT, intermittent networks; small footprint | Not the default for multi-MB payloads or huge intra-DC throughput |
| gRPC / HTTP/2 | Request streams, flow control | Uniform with microservices; easy gateways | Often extra framing vs highly tuned binary log pipeline |
Our choice (for this Kafka-like guide): a partitioned append-only log with custom or Kafka-compatible binary protocol, local segmented storage with page-cache-friendly I/O, optional tiered offload for cost, at-least-once as the default product semantics with idempotent producer + idempotent application for duplicate control, and transactions only where read-process-write atomicity is required. This maximizes throughput and replay while keeping a clear path to production-grade durability via ISR and acks=all.
CAP Theorem Analysis¶
Distributed queues sit on the CP vs AP frontier: network partitions force a choice between accepting writes (availability to producers, risk duplicates or divergent state) and rejecting writes (consistency of what is “committed,” risk unavailability or producer-side backlog).
| During a partition… | If the broker favors… | What happens | Interview framing |
|---|---|---|---|
| Subset of replicas isolated | CP (quorum / min ISR not met) | Reject or fail produces that need replication | No silent loss of acknowledged writes; producers retry or fail visible |
| AP (accept leader writes without ISR) | Higher availability of the write API | Risk data loss on failover (unclean leader election)—must be explicit | |
| Producer cannot reach leader | Either | Timeouts / retries | At-least-once with duplicate risk unless idempotent producer |
Note
CAP is about network partition, not “disk died.” Replication and leader election are how you map partition tolerance to concrete broker behavior.
ISR (in-sync replica) model (sketch): Each partition has a leader and followers. The ISR is the set of replicas caught up within lag bounds. A produce with acks=all is acknowledged only after all ISR replicas have the record (Kafka’s exact rules involve HW and leader epochs—interviews use this story). If a follower falls behind, it is dropped from ISR; min.insync.replicas blocks commits if the ISR is too small—CP over taking writes without enough copies.
Tip
Staff sound bite: Under partition, Kafka-style systems with acks=all + min ISR behave CP on the commit set (you may not get an ack). Unclean election trades C for A—say that only for explicit disaster policies.
SLA and SLO Definitions¶
SLIs are measurable; SLOs are targets over a window; error budgets decide when to freeze features vs fix reliability.
| Area | SLI (what we measure) | Example SLO (monthly) | Why candidates care |
|---|---|---|---|
| Publish latency | Time from send return to broker ack |
p99 < 10 ms, p999 < 50 ms (same region) | Batching vs linger.ms trade-off; cross-AZ blows p99 |
| End-to-end delivery latency | Produce timestamp → consumer processing complete | p99 < 100 ms for pipeline class (tuned) | Includes consumer lag, poll interval, downstream DB |
| Message durability | Fraction of acked acks=all writes lost |
< 0.001% annual (often expressed via RPO) | Ties to RF=3, ISR, rack awareness |
| Availability | Successful produce+fetch API / total attempts | 99.95%–99.99% API monthly | Controller quorum, rolling restarts, AZ spread |
| Ordering guarantee | Violations of per-partition total order | 100% within partition for keyed traffic | Misconfigured parallelism or retries break order |
Note
Publish SLO is broker-local; end-to-end SLO is product-owned—always separate them in interviews.
Error budget policy (example):
| Budget state | Action |
|---|---|
| > 50% remaining | Normal feature velocity; optional latency tuning |
| 25–50% | Freeze non-critical launches; prioritize ISR, lag, p99 work |
| < 25% | Incident review; no partition increases without review; paging on SLO burn |
| Exhausted | Freeze releases until root cause; exec review |
Warning
Consumer lag is not always an SLO violation: define whether max lag (e.g. < 5 minutes at p99) is a product SLO or an internal alerting threshold.
Data Model¶
Clear entities help you explain metadata, storage layout, and failure recovery on the whiteboard.
| Entity | Definition | Interview hooks |
|---|---|---|
| Topic | Named logical stream; configured with partition count, retention, replication factor, compaction | Control plane stores config; clients resolve metadata to leaders |
| Partition | Ordered append-only sub-log; unit of parallelism and per-partition order | Hot key → one partition → skew |
| Offset | Monotonic logical position within a partition (0-based or long sequence) | Consumer progress; not portable across clusters after geo-replication |
| Message / record envelope | On-wire or stored blob with fields below | Serialization (Avro/Protobuf/JSON) is application concern |
Record envelope (typical):
| Field | Role |
|---|---|
| Key | Partition routing (hash(key) % P); optional null for round-robin |
| Value | Payload bytes (often serialized record) |
| Headers | Metadata (trace id, content-type, schema id) without parsing value |
| Timestamp | Create time (producer or broker LogAppendTime); used for retention and timeindex |
Tip
Say headers carry observability and routing hints; value carries business data—helps in schema-evolution questions.
Consumer group state (logical):
| Piece | Stored as |
|---|---|
| Group id | String; coordinator shard by hashing group name |
| Generation / epoch | Increments on rebalance; guards stale commits |
| Member id → partitions | Assignment vector; sticky assignors reduce churn |
| Committed offsets | (topic, partition) → offset map; internal compacted topic or external store |
Dead letter queue (DLQ) structure (pattern):
| Field | Purpose |
|---|---|
| Original topic, partition, offset | Replay or inspect source record |
| Failure reason / stack | Ops triage |
| Payload copy or reference | Idempotency; large bodies may be blob store pointer |
| Retry count, first/last failure time | Backoff and alert thresholds |
Note
DLQ is often a separate topic (or queue product) with short retention + alerting—not infinite storage of bad messages.
Step 2: Back-of-Envelope Estimation¶
Assumptions¶
- Peak produce: 1M messages/sec (aggregate cluster)
- Average message size: 1 KB (key + value + headers overhead included)
- Replication factor: 3 (3× disk for committed data if all replicas hold full copy)
- Retention: 7 days time-based
Messages per Second per Partition (Sanity)¶
If a topic uses 2,048 partitions at peak:
1e6 / 2048 ≈ 488 msg/s per partition leader (average)
Skew: hot keys can drive 10–100× average on a single partition — always
provision headroom and monitor per-partition throughput in dashboards.
Traffic & Bandwidth¶
Ingress (producers): 1e6 msg/s × 1 KB ≈ 1 GB/s
With replication factor 3, bytes written to follower replicas
(order-of-magnitude): ~2 GB/s additional inter-broker traffic
(leader → followers), depending on rack layout and ack policy.
Egress to consumers is workload-dependent; often similar order as
ingress for fan-out pipelines (multiple consumer groups multiply reads).
Interview sound bite: “I’d separate client-facing throughput from replication fan-out—they’re different bottlenecks (NIC vs disk vs ISR).”
Storage (7-Day Retention)¶
Per second: 1e6 × 1 KB = 1 GB/s
Per day: 1 GB/s × 86,400 s ≈ 86.4 TB/day raw (single copy logical)
Seven days logical (single copy): 86.4 TB × 7 ≈ 605 TB ≈ 0.6 PB
With replication factor 3 on disk for durability: ~1.8 PB physical
(ignoring compression; compression often 2–5× for text-like payloads)
Note
Real clusters use compression and tiered storage (older segments to object store). Your number should move order-of-magnitude, not three decimals.
Partitions & Brokers (Rough Cut)¶
Goal: each partition leader handles ~10–50 MB/s of produce (SSD and
page-cache friendly — tune in real benchmarks).
1 GB/s aggregate → order 20–100 partition leaders hot at peak
(if perfectly balanced — real workloads need headroom for skew).
If each broker hosts ~2 GB/s NIC sustained budget with replication,
~3–5 brokers minimum for network alone at 1 GB/s ingress — in practice
many more for failure domains, GC pauses, and burst.
Illustrative: 50 brokers × 12 partitions/broker → 600 partitions
(illustrative shard count; Kafka clusters often run thousands of partitions)
Warning
Hot partitions (popular keys) blow up this math. Always mention skew and splitting topics or salt keys when estimation is challenged.
Broker Count (Order-of-Magnitude)¶
| Sizing input | Example |
|---|---|
| Target ingress | 1 GB/s |
| Per-broker sustainable produce + replicate | ~200–400 MB/s (depends on SSD, NIC, replication) |
| Failure headroom | 2× for broker loss during peak |
Rough cluster size: ~10–30 brokers for this hypothetical (not a law—validate with load tests). Add brokers for rack/AZ spread and noisy neighbors on multi-tenant clouds.
Step 3: High-Level Design¶
Architecture (Mermaid)¶
Modern Kafka uses an internal metadata quorum (KRaft — Kafka Raft) in place of ZooKeeper in new deployments; interviews still often mention ZooKeeper for historical reasons. Either way, the control plane stores topic/partition leadership and cluster membership; the data plane is brokers serving partitions.
Produce → Fetch Flow (Sequence)¶
Component Responsibilities¶
| Component | Responsibility |
|---|---|
| Producer | Partition key hashing, batching, retries, acks, idempotence sequence |
| Broker / partition leader | Append to local log; replicate to followers; serve fetch requests |
| Follower | Truncate to leader epoch; replicate in-sync; join ISR when caught up |
| Consumer group coordinator | Group membership, partition assignment, generation/epoch |
| Offset storage | __consumer_offsets topic or external store (rare) |
| Metadata service (ZK/KRaft) | Topic config, leader/ISR, controlled shutdown, ACL metadata |
Tip
Draw one partition’s leader + two followers on the whiteboard before talking about ISR—interviewers map mental models faster.
Step 4: Deep Dive¶
4.1 Append-Only Log and Segment Storage¶
Core idea: Each partition is an ordered append-only log on disk. The log is split into segments (files) to bound recovery time and retention deletion.
| Piece | Role |
|---|---|
| Active segment | Only the tail segment accepts appends; roll on size or time |
| Immutable segments | Older files are read-only; cheap to cache and replicate |
| Index | Maps logical offset → file position (sparse offset index) |
| Timeindex | Timestamp → offset for time-based retention |
Write path: producer → leader appends batch to page cache, eventually flushed/fsync per durability settings; followers pull in fetch requests from the leader (Kafka replication).
Segment file, sparse index, and naming
Each record is length-prefixed payload bytes. The sparse index stores (relative_offset → physical_position) every INDEX_EVERY records; lookup uses binary search for the floor entry, then linear scan within the segment (same idea as Kafka’s .index + scan in .log).
import os
import struct
from bisect import bisect_right
from dataclasses import dataclass
_LEN = struct.Struct(">I")
@dataclass(frozen=True)
class IndexEntry:
relative_offset: int
physical_position: int
class Segment:
"""One on-disk segment: append-only log + sidecar sparse offset index."""
INDEX_EVERY = 4 # demo: index every N records; production uses byte stride
def __init__(self, base_offset: int, log_path: str, index_path: str):
self.base_offset = base_offset
self.log_path = log_path
self.index_path = index_path
self._entries: list[IndexEntry] = []
self._record_count = 0
self._rebuild_meta_from_log()
def _rebuild_meta_from_log(self) -> None:
self._entries = []
if not os.path.exists(self.log_path):
self._load_index_file()
return
with open(self.log_path, "rb") as log:
pos = 0
rel = 0
while True:
hdr = log.read(4)
if len(hdr) < 4:
break
(ln,) = _LEN.unpack(hdr)
if rel % self.INDEX_EVERY == 0:
self._entries.append(IndexEntry(rel, pos))
log.read(ln)
pos += 4 + ln
rel += 1
self._record_count = rel
def _load_index_file(self) -> None:
if not os.path.exists(self.index_path) or self._entries:
return
with open(self.index_path, "rb") as f:
raw = f.read()
self._entries = []
for i in range(0, len(raw), 8):
rel, pos = struct.unpack_from(">ii", raw, i)
self._entries.append(IndexEntry(rel, pos))
def _persist_index(self) -> None:
with open(self.index_path, "wb") as f:
for e in self._entries:
f.write(struct.pack(">ii", e.relative_offset, e.physical_position))
def append(self, record_bytes: bytes) -> int:
physical = os.path.getsize(self.log_path) if os.path.exists(self.log_path) else 0
rel = self._record_count
if rel % self.INDEX_EVERY == 0:
self._entries.append(IndexEntry(rel, physical))
with open(self.log_path, "ab") as log:
log.write(_LEN.pack(len(record_bytes)))
log.write(record_bytes)
self._record_count += 1
self._persist_index()
return self.base_offset + rel
def _floor_entry(self, target_rel: int) -> tuple[int, int]:
if not self._entries:
return (0, 0)
keys = [e.relative_offset for e in self._entries]
i = bisect_right(keys, target_rel) - 1
if i < 0:
return (0, 0)
e = self._entries[i]
return (e.relative_offset, e.physical_position)
def read_at_relative(self, target_rel: int) -> bytes | None:
if target_rel < 0 or target_rel >= self._record_count:
return None
cur_rel, pos = self._floor_entry(target_rel)
with open(self.log_path, "rb") as log:
log.seek(pos)
while cur_rel < target_rel:
(ln,) = _LEN.unpack(log.read(4))
log.read(ln)
cur_rel += 1
(ln,) = _LEN.unpack(log.read(4))
return log.read(ln)
import java.nio.MappedByteBuffer;
// Illustrative: sparse offset index — map relative offset -> physical position
public final class OffsetIndex {
private final MappedByteBuffer mmap;
public void append(int relativeOffset, int physicalPosition) {
// each entry: 4 bytes relative offset + 4 bytes position
mmap.putInt(relativeOffset);
mmap.putInt(physicalPosition);
}
public int lookup(long targetOffset) {
// binary search for floor of targetOffset
return 0;
}
}
Log compaction preview: tombstones and key-based deduplication¶
Log compaction (for changelog topics) keeps the latest value per key and drops older versions in the background. A tombstone is a record with null/empty value meaning “delete this key”; it must be retained until the compaction policy removes it (Kafka: delete.retention.ms after compaction).
Python: in-memory simulation of compaction — walk ordered records, keep last write wins per key; tombstones participate in deduplication so an older value cannot “come back.”
from dataclasses import dataclass
from typing import Iterator
# Sentinel: producer writes empty payload to mean "delete key" (tombstone)
TOMBSTONE = b""
@dataclass(frozen=True)
class LogRecord:
offset: int
key: bytes
value: bytes # TOMBSTONE means delete marker
def compact_records_by_key(records: list[LogRecord]) -> list[LogRecord]:
"""
Merge-sort style single pass: for each key, only the last record survives.
Tombstone last => key absent from output (Kafka still retains tombstone until
delete retention; this models the key map after compaction).
"""
last_by_key: dict[bytes, LogRecord] = {}
for r in records:
last_by_key[r.key] = r
# Emit in offset order of surviving records (by key's last offset)
survivors = sorted(last_by_key.values(), key=lambda x: x.offset)
out: list[LogRecord] = []
for r in survivors:
if r.value == TOMBSTONE:
continue # key removed from log; tombstone elided after retention window
out.append(r)
return out
def stream_compact_segment(
records: Iterator[LogRecord],
) -> Iterator[LogRecord]:
"""Streaming variant for large segments: bounded memory if keys shard per cleaner pass."""
last_by_key: dict[bytes, LogRecord] = {}
for r in records:
last_by_key[r.key] = r
for r in sorted(last_by_key.values(), key=lambda x: x.offset):
if r.value != TOMBSTONE:
yield r
Tip
Interview line: compaction is not “delete random rows”—it is key-ordered merging of segments so the tail of the log for each key wins; tombstones suppress older values until garbage-collected.
Note
Why segments? Limit file size, bound recovery, allow delete by whole file on retention, and enable compacted topics (Kafka log compaction) to rewrite segments in the background.
High watermark (HW) vs committed: The leader maintains the log end offset (LEO) per replica; the high watermark is the smallest LEO across ISR (simplified mental model). Consumers only read up to HW for committed data under the usual replication story—interviewers may probe edge cases around leader election and unclean failover.
Sparse offset index: Not every offset maps to disk—typically every few KB you record (relative_offset → physical_position) so seek is O(log n) in index entries, then scan within the segment.
4.2 Partitioning and Replication¶
Partition assignment: each topic has P partitions; brokers host a subset of partition leaders and replicas subject to rack-awareness rules.
ISR (In-Sync Replicas): followers that are caught up within a bounded lag; only ISR members are eligible for leader election on failure.
| Term | Meaning |
|---|---|
| Leader | Handles produce/fetch for the partition |
| Follower | Copies the leader’s log |
| AR (Assigned Replicas) | All replicas assigned to the partition |
| ISR | Subset that is sufficiently up-to-date |
Replication protocol (simplified):
- Producer sends to leader.
- Leader writes to local log; followers send fetch requests (Kafka acts as pull replication).
- Followers append in order; advance high watermark when ISR replicates.
Leader election: metadata controller selects a new leader from ISR (prefer unclean leader election only if availability trumps consistency—config flag).
Python: ISR membership (illustrative)
from dataclasses import dataclass, field
@dataclass
class ReplicaState:
broker_id: int
log_end_offset: int = 0
last_fetch_ms: int = 0
@dataclass
class Partition:
leader: int
replicas: dict[int, ReplicaState] = field(default_factory=dict)
def isr(self, max_lag: int, now_ms: int) -> list[int]:
leader_leo = self.replicas[self.leader].log_end_offset
alive = []
for bid, st in self.replicas.items():
if bid == self.leader:
continue
if leader_leo - st.log_end_offset <= max_lag and now_ms - st.last_fetch_ms < 10_000:
alive.append(bid)
return [self.leader] + alive
Warning
If min.insync.replicas is not met, producer with acks=all should fail—you trade availability for avoiding silent data loss. Explain this trade-off.
Rack awareness: place replicas on different racks/AZs so a single failure domain does not wipe ISR. The controller tries to honor replica placement constraints when assigning partitions.
4.3 Producer Guarantees¶
acks (Kafka producer):
| acks | Behavior | Durability vs latency |
|---|---|---|
| 0 | Fire-and-forget | Lowest latency; can lose on crash |
| 1 | Leader ack | Balanced; loss if leader dies before replication |
| all | All ISR ack | Strongest; higher latency |
Idempotent producer: broker tracks Producer ID + sequence number per partition; duplicates from retries are dropped.
Exactly-once (stream processing): combine idempotent producer, transactions (atomic write to multiple partitions), and read-process-write with correct isolation—or use external idempotent sinks.
Producer configuration and retry semantics
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
p.put(ProducerConfig.ACKS_CONFIG, "all");
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payments-tx-1");
Tip
State clearly: EOS across services usually needs idempotent consumers or dedupe store—the broker does not magically make downstream DB writes exactly-once without application design.
Transactional producer (two-phase style): initTransactions → beginTransaction → send batches → sendOffsetsToTransaction → commitTransaction. Aborts are visible to consumers per isolation.level (read_committed skips open transactions).
Python (pseudocode for transaction boundary):
def process_batch_txn(producer, consumer, records, output_topic):
producer.begin_transaction()
try:
for r in records:
producer.send(output_topic, key=r.key, value=transform(r.value))
offsets = {tp: next_offset(tp) for tp in consumer.assignment()}
producer.send_offsets_to_transaction(offsets, consumer.consumer_group_metadata)
producer.commit_transaction()
except Exception:
producer.abort_transaction()
raise
4.4 Consumer Groups and Offset Management¶
Group coordinator: one broker acts as coordinator for the group; handles JoinGroup, SyncGroup, Heartbeats.
Rebalancing: when members join/leave, partitions are revoked and reassigned (protocol: range, round-robin, sticky, cooperative sticky).
| Strategy | Behavior |
|---|---|
| Eager | Stop all, reassign everything — simple, bursty |
| Cooperative | Revoke subset — less stop-the-world |
Offsets: stored in internal topic __consumer_offsets (or ZooKeeper legacy). Commit after processing = at-least-once; commit before = at-most-once.
Offset commit strategies:
| Strategy | Semantics | Risk |
|---|---|---|
| Auto commit (interval) | Simple | Can commit before processing → at-most-once processing with confusing failure modes |
| Sync commit after work | At-least-once | Duplicates if process dies after work but before commit |
| Transactional read-process-write | EOS within Kafka transaction | Operational complexity; broker version support |
At-least-once consume and commit
Note
Rebalance storm during GC pauses or slow processing is a classic production issue—tune session.timeout.ms, max.poll.interval.ms, and use cooperative rebalancing where possible.
Static membership (optional): group.instance.id pins a consumer identity to reduce churn on rolling deploys—fewer unnecessary rebalances when the same container comes back.
Consumer group state machine (conceptual):
4.5 Zero-Copy and High-Performance I/O¶
Goal: move bytes from disk/socket to socket with minimal CPU copies.
| Mechanism | Role |
|---|---|
sendfile / transferTo |
Kernel copies from page cache to NIC without user-space buffers |
| Page cache | Log reads/writes hit OS cache; sequential access is fast |
| Batching | Amortize metadata and round trips (linger.ms, batch.size) |
| Compression | Trade CPU for network/disk (lz4/zstd popular for low CPU) |
Zero-copy send path (sketches)
Tip
Mention batching trade-off: higher linger.ms improves throughput and compression ratio but hurts p99 latency—tie to SLO.
Batching knobs (Kafka producer):
| Config | Effect |
|---|---|
batch.size |
Larger batches → fewer requests |
linger.ms |
Wait to fill batches; trades latency for throughput |
compression.type |
lz4/zstd for CPU vs bytes on wire |
buffer.memory |
Backpressure when producer buffer is full |
End-to-end latency budget: client serialization + RTT to leader + ISR replication + optional follower fetch delay + consumer poll interval. Sub-ms p99 often requires same-AZ clients, tuned fetch, and avoiding large GC pauses.
4.6 Ordering Guarantees¶
| Guarantee | What you get | Cost |
|---|---|---|
| Per-partition total order | All consumers see the same order in one partition | Must route related events to same partition via key |
| Per-key order | Same as partition if key maps to partition | Hot keys |
| Global order | Single partition (or external sequencing service) | No horizontal scale for writes |
Key-based routing:
Partition helper and idempotent producer settings
Warning
Global ordering across all events usually implies one partition or a central sequencer—say you’d only do it for rare audit streams, not the main firehose.
Retries vs ordering: With max.in.flight.requests.per.connection > 1 and retries, ordering can break if batches complete out of order—idempotent producer + careful in-flight settings restore predictable behavior; interviewers often expect you to mention this interaction.
4.7 Log Compaction¶
Why compaction matters: Default time/size retention keeps a append history (great for replay). Compaction is for changelog topics where each key represents entity state: you want “latest value per key” to fit in disk, not infinite history. Kafka Streams KTable semantics assume a compacted changelog backing store—materialized state is key → latest value.
| Use case | Retention story |
|---|---|
| CDC / config / state topics | Compact so brokers store current key set, not every revision forever |
| Event sourcing with snapshots | Often combine compaction + snapshots in the app layer |
Compaction algorithm (conceptual):
- Split the log into segments; classify segments as clean (already compacted) vs dirty (ratio of obsolete records high).
- Dirty ratio / size thresholds trigger cleaner threads (Kafka:
log.cleaner.*,min.cleanable.dirty.ratio). - Cleaners merge segments: for each key, only the last record in offset order is kept in the output segment; tombstones (null value) delete the key from the compacted view but may be retained briefly so replicas/consumers observe the delete.
- Inter-segment order is preserved by merge-sort style iteration by offset.
Note
Real brokers schedule cleaner I/O to avoid starving produce/fetch; throttle and buffer settings matter on SSD-heavy clusters.
Python: compact one segment’s records by key (same idea as a cleaner pass merging in-order iterators):
from dataclasses import dataclass
from heapq import merge
@dataclass(frozen=True)
class KV:
offset: int
key: bytes
value: bytes | None # None = tombstone
def merge_sorted_segments(*segments: list[KV]) -> list[KV]:
"""
Each segment list is sorted by offset. K-way merge preserves global offset order,
then we collapse to last-write-wins per key (cleaner merge pass).
"""
ordered = list(merge(*segments))
last: dict[bytes, KV] = {}
for r in ordered:
last[r.key] = r
return sorted(last.values(), key=lambda r: r.offset)
def compact_dirty_segment(records: list[KV]) -> list[KV]:
"""Single segment: keep last KV per key; drop superseded rows."""
last: dict[bytes, KV] = {}
for r in sorted(records, key=lambda x: x.offset):
last[r.key] = r
return sorted(last.values(), key=lambda r: r.offset)
Tombstone records and delete markers: A tombstone is a record with null value (Kafka: null payload). It tells compaction and consumers “this key is deleted.” Compaction eventually removes the tombstone after delete.retention.ms so the key does not stay in the log forever—until then, replicas must replicate tombstones for correctness.
Warning
If you never emit tombstones for deleted keys, old values can reappear after failover or when a consumer reads from an uncompacted tail—design deletes explicitly in compacted topics.
Trade-offs:
| Factor | Effect |
|---|---|
| Compaction I/O | Background read+rewrite of segments; competes with produce/fetch |
| Storage savings | Large for high-churn keys with small latest state |
| Read path | Consumers of changelog see key-level truth faster than scanning full history |
| Ordering | Still per-partition total order in the uncompacted log; compacted file is per-key latest view |
Tip
Mention "dirty ratio" in interviews: the cleaner prioritizes segments where obsolete records are a large fraction of bytes—maximizing reclaim per I/O dollar.
4.8 Multi-Region Replication¶
Problem: One cluster lives in us-east; disaster recovery, low-latency local reads, or geo routing need copies of streams elsewhere. Offsets and metadata are cluster-local—clients cannot assume same numeric offset means the same record in another cluster.
MirrorMaker 2 (MM2) / Cluster Linking (Confluent) / geo-replication products: run connectors that consume from a source cluster and produce to a target, preserving topic names (optionally prefixed), partitioning (best-effort), and ordering per partition on the target. Cluster Linking is often broker-native with stronger offset/ACL semantics—position as product-specific in interviews.
Offset translation across clusters: There is no universal offset mapping—a record at offset 9001 in cluster A is not the same byte offset in cluster B. Replication tracks progress via consumer offsets in the replication flow (MM2 internal topics) or byte checkpoints. Application-level idempotency keys in the payload beat trying to sync raw offsets for business logic.
| Pattern | Behavior | Typical use |
|---|---|---|
| Active-passive | One cluster primary for writes; standby consumes replica for DR readiness | RPO via replication lag; failover promotes passive |
| Active-active | Both regions produce; replication bidirectional | Low-latency local writes; conflicts must be designed |
Conflict resolution (active-active): If the same key can be written in both regions, you need deterministic merge rules: last-write-wins (LWW) with vector clocks / version fields, CRDTs, or region-owned key prefixes (us-east-{id} vs eu-west-{id}) so keys don’t collide. The log does not magically merge business conflicts.
RPO / RTO:
| Metric | Meaning for replication |
|---|---|
| RPO | Data lost on disaster = replication lag + unreplicated in-flight produce; sync cross-region is rare (latency) |
| RTO | Time to redirect clients / promote secondary; includes DNS, metadata, and consumer restart |
Note
State RPO ≠ 0 for async geo replication unless you pay cross-region sync latency and durability costs on every write.
Warning
Exactly-once across regions is harder: idempotence and transactions are per cluster—end-to-end pipelines need dedupe at sinks or external coordination.
Step 5: Scaling & Production¶
Failure Handling¶
| Failure | Detection | Mitigation |
|---|---|---|
| Broker crash | Controller + metadata | Elect new leader from ISR; clients refresh metadata |
| Slow follower | Lag metrics | Drop from ISR; alert before data loss risk |
| ZooKeeper/KRaft loss | Quorum health | Metadata availability; avoid brain split with epochs |
| Disk full | Metrics | Retention, tiering, expand volumes |
| Consumer slow | max.poll exceeded |
Scale consumers, increase partitions, tune timeouts |
Monitoring Dashboard (Examples)¶
| Metric | Why |
|---|---|
| Bytes in / out per broker | Capacity; hot brokers |
| Under-replicated partitions | ISR health; replication backlog |
| Request latency p99 (produce/fetch) | SLO tracking |
| Consumer lag | Backlog; scaling signal |
| ISR shrink/expand rate | Stability incidents |
Trade-offs¶
| Choice | Upside | Downside |
|---|---|---|
| More partitions | Parallelism | More metadata, rebalance cost, file handles |
acks=all |
Durability | Higher latency |
| Long retention | Replay, debugging | Storage cost |
| Log compaction | Key-value changelog semantics | Compaction I/O; tombstones |
| Large messages | Simplicity | Broker limits; use reference + blob store |
Operational Runbooks (Staff-Level)¶
| Incident | First actions |
|---|---|
| Growing consumer lag | Scale consumers; check downstream DB; add partitions only if key space allows |
| Under-replicated partitions | Broker disk/NIC; slow follower; ISR shrink |
| Broker disk 90%+ | Lower retention; tiered storage; expand volumes |
| Controller flapping | JVM GC; metadata quorum health; network partition |
Security & Multi-Tenancy¶
| Layer | Control |
|---|---|
| Transport | TLS for client–broker and inter-broker |
| AuthN | SASL (SCRAM, OAuth, mutual TLS) |
| AuthZ | ACLs on topic/resource |
| Quotas | Produce/fetch byte rate per principal |
Note
Production answers tie SLO (lag p99, uptime) to alerting and runbooks (unclean election, broker replacement).
Interview Tips¶
Note
Common Google-style follow-ups: How does leader failover change ordering? What happens to uncommitted messages? How do you avoid split-brain? Explain watermark and committed offset vs LEO. Compare Kafka vs RabbitMQ for task queues. How would you implement priority (usually separate topics or weighted consumers)? What about multi-region: active-active consumers and offset mapping? How does Kafka Streams achieve exactly-once? What is the cooperative sticky assignor solving?
Quick checklist:
| Theme | Be ready to explain |
|---|---|
| Replication | ISR, min ISR, unclean election |
| Producer | Idempotence, retries, ordering with retries |
| Consumer | Rebalance, partition assignment, commit points |
| Storage | Segments, indexes, retention, compaction |
| Performance | Page cache, sendfile, batching, compression |
Summary¶
| Concept | One-liner |
|---|---|
| Partitioned log | Scale throughput; order within partition only |
| ISR + leader | Durability and failover without arbitrary failover data loss |
| Offsets | Replay and consumer progress; at-least-once by default |
| Idempotent producer | Safe retries without duplicate sequence numbers |
| Zero-copy | Cheap fan-out from page cache to network |
| Hot partitions | Key skew is the enemy of estimation and latency |
| Log compaction | Latest value per key; tombstones + cleaner I/O trade-offs |
| Multi-region | MM2 / linking; offsets not portable; active-active needs merge rules |
This walkthrough aligns with Kafka-style log systems; adapt naming if the interviewer prefers Pulsar (broker + BookKeeper) or cloud-managed streaming.