DARK MODE

Distributed Systems

// CAP · consistency models · consensus · replication · distributed transactions · fault tolerance · senior → principal

Overview
Deep Dive
Q & A
Scenarios
Core Concepts
⚖️ CAP Theorem & PACELC
CAP theorem: a distributed system can guarantee at most two of three properties simultaneously during a network partition: - Consistency (C): every read returns the most recent write or an error - Availability (A): every request receives a (non-error) response - Partition tolerance (P): the system continues operating despite network partitions Network partitions are not optional in real distributed systems — they happen. So the real choice is CP vs AP: when a partition occurs, do you return an error (preserve consistency) or return potentially stale data (preserve availability)? PACELC extends CAP for the normal (no-partition) case: even when the network is healthy, there is a trade-off between Latency and Consistency. Synchronous replication is consistent but adds latency; asynchronous replication is faster but eventually consistent. Example: DynamoDB is AP/EL (available during partitions, low latency over consistency); HBase is CP/EC (consistent during partitions, consistent over latency).
P is not optional CP = consistency over availability AP = availability over consistency
🎯 Consistency Models
From strongest to weakest: Linearizability (strong consistency): operations appear to execute instantaneously at some point between their invocation and response. A read always returns the value of the most recent write, globally. Requires coordination on every operation. Used by etcd, Zookeeper, Google Spanner. Sequential consistency: all nodes see operations in the same order, but that order doesn't have to match real time. Stronger than causal, weaker than linearizability. Causal consistency: operations that are causally related are seen in causal order by all nodes. Concurrent (causally unrelated) operations may be seen in different orders. MongoDB with causal sessions, Dynamo with vector clocks. Eventual consistency: if no new updates are made, all replicas will converge to the same value — eventually. No guarantee on when or what intermediate reads return. DNS, Cassandra (default), DynamoDB (default).
linearizable = globally ordered causal = respects happens-before eventual = best effort convergence
🗳️ Consensus: Raft
Consensus is the problem of getting a group of nodes to agree on a value despite failures. Required for leader election, replicated state machines, and distributed locks. Raft solves consensus by electing a leader who handles all writes. Followers replicate the leader's log. Terms are monotonically increasing election epochs. Leader election: nodes start as followers. On timeout without a heartbeat, a follower becomes a candidate and requests votes. A candidate winning a majority becomes leader. Raft guarantees only one leader per term. Log replication: the leader appends entries to its log and sends them to followers. An entry is committed once a majority acknowledges it. Committed entries are never lost, even if the leader crashes. Safety: Raft's leader election ensures the winner always has the most complete committed log — a node votes for a candidate only if the candidate's log is at least as up-to-date.
Follower timeout Candidate requests votes Majority votes → Leader Leader replicates log Majority ack → Commit
majority quorum = (N/2)+1 leader has complete log term = election epoch
🔁 Replication Strategies
Single-leader (leader-follower): all writes go to the leader; replicated to followers. Simple, strong consistency possible with synchronous replication. Leader is a bottleneck and single point of failure. Used by PostgreSQL, MySQL, Kafka (partition leader), Redis Sentinel. Multi-leader: multiple nodes accept writes; sync changes asynchronously. Higher write throughput, lower latency for geo-distributed writes. Write conflicts are possible and must be resolved (last-write-wins, custom merge, CRDTs). Used by CockroachDB (within a region), multi-region active-active databases. Leaderless (Dynamo-style): any node accepts reads/writes. Quorum reads and writes (W + R > N for strong consistency). Conflicts resolved at read time via version vectors. Used by Cassandra, DynamoDB, Riak. Quorum: with N replicas, write to W nodes, read from R nodes. If W + R > N, at least one node in every read set overlaps every write set — guaranteeing you read the latest write. Tune W and R based on read/write ratio.
W + R > N = strong consistency multi-leader = conflict risk leaderless = no SPOF
💥 Failure Detection & Fault Tolerance
Timeouts are the only way to detect failures in asynchronous networks — there is no reliable "node is dead" signal. A slow response is indistinguishable from a crashed node until the timeout fires. Too short → false positives (healthy nodes marked dead); too long → slow failure detection. Heartbeats: nodes periodically send "I'm alive" signals. Missing heartbeats trigger failure suspicion. Raft, ZooKeeper, Kafka all use heartbeats with configurable timeouts. Phi Accrual Failure Detector (Cassandra, Akka): instead of a binary alive/dead, outputs a suspicion level (phi) that increases as heartbeats are missed. Callers choose a phi threshold for their use case — more nuanced than a hard timeout. Gossip protocol: nodes periodically exchange state with random peers; information propagates epidemically. Used for membership (Cassandra ring), failure detection, and metadata dissemination. Scales to thousands of nodes with O(log N) convergence time.
timeouts = only failure signal gossip = O(log N) propagation slow ≠ dead in async networks
📦 Distributed Transactions & Patterns
Two-Phase Commit (2PC): coordinator asks all participants to prepare (lock resources, write to WAL), then commits (or aborts) based on all votes. Guarantees atomicity across services. Fatal flaw: if the coordinator crashes after prepare but before commit, participants are blocked — holding locks — until the coordinator recovers. Blocking protocol. SAGA: a long-lived transaction decomposed into a sequence of local transactions, each with a compensating transaction (undo). If step N fails, steps N-1 through 1 are compensated in reverse. No distributed lock held across steps. Two flavors: choreography (services react to events) and orchestration (a central coordinator drives the sequence). Outbox pattern: instead of writing to a database AND publishing an event (dual write, risk of inconsistency), write both to the database atomically in a single local transaction using an outbox table. A separate relay process reads the outbox and publishes to the message broker — exactly once, in order.
Local DB write + Outbox row (atomic) Relay reads outbox Relay publishes event Mark outbox row processed
2PC = blocking on coordinator crash SAGA = compensations not rollback outbox = atomic dual write
Gotchas & Failure Modes
Partial failure is harder than total failure A completely failed node is easy to detect and handle. A node that responds slowly, drops 10% of messages, or returns incorrect results for some keys is far harder. Your system must assume that at any moment, some subset of nodes is in an unknown state: alive, dead, or Byzantine (returning incorrect data). Design for partial failure as the norm, not the exception. Cascading failures are almost always triggered by slow partial failures, not clean crashes.
You cannot rely on wall clocks for ordering in distributed systems System clocks on different machines can differ by milliseconds to seconds, even with NTP. NTP corrections can move clocks backward. A timestamp-based "latest write wins" conflict resolution will silently discard writes from machines with slightly fast clocks. Use logical clocks (Lamport timestamps) or vector clocks for causal ordering. The only exception is Google Spanner's TrueTime, which uses GPS and atomic clocks with bounded uncertainty — most systems don't have this.
Idempotency is harder than adding an idempotency key An idempotent operation produces the same result regardless of how many times it's called. The hard part: at-least-once message delivery means your consumer will process duplicate messages. Simply storing an idempotency key is not enough if checking and storing are not atomic with the operation. A race condition between two concurrent identical requests can result in both being processed. Use a database unique constraint on the idempotency key to make the check-and-store atomic — a second write will fail, not slip through.
Network partitions are routine — not edge cases Teams design for the happy path (all nodes reachable) and treat partitions as rare catastrophes. In practice, brief network partitions happen multiple times per day in any large system: a switch reboot, a BGP flap, a VM live migration, a cloud AZ hiccup. Design your system to handle partitions gracefully — degrade, queue, or return cached data — rather than assuming they won't happen. A system that works perfectly for 364 days but completely fails for hours on day 365 has an effective availability far below its theoretical uptime.
Leader election does not prevent split-brain without fencing When a leader crashes and a new leader is elected, the old leader may still be running (GC pause, network partition making it think it's still leader). If both leaders accept writes, you have split-brain — diverged state that may be impossible to reconcile. Fencing tokens (monotonically increasing lease numbers) prevent this: every write operation includes the current fence token; the storage layer rejects writes with an outdated token. The old leader's writes are rejected because its token is stale.
The thundering herd on cache expiry or leader failover When a cache entry expires, all requests that were being served from it simultaneously hit the database — the thundering herd. Similarly, when a leader fails over, all clients simultaneously try to reconnect and may overwhelm the new leader. Mitigation: probabilistic early expiration (refresh the cache slightly before TTL, randomly), request coalescing (only one goroutine fetches; others wait for the result), and exponential backoff with jitter on reconnects. Pure exponential backoff without jitter synchronizes retries — adding jitter spreads the load.
When to Use / When Not To
✓ Use Distributed Systems When
  • When a single machine cannot handle the load, storage, or availability requirements
  • When fault tolerance is required — a single node failure must not cause service outage
  • When data must be geographically distributed to reduce latency for global users
  • When horizontal scaling is needed — adding nodes should linearly increase capacity
✗ Don't Use Distributed Systems When
  • When a single machine is sufficient — distributed systems add complexity without benefit at small scale
  • When strong consistency is required and you cannot tolerate the latency or availability cost of coordination
  • When the team lacks the operational maturity to handle split-brain, partition handling, and clock skew
  • As a first choice — start with a well-understood single-node system and distribute only when you hit real limits
Quick Reference & Comparisons
🎯 Consistency Model Comparison
LinearizabilityEvery op appears atomic at a single point in real time. Reads always see the latest committed write. Requires global coordination. Highest latency. Used by: etcd, Zookeeper, Google Spanner, CockroachDB.
Sequential ConsistencyAll nodes agree on a global operation order, but that order doesn't have to match real time. Weaker than linearizability. Used in some CPU memory models.
Causal ConsistencyCausally related ops seen in causal order by all nodes. Concurrent ops may be seen in different order. Requires vector clocks or dependency tracking. Used by: MongoDB causal sessions, Amazon Aurora.
Read-your-writesAfter a client writes, it always sees that write in subsequent reads (from any replica). Not system-wide — only for the writing client. Often guaranteed via sticky sessions or session tokens.
Monotonic readsOnce a client reads a value, subsequent reads never return an older value. Prevents 'time going backwards' for a single client.
Eventual ConsistencyGiven no new updates, all replicas converge to the same value eventually. No bound on convergence time. No guarantees on intermediate reads. Used by: Cassandra (default), DynamoDB (default), DNS.
🗳️ Consensus Protocol Comparison
RaftDesigned for understandability. Single leader per term. Log-based. Used by: etcd, CockroachDB, TiKV, Consul. Leader election is simple; log replication is straightforward. Easiest to implement correctly.
Multi-PaxosOriginal consensus algorithm. More complex than Raft; many variants (Classic, Fast, Cheap). Leader-based after initial leader election. Used by: Google Chubby, older Zookeeper versions, many academic systems.
ZAB (Zookeeper Atomic Broadcast)Zookeeper's proprietary protocol. Similar to Paxos/Raft. Leader-based. Guarantees total order of updates. Primary difference: optimized for broadcast (one-to-many), not point-to-point consensus.
PBFT (Practical Byzantine Fault Tolerance)Tolerates f Byzantine (malicious/arbitrary) faults with 3f+1 nodes. Very expensive — O(n²) message complexity. Used in blockchain and financial systems requiring Byzantine fault tolerance.
Viewstamped ReplicationPredecessor to Raft/Paxos in some sense. View-based leadership. Less well-known but theoretically equivalent to Paxos. Historical significance; rarely used in new systems.
🔁 Replication & Quorum Reference
N (replication factor)Total number of replicas. Typically 3 (tolerates 1 failure) or 5 (tolerates 2 failures). Increasing N improves fault tolerance but increases write cost.
W (write quorum)Number of nodes that must acknowledge a write. W=1 is fastest, least durable. W=N is most durable but slowest.
R (read quorum)Number of nodes queried for a read; returns the most recent value. R=1 is fastest. R=N is slowest.
W + R > NStrong consistency: any read set overlaps any write set — guaranteed to include at least one node with the latest write. N=3, W=2, R=2 is common.
W + R ≤ NPossible to read stale data (no guaranteed overlap between write and read quorum). Trade consistency for availability or latency.
Hinted handoffIf a target replica is unavailable during a write, another node temporarily stores the write (hint) and delivers it when the target recovers. Improves write availability at the cost of temporary inconsistency.
🛡️ Resilience Patterns
Circuit BreakerAfter N consecutive failures to a downstream, open the circuit — stop calling the downstream and return a fast error. After a timeout, allow one probe request. If it succeeds, close the circuit. Prevents cascading failures and gives the downstream time to recover.
BulkheadIsolate resources (thread pools, connection pools, semaphores) per downstream dependency. One slow dependency fills its own pool, not the shared pool — other dependencies remain unaffected. Named after ship hull compartments.
Retry with exponential backoff + jitterRetry on transient failures with doubling intervals. Add random jitter to avoid synchronized retry storms from many clients. Cap at a maximum backoff. Only retry on idempotent operations or those with idempotency keys.
TimeoutEvery outbound call must have a timeout. Without timeouts, a single slow downstream causes threads/goroutines to accumulate until the calling service is exhausted. Set timeouts derived from SLA requirements, not arbitrary large values.
BackpressureSignal to producers to slow down when consumers are overwhelmed. TCP flow control is a network-level example. Application-level: bounded queues that block producers, reactive streams with demand signaling. Prefer backpressure over unbounded buffers that hide overload.
Idempotency keyClient-generated unique ID per logical operation. Server stores (idempotency_key, result). On duplicate, return stored result without re-executing. Enforced with a unique DB constraint on idempotency_key to prevent races.
💻 CLI Commands
Raft & etcd
etcdctl endpoint health --cluster # check all cluster members etcdctl endpoint status --cluster -w table # show leader and raft index etcdctl member list -w table # list cluster members etcdctl put /key value # write a key etcdctl get /key # read a key etcdctl watch /key # watch for changes etcdctl lease grant 60 # create a 60s TTL lease etcdctl put /lock 'holder' --lease= # distributed lock via lease
Distributed Debugging
# Check clock skew across nodes (should be < 100ms) for h in node1 node2 node3; do ssh $h 'date +%s%3N'; done # Network partition simulation with tc (Linux traffic control) tc qdisc add dev eth0 root netem delay 100ms # add 100ms latency tc qdisc add dev eth0 root netem loss 10% # simulate 10% packet loss tc qdisc del dev eth0 root # remove impairment # Check connection states ss -s # socket summary ss -tp state established # established TCP connections
Chaos Engineering
# Chaos Mesh (Kubernetes) — inject failures kubectl apply -f network-chaos.yaml # inject network partition kubectl apply -f pod-kill.yaml # randomly kill pods # Toxiproxy — proxy-based fault injection toxiproxy-cli create mydb -l 0.0.0.0:5433 -u db:5432 # create proxy toxiproxy-cli toxic add mydb -t latency -a latency=200 # add 200ms latency toxiproxy-cli toxic add mydb -t bandwidth -a rate=100 # limit to 100 KB/s toxiproxy-cli toxic remove mydb --toxic-name latency_upstream
CP vs AP Systems — Popular Databases & Their Trade-offs
System CAP Classification Consistency Model Replication Best For
etcd / Zookeeper CP Linearizable Raft / ZAB; single leader Distributed coordination, leader election, config
PostgreSQL (single) CP (single node) Serializable / RC Streaming (async) or sync OLTP requiring ACID; primary + replicas
CockroachDB CP Serializable (SSI) Raft per range; multi-region Geo-distributed SQL requiring strong consistency
Google Spanner CP Linearizable + external Paxos; TrueTime bounded skew Global ACID transactions with GPS clock bounds
Cassandra AP (tunable) Eventual (default) / strong (W+R>N) Leaderless; gossip; hinted handoff High-write, high-availability, time-series
DynamoDB AP (tunable) Eventual (default) / strong reads opt-in Multi-AZ; single-digit ms reads Serverless key-value at massive scale
MongoDB CP (majority write) Eventual (default) / causal sessions Replica set; Raft-based primary election Document store with flexible schema
Redis (Sentinel) AP (async replication) Eventual (replication lag) Async leader-follower Cache, sessions, rate limiting; not a primary store
Kafka CP (per partition) Linearizable within partition Leader + ISR replication Ordered event streaming; replay; fan-out
Interview Q & A
Senior Engineer — Execution Depth
S-01 Explain the CAP theorem. What does it actually mean for practical system design — and what does it not mean? Senior

CAP states that a distributed system can provide at most two of: Consistency (reads return the latest write), Availability (every request gets a non-error response), and Partition tolerance (the system works despite network partitions). What CAP actually means: since network partitions are unavoidable in real systems, the real choice is what to do when a partition occurs: sacrifice consistency (AP — return possibly stale data) or sacrifice availability (CP — return an error until the partition heals). What CAP does not mean: - It's not a permanent choice — a system can be CP during partitions and highly available the rest of the time. Partitions are rare; the PACELC model better captures the latency-vs-consistency trade-off under normal operation. - "Consistency" in CAP means linearizability, not ACID consistency. These are different. - "Available" in CAP means every node responds — not that the system is always reachable by clients. A load balancer routing around a downed node provides availability from the client's perspective even if CAP's strict definition isn't met.

Practical examples: - CP: etcd, ZooKeeper — return an error rather than stale data during partition - AP: Cassandra (default), DynamoDB — continue serving reads/writes during partition, reconcile conflicts afterward - Most SQL databases are effectively CP for single-node deployments

The CAP theorem is often misapplied as a binary framework. In reality, most systems make nuanced trade-offs: Cassandra lets you choose quorum level per operation, so the same cluster can be CP for critical writes (ALL) and AP for non-critical reads (ONE). A more useful mental model is the consistency spectrum: what is the maximum staleness you can tolerate, and what is the cost (in latency, availability, and complexity) to reduce it? Strong consistency across a WAN link costs 100ms+ round trips per write — often unacceptable for user-facing writes. For those cases, design for causal consistency with conflict resolution rather than linearizability.
S-02 What is the difference between linearizability, causal consistency, and eventual consistency? Give a concrete example where the wrong choice causes a bug. Senior
Linearizability: every operation appears to execute atomically at a single point in real time. If a write completes at time T, all reads after T see the new value — globally, on any node. The strongest model; requires coordination. Causal consistency: operations related by causality are seen in causal order. "If you see my comment on a post, you see the post itself." Concurrent (independent) operations can be seen in different order by different nodes. No global coordination needed. Eventual consistency: replicas will converge given no new updates — but any individual read may return stale data. No ordering guarantees for concurrent operations. Bug caused by wrong consistency choice: Consider a social network using eventual consistency for profile reads. User Alice updates her privacy setting to "friends only" for her posts. Immediately after, she posts a private message. Due to replication lag: 1. The privacy setting update has not yet propagated to a read replica. 2. A request to that replica checks Alice's privacy setting — still sees "public." 3. Alice's private post is served to a stranger. The root cause: the privacy check and the post read must be causally ordered — if you read a post, you must see all privacy settings written before it. Causal consistency (or linearizability) would prevent this. Eventual consistency cannot.
The correct consistency model is determined by the business invariant, not by what's easy to implement. Map your invariants first: "a user can never see data they're not authorized to see" requires consistency between the authorization check and the data read. "Show the user's shopping cart total" can tolerate eventual consistency — a 100ms stale total is acceptable. Many teams default to eventual consistency cluster-wide and then add ad-hoc fixes for violations, resulting in a patchwork of inconsistent workarounds. A deliberate consistency model per data type (using a consistent store for auth/permissions, eventually consistent for counters and feeds) is cleaner and more auditable.
S-03 How does the Raft consensus algorithm work? Walk through leader election and log replication. Senior
Raft nodes are in one of three states: follower, candidate, or leader. Leader election: Followers wait for heartbeats from the leader. If the election timeout passes (randomized 150–300ms) without a heartbeat, the follower becomes a candidate, increments its term, votes for itself, and sends RequestVote RPCs to all other nodes. A candidate wins if it receives votes from a majority ((N/2)+1) of nodes. A node only votes for a candidate whose log is at least as up-to-date as its own (prevents electing a node with stale log). Randomized timeouts prevent split votes — nodes start elections at slightly different times. Log replication: The leader receives client writes, appends them to its log as a new entry, and sends AppendEntries RPCs to all followers. An entry is committed when the leader receives acknowledgment from a majority of nodes. The leader then applies the entry to its state machine and responds to the client. Future AppendEntries (including heartbeats) inform followers of committed entries, which they apply to their state machines. Safety guarantees: - At most one leader per term (election majority) - A committed entry is never lost (only a node with the complete committed log can win an election) - Entries are applied in log order (state machine safety) Raft is not Byzantine-fault-tolerant — it assumes nodes are honest but may crash.
Raft's linearizability guarantee requires that every read also goes through the leader (or that the leader confirms it's still the leader via a heartbeat quorum before serving a read). If followers serve reads directly without this check, a partitioned follower may serve stale data while the rest of the cluster has moved on. This is the subtle difference between "Raft cluster" (linearizable) and "Raft cluster with follower reads" (potentially stale). etcd, for example, provides a --consistency=linearizable flag that sends reads through the leader and a --consistency=serializable flag for fast follower reads with possible staleness. Know which mode your system uses.
S-04 What is two-phase commit (2PC) and what are its failure modes? Why is it rarely used in microservices? Senior
2PC coordinates an atomic transaction across multiple participants (databases, services): Phase 1 — Prepare: the coordinator asks each participant to prepare (lock resources, write to WAL). Each participant responds with "yes" (ready to commit) or "no" (abort). Phase 2 — Commit or Abort: if all participants voted yes, the coordinator sends "commit"; otherwise sends "abort." Participants execute the decision. Failure modes: - Coordinator crashes after prepare, before commit: participants are in the "prepared" state — locks held, transaction neither committed nor aborted. They're blocked until the coordinator recovers. This is the blocking problem — 2PC is a blocking protocol. - Participant crashes after prepare: when it recovers, it reads its WAL and asks the coordinator for the decision. If the coordinator also crashed, nobody knows the decision. - Network partition between coordinator and participant: same blocking scenario. Why microservices avoid 2PC: - It requires participants to hold locks across a network round-trip — high latency window where failures cause indefinite blocking - Tightly couples all participating services — they must all be available for the transaction to complete - Does not compose well with HTTP APIs — 2PC requires a stateful protocol participants must support - Modern distributed systems prefer SAGA (compensating transactions) or outbox pattern for atomicity
2PC is not inherently wrong — it's used extensively within single database systems (most databases use an internal 2PC-like protocol for distributed indexes or partitions). Its problems emerge across service boundaries, where the coordinator is a network-separated orchestrator rather than the same process as the storage engine. The three-phase commit (3PC) was designed to resolve 2PC's blocking problem by adding a pre-commit phase that allows participants to infer the coordinator's decision — but 3PC is almost never used in practice because it's more complex and still not fully non-blocking under partition. The practical answer for cross-service atomicity is SAGA + idempotent compensations.
S-05 What is the SAGA pattern? Compare choreography vs orchestration SAGAs. What makes compensations tricky? Senior
A SAGA decomposes a long-lived transaction into a sequence of local transactions, each atomic within one service. If a step fails, previously completed steps are compensated (undone) with dedicated compensating transactions — not rolled back (no distributed lock). Choreography: each service publishes events and reacts to events from other services. No central coordinator. Loosely coupled — services don't know about each other's internals. Hard to understand the overall flow; debugging requires tracing events across services. Good for simple, linear flows with few participants. Orchestration: a central saga orchestrator (a dedicated service or workflow engine like Temporal, AWS Step Functions) drives the sequence by calling services in order and deciding what to do on failure. Easier to understand flow; the orchestrator is a potential bottleneck and single point of failure (mitigated by making it stateful and replicated). Good for complex flows with many branches and error paths. What makes compensations tricky: - Compensations must be idempotent — they may be called multiple times if the saga retries. - Compensations cannot always fully undo — a sent email, a charged card, or a shipped package cannot be "rolled back." Compensations must handle semantic undos (refund, cancel notification). - Pivot transaction: the step after which it's too late to cancel (e.g., payment captured). Design the saga so the pivot is as late as possible. - Concurrent modifications: if another saga or user modifies the data between the original step and its compensation, the compensation may conflict. Use versioning or optimistic locking.
Sagas introduce ACI (no Isolation) — other transactions can observe intermediate saga states. A customer's order goes through: pending → payment captured → inventory reserved → shipped. Another query during this sequence may see the order in an in-between state. Mitigate with semantic locks (a status field that marks the record as "in-progress"), countermeasures (fail queries that see in-progress records), or by designing the domain model so partial states are meaningful and visible (pending is a valid, displayable order state). Don't promise isolation the saga pattern cannot deliver — design the domain to tolerate visible intermediate states.
S-06 What is the dual-write problem and how does the outbox pattern solve it? Senior
The dual-write problem: a service needs to write to its database AND publish an event to a message broker atomically. These are two separate systems with no distributed transaction support. Three failure scenarios: 1. DB write succeeds, broker publish fails → state change not propagated → downstream services miss the event. 2. Broker publish succeeds, DB write fails → downstream processes an event for a change that didn't happen. 3. Process crashes between DB write and broker publish → same as scenario 1. Outbox pattern solves this by making both writes local: 1. Within a single local database transaction, write the business record AND an outbox table row (the event payload). 2. A separate relay process (or CDC via Debezium) reads unprocessed outbox rows and publishes them to the broker. 3. Mark rows processed (or delete them) after successful publish. The relay publishes at-least-once — retries on failure mean the consumer must be idempotent. The database transaction is the unit of atomicity — either both the business record and the outbox row are written, or neither is. No distributed transaction needed.
The relay process can be implemented two ways: polling (a loop that queries the outbox table for unprocessed rows at an interval) or CDC (Change Data Capture with Debezium reading the database's transaction log). Polling is simple but adds latency (up to the poll interval) and database load. CDC is real-time and zero-polling-overhead but requires the database to have logical replication enabled (wal_level = logical for PostgreSQL) and adds operational complexity (Kafka Connect + Debezium). For most teams, polling with a short interval (1 s) is sufficient and simpler to operate. CDC is worth the complexity when sub-second event latency is required or when the outbox table grows very large.
S-07 How do logical clocks (Lamport timestamps) and vector clocks work? When do you need them? Senior
The problem: wall clocks on different machines can't be trusted for event ordering. Even with NTP, clocks differ by milliseconds, and NTP corrections can jump clocks backward. Lamport timestamps: each node maintains a counter. On any event, increment the counter. On sending a message, include the counter. On receiving a message, set counter = max(local, received) + 1. Lamport timestamps guarantee: if event A happened before event B (causally), then timestamp(A) < timestamp(B). But the converse is not true — timestamp(A) < timestamp(B) does not mean A happened before B. They establish a partial order. Vector clocks: each node maintains a vector of counters, one per node ([N1: 3, N2: 1, N3: 2]). On event: increment own entry. On send: include vector. On receive: take element-wise max, then increment own entry. Two events are concurrent if neither vector dominates the other. This detects causality precisely — you can tell if two events are causally related or truly concurrent (and thus may conflict). Used by Dynamo, Riak for conflict detection. When you need them: - Detecting write conflicts in leaderless replication (were these two writes causally related or concurrent? If concurrent, they conflict and need resolution) - Event ordering in distributed logs where wall-clock ordering is unreliable - Debugging distributed systems (causal trace of which event caused which)
Version vectors (a variant of vector clocks tracked per key rather than per node) are the practical implementation in systems like Dynamo. When two replicas disagree on a value, the version vectors tell you whether one causally supersedes the other (take the newer one) or whether they're concurrent (a conflict that needs application-level resolution — last write wins, merge, or present both to the user). The cost of vector clocks grows with the number of writers — in a system with thousands of clients all writing to the same key, the vector grows unboundedly. Dotted version vectors solve this by tracking writes per server rather than per client. Understanding this trade-off matters when designing high-write leaderless systems.
Staff Engineer — Design & Cross-System Thinking
ST-01 How do you design an idempotent payment API? What makes idempotency hard in distributed systems? Staff
An idempotent API returns the same result for the same logical request, regardless of how many times it's called. For payments, the client generates a unique idempotency key per payment attempt and includes it in every retry. Server-side implementation: sql CREATE TABLE idempotency_keys ( key VARCHAR PRIMARY KEY, -- unique constraint is the atomic check response JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() ); On receipt of a request with key K: 1. INSERT INTO idempotency_keys (key, response) VALUES (K, NULL) ON CONFLICT DO NOTHING returns whether this is a new or duplicate request. 2. If new: process the payment, then update response with the result. 3. If duplicate: return the stored response immediately. What makes idempotency hard: - Race condition: two identical requests arrive simultaneously. Both check "is this key new?" before either inserts. Both see "new" and both process the payment. The unique constraint prevents this — only one INSERT succeeds; the other gets a conflict and waits. - Partial failure: request is processed, payment charged, but the response is lost before being stored. On retry, the key insert succeeds (new key), payment is charged again. Fix: write the key row before charging (response=NULL), then update after. If the process crashes mid-charge, the key row exists with response=NULL — a sentinel that indicates an in-progress or failed payment requiring manual review. - Idempotency key scope: a key reused across different operations (different amounts, different merchants) must be rejected. Validate that the key's associated request matches the current request parameters.
Idempotency keys have a finite retention window — typically 24 hours to 7 days. After that, the same key may be reused safely (original request is long expired). Don't retain keys forever — it's a slow-growing table that eventually becomes a performance problem. A more subtle issue: idempotency at the API layer doesn't automatically make downstream calls idempotent. If the payment API calls a card processor, the processor call must also be idempotent — pass the idempotency key through to the processor. Stripe's API, for example, accepts Idempotency-Key headers that are propagated to their internal payment engine. Design idempotency end-to-end through every hop, not just at the entry point.
ST-02 How do you design a distributed rate limiter that works across multiple service instances? Staff

A single-instance rate limiter (token bucket or fixed window in memory) doesn't work when the service scales horizontally — each instance has its own counter, so the effective rate limit is limit × instance_count. Redis-based token bucket: Use a Redis key per (user_id, window) with atomic Lua script to check and decrement the counter in a single round-trip. Redis's single-threaded command execution makes the check-and-decrement atomic without a distributed lock. lua local current = redis.call('GET', KEYS[1]) if current and tonumber(current) >= tonumber(ARGV[1]) then return 0 -- rate limit exceeded end redis.call('INCR', KEYS[1]) redis.call('EXPIRE', KEYS[1], ARGV[2]) return 1 -- allowed Sliding window log: store timestamps of each request in a sorted set. Count entries in the window [now - window, now]. More accurate than fixed windows but higher memory usage. Trade-offs: - Redis becomes a synchronous dependency on every request — it must be highly available and low-latency (<1ms). Use Redis Cluster for HA. - Network round-trip to Redis adds latency. For latency-critical paths, use a local approximate counter (token bucket with periodic Redis sync) — accepts slight over-limit for reduced latency. - Redis failure: fail-open (allow all requests) or fail-closed (reject all). Fail-open is usually correct — a rate limiter outage shouldn't take down the service.

Sliding window with approximate local counting: each instance tracks a local counter and periodically reports to Redis. Redis aggregates. Local counter is the rate limit minus the reported counts from other instances. Allows requests without a Redis round-trip most of the time; syncs periodically. Trades exactness for performance.

Rate limiting decisions reveal your consistency-vs-availability trade-offs in microcosm. A strict rate limit (exactly N requests per second, no more) requires linearizable counter updates — expensive. An approximate rate limit (N ± ε requests per second, with ε proportional to instance count and sync frequency) is available and fast. For most rate limiting use cases (bot protection, cost control, fairness between tenants), approximate is entirely acceptable. For billing-critical rate limits (API tiers where overage is charged differently), exact counting with Redis is worth the cost. Make the precision requirement explicit rather than defaulting to exact counting everywhere.
ST-03 How do you handle and recover from split-brain in a distributed system with leader election? Staff
Split-brain occurs when a network partition separates the cluster into groups, each of which elects its own leader. Both leaders accept writes — the cluster has two masters with diverging state. Reconciliation is expensive or impossible for some data types. Prevention — quorum-based election: a leader is only valid if it has a majority of votes. In a 3-node cluster partitioned 2+1, only the majority side (2) can elect a leader. The minority side (1) cannot reach quorum and stops accepting writes. This is the CP trade-off — the minority side becomes unavailable. Fencing tokens: each leadership term has a monotonically increasing token. Every write to the storage layer includes the current token. The storage layer rejects writes with an outdated token. If a partitioned old leader recovers and tries to write with token 5 while the new leader has token 6, the writes are rejected. The old leader discovers it's no longer the leader via rejection. Lease-based leadership: the leader holds a time-limited lease. It cannot accept writes after the lease expires. Even if the leader can't communicate with the cluster (partition), it stops writing when the lease expires, giving the cluster time to elect a new leader whose lease starts after the old one expired. etcd's leader leases work this way. Recovery from split-brain (if it occurs despite prevention): compare the two diverged logs. The Raft algorithm's log comparison rejects candidates without the complete committed log — in practice, only the majority side has committed entries, and the minority side's uncommitted writes are discarded on rejoin.
Fencing tokens are not universally implemented — many systems rely only on quorum without storage-layer enforcement. This is a risk: a GC-paused leader that resumes after a new leader is elected may succeed in writing to the storage layer if the storage layer doesn't enforce fencing. The result is silent data corruption that may not be detected until a read compares diverged states. When designing systems on top of distributed databases, verify whether the database provides fencing (etcd's lease mechanism does; a plain Redis SET does not). For systems using Redis for distributed locking, the Redlock algorithm's safety properties are contested — Martin Kleppmann's critique is worth reading before depending on it for critical coordination.
ST-04 How does consistent hashing work and why do systems like Cassandra and DynamoDB use it? Staff
The problem with naive sharding: if you assign keys to nodes by hash(key) % N, adding or removing a node changes N, which remaps almost every key — requires massive data redistribution. Consistent hashing: arrange a "ring" of hash values from 0 to 2³²-1. Place each node at one or more points on the ring (by hashing the node's ID). To find which node owns a key, hash the key and walk the ring clockwise to the first node. Adding a node places it on the ring — only the keys between it and its predecessor move to the new node (~1/N of total keys). Removing a node moves only that node's keys to its successor. Virtual nodes (vnodes): instead of placing each physical node once on the ring, place it at V points (vnodes, typically 128–256 per node). This: - Evenly distributes load even with heterogeneous or few nodes - Makes rebalancing more granular — a new node picks up vnodes from many existing nodes, distributing the transfer load - Allows different nodes to hold different numbers of vnodes (weighted by capacity) Replication with consistent hashing: a key is assigned to the N clockwise successor nodes. For Cassandra with replication_factor = 3, each key lives on 3 consecutive nodes on the ring. If one node fails, reads and writes fall to the remaining 2 (or 1, depending on quorum settings).
Consistent hashing elegantly solves data distribution at the cost of uneven load under skewed key distributions. If many requests hit the same key range (a hot partition), the node responsible for that range is overwhelmed regardless of how many other nodes exist. DynamoDB's adaptive capacity and Cassandra's ByteOrderedPartitioner (BOP) vs Murmur3Partitioner represent two different answers to this: BOP allows range scans but causes hot partitions on sequential keys; Murmur3 distributes randomly (no range scans) but prevents hot partitions. Choosing a partition key that distributes load evenly is a data modeling discipline, not just a configuration choice.
Principal Engineer — Architecture & Org-Scale Thinking
P-01 How do you design a globally distributed data system that serves users in multiple regions with low latency reads, strong consistency for financial data, and high availability? Principal

This is a multi-constraint problem: low read latency (local reads), strong consistency (no stale financial data), and high availability (no single region failure takes you down). These constraints are partially in tension — strong consistency requires cross-region coordination which adds latency. Tiered consistency architecture: Tier 1 — Strongly consistent (financial data: balances, transactions): - Use a globally distributed SQL database (Google Spanner or CockroachDB) with external consistency via TrueTime / HLC (Hybrid Logical Clocks). - Writes go to a global leader (or are multi-leader with serializable isolation). Latency is bounded by the speed of light + algorithm overhead (~30–100ms cross-region). - Accept that writes are slower; financial writes are lower frequency than reads. - For reads: use stale reads (bounded staleness) for user-facing balance displays — returning data up to 10 seconds stale is acceptable for a balance page. Use linearizable reads only for the atomic check-then-debit operations.

Tier 2 — Causally consistent (user profile, preferences, non-financial state): - Replicate to all regions with causal ordering. Users always see their own writes. - Read from local region; writes propagate asynchronously. Conflicts resolved by last-writer-wins or CRDTs (counters, sets).

Tier 3 — Eventual consistency (feeds, analytics, leaderboards): - Replicate asynchronously. Seconds of staleness acceptable. Read from local region. Multi-region HA: - Active-active for AP tiers; writes go to the nearest region. - Active-active for the CP tier is possible with multi-region Paxos groups (Spanner's model), with the latency cost paid on each write. - Circuit breakers between regions: if region A can't reach region B's consensus group, fail writes rather than silently accept stale reads that violate consistency guarantees.

The hardest part of this design is not the technology — it's defining the consistency requirement per data type precisely enough to make technology choices. "Financial data must be strongly consistent" sounds clear, but what about the user's transaction history display? Must it be linearizable, or is 10-second bounded staleness acceptable? The answer changes the architecture significantly: linearizable history reads require routing to the Spanner leader; bounded-stale reads can be served from a local replica. Map out every read and write operation, its consistency requirement, and its expected frequency. The resulting matrix drives the data tier placement. Teams that skip this mapping and apply uniform strong consistency to everything pay the latency cost for operations that didn't need it.
P-02 How do you approach chaos engineering for a complex distributed system? What failure modes are most commonly missed? Principal
Chaos engineering is the practice of deliberately injecting failures to discover system weaknesses before they manifest as incidents. Structured approach: 1. Define the steady state: what does "healthy" look like? Success rate, p99 latency, error rate, key business metric (orders per minute). Chaos experiments verify that the steady state holds under failure, not that services stay running. 2. Start with known failure modes: before random chaos, exhaustively test expected failures: single-node crashes, single-AZ failure, network latency injection, dependency timeouts, disk full. These should be in your runbooks — verify the runbooks are correct. 3. Hypothesis-driven experiments: "If Kafka broker 2 goes down, consumer lag should not exceed 30 seconds and no messages should be lost." Run the experiment, measure the outcome, compare against the hypothesis. A passing experiment builds confidence; a failing one reveals a weakness. 4. Increase scope gradually: start in dev/staging, then production during low-traffic periods, then during peak traffic. GameDay events involve the full team practicing response. Commonly missed failure modes: - Slow dependencies (not down, just slow): a 2-second response from a downstream is worse than a crash — threads accumulate waiting, cascading to the caller. Test with latency injection, not just kill injection. - Partial availability: one instance of a downstream is healthy; others are not. Load balancers may route some requests to the bad instance — circuit breakers must act per-instance, not per-service. - Clock skew: inject clock drift and verify that Lamport/vector clock logic and lease-based systems still function correctly. - Resource exhaustion: file descriptors, thread pool, connection pool — not just CPU and memory. - Data corruption at the boundary: malformed messages in Kafka, corrupted DB rows. Does your consumer handle them? Or does one bad message halt all processing? - Thundering herd after recovery: a dependency comes back online — do all instances simultaneously flood it? Test restart behavior, not just failure behavior.
The organizational challenge of chaos engineering is greater than the technical one. Engineers are reluctant to break production. Leaders are reluctant to authorize it. The path forward is making the risk explicit: "we are going to experience these failures — the question is whether we discover them in a controlled experiment or during an incident at 2 AM." Start with low-risk experiments (kill one pod behind an HPA, inject 50ms latency to a non-critical service). Build confidence gradually. Establish a chaos experiment review process: proposed experiment → hypothesis → approval → execution → results → blameless post-mortem if something unexpected happens. Tools: Chaos Mesh (Kubernetes), Toxiproxy (network faults), Gremlin (managed chaos), AWS Fault Injection Simulator. The tools are secondary — the discipline of forming hypotheses and measuring outcomes is what makes chaos engineering valuable.
System Design Scenarios
Designing an Eventually Consistent Order System with SAGA
Problem
Design the backend for an e-commerce order placement flow that spans three services: Order Service, Inventory Service, and Payment Service. An order must atomically reserve inventory AND charge the customer. Either both succeed or neither does. The system handles 5,000 orders per minute at peak.
Constraints
  • No distributed transactions (2PC) — services are independently deployable
  • A failed payment must release the inventory reservation
  • A customer can never be charged without inventory being confirmed reserved
  • The system must remain available if one service is temporarily down
Key Discussion Points
  • SAGA with orchestration: an Order Saga Orchestrator (implemented with Temporal or a state machine in the Order Service) drives the sequence: (1) Reserve Inventory → (2) Charge Payment → (3) Confirm Order. The orchestrator is a durable workflow — if it crashes, it resumes from the last committed step.
  • Step ordering matters — charge after reserve: the invariant 'never charge without inventory' dictates that inventory reservation must complete before the payment charge. If inventory fails, no charge is attempted. If payment fails after reservation, the compensation is to release the inventory reservation.
  • Compensating transactions: each step has a compensation. Reserve Inventory → Cancel Reservation. Charge Payment → Issue Refund. Compensations must be idempotent — the orchestrator may retry them. The refund must be issued even if the payment processor is temporarily unavailable — queue it with at-least-once delivery.
  • Outbox pattern for event publishing: after the Order Service writes the new order to its database, it publishes a OrderCreated event via the outbox pattern. The Inventory Service and Payment Service are triggered by events — not by synchronous calls from the orchestrator. This decouples availability: if the Inventory Service is down, the event waits in the queue.
  • Idempotency at every step: the orchestrator retries steps on timeout. Every service operation must be idempotent using an idempotency key derived from the order ID + step name. A duplicate ReserveInventory for order-123 returns the cached reservation result, not a double-reservation.
  • Visibility and observability: the order's saga state (pending, inventory-reserved, payment-captured, confirmed, compensating, failed) is stored in the Order Service database and exposed via API. Support teams can inspect why an order is stuck and manually trigger compensations if needed.
🚩 Red Flags
  • Charging before confirming inventory — violates the core invariant; customer charged for unavailable item
  • Non-idempotent compensation — a double-refund issued because the orchestrator retried a timed-out refund call
  • Synchronous calls between services without circuit breakers — one slow service causes the entire order flow to back up
  • No durable saga state — orchestrator crash loses all in-progress orders; they're stuck with no recovery path
  • Using 2PC across Order, Inventory, and Payment services — tightly couples their deployments and blocks on any one service being slow
Building a Distributed Leaderboard with High Write Throughput
Problem
Design a real-time leaderboard for a mobile game with 10 million active users. User scores are updated on every game completion (average 3 completions per user per hour at peak). The leaderboard must show the top 100 players globally and each user's own rank. Reads must be under 50ms; write throughput must sustain 30,000 updates/second at peak.
Constraints
  • Global leaderboard rank for any user on demand (not just top 100)
  • Reads: p99 < 50ms; writes: sustained 30K/s
  • Score updates are monotonically increasing (scores never decrease)
  • Eventual consistency acceptable: leaderboard can be up to 10 seconds stale
Key Discussion Points
  • Redis Sorted Set (ZSET) as the leaderboard store: ZADD leaderboard <score> <user_id> for updates; ZREVRANK leaderboard <user_id> for a user's rank; ZREVRANGE leaderboard 0 99 WITHSCORES for the top 100. ZADD is O(log N); ZREVRANK is O(log N). Redis single-threaded throughput handles 30K writes/second on a single instance.
  • Write path — Kafka as a buffer: game servers publish score events to Kafka. A consumer group reads and applies ZADD to Redis. Kafka absorbs burst writes and provides durability. If Redis is temporarily unavailable, events queue in Kafka. The 10-second staleness budget easily accommodates this buffering.
  • Redis Cluster for scale: one Redis instance handles 30K ops/s but its memory is bounded. With 10M users and 8 bytes per entry, the ZSET is ~80 MB — fits on one instance. At 100M users (~800 MB), shard with Redis Cluster: hash user_id to a slot, route to the appropriate shard. Rank computation across shards is harder — requires merging.
  • Read path — Redis replica for top-100: the top-100 list is read-heavy. Serve from Redis read replicas. Cache in application memory (refresh every 5 s) to further reduce Redis load. User's own rank is served from the primary (or replica with acceptable staleness).
  • Sharded global rank for very large scale: at 1B users, a single ZSET is impractical. Shard by score range (0–1000, 1001–2000, etc.). To find a user's global rank: (1) find which shard their score falls in; (2) count all users in higher-score shards; (3) add local rank within their shard. Pre-compute shard sizes periodically. Approximate rank is acceptable given the 10-second staleness budget.
  • Monotonic score updates — simplification: since scores never decrease, ZADD with the new score is always correct. No need to check if the new score is higher — the game server guarantees it. This eliminates a read-before-write race condition.
🚩 Red Flags
  • Using a relational database with ORDER BY score DESC LIMIT 100 — table scans on 10M rows at 30K writes/s will collapse the database
  • Computing rank with SELECT COUNT(*) WHERE score > :user_score on every rank request — O(N) per rank query kills the database
  • Writing directly from game servers to Redis without a queue — a burst of writes during a tournament can overwhelm Redis; no durability if Redis is unavailable
  • Single Redis instance with no replica — Redis failure takes down the entire leaderboard
  • Strong consistency requirement on ranks — a user seeing their rank fluctuate by ±5 positions in real time (due to concurrent updates) is acceptable; designing for exact real-time rank wastes resources
Diagnosing and Recovering from a Cascading Failure
Problem
A production system serving 500K requests/minute begins degrading. The alert fires: p99 latency climbs from 80ms to 8 seconds; error rate goes from 0.1% to 35% over 5 minutes. The system consists of an API gateway → 6 microservices → PostgreSQL + Redis + Kafka. Walk through your diagnosis and recovery approach.
Constraints
  • Cascading failure — unclear which service is the origin
  • User impact is ongoing — recovery speed matters
  • The system has distributed tracing (Jaeger) and structured logs
Key Discussion Points
  • Start with blast radius, not root cause: identify which services and endpoints are failing (the 35% error rate — which endpoints? which services?). Check the service map in Jaeger for error clustering. Cascading failures have an origin; the origin usually shows the longest latency and the highest upstream error rate.
  • Check the saturation signals: CPU, memory, connection pool exhaustion, thread pool saturation, GC pause time. A cascading failure is almost always triggered by resource exhaustion in one service — threads accumulate waiting for a slow downstream, the thread pool fills, new requests fail immediately. Look for thread pool at 100% or connection pool exhausted metrics.
  • Check the dependency that's slow, not dead: dead dependencies trigger circuit breakers; slow ones don't. If PostgreSQL is responding in 5 seconds instead of 10ms, all services that call it have threads blocked waiting. The services themselves look unhealthy, but the root cause is the database. Look for db_query_duration_p99 spikes.
  • Shed load immediately if overloaded: if the API gateway has load shedding configured (rate limit or queue depth limit), activate it. Reducing incoming request rate gives the system headroom to drain its queues. A system under overload cannot recover if new load keeps arriving at the same rate.
  • Isolate the origin service: once identified (e.g., PostgreSQL slow queries due to a missing index after a data growth spike), isolate the blast radius. If one endpoint is causing the database overload (a new slow query deployed in the last hour), kill or rate-limit that endpoint at the gateway. Let the rest of the system recover.
  • Recovery sequence: (1) reduce load; (2) fix or isolate the origin; (3) allow services to drain their backlogs; (4) monitor recovery metrics (thread pool utilization, connection pool, request queue depth) to confirm they're decreasing; (5) gradually restore full traffic. Do not restore full traffic all at once — a recovering system can be re-overwhelmed by sudden load.
  • Post-incident: the cascading failure happened because circuit breakers weren't configured (or thresholds were too high), bulkheads didn't isolate the failing dependency's thread pool, or load shedding wasn't enabled. The fix is not just resolving the immediate root cause — it's adding the missing resilience patterns so the next slow downstream causes a graceful degradation, not a cascade.
🚩 Red Flags
  • Chasing root cause before stopping the bleeding — identify and mitigate the blast radius first, diagnose after load is reduced
  • Restarting services without reducing incoming load — they come back up and immediately fill their thread pools again
  • Restoring full traffic at once after partial recovery — a system that just recovered is fragile; gradual ramp is safer
  • No circuit breakers on downstream calls — a slow dependency blocks threads in every upstream caller indefinitely until they're exhausted
  • No per-dependency thread pool isolation (bulkhead) — one slow downstream fills the shared thread pool and makes the service unresponsive to requests that don't even touch that downstream