MIT 6.824: Distributed Systems- 实现Raft Lab2C
接上文《MIT 6.824: Distributed Systems- 实现Raft Lab2B》,Lab2C包含2个主要任务:
- 当currentTerm、voteFor、log[]更新后,调用persister将它们持久化下来,因为这3个状态是要求持久化的。
- 优化nextIndex的回退性能,即appendEntries被拒时,论文默认反复减1回退重试,导致耗费很长时间才能找到同步位置,优化后可以一次性跳过更多的index,减少RPC往复。
已通过MIT单元测试。
Lab2C实现
实现persist持久化函数非常简单,只需要调用作业提供的persister把3个持久化状态写进去就行。
Lab2C坑了半天,有一个Case一直单测不过:
go test -run TestFigure8Unreliable2C
分析是日志同步协调太慢,导致单测超时失败,发现这个Case通过制造网络分区产生了2个leader,然后不断的向2个leader都写入日志,导致产生了2个很长的歧义链。当网络分区恢复后,2个歧义链之间日志同步过程,会因为不断的prevLogTerm冲突导致nextIndex不断回退,因为每次只回退1位,进而耗时非常长。
另外发现Lab2B中的一个case也是同样道理,存在一定几率超时,原因也是歧义链的冲突处理耗时过长:
go test -run TestBackup2B
一开始怀疑是不是从Lab2B就写Bug了,结果发现的确不是bug,而是冲突处理性能问题。
持久化函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
// // save Raft's persistent state to stable storage, // where it can later be retrieved after a crash and restart. // see paper's Figure 2 for a description of what should be persistent. // func (rf *Raft) persist() { // Your code here (2C). // 不用加锁,外层逻辑会锁 w := new(bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(rf.currentTerm) e.Encode(rf.votedFor) e.Encode(rf.log) data := w.Bytes() DPrintf("RaftNode[%d] persist starts, currentTerm[%d] voteFor[%d] log[%v]", rf.me, rf.currentTerm, rf.votedFor, rf.log) rf.persister.SaveRaftState(data) } // // restore previously persisted state. // func (rf *Raft) readPersist(data []byte) { if data == nil || len(data) < 1 { // bootstrap without any state? return } // Your code here (2C). r := bytes.NewBuffer(data) d := labgob.NewDecoder(r) rf.mu.Lock() defer rf.mu.Unlock() d.Decode(&rf.currentTerm) d.Decode(&rf.votedFor) d.Decode(&rf.log) } |
只要在所有修改持久化状态之后调用persist即可。
appendEntries应答
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
type AppendEntriesArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []LogEntry LeaderCommit int } type AppendEntriesReply struct { Term int Success bool ConflictIndex int ConflictTerm int } |
Reply增加2个字段,用于leader和follower协调同步位置,优化回退性能。
appendEntries服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() DPrintf("RaftNode[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s] logIndex[%d] prevLogIndex[%d] prevLogTerm[%d] commitIndex[%d] Entries[%v]", rf.me, rf.leaderId, args.Term, rf.currentTerm, rf.role, len(rf.log), args.PrevLogIndex, args.PrevLogTerm, rf.commitIndex, args.Entries) reply.Term = rf.currentTerm reply.Success = false reply.ConflictIndex = -1 reply.ConflictTerm = - 1 defer func() { DPrintf("RaftNode[%d] Return AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s] logIndex[%d] prevLogIndex[%d] prevLogTerm[%d] Success[%v] commitIndex[%d] log[%v] ConflictIndex[%d]", rf.me, rf.leaderId, args.Term, rf.currentTerm, rf.role, len(rf.log), args.PrevLogIndex, args.PrevLogTerm, reply.Success, rf.commitIndex, rf.log, reply.ConflictIndex) }() if args.Term < rf.currentTerm { return } // 发现更大的任期,则转为该任期的follower if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.role = ROLE_FOLLOWER rf.votedFor = -1 rf.leaderId = -1 rf.persist() // 继续向下走 } // 认识新的leader rf.leaderId = args.LeaderId // 刷新活跃时间 rf.lastActiveTime = time.Now() // appendEntries RPC , receiver 2) // 如果本地没有前一个日志的话,那么false if len(rf.log) < args.PrevLogIndex { reply.ConflictIndex = len(rf.log) return } // 如果本地有前一个日志的话,那么term必须相同,否则false if args.PrevLogIndex > 0 && rf.log[args.PrevLogIndex - 1].Term != args.PrevLogTerm { reply.ConflictTerm = rf.log[args.PrevLogIndex - 1].Term for index := 1; index <= args.PrevLogIndex; index++ { // 找到冲突term的首次出现位置,最差就是PrevLogIndex if rf.log[index - 1].Term == reply.ConflictTerm { reply.ConflictIndex = index break } } return } // 保存日志 for i, logEntry := range args.Entries { index := args.PrevLogIndex + i + 1 if index > len(rf.log) { rf.log = append(rf.log, logEntry) } else { // 重叠部分 if rf.log[index - 1].Term != logEntry.Term { rf.log = rf.log[:index - 1] // 删除当前以及后续所有log rf.log = append(rf.log, logEntry) // 把新log加入进来 } // term一样啥也不用做,继续向后比对Log } } rf.persist() // 更新提交下标 if args.LeaderCommit > rf.commitIndex { rf.commitIndex = args.LeaderCommit if len(rf.log) < rf.commitIndex { rf.commitIndex = len(rf.log) } } reply.Success = true } |
重点是conflictIndex和conflictTerm的理解和处理,看这篇MIT的总结:
https://thesquareplanet.com/blog/students-guide-to-raft/#an-aside-on-optimizations、
appendEntries客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
// lab-2A只做心跳,不考虑log同步 func (rf *Raft) appendEntriesLoop() { for !rf.killed() { time.Sleep(10 * time.Millisecond) func() { rf.mu.Lock() defer rf.mu.Unlock() // 只有leader才向外广播心跳 if rf.role != ROLE_LEADER { return } // 100ms广播1次 now := time.Now() if now.Sub(rf.lastBroadcastTime) < 100*time.Millisecond { return } rf.lastBroadcastTime = time.Now() // 并发RPC心跳 type AppendResult struct { peerId int resp *AppendEntriesReply } for peerId := 0; peerId < len(rf.peers); peerId++ { if peerId == rf.me { continue } args := AppendEntriesArgs{} args.Term = rf.currentTerm args.LeaderId = rf.me args.LeaderCommit = rf.commitIndex args.Entries = make([]LogEntry, 0) args.PrevLogIndex = rf.nextIndex[peerId] - 1 if args.PrevLogIndex > 0 { args.PrevLogTerm = rf.log[args.PrevLogIndex - 1].Term } args.Entries = append(args.Entries, rf.log[rf.nextIndex[peerId]-1:]...) DPrintf("RaftNode[%d] appendEntries starts, currentTerm[%d] peer[%d] logIndex=[%d] nextIndex[%d] matchIndex[%d] args.Entries[%d] commitIndex[%d]", rf.me, rf.currentTerm, peerId, len(rf.log), rf.nextIndex[peerId], rf.matchIndex[peerId], len(args.Entries), rf.commitIndex) // log相关字段在lab-2A不处理 go func(id int, args1 *AppendEntriesArgs) { // DPrintf("RaftNode[%d] appendEntries starts, myTerm[%d] peerId[%d]", rf.me, args1.Term, id) reply := AppendEntriesReply{} if ok := rf.sendAppendEntries(id, args1, &reply); ok { rf.mu.Lock() defer rf.mu.Unlock() defer func() { DPrintf("RaftNode[%d] appendEntries ends, currentTerm[%d] peer[%d] logIndex=[%d] nextIndex[%d] matchIndex[%d] commitIndex[%d]", rf.me, rf.currentTerm, id, len(rf.log), rf.nextIndex[id], rf.matchIndex[id], rf.commitIndex) }() // 如果不是rpc前的leader状态了,那么啥也别做了 if rf.currentTerm != args1.Term { return } if reply.Term > rf.currentTerm { // 变成follower rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.currentTerm = reply.Term rf.votedFor = -1 rf.persist() return } // 因为RPC期间无锁, 可能相关状态被其他RPC修改了 // 因此这里得根据发出RPC请求时的状态做更新,而不要直接对nextIndex和matchIndex做相对加减 if reply.Success { // 同步日志成功 rf.nextIndex[id] = args1.PrevLogIndex + len(args1.Entries) + 1 rf.matchIndex[id] = rf.nextIndex[id] - 1 // 数字N, 让peer[i]的大多数>=N // peer[0]' index=2 // peer[1]' index=2 // peer[2]' index=1 // 1,2,2 // 更新commitIndex, 就是找中位数 sortedMatchIndex := make([]int, 0) sortedMatchIndex = append(sortedMatchIndex, len(rf.log)) for i := 0; i < len(rf.peers); i++ { if i == rf.me { continue } sortedMatchIndex = append(sortedMatchIndex, rf.matchIndex[i]) } sort.Ints(sortedMatchIndex) newCommitIndex := sortedMatchIndex[len(rf.peers) / 2] if newCommitIndex > rf.commitIndex && rf.log[newCommitIndex - 1].Term == rf.currentTerm { rf.commitIndex = newCommitIndex } } else { // 回退优化,参考:https://thesquareplanet.com/blog/students-guide-to-raft/#an-aside-on-optimizations nextIndexBefore := rf.nextIndex[id] // 仅为打印log if reply.ConflictTerm != -1 { // follower的prevLogIndex位置term不同 conflictTermIndex := -1 for index := args1.PrevLogIndex; index >= 1; index-- { // 找最后一个conflictTerm if rf.log[index - 1].Term == reply.ConflictTerm { conflictTermIndex = index break } } if conflictTermIndex != -1 { // leader也存在冲突term的日志,则从term最后一次出现之后的日志开始尝试同步,因为leader/follower可能在该term的日志有部分相同 rf.nextIndex[id] = conflictTermIndex + 1 } else { // leader并没有term的日志,那么把follower日志中该term首次出现的位置作为尝试同步的位置,即截断follower在此term的所有日志 rf.nextIndex[id] = reply.ConflictIndex } } else { // follower的prevLogIndex位置没有日志 rf.nextIndex[id] = reply.ConflictIndex + 1 } DPrintf("RaftNode[%d] back-off nextIndex, peer[%d] nextIndexBefore[%d] nextIndex[%d]", rf.me, id, nextIndexBefore, rf.nextIndex[id]) } } }(peerId, &args) } }() } } |
在success==false情况下,处理nextIndex回退时遵从了优化后的逻辑。
总结
lab2C只是在lab2B基础上,把持久化状态进行了persist存储,另外对日志同步性能提出了更高要求,因为它会制造网络分区形成2个leader然后向2个leader同时写入大量日志,造成2个很长的歧义日志,然而默认的论文实现每次回退1个下标进行重试是无法通过单测的。
- 回退优化原理:如果follower日志比leader短,那么leader可以直接从follower末尾的index开始尝试传日志,只有这样follower日志才不会出现空槽的情况;否则follower在prevLogIndex位置冲突的term,如果在leader中也有这个term的日志,则从leader日志中该term最后一次出现的位置开始尝试同步,避免给follower错过这个term的任何一条日志;如果冲突term在leader里压根不在,则从follower日志该term首次出现的下标开始同步,因为leader压根没有这个term的日志,相当于对follower截断。
- leader可能收到旧的appendEntries RPC应答,因此leader收到RPC应答时重新加锁后,应该注意检查currentTerm是否和RPC时的term一样,另外也不应该对nextIndex直接做-=1这样的相对计算(因为旧RPC应答之前可能新RPC已经应答并且修改了nextIndex),而是应该用RPC时的prevLogIndex等信息做绝对计算,这样是不会有问题的。
多看看MIT对6.824常见问题的总结:https://thesquareplanet.com/blog/students-guide-to-raft/#an-aside-on-optimizations
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

这好像不是过了MIT的单元测试。。只是上传成功了,需要有response 才算。。
不过时间已经过了
嗯,本地单测过了。
博主你好,我跑的这个版本的代码,应该是lab2C的完整版吧,https://github.com/owenliang/mit-6.824/blob/b88f6a745f18ef4dd9e4d0f7293cfb81fd49e064/src/raft/raft.go, 我跑了几十次,大概有4 5次过不去Test (2A): election after network failure … 和 Test (2C): Figure 8 (unreliable) …晕死。
另外,话说这篇博客appendEntries客户端的72行,
if reply.Success { // 同步日志成功
rf.nextIndex[id] = args1.PrevLogIndex + len(args1.Entries) + 1
rf.matchIndex[id] = rf.nextIndex[id] – 1
如果收到一个延迟返回的短日志老心跳包的reply.success,会不会把rf.nextIndex[id]给缩小了???
实不相瞒,我已经忘光了-,-
赋值前先判断一下nextIndex[id]有没有被缩小的风险,如果有直接丢弃,这样不知道可不可行
1
老哥,你lab4弃坑了吗= =?代码逻辑很清晰!赞
精力不够了,就那一阵闲点-,-
我想请问一下如何处理这种情况呢?3个服务器,其中一个断线重连回来,followerA断线期间在选举,导致term非常大,然后回来以后leaderB发送心跳信息发现了比自己的term大的情况,然后转换为follower,重新发起选举。原来的followerA的较大的概率先超时(leader发现的时候A已经过了一会),会先发起选举,旧的leader发起选举的时候,A已经头投给自己了,不能投给B,然后B选举不成功,B的lastPrevTerm大于A,肯定不投给A,两个都选举失败。然后还是大概率A先超时,先发起选举。请问这种情况应该怎么优化呢?
补充一下:不是3个服务器,而是5个服务器,但是只有3个存活。
这就是随机化选举超时时间的意义,由于选举超时时间是随机的,所以在几轮内就大概率会出现比A更早超时的其他follower,从而继续运行
总结中的“避免给follower错过这个term的任何一条日志”应该是“避免给follower错过prevLogIndex到这个term的上一条已经同步的日志之间的其他term的日志”吧,因为follower在回退时,直接退回到confictIndex第一次出现的位置,可能会误伤到其他term的日志
1
谢谢大佬!!!TestFigure8Unreliable2C真的很烦,明明程序没有错,排查了半天,看了半天日志,结合大佬说的,懂了!!