[ETCD v3.4.10源码分析] 1. Raft协议与心跳机制

Leader选举

在ETCD中只有Leader能处理写提案,那么Leader是如何产生的呢?
leader-follower

tickHeartBeat
stepLeader
send
newReady
tickElection
Step
时钟
n.tickc
MsgBeat
MsgHeartBeat
r.msgs
Ready
readyc
MsgHeartBeat
选举超时计数++超过timeout
MsgHup
Step
Campaign
MsgVote
Network
MsgVote
如果follower的leader还活着并且超时计数不大于选举超时时间则拒绝
MsgVoteResp

1. Ticker

在创建raftNode实例时,会同时创建1个Ticker,传入的hearbeat等于cfg.TickMs,默认为100ms

func newRaftNode(cfg raftNodeConfig) *raftNode {
	// 创建logger(略)
	r := &raftNode{
		lg:             cfg.lg,
		tickMu:         new(sync.Mutex),
		raftNodeConfig: cfg,
		// set up contention detectors for raft heartbeat message.
		// expect to send a heartbeat within 2 heartbeat intervals.
		td:         contention.NewTimeoutDetector(2 * cfg.heartbeat),
		readStateC: make(chan raft.ReadState, 1),
		msgSnapC:   make(chan raftpb.Message, maxInFlightMsgSnap),
		applyc:     make(chan apply),
		stopped:    make(chan struct{}),
		done:       make(chan struct{}),
	}
	if r.heartbeat == 0 {
		r.ticker = &time.Ticker{}
	} else {
		r.ticker = time.NewTicker(r.heartbeat)
	}
	return r
}

首先介绍一下raft和config中定义的几个变量,
raft.go中的raft结构体

	// number of ticks since it reached last electionTimeout when it is leader
	// or candidate.
	// number of ticks since it reached last electionTimeout or received a
	// valid message from current leader when it is a follower.
	// 选举过期计数,主要用于follower来判断leader是不是正常工作,如果这个值递增到大于随机化选举超时计数(randomizedElectionTimeout),follower就认为leader已挂,它自己会开始竞选leader。
	electionElapsed int

	// number of ticks since it reached last heartbeatTimeout.
	// only leader keeps heartbeatElapsed.
	// 心跳过期计数,用于leader判断是不是要开始发送心跳了。只要这个值超过或等于心跳超时计数(heartbeatTimeout),就会触发leader广播heartbeat信息
	heartbeatElapsed int

	checkQuorum bool
	preVote     bool

	// 心跳超时计数,心跳超时时间和tick时间的比值。当前代码中是写死的1。也就是每次tick都应该发送心跳。实际上tick的周期就是通过–heartbeat-interval来配置的。
	heartbeatTimeout int
	electionTimeout  int
	// randomizedElectionTimeout is a random number between
	// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
	// when raft changes its state to follower or candidate.
	// 随机化选举超时计数,这个值是一个每次任期都不一样的随机值,主要是为了避免分裂选举的问题引入的随机化方案。这个时间随机化以后,每个竞选者发送的竞选消息的时间就会错开,避免了同时多个节点同时竞选。从代码中可以看到,它的值是[electiontimeout, 2*electiontimeout-1] 之间,而electionTimeout就是下图中的ElectionTicks,是ElectionMs相对于TickMs的倍数。ElectionMs是由–election-timeout来配置的,TickMs就是–heartbeat-interval。
	randomizedElectionTimeout int

raft.go中的config结构体

	// ElectionTick is the number of Node.Tick invocations that must pass between
	// elections. That is, if a follower does not receive any message from the
	// leader of current term before ElectionTick has elapsed, it will become
	// candidate and start an election. ElectionTick must be greater than
	// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
	// unnecessary leader switching.
	ElectionTick int
	// HeartbeatTick is the number of Node.Tick invocations that must pass between
	// heartbeats. That is, a leader sends heartbeat messages to maintain its
	// leadership every HeartbeatTick ticks.
	HeartbeatTick int
func (r *raft) resetRandomizedElectionTimeout() {
	r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }

raftNode的start方法启动的协程中,会监听ticker的channel,调用raftNode的Tick方法,该方法往tickc通道中推入一个空对象。

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
	internalTimeout := time.Second

	go func() {
		defer r.onStop()
		islead := false

		for {
			select {
			case <-r.ticker.C:
				r.tick()
			case rd := <-r.Ready():
				if rd.SoftState != nil {
					newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
					if newLeader {
						leaderChanges.Inc()
					}

					if rd.SoftState.Lead == raft.None {
						hasLeader.Set(0)
					} else {
						hasLeader.Set(1)
					}

					rh.updateLead(rd.SoftState.Lead)
					islead = rd.RaftState == raft.StateLeader
					if islead {
						isLeader.Set(1)
					} else {
						isLeader.Set(0)
					}
					rh.updateLeadership(newLeader)
					r.td.Reset()
				}

				if len(rd.ReadStates) != 0 {

2. Leader发送心跳流程

  1. 对于leader,tick被设置为tickHeartbeat,tickHeartbeat会递增心跳过期时间计数(heartbeatElapsed),如果心跳过期时间超过了心跳超时时间计数(heartbeatTimeout),它会产生一个MsgBeat消息。心跳超时时间计数是系统设置死的,就是1。也就是说只要1次tick时间过去,基本上会发送心跳消息。发送心跳首先是调用状态机的step方法。
func (r *raft) becomeLeader() {
	if r.state == StateFollower {
		panic("invalid transition [follower -> leader]")
	}
	r.step = stepLeader
	r.reset(r.Term)
	r.tick = r.tickHeartbeat
	r.lead = r.id
	r.state = StateLeader

	r.prs.Progress[r.id].BecomeReplicate()

	r.pendingConfIndex = r.raftLog.lastIndex()

	emptyEnt := pb.Entry{Data: nil}
	if !r.appendEntry(emptyEnt) {
		r.logger.Panic("empty entry was dropped")
	}

	r.reduceUncommittedSize([]pb.Entry{emptyEnt})
	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {
	r.heartbeatElapsed++
	r.electionElapsed++

	if r.electionElapsed >= r.electionTimeout {
		r.electionElapsed = 0
		if r.checkQuorum {
			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
		}
		
		if r.state == StateLeader && r.leadTransferee != None {
			r.abortLeaderTransfer()
		}
	}

	if r.state != StateLeader {
		return
	}

	if r.heartbeatElapsed >= r.heartbeatTimeout {
		r.heartbeatElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
	}
}
  1. step在leader状态下为stepLeader(),当收到MsgBeat时,它会调用bcastHeartbeat()广播MsgHeartbeat消息。构造MsgHeartbeat类型消息时,需要在Commit字段填入当前已经可以commit的消息index,如果该index大于peer中记录的对端节点已经同步的日志index,则采用对端已经同步的日志index。Commit字段的作用将在接收端处理消息时详细介绍。
func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
	case pb.MsgBeat:
		r.bcastHeartbeat()
		return nil
	case pb.MsgCheckQuorum:
		// 略
	case pb.MsgProp:
		if len(m.Entries) == 0 {
			r.logger.Panicf("%x stepped empty MsgProp", r.id)
		}
// bcastHeartbeat sends RPC, without entries to all the peers.
func (r *raft) bcastHeartbeat() {
	lastCtx := r.readOnly.lastPendingRequestCtx()
	if len(lastCtx) == 0 {
		r.bcastHeartbeatWithCtx(nil)
	} else {
		r.bcastHeartbeatWithCtx([]byte(lastCtx))
	}
}

func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
		if id == r.id {
			return
		}
		r.sendHeartbeat(id, ctx)
	})
}
// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
	// Attach the commit as min(to.matched, r.committed).
	// When the leader sends out heartbeat message,
	// the receiver(follower) might not be matched with the leader
	// or it might not have all the committed entries.
	// The leader MUST NOT forward the follower's commit to
	// an unmatched index.
	commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
	m := pb.Message{
		To:      to,
		Type:    pb.MsgHeartbeat,
		Commit:  commit,
		Context: ctx,
	}

	r.send(m)
}
  1. send方法将消息append到msgs数组中。
// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
	m.From = r.id
	if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
		if m.Term == 0 {
			panic(fmt.Sprintf("term should be set when sending %s", m.Type))
		}
	} else {
		if m.Term != 0 {
			panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
		}
		// do not attach term to MsgProp, MsgReadIndex
		// proposals are a way to forward to the leader and
		// should be treated as local message.
		// MsgReadIndex is also forwarded to leader.
		if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
			m.Term = r.Term
		}
	}
	r.msgs = append(r.msgs, m)
}
  1. node启动的协程会收集msgs中的消息,连同当前未持久化的日志条目、已经确定可以commit的日志条目、变化了的softState、变化了的hardState、readstate一起打包到Ready数据结构中。这些都是会引起状态机变化的,所以都封装在一个叫Ready的结构中,意思是这些东西都已经没问题了,该持久化的持久化,该发送的发送。
func (n *node) run() {
	var propc chan msgWithResult
	var readyc chan Ready
	var advancec chan struct{}
	var rd Ready

	r := n.rn.raft

	lead := None

	for {
		if advancec != nil {
			readyc = nil
		} else if n.rn.HasReady() {
			// Populate a Ready. Note that this Ready is not guaranteed to
			// actually be handled. We will arm readyc, but there's no guarantee
			// that we will actually send on it. It's possible that we will
			// service another channel instead, loop around, and then populate
			// the Ready again. We could instead force the previous Ready to be
			// handled first, but it's generally good to emit larger Readys plus
			// it simplifies testing (by emitting less frequently and more
			// predictably).
			rd = n.rn.readyWithoutAccept()
			readyc = n.readyc
		}

		if lead != r.lead {
			if r.hasLeader() {
				if lead == None {
					r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
				} else {
					r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
				}
				propc = n.propc
			} else {
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
	rd := Ready{
		Entries:          r.raftLog.unstableEntries(),
		CommittedEntries: r.raftLog.nextEnts(),
		Messages:         r.msgs,
	}
	if softSt := r.softState(); !softSt.equal(prevSoftSt) {
		rd.SoftState = softSt
	}
	if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
		rd.HardState = hardSt
	}
	if r.raftLog.unstable.snapshot != nil {
		rd.Snapshot = *r.raftLog.unstable.snapshot
	}
	if len(r.readStates) != 0 {
		rd.ReadStates = r.readStates
	}
	rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
	return rd
}
			select {
			case n.confstatec <- cs:
			case <-n.done:
			}
		case <-n.tickc:
			n.rn.Tick()
		case readyc <- rd:
			n.rn.acceptReady(rd)
			advancec = n.advancec
		case <-advancec:
			n.rn.Advance(rd)
			rd = Ready{}
			advancec = nil
		case c := <-n.status:
			c <- getStatus(r)
		case <-n.stop:
			close(n.done)
			return
		}
	}
}
  1. 还是raftNode启动的那个协程.start(),处理readyc通道。如果是leader,会在持久化日志之前发送消息,如果不是leader,则会在持久化日志完成以后发送消息。
// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
	internalTimeout := time.Second

	go func() {
		defer r.onStop()
		islead := false

		for {
			select {
			case <-r.ticker.C:
				r.tick()
			case rd := <-r.Ready():
				if rd.SoftState != nil {
					newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
					if newLeader {
						leaderChanges.Inc()
					}

					if rd.SoftState.Lead == raft.None {
						hasLeader.Set(0)
					} else {
						hasLeader.Set(1)
					}

					rh.updateLead(rd.SoftState.Lead)
					islead = rd.RaftState == raft.StateLeader
					if islead {
						isLeader.Set(1)
					} else {
						isLeader.Set(0)
					}
					rh.updateLeadership(newLeader)
					r.td.Reset()
				}

				if len(rd.ReadStates) != 0 {
					select {
				// the leader can write to its disk in parallel with replicating to the followers and them
				// writing to their disks.
				// For more details, check raft thesis 10.2.1
				if islead {
					// gofail: var raftBeforeLeaderSend struct{}
					r.transport.Send(r.processMessages(rd.Messages))
				}

				// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
				// ensure that recovery after a snapshot restore is possible.
				if !raft.IsEmptySnap(rd.Snapshot) {
					// gofail: var raftBeforeSaveSnap struct{}
					if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
						if r.lg != nil {
							r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
						} else {
							plog.Fatalf("failed to save Raft snapshot %v", err)
						}
					}
					// gofail: var raftAfterSaveSnap struct{}
				}
  1. transport的Send一般情况下都是调用其内部的peer的send()方法发送消息。peer的send()方法则是将消息推送到streamWriter的msgc通道中。
func (p *peer) send(m raftpb.Message) {
	p.mu.Lock()
	paused := p.paused
	p.mu.Unlock()

	if paused {
		return
	}

	writec, name := p.pick(m)
	select {
	case writec <- m:
	default:
		p.r.ReportUnreachable(m.To)
		if isMsgSnap(m) {
			p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
		}
		if p.status.isActive() {
			// ...
		} else {
			// ...
		}
		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
	}
}
  1. streamWriter有一个协程处理msgc通道,调用encode,使用protobuf将Message序列化为bytes数组,写入到连接的io通道中.
func (cw *streamWriter) run() {
	// ...
			case m := <-msgc:
			err := enc.encode(&m)
			if err == nil {
				unflushed += m.Size()

				if len(msgc) == 0 || batched > streamBufSize/2 {
					flusher.Flush()
					sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
					unflushed = 0
					batched = 0
				} else {
					batched++
				}

				continue
			}

			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
			cw.close()
			// log ...
			heartbeatc, msgc = nil, nil
			cw.r.ReportUnreachable(m.To)
			sentFailures.WithLabelValues(cw.peerID.String()).Inc()

		case conn := <-cw.connc:
func (enc *messageEncoder) encode(m *raftpb.Message) error {
	if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
		return err
	}
	_, err := enc.w.Write(pbutil.MustMarshal(m))
	return err
}
  1. 对方的节点有streamReader会接收消息,并反序列化为Message对象。然后将消息推送到peer的recvc或者propc通道中。
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
	var dec decoder
	cr.mu.Lock()
	switch t {
	case streamTypeMsgAppV2:
		dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
	case streamTypeMessage:
		dec = &messageDecoder{r: rc}
	default:
		if cr.lg != nil {
			cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
		} else {
			plog.Panicf("unhandled stream type %s", t)
		}
	}
	select {
	case <-cr.ctx.Done():
		cr.mu.Unlock()
		if err := rc.Close(); err != nil {
			return err
		}
		return io.EOF
	default:
		cr.closer = rc
	}
	cr.mu.Unlock()

	// gofail: labelRaftDropHeartbeat:
	for {
		m, err := dec.decode()
		if err != nil {
			cr.mu.Lock()
			cr.close()
			cr.mu.Unlock()
			return err
		}
		
		// gofail-go: var raftDropHeartbeat struct{}
		// continue labelRaftDropHeartbeat
		receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))

		cr.mu.Lock()
		paused := cr.paused
		cr.mu.Unlock()

		if paused {
			continue
		}

		if isLinkHeartbeatMessage(&m) {
			// raft is not interested in link layer
			// heartbeat message, so we should ignore
			// it.
			continue
		}

		recvc := cr.recvc
		if m.Type == raftpb.MsgProp {
			recvc = cr.propc
		}
		
		select {
		case recvc <- m:
		default:
			if cr.status.isActive() {
				// log ...
			} else {
				if cr.lg != nil {
					// log ...
				}
			}
			recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
		}
  1. peer启动时启动了两个协程,分别处理recvc和propc通道。调用Raft.Process处理消息。EtcdServer是这个接口的实现。
func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
	// ...
	ctx, cancel := context.WithCancel(context.Background())
	p.cancel = cancel
	go func() {
		for {
			select {
			case mm := <-p.recvc:
				if err := r.Process(ctx, mm); err != nil {
					if t.Logger != nil {
						t.Logger.Warn("failed to process Raft message", zap.Error(err))
					} else {
						plog.Warningf("failed to process raft message (%v)", err)
					}
				}
			case <-p.stopc:
				return
			}
		}
	}()

	// r.Process might block for processing proposal when there is no leader.
	// Thus propc must be put into a separate routine with recvc to avoid blocking
	// processing other raft messages.
	go func() {
		for {
			select {
			case mm := <-p.propc:
				if err := r.Process(ctx, mm); err != nil {
					plog.Warningf("failed to process raft message (%v)", err)
				}
			case <-p.stopc:
				return
			}
		}
	}()
  1. EtcdServer判断消息来源的节点是否被删除,没有的话调用Step方法,传入消息,执行状态机的步进。而接收heartbeat的节点状态机正常情况下都是follower状态。因此就是调用stepFollower进行步进。
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
	if s.cluster.IsIDRemoved(types.ID(m.From)) {
		if lg := s.getLogger(); lg != nil {
			lg.Warn(
				"rejected Raft message from removed member",
				zap.String("local-member-id", s.ID().String()),
				zap.String("removed-member-id", types.ID(m.From).String()),
			)
		} else {
			plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
		}
		return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
	}
	if m.Type == raftpb.MsgApp {
		s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
	}
	return s.r.Step(ctx, m)
}
func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
	case pb.MsgProp:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
			return ErrProposalDropped
		} else if r.disableProposalForwarding {
			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
			return ErrProposalDropped
		}
		m.To = r.lead
		r.send(m)
	case pb.MsgApp:
		r.electionElapsed = 0
		r.lead = m.From
		r.handleAppendEntries(m)
	case pb.MsgHeartbeat:
		r.electionElapsed = 0
		r.lead = m.From
		r.handleHeartbeat(m)
	case pb.MsgSnap:
		r.electionElapsed = 0
		r.lead = m.From
func (r *raft) handleHeartbeat(m pb.Message) {
	r.raftLog.commitTo(m.Commit)
	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

follower对heatbeat消息的处理是:先将选举过期时间计数(electionElapsed)归零。这个时间会在每次tickElection调用时递增。如果到了electionTimeout,就会重新选举。另外,我们还可以看到这里handleHeartbeat中,会将本地日志的commit值设置为消息中带的Commit。这就是第2步说到设置Commit的目的,heartbeat消息还会把leader的commit值同步到follower。同时,leader在设置消息的Commit时,是取它对端已经同步的日志最新index和它自己的commit值中间较小的那个,这样可以保证如果有节点同步比较慢,也不会把commit值设置成了它还没同步到的日志。
最后,follower处理完以后会回复一个MsgHeartbeatResp消息。

  1. 回复消息的中间处理流程和心跳消息的处理一致,因此不再赘述。leader收到回复消息以后,最后会调用stepLeader处理回复消息。

  2. stepLeader收到回复消息以后,会判断是不是要继续同步日志,如果是,就发送日志同步信息。另外会处理读请求,这部分的处理将在linearizable读请求的流程中详细解读。

func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
	case pb.MsgBeat:
		r.bcastHeartbeat()
		return nil
	case pb.MsgCheckQuorum:
		// ...
	case pb.MsgHeartbeatResp:
		pr.RecentActive = true
		pr.ProbeSent = false

		// free one slot for the full inflights window to allow progress.
		if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
			pr.Inflights.FreeFirstOne()
		}
		if pr.Match < r.raftLog.lastIndex() {
			r.sendAppend(m.From)
		}

		if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
			return nil
		}

		if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
			return nil
		}

		rss := r.readOnly.advance(m)
		for _, rs := range rss {
			req := rs.req
			if req.From == None || req.From == r.id { // from local member
				r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
			} else {
				r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
			}
		}
	case pb.MsgSnapStatus:

3. 选举

检测到选举超时的follower,会触发选举流程,具体的流程如下:

  1. 依然从tick开始,对于follower(或candidate)。tick就是tickElection,它的做法是,首先递增选举过期计数(electionElapsed),如果选举过期计数超过了选举超时计数。则开始发起选举。发起选举的话,实际是创建一个MsgHup消息调用状态机的Step方法。
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}
  1. Step方法处理MsgHup消息,查看当前本地消息中有没有没有作用到状态机的配置信息日志,如果有的话,是不能竞选的,因为集群的配置信息有可能会出现增删节点的情况,需要保证各节点都起作用以后才能进行选举操作。从图上可以看到,如果有PreVote的配置,会有一个PreElection的分支。这个放在最后我们介绍。我们直接看campaign()方法,它首先将自己变成candidate状态,becomeCandidate会将自己Term+1。然后拿到自己当前最新的日志Term和index值。把这些都包在一个MsgVote消息中,广播给所有的节点。最新的日志Term和index值是非常重要的,它能保证新选出来的leader中一定包含之前已经commit的日志,不会让已经commit的日志被新leader改写。这个在后面的流程中还会讲到。
	switch m.Type {
	case pb.MsgHup:
		if r.state != StateLeader {
			if !r.promotable() {
				r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
				return nil
			}
			ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
			if err != nil {
				r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
			}
			if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
				r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
				return nil
			}

			r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
			if r.preVote {
				r.campaign(campaignPreElection)
			} else {
				r.campaign(campaignElection)
			}
		} else {
			r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
		}

	case pb.MsgVote, pb.MsgPreVote:
func (r *raft) becomeCandidate() {
	// TODO(xiangli) remove the panic when the raft implementation is stable
	if r.state == StateLeader {
		panic("invalid transition [leader -> candidate]")
	}
	r.step = stepCandidate
	r.reset(r.Term + 1)
	r.tick = r.tickElection
	r.Vote = r.id
	r.state = StateCandidate
	r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
  1. 选举消息发送的流程和所有消息的流程一样,不在赘述。
  2. 心跳消息到了对端节点以后,进行相应的处理,最终会调到Step方法,进行状态机步进。Step处理MsgVote方法的流程是这样的:
  • 首先,如果选举过期时间还没有超时,将拒绝这次选举请求。这是为了防止有些follower自己的原因没收到leader的心跳擅自发起选举。
  • 如果r.Vote已经设置了,也就是说在一个任期中已经同意了某个节点的选举请求,就会拒绝选举
  • 如果根据消息中的LogTerm和Index,也就是第2步传进来的竞选者的最新日志的index和term,发现竞选者比当前节点的日志要旧,则拒绝选举。
  • 其他情况则赞成选举。回复一个赞成的消息。
	case m.Term > r.Term:
		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
			force := bytes.Equal(m.Context, []byte(campaignTransfer))
			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
			if !force && inLease {
				// If a server receives a RequestVote request within the minimum election timeout
				// of hearing from a current leader, it does not update its term or grant its vote
				r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
					r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
				return nil
			}
		}
	case pb.MsgVote, pb.MsgPreVote:
		// We can vote if this is a repeat of a vote we've already cast...
		canVote := r.Vote == m.From ||
			// ...we haven't voted and we don't think there's a leader yet in this term...
			(r.Vote == None && r.lead == None) ||
			// ...or this is a PreVote for a future term...
			(m.Type == pb.MsgPreVote && m.Term > r.Term)
		// ...and we believe the candidate is up to date.
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
			// Note: it turns out that that learners must be allowed to cast votes.
			// This seems counter- intuitive but is necessary in the situation in which
			// a learner has been promoted (i.e. is now a voter) but has not learned
			// about this yet.
			// For example, consider a group in which id=1 is a learner and id=2 and
			// id=3 are voters. A configuration change promoting 1 can be committed on
			// the quorum `{2,3}` without the config change being appended to the
			// learner's log. If the leader (say 2) fails, there are de facto two
			// voters remaining. Only 3 can win an election (due to its log containing
			// all committed entries), but to do so it will need 1 to vote. But 1
			// considers itself a learner and will continue to do so until 3 has
			// stepped up as leader, replicates the conf change to 1, and 1 applies it.
			// Ultimately, by receiving a request to vote, the learner realizes that
			// the candidate believes it to be a voter, and that it should act
			// accordingly. The candidate's config may be stale, too; but in that case
			// it won't win the election, at least in the absence of the bug discussed
			// in:
			// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			// When responding to Msg{Pre,}Vote messages we include the term
			// from the message, not the local term. To see why, consider the
			// case where a single node was previously partitioned away and
			// it's local term is now out of date. If we include the local term
			// (recall that for pre-votes we don't update the local term), the
			// (pre-)campaigning node on the other end will proceed to ignore
			// the message (it ignores all out of date messages).
			// The term in the original message and current local term are the
			// same in the case of regular votes, but different for pre-votes.
			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
			if m.Type == pb.MsgVote {
				// Only record real votes.
				r.electionElapsed = 0
				r.Vote = m.From
			}
		} else {
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
		}
// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
// by comparing the index and term of the last entries in the existing logs.
// If the logs have last entries with different terms, then the log with the
// later term is more up-to-date. If the logs end with the same term, then
// whichever log has the larger lastIndex is more up-to-date. If the logs are
// the same, the given log is up-to-date.
func (l *raftLog) isUpToDate(lasti, term uint64) bool {
	return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
}
  1. 竞选者收到MsgVoteResp消息以后,stepCandidate处理该消息,首先更新r.votes。r.votes是保存了选票信息。如果同意票超过半数,则升级为leader,否则如果已经获得超过半数的反对票,则变成follower。
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
	// ...
	case myVoteRespType:
		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
		switch res {
		case quorum.VoteWon:
			if r.state == StatePreCandidate {
				r.campaign(campaignElection)
			} else {
				r.becomeLeader()
				r.bcastAppend()
			}
		case quorum.VoteLost:
			// pb.MsgPreVoteResp contains future term of pre-candidate
			// m.Term > r.Term; reuse r.Term
			r.becomeFollower(r.Term, None)
		}
	case pb.MsgTimeoutNow:
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
	if v {
		r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
	} else {
		r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
	}
	r.prs.RecordVote(id, v)
	return r.prs.TallyVotes()
}

4. PreVote

PreVote是解决因为某个因为网络分区而失效的节点重新加入集群以后,会导致集群重新选举的问题。

问题出现的过程是这样的,假设当前集群的Term是1,其中一个节点,比如A,它因为网络分区,接收不到leader的心跳,当超过选举超时时间以后,它会将自己变成Candidate,这时候它会把它的Term值变成2,然后开始竞选。当然这时候是不可能竞选成功的。可是当网络修复以后,无论是它的竞选消息,还是其他的回复消息,都会带上它的Term,也就是2。而这时候整个集群里其他机器的Term还是1,这时候的leader发现已经有比自己Term高的节点存在,它就自己乖乖降级为follower,这样就会导致一次重新选举。

这种现象本身布常见,而且出现了也只是出现一次重选举,对整个集群的影响并不大。但是如果希望避免这种情况发生,依然是有办法的,办法就是PreVote。

PreVote的做法是:当集群中的某个follower发现自己已经在选举超时时间内没收到leader的心跳了,这时候它首先不是直接变成candidate,也就不会将Term自增1。而是引入一个新的环境叫PreVote,我们就将它称为预选举吧。它会先广播发送一个PreVote消息,其他节点如果正常运行,就回复一个反对预选举的消息,其他节点如果也失去了leader,才会有回复赞成的消息。节点只有收到超过半数的预选举选票,才会将自己变成candidate,发起选举。这样,如果是这个单个节点的网络有问题,它不会贸然自增Term,因此当它重新加入集群时。也不会对现任leader地位有任何冲击。保证了系统更稳定的方式运行。

5. 如何保证已经commit的数据不会被改写?

etcd集群的leader会一直向follower同步自己的日志,如果follower发现自己的日志和leader不一致,会删除它本地的不一致的日志,保证和leader同步。

leader在运行过程中,会检查同步日志的回复消息,如果发现一条日志已经被超过半数的节点同步,则把这条日志记为committed。随后会进行apply动作,持久化日志,改变kv存储。

我们现在设想这么一个场景:一个集群运行过程中,leader突然挂了,这时候就有新的follower竞选leader。如果新上来的leader日志是比较老的,那么在同步日志的时候,其他节点就会删除比这个节点新的日志。要命的是,如果这些新的日志有的是已经提交了的。那么就违反了已经提交的日志不能被修改的原则了。

怎么避免这种事情发生呢?
这就涉及到刚才选举流程中一个动作,candidate在发起选举的时候会加上当前自己的最新的日志index和term。follower收到选举消息时,会根据这两个字段的信息,判断这个竞选者的日志是不是比自己新,如果是,则赞成选举,否则投反对票。

为什么这样可以保证已经commit的日志不会被改写呢?
因为这个机制可以保证选举出来的leader本地已经有已经commit的日志了。

为什么这样就能保证新leader本地有已经commit的日志呢?
因为我们刚才说到,只有超过半数节点同步的日志,才会被leader commit,而candidate要想获得半数以上的选票,日志就一定要比半数以上的节点新。这样两个半数以上的群体里交集中,一定至少存在一个节点。这个节点的日志肯定被commit了。因此我们只要保证竞选者的日志被大多数节点新,就能保证新的leader不会改写已经commit的日志。

简单来说,这种机制可以保证下图的b和e肯定选不leader。

6. 频繁重选举的问题

如果etcd频繁出现重新选举,会导致系统长时间处于不可用状态,大大降低了系统的可用性。
什么原因会导致系统重新选举呢?

  1. 网络时延和网络拥塞:从心跳发送的流程可以看到,心跳消息和其他消息一样都是先放到Ready结构的msgs数组中。然后逐条发送出去,对不同的节点,消息发送不会阻塞。但是对相同的节点,是一个协程来处理它的msgc通道的。也就是说如果有网络拥塞,是有可能出现其他的消息拥塞通道,导致心跳消息不能及时发送的。即使只有心跳消息,拥塞引起信道带宽过小,也会导致这条心跳消息长时间不能到达对端。也会导致心跳超时。另外网络延时会导致消息发送时间过程,也会引起心跳超时。另外,peer之间通信建链的超时时间设置为1s+(选举超时时间)*1/5 。也就是说如果选举超时设置为5s,那么建链时间必须小于2s。在网络拥塞的环境下,这也会影响消息的正常发送。
func (c *ServerConfig) peerDialTimeout() time.Duration {
	// 1s for queue wait and election timeout
	return time.Second + time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
}
  1. io延时:从apply的流程可以看到,发送msg以后,leader会开始持久化已经commit的日志或者snapshot。这个过程会阻塞这个协程的调用。如果这个过程阻塞时间过长,就会导致后面的msgs堵在那里不能及时发送。根据官网的解释,etcd是故意这么做的,这样可以让那些io有问题的leader自动失去leader地位。让io正常的节点选上leader。但是如果整个集群的节点io都有问题,就会导致整个集群不稳定。

参考文献

  1. 【深入浅出etcd系列】2. 心跳和选举

版权声明:本文为xidianjiapei001原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。