Leader for Primary

Some notes on leadership election.

In a distributed system, coordinating which process is the primary (or leader) for a given partition (as opposed to a follower) is a classic leader election problem. The best approach depends on your system's requirements (latency, consistency, fault-tolerance, etc.), but here are the most widely used and proven methods:

Use a Consensus Algorithm (e.g., Raft, Paxos, Zab)

Best for: systems requiring strong consistency, such as distributed databases or state machines.

  • Raft is the most popular due to understandability and solid libraries.
  • These protocols elect a leader among a group of replicas managing the same partition.
  • The leader is responsible for processing writes and replicating to followers.
  • Handles failover and leader re-election cleanly.

Examples:

  • Etcd (used by Kubernetes) and Consul use Raft.
  • Zookeeper uses Zab, which is similar to Paxos.

External Coordination Service (e.g., Zookeeper, Etcd, Consul)

Best for: systems that need distributed coordination but don't want to implement consensus themselves.

  • You register ephemeral nodes for candidates.
  • Whoever gets the “lock” or lowest sequence node becomes the leader.
  • Others watch for changes and take over when the leader fails.

Example flow (Zookeeper):

  1. Each node creates an ephemeral sequential znode (e.g., /election/node_).
  2. Node with the lowest number becomes leader.
  3. Others watch the node one ahead of them to detect failure.

Leasing via Distributed Lock (e.g., Dynamo-style)

Best for: systems where eventual consistency is acceptable, and simplicity is valued.

  • A node acquires a lock or lease (via e.g., DynamoDB, Etcd TTL keys).
  • The lease must be periodically renewed.
  • If the lease expires or isn't renewed in time, another node can take over.

Risk: Clock skew and lease expiry can cause split-brain if not carefully handled.

Quorum-based Shard Mapping

Best for: partitioned data systems like Kafka or Cassandra.

  • The system maintains metadata mapping partitions to primary replicas.
  • A controller node (itself elected via Raft/Zookeeper) manages this mapping.
  • Followers consult the controller to find the current leader.

Kafka example:

  • Uses Zookeeper to elect a controller.
  • Controller assigns leaders to partitions.

Summary

MethodProsCons
Raft/Paxos/ZabStrong consistency, safe failoverComplex, performance overhead
Zookeeper/Etcd electionEasier to implement, battle-testedExternal dependency, needs care w/ TTLs
Lease-based electionSimple, scalableRisk of split-brain, clock sync needed
Centralized controllerEfficient, widely usedSingle point of failure unless replicated

Low Latency Considerations

If lowest latency is your top priority — particularly for failover detection and primary election reassignment — then you want to avoid heavyweight consensus protocols like Raft or Paxos in the hot path. Instead, favor lightweight, local, fast-to-resolve mechanisms, even if that comes at the cost of consistency guarantees or requires accepting some complexity in failure handling.

Here are the lowest-latency approaches, in order of increasing complexity:

Static Assignment with Gossip-Based Failure Detection

  • Latency: Sub-millisecond reassignment (depending on tuning).
  • Each node knows ahead of time which partitions it's a candidate for.
  • Leadership can be determined locally or via a simple tie-breaking rule (e.g., lowest IP, highest score).
  • Failure detection is done via gossip (e.g., SWIM), which is fast and decentralized.
  • Upon detection of a failure, neighbors promote the next replica.

Trade-off: May lead to brief periods of split-brain if failure detection is inaccurate or delayed.

Leases via Shared Store (e.g., Redis, Etcd, DynamoDB TTL keys)

  • Latency: ~10–100ms to acquire/reacquire leadership depending on store latency and lease duration.
  • Nodes race to acquire a lease (e.g., via SET key val NX PX) with a short TTL.
  • Only the holder of the valid lease is the primary.
  • Others retry periodically with backoff.

Tuning tip: Set short TTLs (e.g., 50–200ms) and frequent renewal intervals (e.g., every 25ms).

Trade-off: Requires clock coordination or aggressive timeouts to prevent overlap.

In-memory Coordinator with Fallback

  • Latency: sub-millisecond if centralized and in-process.
  • One process per region or rack is elected as the local coordinator (e.g., using a bootstrapped consensus or config).
  • It manages partition leadership within its scope purely in-memory.
  • If the coordinator dies, nodes fall back to a backup strategy (like leases or static order).

Trade-off: Needs reliable detection of local coordinator failure; otherwise may delay recovery.

Deterministic Leader Election (No Coordination)

  • Latency: near-zero (pure function call).
  • Leadership is determined by a deterministic function:
function electLeader(replicas, partitionId) { return hash(partitionId + epoch) % replicas.length; }
  • Used in systems that prioritize availability over consistency.
  • Epoch must change on failure to trigger re-election.

Trade-off: Hard to handle failure or state loss without a source of epoch advancement.

TL;DR Recommendations for Low Latency

ApproachLeadership AssignmentFailover DetectionLatencyIdeal Use Case
Gossip + static mappingLocal decisionGossip heartbeat<100msHigh-scale, AP systems
Leases (Redis/Etcd)Shared lease lockTTL expiry~50–100msSimple low-latency HA
Deterministic electionStateless functionExternal trigger~0msUltra-fast election in AP systems
Local in-memory coord.In-process mapHealth-check~1msControlled environments (e.g., same rack/zone)

Fencing Tokens

Fencing tokens are a simple but powerful technique to prevent split-brain behavior or stale leaders from making changes after they’ve lost leadership in a distributed system.

They are used in systems with leader election or distributed locks where it’s possible for two nodes to believe they are the leader — due to things like clock drift, delayed network packets, GC pauses, or lease expiration.

What is a Fencing Token?

A fencing token is a monotonically increasing number (usually an integer) issued every time a lock is granted or leadership is assigned.

When a node acquires the lock (or becomes leader), it gets a new fencing token, e.g.:

Node A acquires lock → fencing token = 100 Node B acquires lock later → fencing token = 101

All systems that receive commands from the leader (e.g., databases, storage backends, worker queues) are required to:

  • Check the fencing token, and
  • Reject any operation with a stale (lower) token.

Why Do You Need Fencing Tokens?

Without fencing tokens, you might have this failure scenario:

  1. Node A acquires a lock (and becomes leader).
  2. Node A pauses (e.g., GC stop-the-world).
  3. Lock expires; Node B acquires the lock.
  4. Node A wakes up and still thinks it's the leader.
  5. Both A and B perform operations → data corruption or inconsistent state.

If every operation carries a fencing token, then:

  • When Node A tries to act with fencing token 100, but the storage backend has seen 101 from Node B,
  • The system rejects A’s operations, preserving safety.

Where Are Fencing Tokens Used?

  • HDFS NameNode failover: fencing tokens prevent the old NameNode from issuing stale writes.
  • Kafka controller: fencing tokens are used to ensure only the active controller can write metadata.
  • Leader leases with etcd or Zookeeper: fencing tokens often accompany them.
  • Stateful distributed tasks: fencing tokens can protect access to S3, databases, or other mutable systems.

How to Implement Fencing Tokens

  1. Use a central authority to assign increasing numbers (e.g., Redis INCR, etcd revision, Zookeeper zxid).
  2. Include the fencing token in every operation request.
  3. The downstream system must track the highest seen token and reject stale ones.

Example (with Redis):

# Acquire lock with fencing token token = redis.incr("lock-seq") redis.set("lock", my_id, NX, PX=ttl) # When making writes: some_operation(data, fencing_token=token) # Storage system: if incoming_token < last_seen_token: reject_operation()

Summary

ConceptPurpose
Fencing TokenMonotonically increasing number
PreventsOld leader writing after expiry
Key RequirementDownstream must enforce token logic
Common PatternCombine with leases or locks

Gossip Protocol

The Gossip Protocol is a decentralized, probabilistic communication protocol used in distributed systems to share information (like state, membership, or failures) between nodes in a scalable, fault-tolerant, and eventually consistent way.

Think of it like how rumors spread in a social network: each person (node) tells a few others, who then tell a few more, and so on — until eventually everyone knows.

Core Idea

Each node periodically picks one or more random peers and exchanges state information. This small trick allows systems to:

  • Discover new nodes
  • Detect node failures
  • Spread updates efficiently
  • Maintain eventual consistency without central coordination

What Happens in a Gossip Cycle?

  1. Selection: Node A picks a peer (e.g. Node B) at random.
  2. Communication: A sends a small message with what it knows (e.g. "Node C is up", "Node D has version 3").
  3. Merge: B updates its own state and may reply with information that A doesn't know.
  4. Repeat: Both A and B continue gossiping with other peers.

Each cycle is fast (e.g., 1–2 seconds), and over time the cluster converges on a shared view.

Types of Gossip Use

  • Membership gossip: "Who's in the cluster?"
  • Failure detection: "Has Node X died?"
  • Data dissemination: "Here’s an update (e.g., config change, write operation)"
  • Anti-entropy: Reconcile differences between replicas over time.

Properties

PropertyDescription
ScalableO(log N) or better message complexity for full propagation
Fault-tolerantWorks with partial failure, unreliable networks
Eventually consistentDoesn't require consensus; data converges over time
DecentralizedNo single point of failure or control
ProbabilisticNo hard guarantees on time to consistency

Real-World Examples

  • Cassandra / ScyllaDB: Uses gossip to maintain cluster membership and replica coordination.
  • Consul: Uses the Serf library (which implements gossip with failure detection) for service discovery.
  • Akka Cluster: Gossip for cluster membership and state propagation.
  • SWIM protocol: A well-known gossip-based failure detector with low overhead.

Gossip + Failure Detection (SWIM)

Many gossip-based systems use SWIM (Scalable Weakly-consistent Infection-style Membership) which enhances gossip with failure detection:

  • Heartbeats are piggybacked on gossip.
  • If a node doesn't hear from a peer in time, it marks it suspect.
  • A suspect node is confirmed dead if others agree.

This is useful for leader election, load balancing, and shard placement.

Trade-offs

ProCon
Simple, robust, scalableNo hard consistency or ordering
No central coordinationSlow convergence under large churn
Handles partitions wellCannot guarantee rapid failover or uniqueness of state

Summary

The Gossip Protocol is like a fast, resilient rumor mill for your distributed system. It doesn't guarantee perfect accuracy or timing, but it's great when you want to scale out, tolerate failures, and eventually converge.

Originally posted:
Filed Under:
architecture