[ETCD v3.4.10源码分析] 1. Raft协议与心跳机制
Leader选举
在ETCD中只有Leader能处理写提案,那么Leader是如何产生的呢?
1. Ticker
在创建raftNode实例时,会同时创建1个Ticker,传入的hearbeat等于cfg.TickMs,默认为100ms
func newRaftNode(cfg raftNodeConfig) *raftNode {
// 创建logger(略)
r := &raftNode{
lg: cfg.lg,
tickMu: new(sync.Mutex),
raftNodeConfig: cfg,
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
readStateC: make(chan raft.ReadState, 1),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyc: make(chan apply),
stopped: make(chan struct{}),
done: make(chan struct{}),
}
if r.heartbeat == 0 {
r.ticker = &time.Ticker{}
} else {
r.ticker = time.NewTicker(r.heartbeat)
}
return r
}
首先介绍一下raft和config中定义的几个变量,
raft.go中的raft结构体
// number of ticks since it reached last electionTimeout when it is leader
// or candidate.
// number of ticks since it reached last electionTimeout or received a
// valid message from current leader when it is a follower.
// 选举过期计数,主要用于follower来判断leader是不是正常工作,如果这个值递增到大于随机化选举超时计数(randomizedElectionTimeout),follower就认为leader已挂,它自己会开始竞选leader。
electionElapsed int
// number of ticks since it reached last heartbeatTimeout.
// only leader keeps heartbeatElapsed.
// 心跳过期计数,用于leader判断是不是要开始发送心跳了。只要这个值超过或等于心跳超时计数(heartbeatTimeout),就会触发leader广播heartbeat信息
heartbeatElapsed int
checkQuorum bool
preVote bool
// 心跳超时计数,心跳超时时间和tick时间的比值。当前代码中是写死的1。也就是每次tick都应该发送心跳。实际上tick的周期就是通过–heartbeat-interval来配置的。
heartbeatTimeout int
electionTimeout int
// randomizedElectionTimeout is a random number between
// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
// when raft changes its state to follower or candidate.
// 随机化选举超时计数,这个值是一个每次任期都不一样的随机值,主要是为了避免分裂选举的问题引入的随机化方案。这个时间随机化以后,每个竞选者发送的竞选消息的时间就会错开,避免了同时多个节点同时竞选。从代码中可以看到,它的值是[electiontimeout, 2*electiontimeout-1] 之间,而electionTimeout就是下图中的ElectionTicks,是ElectionMs相对于TickMs的倍数。ElectionMs是由–election-timeout来配置的,TickMs就是–heartbeat-interval。
randomizedElectionTimeout int
raft.go中的config结构体
// ElectionTick is the number of Node.Tick invocations that must pass between
// elections. That is, if a follower does not receive any message from the
// leader of current term before ElectionTick has elapsed, it will become
// candidate and start an election. ElectionTick must be greater than
// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
// unnecessary leader switching.
ElectionTick int
// HeartbeatTick is the number of Node.Tick invocations that must pass between
// heartbeats. That is, a leader sends heartbeat messages to maintain its
// leadership every HeartbeatTick ticks.
HeartbeatTick int
func (r *raft) resetRandomizedElectionTimeout() {
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
raftNode的start方法启动的协程中,会监听ticker的channel,调用raftNode的Tick方法,该方法往tickc通道中推入一个空对象。
// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
case <-r.ticker.C:
r.tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
if newLeader {
leaderChanges.Inc()
}
if rd.SoftState.Lead == raft.None {
hasLeader.Set(0)
} else {
hasLeader.Set(1)
}
rh.updateLead(rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
if islead {
isLeader.Set(1)
} else {
isLeader.Set(0)
}
rh.updateLeadership(newLeader)
r.td.Reset()
}
if len(rd.ReadStates) != 0 {
2. Leader发送心跳流程
- 对于leader,tick被设置为tickHeartbeat,tickHeartbeat会递增心跳过期时间计数(heartbeatElapsed),如果心跳过期时间超过了心跳超时时间计数(heartbeatTimeout),它会产生一个MsgBeat消息。心跳超时时间计数是系统设置死的,就是1。也就是说只要1次tick时间过去,基本上会发送心跳消息。发送心跳首先是调用状态机的step方法。
func (r *raft) becomeLeader() {
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
r.prs.Progress[r.id].BecomeReplicate()
r.pendingConfIndex = r.raftLog.lastIndex()
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
r.logger.Panic("empty entry was dropped")
}
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {
r.heartbeatElapsed++
r.electionElapsed++
if r.electionElapsed >= r.electionTimeout {
r.electionElapsed = 0
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
if r.state == StateLeader && r.leadTransferee != None {
r.abortLeaderTransfer()
}
}
if r.state != StateLeader {
return
}
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
- step在leader状态下为stepLeader(),当收到MsgBeat时,它会调用bcastHeartbeat()广播MsgHeartbeat消息。构造MsgHeartbeat类型消息时,需要在Commit字段填入当前已经可以commit的消息index,如果该index大于peer中记录的对端节点已经同步的日志index,则采用对端已经同步的日志index。Commit字段的作用将在接收端处理消息时详细介绍。
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
// 略
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
// bcastHeartbeat sends RPC, without entries to all the peers.
func (r *raft) bcastHeartbeat() {
lastCtx := r.readOnly.lastPendingRequestCtx()
if len(lastCtx) == 0 {
r.bcastHeartbeatWithCtx(nil)
} else {
r.bcastHeartbeatWithCtx([]byte(lastCtx))
}
}
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
r.sendHeartbeat(id, ctx)
})
}
// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}
r.send(m)
}
- send方法将消息append到msgs数组中。
// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
if m.Term == 0 {
panic(fmt.Sprintf("term should be set when sending %s", m.Type))
}
} else {
if m.Term != 0 {
panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
}
// do not attach term to MsgProp, MsgReadIndex
// proposals are a way to forward to the leader and
// should be treated as local message.
// MsgReadIndex is also forwarded to leader.
if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
m.Term = r.Term
}
}
r.msgs = append(r.msgs, m)
}
- node启动的协程会收集msgs中的消息,连同当前未持久化的日志条目、已经确定可以commit的日志条目、变化了的softState、变化了的hardState、readstate一起打包到Ready数据结构中。这些都是会引起状态机变化的,所以都封装在一个叫Ready的结构中,意思是这些东西都已经没问题了,该持久化的持久化,该发送的发送。
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
if advancec != nil {
readyc = nil
} else if n.rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
if lead != r.lead {
if r.hasLeader() {
if lead == None {
r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
} else {
r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
}
propc = n.propc
} else {
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
}
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
return rd
}
select {
case n.confstatec <- cs:
case <-n.done:
}
case <-n.tickc:
n.rn.Tick()
case readyc <- rd:
n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
case <-n.stop:
close(n.done)
return
}
}
}
- 还是raftNode启动的那个协程.start(),处理readyc通道。如果是leader,会在持久化日志之前发送消息,如果不是leader,则会在持久化日志完成以后发送消息。
// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
case <-r.ticker.C:
r.tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
if newLeader {
leaderChanges.Inc()
}
if rd.SoftState.Lead == raft.None {
hasLeader.Set(0)
} else {
hasLeader.Set(1)
}
rh.updateLead(rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
if islead {
isLeader.Set(1)
} else {
isLeader.Set(0)
}
rh.updateLeadership(newLeader)
r.td.Reset()
}
if len(rd.ReadStates) != 0 {
select {
// the leader can write to its disk in parallel with replicating to the followers and them
// writing to their disks.
// For more details, check raft thesis 10.2.1
if islead {
// gofail: var raftBeforeLeaderSend struct{}
r.transport.Send(r.processMessages(rd.Messages))
}
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
} else {
plog.Fatalf("failed to save Raft snapshot %v", err)
}
}
// gofail: var raftAfterSaveSnap struct{}
}
- transport的Send一般情况下都是调用其内部的peer的send()方法发送消息。peer的send()方法则是将消息推送到streamWriter的msgc通道中。
func (p *peer) send(m raftpb.Message) {
p.mu.Lock()
paused := p.paused
p.mu.Unlock()
if paused {
return
}
writec, name := p.pick(m)
select {
case writec <- m:
default:
p.r.ReportUnreachable(m.To)
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
}
if p.status.isActive() {
// ...
} else {
// ...
}
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
}
}
- streamWriter有一个协程处理msgc通道,调用encode,使用protobuf将Message序列化为bytes数组,写入到连接的io通道中.
func (cw *streamWriter) run() {
// ...
case m := <-msgc:
err := enc.encode(&m)
if err == nil {
unflushed += m.Size()
if len(msgc) == 0 || batched > streamBufSize/2 {
flusher.Flush()
sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
unflushed = 0
batched = 0
} else {
batched++
}
continue
}
cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
cw.close()
// log ...
heartbeatc, msgc = nil, nil
cw.r.ReportUnreachable(m.To)
sentFailures.WithLabelValues(cw.peerID.String()).Inc()
case conn := <-cw.connc:
func (enc *messageEncoder) encode(m *raftpb.Message) error {
if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
return err
}
_, err := enc.w.Write(pbutil.MustMarshal(m))
return err
}
- 对方的节点有streamReader会接收消息,并反序列化为Message对象。然后将消息推送到peer的recvc或者propc通道中。
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
var dec decoder
cr.mu.Lock()
switch t {
case streamTypeMsgAppV2:
dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
case streamTypeMessage:
dec = &messageDecoder{r: rc}
default:
if cr.lg != nil {
cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
} else {
plog.Panicf("unhandled stream type %s", t)
}
}
select {
case <-cr.ctx.Done():
cr.mu.Unlock()
if err := rc.Close(); err != nil {
return err
}
return io.EOF
default:
cr.closer = rc
}
cr.mu.Unlock()
// gofail: labelRaftDropHeartbeat:
for {
m, err := dec.decode()
if err != nil {
cr.mu.Lock()
cr.close()
cr.mu.Unlock()
return err
}
// gofail-go: var raftDropHeartbeat struct{}
// continue labelRaftDropHeartbeat
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
cr.mu.Lock()
paused := cr.paused
cr.mu.Unlock()
if paused {
continue
}
if isLinkHeartbeatMessage(&m) {
// raft is not interested in link layer
// heartbeat message, so we should ignore
// it.
continue
}
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m:
default:
if cr.status.isActive() {
// log ...
} else {
if cr.lg != nil {
// log ...
}
}
recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
}
- peer启动时启动了两个协程,分别处理recvc和propc通道。调用Raft.Process处理消息。EtcdServer是这个接口的实现。
func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
// ...
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
go func() {
for {
select {
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
if t.Logger != nil {
t.Logger.Warn("failed to process Raft message", zap.Error(err))
} else {
plog.Warningf("failed to process raft message (%v)", err)
}
}
case <-p.stopc:
return
}
}
}()
// r.Process might block for processing proposal when there is no leader.
// Thus propc must be put into a separate routine with recvc to avoid blocking
// processing other raft messages.
go func() {
for {
select {
case mm := <-p.propc:
if err := r.Process(ctx, mm); err != nil {
plog.Warningf("failed to process raft message (%v)", err)
}
case <-p.stopc:
return
}
}
}()
- EtcdServer判断消息来源的节点是否被删除,没有的话调用Step方法,传入消息,执行状态机的步进。而接收heartbeat的节点状态机正常情况下都是follower状态。因此就是调用stepFollower进行步进。
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
if lg := s.getLogger(); lg != nil {
lg.Warn(
"rejected Raft message from removed member",
zap.String("local-member-id", s.ID().String()),
zap.String("removed-member-id", types.ID(m.From).String()),
)
} else {
plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
}
return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
}
if m.Type == raftpb.MsgApp {
s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
}
return s.r.Step(ctx, m)
}
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
} else if r.disableProposalForwarding {
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
return ErrProposalDropped
}
m.To = r.lead
r.send(m)
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
case pb.MsgSnap:
r.electionElapsed = 0
r.lead = m.From
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
follower对heatbeat消息的处理是:先将选举过期时间计数(electionElapsed)归零。这个时间会在每次tickElection调用时递增。如果到了electionTimeout,就会重新选举。另外,我们还可以看到这里handleHeartbeat中,会将本地日志的commit值设置为消息中带的Commit。这就是第2步说到设置Commit的目的,heartbeat消息还会把leader的commit值同步到follower。同时,leader在设置消息的Commit时,是取它对端已经同步的日志最新index和它自己的commit值中间较小的那个,这样可以保证如果有节点同步比较慢,也不会把commit值设置成了它还没同步到的日志。
最后,follower处理完以后会回复一个MsgHeartbeatResp消息。
回复消息的中间处理流程和心跳消息的处理一致,因此不再赘述。leader收到回复消息以后,最后会调用stepLeader处理回复消息。
stepLeader收到回复消息以后,会判断是不是要继续同步日志,如果是,就发送日志同步信息。另外会处理读请求,这部分的处理将在linearizable读请求的流程中详细解读。
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
// ...
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.ProbeSent = false
// free one slot for the full inflights window to allow progress.
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
pr.Inflights.FreeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
} else {
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
}
}
case pb.MsgSnapStatus:
3. 选举
检测到选举超时的follower,会触发选举流程,具体的流程如下:
- 依然从tick开始,对于follower(或candidate)。tick就是tickElection,它的做法是,首先递增选举过期计数(electionElapsed),如果选举过期计数超过了选举超时计数。则开始发起选举。发起选举的话,实际是创建一个MsgHup消息调用状态机的Step方法。
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
- Step方法处理MsgHup消息,查看当前本地消息中有没有没有作用到状态机的配置信息日志,如果有的话,是不能竞选的,因为集群的配置信息有可能会出现增删节点的情况,需要保证各节点都起作用以后才能进行选举操作。从图上可以看到,如果有PreVote的配置,会有一个PreElection的分支。这个放在最后我们介绍。我们直接看campaign()方法,它首先将自己变成candidate状态,becomeCandidate会将自己Term+1。然后拿到自己当前最新的日志Term和index值。把这些都包在一个MsgVote消息中,广播给所有的节点。最新的日志Term和index值是非常重要的,它能保证新选出来的leader中一定包含之前已经commit的日志,不会让已经commit的日志被新leader改写。这个在后面的流程中还会讲到。
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
return nil
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return nil
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
}
case pb.MsgVote, pb.MsgPreVote:
func (r *raft) becomeCandidate() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateLeader {
panic("invalid transition [leader -> candidate]")
}
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
- 选举消息发送的流程和所有消息的流程一样,不在赘述。
- 心跳消息到了对端节点以后,进行相应的处理,最终会调到Step方法,进行状态机步进。Step处理MsgVote方法的流程是这样的:
- 首先,如果选举过期时间还没有超时,将拒绝这次选举请求。这是为了防止有些follower自己的原因没收到leader的心跳擅自发起选举。
- 如果r.Vote已经设置了,也就是说在一个任期中已经同意了某个节点的选举请求,就会拒绝选举
- 如果根据消息中的LogTerm和Index,也就是第2步传进来的竞选者的最新日志的index和term,发现竞选者比当前节点的日志要旧,则拒绝选举。
- 其他情况则赞成选举。回复一个赞成的消息。
case m.Term > r.Term:
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
}
case pb.MsgVote, pb.MsgPreVote:
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
// Note: it turns out that that learners must be allowed to cast votes.
// This seems counter- intuitive but is necessary in the situation in which
// a learner has been promoted (i.e. is now a voter) but has not learned
// about this yet.
// For example, consider a group in which id=1 is a learner and id=2 and
// id=3 are voters. A configuration change promoting 1 can be committed on
// the quorum `{2,3}` without the config change being appended to the
// learner's log. If the leader (say 2) fails, there are de facto two
// voters remaining. Only 3 can win an election (due to its log containing
// all committed entries), but to do so it will need 1 to vote. But 1
// considers itself a learner and will continue to do so until 3 has
// stepped up as leader, replicates the conf change to 1, and 1 applies it.
// Ultimately, by receiving a request to vote, the learner realizes that
// the candidate believes it to be a voter, and that it should act
// accordingly. The candidate's config may be stale, too; but in that case
// it won't win the election, at least in the absence of the bug discussed
// in:
// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why, consider the
// case where a single node was previously partitioned away and
// it's local term is now out of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
// by comparing the index and term of the last entries in the existing logs.
// If the logs have last entries with different terms, then the log with the
// later term is more up-to-date. If the logs end with the same term, then
// whichever log has the larger lastIndex is more up-to-date. If the logs are
// the same, the given log is up-to-date.
func (l *raftLog) isUpToDate(lasti, term uint64) bool {
return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
}
- 竞选者收到MsgVoteResp消息以后,stepCandidate处理该消息,首先更新r.votes。r.votes是保存了选票信息。如果同意票超过半数,则升级为leader,否则如果已经获得超过半数的反对票,则变成follower。
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
// ...
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
if v {
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
} else {
r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
}
r.prs.RecordVote(id, v)
return r.prs.TallyVotes()
}
4. PreVote
PreVote是解决因为某个因为网络分区而失效的节点重新加入集群以后,会导致集群重新选举的问题。
问题出现的过程是这样的,假设当前集群的Term是1,其中一个节点,比如A,它因为网络分区,接收不到leader的心跳,当超过选举超时时间以后,它会将自己变成Candidate,这时候它会把它的Term值变成2,然后开始竞选。当然这时候是不可能竞选成功的。可是当网络修复以后,无论是它的竞选消息,还是其他的回复消息,都会带上它的Term,也就是2。而这时候整个集群里其他机器的Term还是1,这时候的leader发现已经有比自己Term高的节点存在,它就自己乖乖降级为follower,这样就会导致一次重新选举。
这种现象本身布常见,而且出现了也只是出现一次重选举,对整个集群的影响并不大。但是如果希望避免这种情况发生,依然是有办法的,办法就是PreVote。
PreVote的做法是:当集群中的某个follower发现自己已经在选举超时时间内没收到leader的心跳了,这时候它首先不是直接变成candidate,也就不会将Term自增1。而是引入一个新的环境叫PreVote,我们就将它称为预选举吧。它会先广播发送一个PreVote消息,其他节点如果正常运行,就回复一个反对预选举的消息,其他节点如果也失去了leader,才会有回复赞成的消息。节点只有收到超过半数的预选举选票,才会将自己变成candidate,发起选举。这样,如果是这个单个节点的网络有问题,它不会贸然自增Term,因此当它重新加入集群时。也不会对现任leader地位有任何冲击。保证了系统更稳定的方式运行。
5. 如何保证已经commit的数据不会被改写?
etcd集群的leader会一直向follower同步自己的日志,如果follower发现自己的日志和leader不一致,会删除它本地的不一致的日志,保证和leader同步。
leader在运行过程中,会检查同步日志的回复消息,如果发现一条日志已经被超过半数的节点同步,则把这条日志记为committed。随后会进行apply动作,持久化日志,改变kv存储。
我们现在设想这么一个场景:一个集群运行过程中,leader突然挂了,这时候就有新的follower竞选leader。如果新上来的leader日志是比较老的,那么在同步日志的时候,其他节点就会删除比这个节点新的日志。要命的是,如果这些新的日志有的是已经提交了的。那么就违反了已经提交的日志不能被修改的原则了。
怎么避免这种事情发生呢?
这就涉及到刚才选举流程中一个动作,candidate在发起选举的时候会加上当前自己的最新的日志index和term。follower收到选举消息时,会根据这两个字段的信息,判断这个竞选者的日志是不是比自己新,如果是,则赞成选举,否则投反对票。
为什么这样可以保证已经commit的日志不会被改写呢?
因为这个机制可以保证选举出来的leader本地已经有已经commit的日志了。
为什么这样就能保证新leader本地有已经commit的日志呢?
因为我们刚才说到,只有超过半数节点同步的日志,才会被leader commit,而candidate要想获得半数以上的选票,日志就一定要比半数以上的节点新。这样两个半数以上的群体里交集中,一定至少存在一个节点。这个节点的日志肯定被commit了。因此我们只要保证竞选者的日志被大多数节点新,就能保证新的leader不会改写已经commit的日志。
简单来说,这种机制可以保证下图的b和e肯定选不leader。
6. 频繁重选举的问题
如果etcd频繁出现重新选举,会导致系统长时间处于不可用状态,大大降低了系统的可用性。
什么原因会导致系统重新选举呢?
- 网络时延和网络拥塞:从心跳发送的流程可以看到,心跳消息和其他消息一样都是先放到Ready结构的msgs数组中。然后逐条发送出去,对不同的节点,消息发送不会阻塞。但是对相同的节点,是一个协程来处理它的msgc通道的。也就是说如果有网络拥塞,是有可能出现其他的消息拥塞通道,导致心跳消息不能及时发送的。即使只有心跳消息,拥塞引起信道带宽过小,也会导致这条心跳消息长时间不能到达对端。也会导致心跳超时。另外网络延时会导致消息发送时间过程,也会引起心跳超时。另外,peer之间通信建链的超时时间设置为1s+(选举超时时间)*1/5 。也就是说如果选举超时设置为5s,那么建链时间必须小于2s。在网络拥塞的环境下,这也会影响消息的正常发送。
func (c *ServerConfig) peerDialTimeout() time.Duration {
// 1s for queue wait and election timeout
return time.Second + time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
}
- io延时:从apply的流程可以看到,发送msg以后,leader会开始持久化已经commit的日志或者snapshot。这个过程会阻塞这个协程的调用。如果这个过程阻塞时间过长,就会导致后面的msgs堵在那里不能及时发送。根据官网的解释,etcd是故意这么做的,这样可以让那些io有问题的leader自动失去leader地位。让io正常的节点选上leader。但是如果整个集群的节点io都有问题,就会导致整个集群不稳定。