// CAP · consistency models · consensus · replication · distributed transactions · fault tolerance · senior → principal
W + R > N for strong consistency). Conflicts resolved at read time via version vectors. Used by Cassandra, DynamoDB, Riak.
Quorum: with N replicas, write to W nodes, read from R nodes. If W + R > N, at least one node in every read set overlaps every write set — guaranteeing you read the latest write. Tune W and R based on read/write ratio.
| Linearizability | Every op appears atomic at a single point in real time. Reads always see the latest committed write. Requires global coordination. Highest latency. Used by: etcd, Zookeeper, Google Spanner, CockroachDB. |
| Sequential Consistency | All nodes agree on a global operation order, but that order doesn't have to match real time. Weaker than linearizability. Used in some CPU memory models. |
| Causal Consistency | Causally related ops seen in causal order by all nodes. Concurrent ops may be seen in different order. Requires vector clocks or dependency tracking. Used by: MongoDB causal sessions, Amazon Aurora. |
| Read-your-writes | After a client writes, it always sees that write in subsequent reads (from any replica). Not system-wide — only for the writing client. Often guaranteed via sticky sessions or session tokens. |
| Monotonic reads | Once a client reads a value, subsequent reads never return an older value. Prevents 'time going backwards' for a single client. |
| Eventual Consistency | Given no new updates, all replicas converge to the same value eventually. No bound on convergence time. No guarantees on intermediate reads. Used by: Cassandra (default), DynamoDB (default), DNS. |
| Raft | Designed for understandability. Single leader per term. Log-based. Used by: etcd, CockroachDB, TiKV, Consul. Leader election is simple; log replication is straightforward. Easiest to implement correctly. |
| Multi-Paxos | Original consensus algorithm. More complex than Raft; many variants (Classic, Fast, Cheap). Leader-based after initial leader election. Used by: Google Chubby, older Zookeeper versions, many academic systems. |
| ZAB (Zookeeper Atomic Broadcast) | Zookeeper's proprietary protocol. Similar to Paxos/Raft. Leader-based. Guarantees total order of updates. Primary difference: optimized for broadcast (one-to-many), not point-to-point consensus. |
| PBFT (Practical Byzantine Fault Tolerance) | Tolerates f Byzantine (malicious/arbitrary) faults with 3f+1 nodes. Very expensive — O(n²) message complexity. Used in blockchain and financial systems requiring Byzantine fault tolerance. |
| Viewstamped Replication | Predecessor to Raft/Paxos in some sense. View-based leadership. Less well-known but theoretically equivalent to Paxos. Historical significance; rarely used in new systems. |
| N (replication factor) | Total number of replicas. Typically 3 (tolerates 1 failure) or 5 (tolerates 2 failures). Increasing N improves fault tolerance but increases write cost. |
| W (write quorum) | Number of nodes that must acknowledge a write. W=1 is fastest, least durable. W=N is most durable but slowest. |
| R (read quorum) | Number of nodes queried for a read; returns the most recent value. R=1 is fastest. R=N is slowest. |
| W + R > N | Strong consistency: any read set overlaps any write set — guaranteed to include at least one node with the latest write. N=3, W=2, R=2 is common. |
| W + R ≤ N | Possible to read stale data (no guaranteed overlap between write and read quorum). Trade consistency for availability or latency. |
| Hinted handoff | If a target replica is unavailable during a write, another node temporarily stores the write (hint) and delivers it when the target recovers. Improves write availability at the cost of temporary inconsistency. |
| Circuit Breaker | After N consecutive failures to a downstream, open the circuit — stop calling the downstream and return a fast error. After a timeout, allow one probe request. If it succeeds, close the circuit. Prevents cascading failures and gives the downstream time to recover. |
| Bulkhead | Isolate resources (thread pools, connection pools, semaphores) per downstream dependency. One slow dependency fills its own pool, not the shared pool — other dependencies remain unaffected. Named after ship hull compartments. |
| Retry with exponential backoff + jitter | Retry on transient failures with doubling intervals. Add random jitter to avoid synchronized retry storms from many clients. Cap at a maximum backoff. Only retry on idempotent operations or those with idempotency keys. |
| Timeout | Every outbound call must have a timeout. Without timeouts, a single slow downstream causes threads/goroutines to accumulate until the calling service is exhausted. Set timeouts derived from SLA requirements, not arbitrary large values. |
| Backpressure | Signal to producers to slow down when consumers are overwhelmed. TCP flow control is a network-level example. Application-level: bounded queues that block producers, reactive streams with demand signaling. Prefer backpressure over unbounded buffers that hide overload. |
| Idempotency key | Client-generated unique ID per logical operation. Server stores (idempotency_key, result). On duplicate, return stored result without re-executing. Enforced with a unique DB constraint on idempotency_key to prevent races. |
| System | CAP Classification | Consistency Model | Replication | Best For |
|---|---|---|---|---|
| etcd / Zookeeper | CP | Linearizable | Raft / ZAB; single leader | Distributed coordination, leader election, config |
| PostgreSQL (single) | CP (single node) | Serializable / RC | Streaming (async) or sync | OLTP requiring ACID; primary + replicas |
| CockroachDB | CP | Serializable (SSI) | Raft per range; multi-region | Geo-distributed SQL requiring strong consistency |
| Google Spanner | CP | Linearizable + external | Paxos; TrueTime bounded skew | Global ACID transactions with GPS clock bounds |
| Cassandra | AP (tunable) | Eventual (default) / strong (W+R>N) | Leaderless; gossip; hinted handoff | High-write, high-availability, time-series |
| DynamoDB | AP (tunable) | Eventual (default) / strong reads opt-in | Multi-AZ; single-digit ms reads | Serverless key-value at massive scale |
| MongoDB | CP (majority write) | Eventual (default) / causal sessions | Replica set; Raft-based primary election | Document store with flexible schema |
| Redis (Sentinel) | AP (async replication) | Eventual (replication lag) | Async leader-follower | Cache, sessions, rate limiting; not a primary store |
| Kafka | CP (per partition) | Linearizable within partition | Leader + ISR replication | Ordered event streaming; replay; fan-out |
CAP states that a distributed system can provide at most two of: Consistency (reads return the latest write), Availability (every request gets a non-error response), and Partition tolerance (the system works despite network partitions). What CAP actually means: since network partitions are unavoidable in real systems, the real choice is what to do when a partition occurs: sacrifice consistency (AP — return possibly stale data) or sacrifice availability (CP — return an error until the partition heals). What CAP does not mean: - It's not a permanent choice — a system can be CP during partitions and highly available the rest of the time. Partitions are rare; the PACELC model better captures the latency-vs-consistency trade-off under normal operation. - "Consistency" in CAP means linearizability, not ACID consistency. These are different. - "Available" in CAP means every node responds — not that the system is always reachable by clients. A load balancer routing around a downed node provides availability from the client's perspective even if CAP's strict definition isn't met.
Practical examples: - CP: etcd, ZooKeeper — return an error rather than stale data during partition - AP: Cassandra (default), DynamoDB — continue serving reads/writes during partition, reconcile conflicts afterward - Most SQL databases are effectively CP for single-node deployments
ALL) and AP for non-critical reads (ONE). A more useful mental model is the consistency spectrum: what is the maximum staleness you can tolerate, and what is the cost (in latency, availability, and complexity) to reduce it? Strong consistency across a WAN link costs 100ms+ round trips per write — often unacceptable for user-facing writes. For those cases, design for causal consistency with conflict resolution rather than linearizability.RequestVote RPCs to all other nodes. A candidate wins if it receives votes from a majority ((N/2)+1) of nodes. A node only votes for a candidate whose log is at least as up-to-date as its own (prevents electing a node with stale log). Randomized timeouts prevent split votes — nodes start elections at slightly different times.
Log replication: The leader receives client writes, appends them to its log as a new entry, and sends AppendEntries RPCs to all followers. An entry is committed when the leader receives acknowledgment from a majority of nodes. The leader then applies the entry to its state machine and responds to the client. Future AppendEntries (including heartbeats) inform followers of committed entries, which they apply to their state machines.
Safety guarantees: - At most one leader per term (election majority) - A committed entry is never lost (only a node with the complete committed log can win an election) - Entries are applied in log order (state machine safety)
Raft is not Byzantine-fault-tolerant — it assumes nodes are honest but may crash.--consistency=linearizable flag that sends reads through the leader and a --consistency=serializable flag for fast follower reads with possible staleness. Know which mode your system uses.wal_level = logical for PostgreSQL) and adds operational complexity (Kafka Connect + Debezium). For most teams, polling with a short interval (1 s) is sufficient and simpler to operate. CDC is worth the complexity when sub-second event latency is required or when the outbox table grows very large.counter = max(local, received) + 1. Lamport timestamps guarantee: if event A happened before event B (causally), then timestamp(A) < timestamp(B). But the converse is not true — timestamp(A) < timestamp(B) does not mean A happened before B. They establish a partial order.
Vector clocks: each node maintains a vector of counters, one per node ([N1: 3, N2: 1, N3: 2]). On event: increment own entry. On send: include vector. On receive: take element-wise max, then increment own entry. Two events are concurrent if neither vector dominates the other. This detects causality precisely — you can tell if two events are causally related or truly concurrent (and thus may conflict). Used by Dynamo, Riak for conflict detection.
When you need them: - Detecting write conflicts in leaderless replication (were these two writes causally related
or concurrent? If concurrent, they conflict and need resolution)
- Event ordering in distributed logs where wall-clock ordering is unreliable - Debugging distributed systems (causal trace of which event caused which)sql CREATE TABLE idempotency_keys (
key VARCHAR PRIMARY KEY, -- unique constraint is the atomic check
response JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
); On receipt of a request with key K: 1. INSERT INTO idempotency_keys (key, response) VALUES (K, NULL) ON CONFLICT DO NOTHING
returns whether this is a new or duplicate request.
2. If new: process the payment, then update response with the result. 3. If duplicate: return the stored response immediately.
What makes idempotency hard: - Race condition: two identical requests arrive simultaneously. Both check "is this key
new?" before either inserts. Both see "new" and both process the payment. The unique
constraint prevents this — only one INSERT succeeds; the other gets a conflict and waits.
- Partial failure: request is processed, payment charged, but the response is lost before
being stored. On retry, the key insert succeeds (new key), payment is charged again.
Fix: write the key row before charging (response=NULL), then update after.
If the process crashes mid-charge, the key row exists with response=NULL — a sentinel
that indicates an in-progress or failed payment requiring manual review.
- Idempotency key scope: a key reused across different operations (different amounts,
different merchants) must be rejected. Validate that the key's associated request matches
the current request parameters.Idempotency-Key headers that are propagated to their internal payment engine. Design idempotency end-to-end through every hop, not just at the entry point.A single-instance rate limiter (token bucket or fixed window in memory) doesn't work when the service scales horizontally — each instance has its own counter, so the effective rate limit is limit × instance_count.
Redis-based token bucket: Use a Redis key per (user_id, window) with atomic Lua script to check and decrement the counter in a single round-trip. Redis's single-threaded command execution makes the check-and-decrement atomic without a distributed lock.
lua local current = redis.call('GET', KEYS[1]) if current and tonumber(current) >= tonumber(ARGV[1]) then
return 0 -- rate limit exceeded
end redis.call('INCR', KEYS[1]) redis.call('EXPIRE', KEYS[1], ARGV[2]) return 1 -- allowed
Sliding window log: store timestamps of each request in a sorted set. Count entries in the window [now - window, now]. More accurate than fixed windows but higher memory usage.
Trade-offs: - Redis becomes a synchronous dependency on every request — it must be highly available
and low-latency (<1ms). Use Redis Cluster for HA.
- Network round-trip to Redis adds latency. For latency-critical paths, use a local
approximate counter (token bucket with periodic Redis sync) — accepts slight over-limit
for reduced latency.
- Redis failure: fail-open (allow all requests) or fail-closed (reject all). Fail-open
is usually correct — a rate limiter outage shouldn't take down the service.
Sliding window with approximate local counting: each instance tracks a local counter and periodically reports to Redis. Redis aggregates. Local counter is the rate limit minus the reported counts from other instances. Allows requests without a Redis round-trip most of the time; syncs periodically. Trades exactness for performance.
hash(key) % N, adding or removing a node changes N, which remaps almost every key — requires massive data redistribution.
Consistent hashing: arrange a "ring" of hash values from 0 to 2³²-1. Place each node at one or more points on the ring (by hashing the node's ID). To find which node owns a key, hash the key and walk the ring clockwise to the first node. Adding a node places it on the ring — only the keys between it and its predecessor move to the new node (~1/N of total keys). Removing a node moves only that node's keys to its successor.
Virtual nodes (vnodes): instead of placing each physical node once on the ring, place it at V points (vnodes, typically 128–256 per node). This: - Evenly distributes load even with heterogeneous or few nodes - Makes rebalancing more granular — a new node picks up vnodes from many existing nodes,
distributing the transfer load
- Allows different nodes to hold different numbers of vnodes (weighted by capacity)
Replication with consistent hashing: a key is assigned to the N clockwise successor nodes. For Cassandra with replication_factor = 3, each key lives on 3 consecutive nodes on the ring. If one node fails, reads and writes fall to the remaining 2 (or 1, depending on quorum settings).ByteOrderedPartitioner (BOP) vs Murmur3Partitioner represent two different answers to this: BOP allows range scans but causes hot partitions on sequential keys; Murmur3 distributes randomly (no range scans) but prevents hot partitions. Choosing a partition key that distributes load evenly is a data modeling discipline, not just a configuration choice.This is a multi-constraint problem: low read latency (local reads), strong consistency (no stale financial data), and high availability (no single region failure takes you down). These constraints are partially in tension — strong consistency requires cross-region coordination which adds latency. Tiered consistency architecture: Tier 1 — Strongly consistent (financial data: balances, transactions): - Use a globally distributed SQL database (Google Spanner or CockroachDB) with external consistency via TrueTime / HLC (Hybrid Logical Clocks). - Writes go to a global leader (or are multi-leader with serializable isolation). Latency is bounded by the speed of light + algorithm overhead (~30–100ms cross-region). - Accept that writes are slower; financial writes are lower frequency than reads. - For reads: use stale reads (bounded staleness) for user-facing balance displays — returning data up to 10 seconds stale is acceptable for a balance page. Use linearizable reads only for the atomic check-then-debit operations.
Tier 2 — Causally consistent (user profile, preferences, non-financial state): - Replicate to all regions with causal ordering. Users always see their own writes. - Read from local region; writes propagate asynchronously. Conflicts resolved by last-writer-wins or CRDTs (counters, sets).
Tier 3 — Eventual consistency (feeds, analytics, leaderboards): - Replicate asynchronously. Seconds of staleness acceptable. Read from local region. Multi-region HA: - Active-active for AP tiers; writes go to the nearest region. - Active-active for the CP tier is possible with multi-region Paxos groups (Spanner's model), with the latency cost paid on each write. - Circuit breakers between regions: if region A can't reach region B's consensus group, fail writes rather than silently accept stale reads that violate consistency guarantees.
OrderCreated event via the outbox pattern. The Inventory Service and Payment Service are triggered by events — not by synchronous calls from the orchestrator. This decouples availability: if the Inventory Service is down, the event waits in the queue.ReserveInventory for order-123 returns the cached reservation result, not a double-reservation.ZADD leaderboard <score> <user_id> for updates; ZREVRANK leaderboard <user_id> for a user's rank; ZREVRANGE leaderboard 0 99 WITHSCORES for the top 100. ZADD is O(log N); ZREVRANK is O(log N). Redis single-threaded throughput handles 30K writes/second on a single instance.ZADD to Redis. Kafka absorbs burst writes and provides durability. If Redis is temporarily unavailable, events queue in Kafka. The 10-second staleness budget easily accommodates this buffering.ZADD with the new score is always correct. No need to check if the new score is higher — the game server guarantees it. This eliminates a read-before-write race condition.ORDER BY score DESC LIMIT 100 — table scans on 10M rows at 30K writes/s will collapse the databaseSELECT COUNT(*) WHERE score > :user_score on every rank request — O(N) per rank query kills the databasedb_query_duration_p99 spikes.