In a distributed system, coordinating which process is the primary (or leader) for a given partition (as opposed to a follower) is a classic leader election problem. The best approach depends on your system's requirements (latency, consistency, fault-tolerance, etc.), but here are the most widely used and proven methods:
Use a Consensus Algorithm (e.g., Raft, Paxos, Zab)
Best for: systems requiring strong consistency, such as distributed databases or state machines.
- Raft is the most popular due to understandability and solid libraries.
- These protocols elect a leader among a group of replicas managing the same partition.
- The leader is responsible for processing writes and replicating to followers.
- Handles failover and leader re-election cleanly.
Examples:
- Etcd (used by Kubernetes) and Consul use Raft.
- Zookeeper uses Zab, which is similar to Paxos.
External Coordination Service (e.g., Zookeeper, Etcd, Consul)
Best for: systems that need distributed coordination but don't want to implement consensus themselves.
- You register ephemeral nodes for candidates.
- Whoever gets the “lock” or lowest sequence node becomes the leader.
- Others watch for changes and take over when the leader fails.
Example flow (Zookeeper):
- Each node creates an ephemeral sequential znode (e.g.,
/election/node_
). - Node with the lowest number becomes leader.
- Others watch the node one ahead of them to detect failure.
Leasing via Distributed Lock (e.g., Dynamo-style)
Best for: systems where eventual consistency is acceptable, and simplicity is valued.
- A node acquires a lock or lease (via e.g., DynamoDB, Etcd TTL keys).
- The lease must be periodically renewed.
- If the lease expires or isn't renewed in time, another node can take over.
Risk: Clock skew and lease expiry can cause split-brain if not carefully handled.
Quorum-based Shard Mapping
Best for: partitioned data systems like Kafka or Cassandra.
- The system maintains metadata mapping partitions to primary replicas.
- A controller node (itself elected via Raft/Zookeeper) manages this mapping.
- Followers consult the controller to find the current leader.
Kafka example:
- Uses Zookeeper to elect a controller.
- Controller assigns leaders to partitions.
Summary
Method | Pros | Cons |
---|---|---|
Raft/Paxos/Zab | Strong consistency, safe failover | Complex, performance overhead |
Zookeeper/Etcd election | Easier to implement, battle-tested | External dependency, needs care w/ TTLs |
Lease-based election | Simple, scalable | Risk of split-brain, clock sync needed |
Centralized controller | Efficient, widely used | Single point of failure unless replicated |
Low Latency Considerations
If lowest latency is your top priority — particularly for failover detection and primary election reassignment — then you want to avoid heavyweight consensus protocols like Raft or Paxos in the hot path. Instead, favor lightweight, local, fast-to-resolve mechanisms, even if that comes at the cost of consistency guarantees or requires accepting some complexity in failure handling.
Here are the lowest-latency approaches, in order of increasing complexity:
Static Assignment with Gossip-Based Failure Detection
- Latency: Sub-millisecond reassignment (depending on tuning).
- Each node knows ahead of time which partitions it's a candidate for.
- Leadership can be determined locally or via a simple tie-breaking rule (e.g., lowest IP, highest score).
- Failure detection is done via gossip (e.g., SWIM), which is fast and decentralized.
- Upon detection of a failure, neighbors promote the next replica.
Trade-off: May lead to brief periods of split-brain if failure detection is inaccurate or delayed.
Leases via Shared Store (e.g., Redis, Etcd, DynamoDB TTL keys)
- Latency: ~10–100ms to acquire/reacquire leadership depending on store latency and lease duration.
- Nodes race to acquire a lease (e.g., via
SET key val NX PX
) with a short TTL. - Only the holder of the valid lease is the primary.
- Others retry periodically with backoff.
Tuning tip: Set short TTLs (e.g., 50–200ms) and frequent renewal intervals (e.g., every 25ms).
Trade-off: Requires clock coordination or aggressive timeouts to prevent overlap.
In-memory Coordinator with Fallback
- Latency: sub-millisecond if centralized and in-process.
- One process per region or rack is elected as the local coordinator (e.g., using a bootstrapped consensus or config).
- It manages partition leadership within its scope purely in-memory.
- If the coordinator dies, nodes fall back to a backup strategy (like leases or static order).
Trade-off: Needs reliable detection of local coordinator failure; otherwise may delay recovery.
Deterministic Leader Election (No Coordination)
- Latency: near-zero (pure function call).
- Leadership is determined by a deterministic function:
function electLeader(replicas, partitionId) { return hash(partitionId + epoch) % replicas.length; }
- Used in systems that prioritize availability over consistency.
- Epoch must change on failure to trigger re-election.
Trade-off: Hard to handle failure or state loss without a source of epoch advancement.
TL;DR Recommendations for Low Latency
Approach | Leadership Assignment | Failover Detection | Latency | Ideal Use Case |
---|---|---|---|---|
Gossip + static mapping | Local decision | Gossip heartbeat | <100ms | High-scale, AP systems |
Leases (Redis/Etcd) | Shared lease lock | TTL expiry | ~50–100ms | Simple low-latency HA |
Deterministic election | Stateless function | External trigger | ~0ms | Ultra-fast election in AP systems |
Local in-memory coord. | In-process map | Health-check | ~1ms | Controlled environments (e.g., same rack/zone) |
Fencing Tokens
Fencing tokens are a simple but powerful technique to prevent split-brain behavior or stale leaders from making changes after they’ve lost leadership in a distributed system.
They are used in systems with leader election or distributed locks where it’s possible for two nodes to believe they are the leader — due to things like clock drift, delayed network packets, GC pauses, or lease expiration.
What is a Fencing Token?
A fencing token is a monotonically increasing number (usually an integer) issued every time a lock is granted or leadership is assigned.
When a node acquires the lock (or becomes leader), it gets a new fencing token, e.g.:
Node A acquires lock → fencing token = 100 Node B acquires lock later → fencing token = 101
All systems that receive commands from the leader (e.g., databases, storage backends, worker queues) are required to:
- Check the fencing token, and
- Reject any operation with a stale (lower) token.
Why Do You Need Fencing Tokens?
Without fencing tokens, you might have this failure scenario:
- Node A acquires a lock (and becomes leader).
- Node A pauses (e.g., GC stop-the-world).
- Lock expires; Node B acquires the lock.
- Node A wakes up and still thinks it's the leader.
- Both A and B perform operations → data corruption or inconsistent state.
If every operation carries a fencing token, then:
- When Node A tries to act with fencing token
100
, but the storage backend has seen101
from Node B, - The system rejects A’s operations, preserving safety.
Where Are Fencing Tokens Used?
- HDFS NameNode failover: fencing tokens prevent the old NameNode from issuing stale writes.
- Kafka controller: fencing tokens are used to ensure only the active controller can write metadata.
- Leader leases with etcd or Zookeeper: fencing tokens often accompany them.
- Stateful distributed tasks: fencing tokens can protect access to S3, databases, or other mutable systems.
How to Implement Fencing Tokens
- Use a central authority to assign increasing numbers (e.g., Redis
INCR
, etcdrevision
, Zookeeperzxid
). - Include the fencing token in every operation request.
- The downstream system must track the highest seen token and reject stale ones.
Example (with Redis):
# Acquire lock with fencing token token = redis.incr("lock-seq") redis.set("lock", my_id, NX, PX=ttl) # When making writes: some_operation(data, fencing_token=token) # Storage system: if incoming_token < last_seen_token: reject_operation()
Summary
Concept | Purpose |
---|---|
Fencing Token | Monotonically increasing number |
Prevents | Old leader writing after expiry |
Key Requirement | Downstream must enforce token logic |
Common Pattern | Combine with leases or locks |
Gossip Protocol
The Gossip Protocol is a decentralized, probabilistic communication protocol used in distributed systems to share information (like state, membership, or failures) between nodes in a scalable, fault-tolerant, and eventually consistent way.
Think of it like how rumors spread in a social network: each person (node) tells a few others, who then tell a few more, and so on — until eventually everyone knows.
Core Idea
Each node periodically picks one or more random peers and exchanges state information. This small trick allows systems to:
- Discover new nodes
- Detect node failures
- Spread updates efficiently
- Maintain eventual consistency without central coordination
What Happens in a Gossip Cycle?
- Selection: Node A picks a peer (e.g. Node B) at random.
- Communication: A sends a small message with what it knows (e.g. "Node C is up", "Node D has version 3").
- Merge: B updates its own state and may reply with information that A doesn't know.
- Repeat: Both A and B continue gossiping with other peers.
Each cycle is fast (e.g., 1–2 seconds), and over time the cluster converges on a shared view.
Types of Gossip Use
- Membership gossip: "Who's in the cluster?"
- Failure detection: "Has Node X died?"
- Data dissemination: "Here’s an update (e.g., config change, write operation)"
- Anti-entropy: Reconcile differences between replicas over time.
Properties
Property | Description |
---|---|
Scalable | O(log N) or better message complexity for full propagation |
Fault-tolerant | Works with partial failure, unreliable networks |
Eventually consistent | Doesn't require consensus; data converges over time |
Decentralized | No single point of failure or control |
Probabilistic | No hard guarantees on time to consistency |
Real-World Examples
- Cassandra / ScyllaDB: Uses gossip to maintain cluster membership and replica coordination.
- Consul: Uses the Serf library (which implements gossip with failure detection) for service discovery.
- Akka Cluster: Gossip for cluster membership and state propagation.
- SWIM protocol: A well-known gossip-based failure detector with low overhead.
Gossip + Failure Detection (SWIM)
Many gossip-based systems use SWIM (Scalable Weakly-consistent Infection-style Membership) which enhances gossip with failure detection:
- Heartbeats are piggybacked on gossip.
- If a node doesn't hear from a peer in time, it marks it suspect.
- A suspect node is confirmed dead if others agree.
This is useful for leader election, load balancing, and shard placement.
Trade-offs
Pro | Con |
---|---|
Simple, robust, scalable | No hard consistency or ordering |
No central coordination | Slow convergence under large churn |
Handles partitions well | Cannot guarantee rapid failover or uniqueness of state |
Summary
The Gossip Protocol is like a fast, resilient rumor mill for your distributed system. It doesn't guarantee perfect accuracy or timing, but it's great when you want to scale out, tolerate failures, and eventually converge.