MIT 6.824: Distributed Systems- 实现Raft Lab3B
接上文《MIT 6.824: Distributed Systems- 实现Raft Lab3A》,Raft Lab3B要求实现snapshot,避免log不断变大无法继续运作。
Lab3B比之前的Lab都要难,因为论文对snapshot部分的描述非常粗略,对个人Raft理解的要求是比较高的,工程上涉及KV层和Raft层的状态联动,牵扯到的代码变动范围覆盖整个Raft实现,因此需要极为仔细。
我在实现的过程中为了逐步逼近正确代码,首先在Raft层引入了LastIncludedIndex和LastIncludedTerm两个持久化状态,然后将原有Raft逻辑逐步适配上来,不断执行Lab3A单元测试来验证兼容性,最终一次性通过了Lab3B测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
go test -run 3B Test: InstallSnapshot RPC (3B) ... ... Passed -- 8.5 3 483 63 Test: snapshot size is reasonable (3B) ... ... Passed -- 83.9 3 2570 800 Test: restarts, snapshots, one client (3B) ... labgob warning: Decoding into a non-default variable/field int may not work ... Passed -- 19.0 5 1406 146 Test: restarts, snapshots, many clients (3B) ... ... Passed -- 26.8 5 18655 2960 Test: unreliable net, snapshots, many clients (3B) ... ... Passed -- 18.5 5 1878 366 Test: unreliable net, restarts, snapshots, many clients (3B) ... ... Passed -- 24.0 5 2555 384 Test: unreliable net, restarts, partitions, snapshots, many clients (3B) ... ... Passed -- 32.7 5 2517 218 Test: unreliable net, restarts, partitions, snapshots, many clients, linearizability checks (3B) ... ... Passed -- 25.8 7 5496 440 PASS ok _/Users/smzdm/Documents/github/6.824/src/kvraft 239.765s (base) smzdm@smzdmdeMacBook-Pro kvraft % git status |
至此,整个Raft代码的完整实现已经放在了github中,引入Snapshot后整个代码变得复杂了不少,大家可以参考我的实现思路:https://github.com/owenliang/mit-6.824。
snapshot实现关键
snapshot包含什么
首先,snapshot除了包含KV层的完整KV Map外,还需要包含Client SeqID Map,这样当leader宕机后安装snapshot后,当继续提交后续log时才能继续保证幂等性(与snapshot被压缩的log之间幂等判定)。
我在KV层使用独立的协程检测raft层log长度,一旦到达阈值则产生一个snapshot并下沉状态到raft层进行log截断等变更:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
func (kv *KVServer) snapshotLoop() { for !kv.killed() { var snapshot []byte var lastIncludedIndex int // 锁内dump snapshot func() { // 如果raft log超过了maxraftstate大小,那么对kvStore快照下来 if kv.maxraftstate != -1 && kv.rf.ExceedLogSize(kv.maxraftstate) { // 这里调用ExceedLogSize不要加kv锁,否则会死锁 // 锁内快照,离开锁通知raft处理 kv.mu.Lock() w := new(bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(kv.kvStore) // kv键值对 e.Encode(kv.seqMap) // 当前各客户端最大请求编号,也要随着snapshot走 snapshot = w.Bytes() lastIncludedIndex = kv.lastAppliedIndex DPrintf("KVServer[%d] KVServer dump snapshot, snapshotSize[%d] lastAppliedIndex[%d]", kv.me, len(snapshot), kv.lastAppliedIndex) kv.mu.Unlock() } }() // 锁外通知raft层截断,否则有死锁 if snapshot != nil { // 通知raft落地snapshot并截断日志(都是已提交的日志,不会因为主从切换截断,放心操作) kv.rf.TakeSnapshot(snapshot, lastIncludedIndex) } time.Sleep(10 * time.Millisecond) } } |
raft负责将KV传来的snapshot和snapshot的最后index持久化下来,这里注意判断raft log长度时不要加KV锁,否则会出现Raft和KV层死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
// 保存snapshot,截断log func (rf *Raft) TakeSnapshot(snapshot []byte, lastIncludedIndex int) { rf.mu.Lock() defer rf.mu.Unlock() // 已经有更大index的snapshot了 if lastIncludedIndex <= rf.lastIncludedIndex { return } // 快照的当前元信息 DPrintf("RafeNode[%d] TakeSnapshot begins, IsLeader[%v] snapshotLastIndex[%d] lastIncludedIndex[%d] lastIncludedTerm[%d]", rf.me, rf.leaderId==rf.me, lastIncludedIndex, rf.lastIncludedIndex, rf.lastIncludedTerm) // 要压缩的日志长度 compactLogLen := lastIncludedIndex - rf.lastIncludedIndex // 更新快照元信息 rf.lastIncludedTerm = rf.log[rf.index2LogPos(lastIncludedIndex)].Term rf.lastIncludedIndex = lastIncludedIndex // 压缩日志 afterLog := make([]LogEntry, len(rf.log) - compactLogLen) copy(afterLog, rf.log[compactLogLen:]) rf.log = afterLog // 把snapshot和raftstate持久化 rf.persister.SaveStateAndSnapshot(rf.raftStateForPersist(), snapshot) DPrintf("RafeNode[%d] TakeSnapshot ends, IsLeader[%v] snapshotLastIndex[%d] lastIncludedIndex[%d] lastIncludedTerm[%d]", rf.me, rf.leaderId==rf.me, lastIncludedIndex, rf.lastIncludedIndex, rf.lastIncludedTerm) } |
在这里就可以对snapshot部分的log进行截断处理了,同时注意新snapshot index一定要比原先的snapshot index更大,否则这个snapshot就没必要做了。
1 2 3 4 5 6 7 8 9 |
// 日志是否需要压缩 func (rf *Raft) ExceedLogSize(logSize int) bool { rf.mu.Lock() defer rf.mu.Unlock() if rf.persister.RaftStateSize() >= logSize { return true } return false } |
判断是否有必要snapshot只需要走Raft层看一下实际序列化后的raftstatesize(也就是log+currentTerm+voteFor加起来的大小),即日志太长就可以snapshot一下了。
raft如何向kv层安装snapshot
当follower收到leader发来的snapshot时,它会保存快照并进行日志截断等处理,然后将日志提交游标重置到snapshot之后,并将将snapshot安装给KV层。
raft层是通过applyChan向kv层安装snapshot的,并且保证此后向applyChan投入的log都是紧随snapshot之后的。
在KV层的apply协程,在原本只能逐条apply log的逻辑基础上,增加一个分支来apply一个snapshot:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
func (kv *KVServer) applyLoop() { for !kv.killed() { select { case msg := <-kv.applyCh: // 如果是安装快照 if !msg.CommandValid { func() { kv.mu.Lock() defer kv.mu.Unlock() if len(msg.Snapshot) == 0 { // 空快照,清空数据 kv.kvStore = make(map[string]string) kv.seqMap = make(map[int64]int64) } else { // 反序列化快照, 安装到内存 r := bytes.NewBuffer(msg.Snapshot) d := labgob.NewDecoder(r) d.Decode(&kv.kvStore) d.Decode(&kv.seqMap) } // 已应用到哪个索引 kv.lastAppliedIndex = msg.LastIncludedIndex DPrintf("KVServer[%d] installSnapshot, kvStore[%v], seqMap[%v] lastAppliedIndex[%v]", kv.me, len(kv.kvStore), len(kv.seqMap), kv.lastAppliedIndex) }() } else { // 如果是普通log cmd := msg.Command index := msg.CommandIndex func() { kv.mu.Lock() defer kv.mu.Unlock() // 更新已经应用到的日志 kv.lastAppliedIndex = index // 操作日志 op := cmd.(*Op) opCtx, existOp := kv.reqMap[index] prevSeq, existSeq := kv.seqMap[op.ClientId] kv.seqMap[op.ClientId] = op.SeqId if existOp { // 存在等待结果的RPC, 那么判断状态是否与写入时一致 if opCtx.op.Term != op.Term { opCtx.wrongLeader = true } } // 只处理ID单调递增的客户端写请求 if op.Type == OP_TYPE_PUT || op.Type == OP_TYPE_APPEND { if !existSeq || op.SeqId > prevSeq { // 如果是递增的请求ID,那么接受它的变更 if op.Type == OP_TYPE_PUT { // put操作 kv.kvStore[op.Key] = op.Value } else if op.Type == OP_TYPE_APPEND { // put-append操作 if val, exist := kv.kvStore[op.Key]; exist { kv.kvStore[op.Key] = val + op.Value } else { kv.kvStore[op.Key] = op.Value } } } else if existOp { opCtx.ignored = true } } else { // OP_TYPE_GET if existOp { opCtx.value, opCtx.keyExist = kv.kvStore[op.Key] } } DPrintf("KVServer[%d] applyLoop, kvStore[%v]", kv.me, len(kv.kvStore)) // 唤醒挂起的RPC if existOp { close(opCtx.committed) } }() } } } } |
安装snapshot后,内存状态被重置,此时相当于lastIncludedIndex之前的日志被提交到KV层,后续继续等待Raft层提交的Log即可。(Raft层会保证snapshot之后紧跟着后续log)
首次启动恢复snapshot
当一个node宕机重启后,是由raft层读取持久化的snapshot,通过applyChan向KV层做首次安装的。
虽然KV层读取applyChan也是协程异步处理的,但是因为我们的KV层无论读写操作都是先写Log等提交后再执行,因此任何读写操作在applyChan内都一定会排到snapshot安装之后,所以首次异步安装snapshot是可靠的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me // Your initialization code here (2A, 2B, 2C). rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.votedFor = -1 rf.lastIncludedIndex = 0 rf.lastIncludedTerm = 0 rf.lastActiveTime = time.Now() rf.applyCh = applyCh // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) // 向application层安装快照 rf.installSnapshotToApplication() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
func (rf *Raft) installSnapshotToApplication() { var applyMsg *ApplyMsg // 同步给application层的快照 applyMsg = &ApplyMsg{ CommandValid: false, Snapshot: rf.persister.ReadSnapshot(), LastIncludedIndex: rf.lastIncludedIndex, LastIncludedTerm: rf.lastIncludedTerm, } // 快照部分就已经提交给application了,所以后续applyLoop提交日志后移 rf.lastApplied = rf.lastIncludedIndex DPrintf("RaftNode[%d] installSnapshotToApplication, snapshotSize[%d] lastIncludedIndex[%d] lastIncludedTerm[%d]", rf.me, len(applyMsg.Snapshot), applyMsg.LastIncludedIndex, applyMsg.LastIncludedTerm) rf.applyCh <- *applyMsg return } |
快照传给KV层后,要更新Raft层的lastApplied索引,强制从snapshot之后的log继续向KV层提交Log。
raft层新增2个snapshot持久化状态
1 2 3 4 5 6 |
// 所有服务器,持久化状态(lab-2A不要求持久化) currentTerm int // 见过的最大任期 votedFor int // 记录在currentTerm任期投票给谁了 log []LogEntry // 操作日志 lastIncludedIndex int // snapshot最后1个logEntry的index,没有snapshot则为0 lastIncludedTerm int // snapthost最后1个logEntry的term,没有snaphost则无意义 |
这样node重启后可以知道snapshot的index范围,以便继续向后工作。
1 2 3 4 5 6 7 8 9 10 11 |
func (rf *Raft) raftStateForPersist() []byte { w := new(bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(rf.currentTerm) e.Encode(rf.votedFor) e.Encode(rf.log) e.Encode(rf.lastIncludedIndex) e.Encode(rf.lastIncludedTerm) data := w.Bytes() return data } |
降低snapshot带来的逻辑复杂度
raft原先很多地方都用到log长度和最后log的term等信息,在引入snapshot情况下的就得考虑最后一个index是snapshot,另外计算log数组下标也变得复杂,因此获取最后的index和最后的term的逻辑进行了封装:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// 最后的index func (rf *Raft) lastIndex() int { return rf.lastIncludedIndex + len(rf.log) } // 最后的term func (rf *Raft) lastTerm() (lastLogTerm int) { lastLogTerm = rf.lastIncludedTerm // for snapshot if len(rf.log) != 0 { lastLogTerm = rf.log[len(rf.log)-1].Term } return } |
根据index计算log数组下标:
1 2 3 4 |
// 日志index转化成log数组下标 func (rf *Raft) index2LogPos(index int) (pos int) { return index - rf.lastIncludedIndex - 1 } |
有这些小函数的帮忙,才能继续改造其他逻辑。
applyLoop提交协程
提交日志到KV层的协程,修改很小,就是取log时候注意下标改成相对值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
for !rf.killed() { time.Sleep(10 * time.Millisecond) var appliedMsgs = make([]ApplyMsg, 0) func() { rf.mu.Lock() defer rf.mu.Unlock() for rf.commitIndex > rf.lastApplied { rf.lastApplied += 1 appliedIndex := rf.index2LogPos(rf.lastApplied) appliedMsgs = append(appliedMsgs, ApplyMsg{ CommandValid: true, Command: rf.log[appliedIndex].Command, CommandIndex: rf.lastApplied, CommandTerm: rf.log[appliedIndex].Term, }) DPrintf("RaftNode[%d] applyLog, currentTerm[%d] lastApplied[%d] commitIndex[%d]", rf.me, rf.currentTerm, rf.lastApplied, rf.commitIndex) } }() // 锁外提交给应用层 for _, msg := range appliedMsgs { rf.applyCh <- msg } } |
这里不需要担心lastApplied位置是snapshot,因为其他关于snapshot处理的逻辑都会保障当snapshot的范围发生变化后一定会调整lastApplied为snapshot之后的位置。
因为snapshot也会向applyCh投递消息,为了保证安装snapshot到applyCh之后投递的Log是紧随snapshot位置之后的log,因此lastApplied状态修改和Log投递必须都在锁内,锁外投递将导致snapshot和log乱序,导致提交时序混乱。
appendEntriesLoop新跳协程
原先该loop只负责向follower同步log,但现在需要增加一个同步snapshot的逻辑。
当发现某个Follower的nextIndex落入了leader的snapshot索引范围内,那么leader就只能向follower 发送snapshot了。
因此,该函数改造成2分叉逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
// lab-2A只做心跳,不考虑log同步 func (rf *Raft) appendEntriesLoop() { for !rf.killed() { time.Sleep(10 * time.Millisecond) func() { rf.mu.Lock() defer rf.mu.Unlock() // 只有leader才向外广播心跳 if rf.role != ROLE_LEADER { return } // 100ms广播1次 now := time.Now() if now.Sub(rf.lastBroadcastTime) < 100*time.Millisecond { return } rf.lastBroadcastTime = time.Now() // 向所有follower发送心跳 for peerId := 0; peerId < len(rf.peers); peerId++ { if peerId == rf.me { continue } // 如果nextIndex在leader的snapshot内,那么直接同步snapshot if rf.nextIndex[peerId] <= rf.lastIncludedIndex { rf.doInstallSnapshot(peerId) } else { // 否则同步日志 rf.doAppendEntries(peerId) } } }() } } |
改造doAppendEntries逻辑
能够进入该函数,说明要同步的nextIndex一定是Log形态。
复杂之处在于,我们需要考虑prevLogIndex恰好是snapshot最后一条log的情况,因此我在实现的时候比较严谨的列出了所有case,确保代码容易解释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
// 已兼容snapshot func (rf *Raft) doAppendEntries(peerId int) { args := AppendEntriesArgs{} args.Term = rf.currentTerm args.LeaderId = rf.me args.LeaderCommit = rf.commitIndex args.Entries = make([]LogEntry, 0) args.PrevLogIndex = rf.nextIndex[peerId] - 1 // 如果prevLogIndex是leader快照的最后1条log, 那么取快照的最后1个term if args.PrevLogIndex == rf.lastIncludedIndex { args.PrevLogTerm = rf.lastIncludedTerm } else { // 否则一定是log部分 args.PrevLogTerm = rf.log[rf.index2LogPos(args.PrevLogIndex)].Term } args.Entries = append(args.Entries, rf.log[rf.index2LogPos(args.PrevLogIndex+1):]...) DPrintf("RaftNode[%d] appendEntries starts, currentTerm[%d] peer[%d] logIndex=[%d] nextIndex[%d] matchIndex[%d] args.Entries[%d] commitIndex[%d]", rf.me, rf.currentTerm, peerId, rf.lastIndex(), rf.nextIndex[peerId], rf.matchIndex[peerId], len(args.Entries), rf.commitIndex) go func() { // DPrintf("RaftNode[%d] appendEntries starts, myTerm[%d] peerId[%d]", rf.me, args1.Term, id) reply := AppendEntriesReply{} if ok := rf.sendAppendEntries(peerId, &args, &reply); ok { rf.mu.Lock() defer rf.mu.Unlock() defer func() { DPrintf("RaftNode[%d] appendEntries ends, currentTerm[%d] peer[%d] logIndex=[%d] nextIndex[%d] matchIndex[%d] commitIndex[%d]", rf.me, rf.currentTerm, peerId, rf.lastIndex(), rf.nextIndex[peerId], rf.matchIndex[peerId], rf.commitIndex) }() // 如果不是rpc前的leader状态了,那么啥也别做了 if rf.currentTerm != args.Term { return } if reply.Term > rf.currentTerm { // 变成follower rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.currentTerm = reply.Term rf.votedFor = -1 rf.persist() return } // 因为RPC期间无锁, 可能相关状态被其他RPC修改了 // 因此这里得根据发出RPC请求时的状态做更新,而不要直接对nextIndex和matchIndex做相对加减 if reply.Success { // 同步日志成功 rf.nextIndex[peerId] = args.PrevLogIndex + len(args.Entries) + 1 rf.matchIndex[peerId] = rf.nextIndex[peerId] - 1 rf.updateCommitIndex() // 更新commitIndex } else { // 回退优化,参考:https://thesquareplanet.com/blog/students-guide-to-raft/#an-aside-on-optimizations nextIndexBefore := rf.nextIndex[peerId] // 仅为打印log if reply.ConflictTerm != -1 { // follower的prevLogIndex位置term冲突了 // 我们找leader log中conflictTerm最后出现位置,如果找到了就用它作为nextIndex,否则用follower的conflictIndex conflictTermIndex := -1 for index := args.PrevLogIndex; index > rf.lastIncludedIndex; index-- { if rf.log[rf.index2LogPos(index)].Term == reply.ConflictTerm { conflictTermIndex = index break } } if conflictTermIndex != -1 { // leader log出现了这个term,那么从这里prevLogIndex之前的最晚出现位置尝试同步 rf.nextIndex[peerId] = conflictTermIndex } else { rf.nextIndex[peerId] = reply.ConflictIndex // 用follower首次出现term的index作为同步开始 } } else { // follower没有发现prevLogIndex term冲突, 可能是被snapshot了或者日志长度不够 // 这时候我们将返回的conflictIndex设置为nextIndex即可 rf.nextIndex[peerId] = reply.ConflictIndex } DPrintf("RaftNode[%d] back-off nextIndex, peer[%d] nextIndexBefore[%d] nextIndex[%d]", rf.me, peerId, nextIndexBefore, rf.nextIndex[peerId]) } } }() } |
另外nextIndex回退逻辑也需要兼容snapshot,这块主要靠appendEntries服务端做了处理,我们需要考虑prevLogIndex落在folllower自己的snapshot范围内或者恰好在边界上或者在log部分,三种case需要认真处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
// 已兼容snapshot func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() DPrintf("RaftNode[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s] logIndex[%d] prevLogIndex[%d] prevLogTerm[%d] commitIndex[%d] Entries[%v]", rf.me, rf.leaderId, args.Term, rf.currentTerm, rf.role, rf.lastIndex(), args.PrevLogIndex, args.PrevLogTerm, rf.commitIndex, args.Entries) reply.Term = rf.currentTerm reply.Success = false reply.ConflictIndex = -1 reply.ConflictTerm = -1 defer func() { DPrintf("RaftNode[%d] Return AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s] logIndex[%d] prevLogIndex[%d] prevLogTerm[%d] Success[%v] commitIndex[%d] log[%v] ConflictIndex[%d]", rf.me, rf.leaderId, args.Term, rf.currentTerm, rf.role, rf.lastIndex(), args.PrevLogIndex, args.PrevLogTerm, reply.Success, rf.commitIndex, len(rf.log), reply.ConflictIndex) }() if args.Term < rf.currentTerm { return } // 发现更大的任期,则转为该任期的follower if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.role = ROLE_FOLLOWER rf.votedFor = -1 rf.persist() // 继续向下走 } // 认识新的leader rf.leaderId = args.LeaderId // 刷新活跃时间 rf.lastActiveTime = time.Now() // 如果prevLogIndex在快照内,且不是快照最后一个log,那么只能从index=1开始同步了 if args.PrevLogIndex < rf.lastIncludedIndex { reply.ConflictIndex = 1 return } else if args.PrevLogIndex == rf.lastIncludedIndex { // prevLogIndex正好等于快照的最后一个log if args.PrevLogTerm != rf.lastIncludedTerm { // 冲突了,那么从index=1开始同步吧 reply.ConflictIndex = 1 return } // 否则继续走后续的日志覆盖逻辑 } else { // prevLogIndex在快照之后,那么进一步判定 if args.PrevLogIndex > rf.lastIndex() { // prevLogIndex位置没有日志的case reply.ConflictIndex = rf.lastIndex() + 1 return } // prevLogIndex位置有日志,那么判断term必须相同,否则false if rf.log[rf.index2LogPos(args.PrevLogIndex)].Term != args.PrevLogTerm { reply.ConflictTerm = rf.log[rf.index2LogPos(args.PrevLogIndex)].Term for index := rf.lastIncludedIndex + 1; index <= args.PrevLogIndex; index++ { // 找到冲突term的首次出现位置,最差就是PrevLogIndex if rf.log[rf.index2LogPos(index)].Term == reply.ConflictTerm { reply.ConflictIndex = index break } } return } // 否则继续走后续的日志覆盖逻辑 } // 保存日志 for i, logEntry := range args.Entries { index := args.PrevLogIndex + 1 + i logPos := rf.index2LogPos(index) if index > rf.lastIndex() { // 超出现有日志长度,继续追加 rf.log = append(rf.log, logEntry) } else { // 重叠部分 if rf.log[logPos].Term != logEntry.Term { rf.log = rf.log[:logPos] // 删除当前以及后续所有log rf.log = append(rf.log, logEntry) // 把新log加入进来 } // term一样啥也不用做,继续向后比对Log } } rf.persist() // 更新提交下标 if args.LeaderCommit > rf.commitIndex { rf.commitIndex = args.LeaderCommit if rf.lastIndex() < rf.commitIndex { rf.commitIndex = rf.lastIndex() } } reply.Success = true } |
如果要宏观的描述的话,就是如果follower没有能力比对prevLogIndex位置的term是否冲突(prevLogIndex位于follower的snapshot范围内),那么就让leader把自己的snapshot发过来吧(也就是让conflictIndex=1)。
doInstallSnapshot逻辑
其实把snapshot发给follower没啥,就是发过去之后对方接收后,leader下一次nextIndex指向哪里呢?
我考虑把nextIndex指向leader的日志末尾就行,继续通过回退来补全follower的log部分,这样处理最简单最保守。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
func (rf *Raft) doInstallSnapshot(peerId int) { DPrintf("RaftNode[%d] doInstallSnapshot starts, leaderId[%d] peerId[%d]\n", rf.me, rf.leaderId, peerId) args := InstallSnapshotArgs{} args.Term = rf.currentTerm args.LeaderId = rf.me args.LastIncludedIndex = rf.lastIncludedIndex args.LastIncludedTerm = rf.lastIncludedTerm args.Offset = 0 args.Data = rf.persister.ReadSnapshot() args.Done = true reply := InstallSnapshotReply{} go func() { if rf.sendInstallSnapshot(peerId, &args, &reply) { rf.mu.Lock() defer rf.mu.Unlock() // 如果不是rpc前的leader状态了,那么啥也别做了 if rf.currentTerm != args.Term { return } if reply.Term > rf.currentTerm { // 变成follower rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.currentTerm = reply.Term rf.votedFor = -1 rf.persist() return } rf.nextIndex[peerId] = rf.lastIndex() + 1 // 重新从末尾同步log(未经优化,但够用) rf.matchIndex[peerId] = args.LastIncludedIndex // 已同步到的位置(未经优化,但够用) rf.updateCommitIndex() // 更新commitIndex DPrintf("RaftNode[%d] doInstallSnapshot ends, leaderId[%d] peerId[%d] nextIndex[%d] matchIndex[%d] commitIndex[%d]\n", rf.me, rf.leaderId, peerId, rf.nextIndex[peerId], rf.matchIndex[peerId], rf.commitIndex) } }() } |
installSnapshot服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
// 安装快照RPC Handler func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) { rf.mu.Lock() defer rf.mu.Unlock() DPrintf("RaftNode[%d] installSnapshot starts, rf.lastIncludedIndex[%d] rf.lastIncludedTerm[%d] args.lastIncludedIndex[%d] args.lastIncludedTerm[%d] logSize[%d]", rf.me, rf.lastIncludedIndex, rf.lastIncludedTerm, args.LastIncludedIndex, args.LastIncludedTerm, len(rf.log)) reply.Term = rf.currentTerm if args.Term < rf.currentTerm { return } // 发现更大的任期,则转为该任期的follower if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.role = ROLE_FOLLOWER rf.votedFor = -1 rf.persist() // 继续向下走 } // 认识新的leader rf.leaderId = args.LeaderId // 刷新活跃时间 rf.lastActiveTime = time.Now() // leader快照不如本地长,那么忽略这个快照 if args.LastIncludedIndex <= rf.lastIncludedIndex { return } else { // leader快照比本地快照长 if args.LastIncludedIndex < rf.lastIndex() { // 快照外还有日志,判断是否需要截断 if rf.log[rf.index2LogPos(args.LastIncludedIndex)].Term != args.LastIncludedTerm { rf.log = make([]LogEntry, 0) // term冲突,扔掉快照外的所有日志 } else { // term没冲突,保留后续日志 leftLog := make([]LogEntry, rf.lastIndex() - args.LastIncludedIndex) copy(leftLog, rf.log[rf.index2LogPos(args.LastIncludedIndex)+1:]) rf.log = leftLog } } else { rf.log = make([]LogEntry, 0) // 快照比本地日志长,日志就清空了 } } // 更新快照位置 rf.lastIncludedIndex = args.LastIncludedIndex rf.lastIncludedTerm = args.LastIncludedTerm // 持久化raft state和snapshot rf.persister.SaveStateAndSnapshot(rf.raftStateForPersist(), args.Data) // snapshot提交给应用层 rf.installSnapshotToApplication() DPrintf("RaftNode[%d] installSnapshot ends, rf.lastIncludedIndex[%d] rf.lastIncludedTerm[%d] args.lastIncludedIndex[%d] args.lastIncludedTerm[%d] logSize[%d]", rf.me, rf.lastIncludedIndex, rf.lastIncludedTerm, args.LastIncludedIndex, args.LastIncludedTerm, len(rf.log)) } |
follower只接收更长的snapshot,否则没啥意义。
另外需要看一下leader的snapshot和follower的log的关系,看一下该怎么截断,这边case需要想清楚关系。其中如果lastIncludedIndex位置的Log term和snapshot term一样,后续的日志是可以保留的,我理解并不是说后续Log是可靠的,只能说明后续log可能有效的,还是需要leader将nextIndex重置到末尾来回退验证。
最后就是更新lastIncludedIndex和lastIncluedTerm并持久化这些数据,再把snapshot安装给KV层。
其他变化
类似于requestVote、Start等方法都需要逐个检查,兼容一下snapshot带来的下标问题,大概就是这样了。
最后提一个死锁的坑点:
raft层持有rf.mu向applyCh写入可能阻塞,此时如果kv层出现一种代码逻辑是先拿到了kv.mu然后再去拿rf.mu的话,此时肯定无法拿到rf.mu(因为raft层持有rf.mu并阻塞在chan),而此刻kv层如果正在处理前一条log并试图加kv.mu,那么也无法拿到kv.mu,就会死锁。
解决办法就是kv层不要拿着kv.mu去请求rf.mu,一定要在kv.mu的锁外操作raft,谨记这一点即可。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

求问,TestSnapshotRecover3B可以通过,但是TestSnapshotRecoverManyClients3B就过不了,有可能是什么原因呢?找了很久找不到bug,博主有过类似经验吗?
一般都是性能不够导致的,需要优化一下实现让集群尽快达成一致。
博主写的不错,但是测试的时间是不是过长了。标准的时间应该在2min,你这个到了4min了
顺便求问TestSnapshotSize的测试时间结果
老哥,时间久远,记不起了。。