Raft在etcd中的实现(二)节点发送消息和接收消息流程

发送消息流程 Raft在etcd中的实现(二)节点发送消息和接收消息流程
文章图片
raft中节点发送消息的流程.png 流程简介
上图是raft中发送消息的流程。大致分为以下的过程。

  1. 由raftexample/raft.go部分发起。前文提到过该部分主要是为应用层提供服务。
  2. 经由raft/node.go部分转发到共识模块。前文提到过该部分主要起到应用层和共识模块的衔接作用。
  3. 共识模块raft/raft.go,对消息进行处理后,将消息append到msgs。注意共识模块自身不管是如何将消息通过通讯传输到其他节点的。这边只是放入到msgs中。
  4. raft/node.go部分监测到msgs中有内容了。使用newReady生成一个一个新的ready对象,并发送给readyc通道。
  5. raftexample/raft.go中,监测到readyc通道有内容之后,会进行写wal日志,增加Storage日志条目,判断是否需要打快照等动作。最后通过transoport组件,经http协议将消息发送给对应的节点。
代码详解
  1. 消息的发起部分。raft中有3类RPC,分别为AppendEntries RPCs,RequestVote RPCs,以及InstallSnapshot RPCs。通过发起的形式分成以下两种。
    1.1 raft自行触发的比方说leader通过AppendEntries发送心跳包,以及RequestVote RPC等。图中最左部分的tickerC是发起者,是通过设置时钟来完成的。
// github.com/coreos/etcd/contrib/raftexample/raft.go func (rc *raftNode) serveChannels() { /*其他逻辑*/ for { select { case <-ticker.C: rc.node.Tick() /*其他情况*/ } } }

1.2 首先由客户端发送请求给leader,然后leader向其他节点发送AppendEntires来进行日志备份。图中最左部分的proposeC即是http server接受到客户端请求后,将消息通过proposeC传递到raftNode中。
// github.com/coreos/etcd/contrib/raftexample/raft.go func (rc *raftNode) serveChannels() { /*其他逻辑*/ // send proposals over raft go func() { var confChangeCount uint64 = 0for rc.proposeC != nil && rc.confChangeC != nil { select { case prop, ok := <-rc.proposeC: if !ok { rc.proposeC = nil } else { // blocks until accepted by raft state machine rc.node.Propose(context.TODO(), []byte(prop)) }case cc, ok := <-rc.confChangeC: if !ok { rc.confChangeC = nil } else { confChangeCount += 1 cc.ID = confChangeCount rc.node.ProposeConfChange(context.TODO(), cc) } } } // client closed channel; shutdown raft if not already close(rc.stopc) }() }

  1. 经由node.go转发消息。
    2.1 tick消息,调用node的Tick方法。方法中为向tickc发送消息。后由run中逻辑处理。见2.3
//github.com/coreos/raft/node.go // Tick increments the internal logical clock for this Node. Election timeouts // and heartbeat timeouts are in units of ticks. func (n *node) Tick() { select { case n.tickc <- struct{}{}: case <-n.done: default: n.logger.Warningf("A tick missed to fire. Node blocks too long!") } }

【Raft在etcd中的实现(二)节点发送消息和接收消息流程】2.2 propose消息,调用node的Propose方法。方法中调用node的step对消息进行判断,因为这边消息类型是MsgProp,所以是将消息m发送给propc通道。后由run中逻辑处理。见2.3
//github.com/coreos/raft/node.go func (n *node) Propose(ctx context.Context, data []byte) error { return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) }

// Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. func (n *node) step(ctx context.Context, m pb.Message) error { ch := n.recvc if m.Type == pb.MsgProp { ch = n.propc }select { case ch <- m: return nil case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } }

2.3 run中处理各种消息。收到tickc时调用raft的tick方法。在收到propc消息时,将m的发送者标记为raft的id,然后调用raft的Step方法。
//github.com/coreos/raft/node.go func (n *node) run(r *raft) { /*其他逻辑暂时忽略*/ for { /*其他逻辑暂时忽略,下文接收消息部分会再讨论*/ select { case m := <-propc: m.From = r.id r.Step(m) case <-n.tickc: r.tick() /*其他情况暂时忽略*/ } } }

  1. 共识模块raft中处理消息。
    3.1 对于tick的情况根据当前节点的状态,有两个tick方法。leader调用的是tickHeartbeat,follower和candidate调用的是tickElection。两个方法都是达到相应的配置的时间的时候调用Step方法进行相应的处理。
//github.com/coreos/etcd/raft/raft.go // tickElection is run by followers and candidates after r.electionTimeout. func (r *raft) tickElection() { r.electionElapsed++if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) } }// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout. func (r *raft) tickHeartbeat() { r.heartbeatElapsed++ r.electionElapsed++if r.electionElapsed >= r.electionTimeout { r.electionElapsed = 0 if r.checkQuorum { r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) } // If current leader cannot transfer leadership in electionTimeout, it becomes leader again. if r.state == StateLeader && r.leadTransferee != None { r.abortLeaderTransfer() } }if r.state != StateLeader { return }if r.heartbeatElapsed >= r.heartbeatTimeout { r.heartbeatElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) } }

3.2 Step方法。共识模块中最重要的方法,根据消息的类型和自身的状态进行响应。该部分逻辑留作下文分析。不过可以观察到Step方法在判断出该消息是需要共识给其他节点的时候,会通过调用sendAppend、sendHeartbeat、bcastAppend、bcastHeartbeat、bcastHeartbeatWithCtx几个方法,或者是直接调用send方法。
3.3 send方法,send方法结合term和msg的type对消息进行处理。然后append到msgs中。注意这里只进行了append,而不是真正的发送。
func (r *raft) send(m pb.Message) { m.From = r.id if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp { if m.Term == 0 { panic(fmt.Sprintf("term should be set when sending %s", m.Type)) } } else { if m.Term != 0 { panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term)) } if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex { m.Term = r.Term } } r.msgs = append(r.msgs, m) }

  1. node部分中run方法中调用newReady方法,将新的消息,一些未写入到文件的日志条目等放入生成ready对象rd,如果有更新,则会走select中的case readyc<-rd,然后将各个消息重置为nil。
// github.com/coreos/raft/node.go func (n *node) run(r *raft) { for { if advancec != nil { readyc = nil } else {rd = newReady(r, prevSoftSt, prevHardSt) if rd.containsUpdates() { readyc = n.readyc } else { readyc = nil } } /*其他逻辑*/ select { /*其他情况*/ case readyc <- rd: if rd.SoftState != nil { prevSoftSt = rd.SoftState } if len(rd.Entries) > 0 { prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term havePrevLastUnstablei = true } if !IsEmptyHardState(rd.HardState) { prevHardSt = rd.HardState } if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Metadata.Index }r.msgs = nil r.readStates = nil advancec = n.advancec} } }

将msgs等信息放入到Ready中
// github.com/coreos/raft/node.go func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { rd := Ready{ Entries:r.raftLog.unstableEntries(), CommittedEntries: r.raftLog.nextEnts(), Messages:r.msgs, } if softSt := r.softState(); !softSt.equal(prevSoftSt) { rd.SoftState = softSt } if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { rd.HardState = hardSt } if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot } if len(r.readStates) != 0 { rd.ReadStates = r.readStates } rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries)) return rd }

判断Ready中是否有新的信息
//github.com/coreos/raft/node.go func (rd Ready) containsUpdates() bool { return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 }

  1. 有新的消息时,node那边会将消息发送到readyc通道。raftexample中raftNode的serverChannels会收到Readc并进行处理。将日志条目写入到wal文件和memory storage中,如果有快照保存快照并提交给state machine去执行。通过transport将消息发送给其他节点。如果有的可提交的日志条目也提交给state machine去执行。判断是否需要触发snapshot,最后发送advance给node。
    经过该步之后,日志条目才写入到wal文件和memorty storage。消息也才经Transport发送给其他节点。
// github.com/coreos/etcd/contrib/raftexample/raft.go func (rc *raftNode) serveChannels() { /*其他逻辑*/ for { select {// store raft entries to wal, then publish over commit channel //node那边发送readyc后,这边收到消息 case rd := <-rc.node.Ready(): //将日志条目写入到wal文件 rc.wal.Save(rd.HardState, rd.Entries) //如果snap不空,则保存快照文件,并提交 if !raft.IsEmptySnap(rd.Snapshot) { rc.saveSnap(rd.Snapshot) rc.raftStorage.ApplySnapshot(rd.Snapshot) rc.publishSnapshot(rd.Snapshot) } //将日志条目写入到storage,storage是memorystorage rc.raftStorage.Append(rd.Entries) //将消息通过Transport发送给其他节点。 rc.transport.Send(rd.Messages) //如果日志条目中有新的可提交日志,则提交到state machine那边执行 if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok { rc.stop() return } rc.maybeTriggerSnapshot() //发送advance给node rc.node.Advance() /*其他情况*/ } } }

node节点收到advance消息,即确定了之前的消息已写入wal文件和storage等。然后更新raftLog信息。并将advancec赋值为空,即保证下一次新消息的执行。
//github.com/coreos/raft/node.go func (n *node) run(r *raft) { for { /*其他逻辑*/ select { /*其他情况*/ case <-advancec: //更新raft中日志的信息 if prevHardSt.Commit != 0 { r.raftLog.appliedTo(prevHardSt.Commit) } if havePrevLastUnstablei { r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) havePrevLastUnstablei = false } r.raftLog.stableSnapTo(prevSnapi) advancec = nil} } }

接收消息流程 Raft在etcd中的实现(二)节点发送消息和接收消息流程
文章图片
raft处理接收到消息.png 流程简介
  1. 由transport模块流入,收到其他节点传递过来的消息。
  2. 通过raftexample中的raftNode的Process方法,调用node来处理消息。
  3. node部分对消息进行简单的判断和处理后,将消息转由共识模块处理。
  4. 共识模块处理消息。
代码详解
  1. transport模块接收到消息。调用raftNode中的Process方法。
// github.com/coreos/etcd/rafthttp/peer.go func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer { /*其他逻辑*/ go func() { for { select { //接收到消息后,调用Process方法 case mm := <-p.recvc: if err := r.Process(ctx, mm); err != nil { plog.Warningf("failed to process raft message (%v)", err) } case <-p.stopc: return } } }() }

2.经由raftNode的Process调用node的Step。
// github.com/coreos/etcd/contrib/raftexample/raft.go func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error { return rc.node.Step(ctx, m) }

  1. Step判断是否为自身的消息,如果不是则通过step处理。step中消息不是MsgProp类型,所以是将消息m发送给recvc通道。后由run 中逻辑处理。
// github.com/coreos/raft/node.go func (n *node) Step(ctx context.Context, m pb.Message) error { // ignore unexpected local messages receiving over network if IsLocalMsg(m.Type) { // TODO: return an error? return nil } return n.step(ctx, m) }

// github.com/coreos/raft/node.go func (n *node) step(ctx context.Context, m pb.Message) error { ch := n.recvc if m.Type == pb.MsgProp { ch = n.propc }select { case ch <- m: return nil case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } }

3.1 run中将收到的消息发送给共识模块的Step方法。
// github.com/coreos/raft/node.go func (n *node) run(r *raft) { for { /*其他逻辑*/ select { /*其他情况*/ case m := <-n.recvc: // filter out response message from unknown From. if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { r.Step(m) // raft never returns an error }} } }

  1. 共识模块中的Step根据消息类型和信息以及自身状态进行相应的操作。

    推荐阅读