MIT 6.824: Distributed Systems- 实现Raft Lab2B
接上文《MIT 6.824: Distributed Systems- 实现Raft Lab2A》,本篇实现Raft Lab2B作业,即实现日志同步。
已通过MIT单元测试:
Lab2B实现
相比较Lab2A的选主来说,Lab2B则集中精力在appendEntriesRPC部分,坑明显变多,最后我会总结一下,先上代码。
Node启动阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me // Your initialization code here (2A, 2B, 2C). rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.votedFor = -1 rf.lastActiveTime = time.Now() // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) // election逻辑 go rf.electionLoop() // leader逻辑 go rf.appendEntriesLoop() // apply逻辑 go rf.applyLogLoop(applyCh) DPrintf("Raftnode[%d]启动", me) return rf } |
Start写入操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
func (rf *Raft) Start(command interface{}) (int, int, bool) { index := -1 term := -1 isLeader := true // Your code here (2B). rf.mu.Lock() defer rf.mu.Unlock() // 只有leader才能写入 if rf.role != ROLE_LEADER { return -1, -1, false } logEntry := LogEntry{ Command: command, Term: rf.currentTerm, } rf.log = append(rf.log, logEntry) index = len(rf.log) term = rf.currentTerm rf.persist() DPrintf("RaftNode[%d] Add Command, logIndex[%d] currentTerm[%d]", rf.me, index, term) return index, term, isLeader } |
appendEntries RPC发起方(仅leader有效)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
// lab-2A只做心跳,不考虑log同步 func (rf *Raft) appendEntriesLoop() { for !rf.killed() { time.Sleep(10 * time.Millisecond) func() { rf.mu.Lock() defer rf.mu.Unlock() // 只有leader才向外广播心跳 if rf.role != ROLE_LEADER { return } // 100ms广播1次 now := time.Now() if now.Sub(rf.lastBroadcastTime) < 100*time.Millisecond { return } rf.lastBroadcastTime = time.Now() // 并发RPC心跳 type AppendResult struct { peerId int resp *AppendEntriesReply } for peerId := 0; peerId < len(rf.peers); peerId++ { if peerId == rf.me { continue } args := AppendEntriesArgs{} args.Term = rf.currentTerm args.LeaderId = rf.me args.LeaderCommit = rf.commitIndex args.Entries = make([]LogEntry, 0) args.PrevLogIndex = rf.nextIndex[peerId] - 1 if args.PrevLogIndex > 0 { args.PrevLogTerm = rf.log[args.PrevLogIndex - 1].Term } args.Entries = append(args.Entries, rf.log[rf.nextIndex[peerId]-1:]...) DPrintf("RaftNode[%d] appendEntries starts, currentTerm[%d] peer[%d] logIndex=[%d] nextIndex[%d] matchIndex[%d] args.Entries[%d] commitIndex[%d]", rf.me, rf.currentTerm, peerId, len(rf.log), rf.nextIndex[peerId], rf.matchIndex[peerId], len(args.Entries), rf.commitIndex) // log相关字段在lab-2A不处理 go func(id int, args1 *AppendEntriesArgs) { // DPrintf("RaftNode[%d] appendEntries starts, myTerm[%d] peerId[%d]", rf.me, args1.Term, id) reply := AppendEntriesReply{} if ok := rf.sendAppendEntries(id, args1, &reply); ok { rf.mu.Lock() defer rf.mu.Unlock() defer func() { DPrintf("RaftNode[%d] appendEntries ends, currentTerm[%d] peer[%d] logIndex=[%d] nextIndex[%d] matchIndex[%d] commitIndex[%d]", rf.me, rf.currentTerm, id, len(rf.log), rf.nextIndex[id], rf.matchIndex[id], rf.commitIndex) }() // 如果不是rpc前的leader状态了,那么啥也别做了 if rf.currentTerm != args1.Term { return } if reply.Term > rf.currentTerm { // 变成follower rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.currentTerm = reply.Term rf.votedFor = -1 rf.persist() return } if reply.Success { // 同步日志成功 rf.nextIndex[id] += len(args1.Entries) rf.matchIndex[id] = rf.nextIndex[id] - 1 // 数字N, 让peer[i]的大多数>=N // peer[0]' index=2 // peer[1]' index=2 // peer[2]' index=1 // 1,2,2 // 更新commitIndex, 就是找中位数 sortedMatchIndex := make([]int, 0) sortedMatchIndex = append(sortedMatchIndex, len(rf.log)) for i := 0; i < len(rf.peers); i++ { if i == rf.me { continue } sortedMatchIndex = append(sortedMatchIndex, rf.matchIndex[i]) } sort.Ints(sortedMatchIndex) newCommitIndex := sortedMatchIndex[len(rf.peers) / 2] if newCommitIndex > rf.commitIndex && rf.log[newCommitIndex - 1].Term == rf.currentTerm { rf.commitIndex = newCommitIndex } // rf.commitIndex = minMatchIndex } else { rf.nextIndex[id] -= 1 if rf.nextIndex[id] < 1 { rf.nextIndex[id] = 1 } } } }(peerId, &args) } }() } } |
appendEntries RPC接收方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() DPrintf("RaftNode[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s] logIndex[%d] prevLogIndex[%d] prevLogTerm[%d] commitIndex[%d] Entries[%v]", rf.me, rf.leaderId, args.Term, rf.currentTerm, rf.role, len(rf.log), args.PrevLogIndex, args.PrevLogTerm, rf.commitIndex, args.Entries) reply.Term = rf.currentTerm reply.Success = false defer func() { DPrintf("RaftNode[%d] Return AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s] logIndex[%d] prevLogIndex[%d] prevLogTerm[%d] Success[%v] commitIndex[%d] log[%v]", rf.me, rf.leaderId, args.Term, rf.currentTerm, rf.role, len(rf.log), args.PrevLogIndex, args.PrevLogTerm, reply.Success, rf.commitIndex, rf.log) }() if args.Term < rf.currentTerm { return } // 发现更大的任期,则转为该任期的follower if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.role = ROLE_FOLLOWER rf.votedFor = -1 rf.leaderId = -1 rf.persist() // 继续向下走 } // 认识新的leader rf.leaderId = args.LeaderId // 刷新活跃时间 rf.lastActiveTime = time.Now() // appendEntries RPC , receiver 2) // 如果本地没有前一个日志的话,那么false if len(rf.log) < args.PrevLogIndex { return } // 如果本地有前一个日志的话,那么term必须相同,否则false if args.PrevLogIndex > 0 && rf.log[args.PrevLogIndex - 1].Term != args.PrevLogTerm { return } for i, logEntry := range args.Entries { index := args.PrevLogIndex + i + 1 if index > len(rf.log) { rf.log = append(rf.log, logEntry) } else { // 重叠部分 if rf.log[index - 1].Term != logEntry.Term { rf.log = rf.log[:index - 1] // 删除当前以及后续所有log rf.log = append(rf.log, logEntry) // 把新log加入进来 } // term一样啥也不用做,继续向后比对Log } } rf.persist() // 更新提交下标 if args.LeaderCommit > rf.commitIndex { rf.commitIndex = args.LeaderCommit if len(rf.log) < rf.commitIndex { rf.commitIndex = len(rf.log) } } reply.Success = true } |
日志提交协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
func (rf *Raft) applyLogLoop(applyCh chan ApplyMsg) { for !rf.killed(){ time.Sleep(10 * time.Millisecond) var appliedMsgs = make([]ApplyMsg, 0) func() { rf.mu.Lock() defer rf.mu.Unlock() for rf.commitIndex > rf.lastApplied { rf.lastApplied += 1 appliedMsgs = append(appliedMsgs, ApplyMsg{ CommandValid: true, Command: rf.log[rf.lastApplied-1].Command, CommandIndex: rf.lastApplied, CommandTerm: rf.log[rf.lastApplied - 1].Term, }) DPrintf("RaftNode[%d] applyLog, currentTerm[%d] lastApplied[%d] commitIndex[%d]", rf.me, rf.currentTerm, rf.lastApplied, rf.commitIndex) } }() // 锁外提交给应用层 for _, msg := range appliedMsgs { applyCh <- msg } } } |
总结Lab2B
- nextIndex是leader对follower日志同步进度的猜测,matchIndex则是实际获知到的同步进度,leader需要不断的appendEntries来和follower进行反复校对,直到PrevLogIndex、PrevLogTerm符合Raft论文约束。
- Leader更新commitIndex需要计算大多数节点拥有的日志范围,通过对matchIndex排序找中位数的index,就是大多数节点都拥有的日志范围,将其设置为commitIndex。
- Follower收到appendEntries时,一定要在处理完log写入后再更新commitIndex,因为论文中要求Follower的commitIndex是min(local log index,leaderCommitIndex)。
- Leader在不断调整某个follower的nextIndex过程中,注意不要让nextIndex减小到1以下,因为nextIndex的语义是follower的下一个日志写入下标。
- requestVote接收端,一定要严格根据论文判定发起方的lastLogIndex和lastLogTerm是否符合条件,这里很容易写错。
- appendEntries接收端,一定要严格遵从prevLogIndex和prevLogTerm的论文校验逻辑,首先看一下prevLogIndex处是否有本地日志(prevLogIndex==0除外,相当于从头同步日志),没有的话则还需要leader来继续回退nextIndex直到prevLogIndex位置有日志。在prevLogIndex有日志的前提下,还需要进一步判断prevLogIndex位置的Term是否一样。
- appendEntries接收端返回success=true的前提必须是将leader传来的日志处理完了并且更新了commitIndex之后。
- appendEntries接收端,要做好日志冲突时的后续全部截断逻辑。
- 注意当log、voteFor、commitIndex三个状态变化后,返回RPC前一定要persist()持久化一下(Lab2B并不要求persist,但是Lab3C要求,所以在这里先做好,免得Lab2C回头加)
- 日志提交通过一个独立的goroutine实现就行,定期判断lastApplied和commitIndex之间的有日志就提交即可,注意提交效率(copy出来释放掉锁,然后往应用层推)
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

请问一下博主,又没有考虑论文的图Figure 8情况,以及5.4.2末尾的解决方案,我在代码里没看出来体现
appendEntriesLoop第89行, 发现了= =
额,加油加油
大家代码不通过的话,要额外考虑Figure 8的情况,我加点代码在requestVote便可通过了
嗷嗷我还补充了conflictIndex之类的,debug好久,最后补充Figure 8情况就通过了
1
1