MIT 6.824: Distributed Systems- 实现Raft Lab2A
MIT 6.824: Distributed Systems是麻省理工大学的研究生公开课,主讲分布式系统。
该课程一共包含Lab1-4共4个大作业,Lab1是实现Mapreduce原型,Lab2-4是实现Raft以及基于Raft实现分布式KV存储。
我正在实现Lab2,也就是Raft核心算法,它被划分成了Lab2A、Lab2B、Lab2C三个子任务:
- Lab2A:实现leader election、heartbeat。
- Lab2B:实现Log replication。
- Lab2C:实现state persistent。
这个拆解符合Raft算法的描述,也就是选主、日志同步、状态存储,是可以分开实现的,最终凑出一个完整的Raft实现。
学习资料
以MIT课程教案与视频为主:
- 教案:https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
- 中文视频(因盈利原因,缺少Lab2B~Lab2C):https://www.bilibili.com/video/BV1x7411M7Sf
- 英文视频(带字幕,可英可中):https://www.youtube.com/watch?v=cQP8WApzIQQ&list=PLrw6a1wE39_tb2fErI4-WkMbsvGQk9_UB
中英文对照学习Raft论文:
- 中文:https://yuerblog.cc/wp-content/uploads/raft-zh_cn.pdf
- 英文:https://pdos.csail.mit.edu/6.824/papers/raft-extended.pdf
实现Raft的时候基本就盯着Figure2的图片即可:
Lab2A实现
因为课程分为Lab2A、Lab2B、Lab2C,为了能留住每个子任务的代码状态,我决定把每个子Lab的代码实现贴到博客上,方便拆分回看,我不会讲解代码细节。
下面是我的Lab2A实现,已经通过了课程的单元测试:
结构体定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
// 日志项 type LogEntry struct { Command interface{} Term int } // 当前角色 const ROLE_LEADER = "Leader" const ROLE_FOLLOWER = "Follower" const ROLE_CANDIDATES = "Candidates" // // A Go object implementing a single Raft peer. // type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() // Your data here (2A, 2B, 2C). // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain. // 所有服务器,持久化状态(lab-2A不要求持久化) currentTerm int // 见过的最大任期 votedFor int // 记录在currentTerm任期投票给谁了 log []*LogEntry // 操作日志 // 所有服务器,易失状态 commitIndex int // 已知的最大已提交索引 lastApplied int // 当前应用到状态机的索引 // 仅Leader,易失状态(成为leader时重置) nextIndex []int // 每个follower的log同步起点索引(初始为leader log的最后一项) matchIndex []int // 每个follower的log同步进度(初始为0),和nextIndex强关联 // 所有服务器,选举相关状态 role string // 身份 leaderId int // leader的id lastActiveTime time.Time // 上次活跃时间(刷新时机:收到leader心跳、给其他candidates投票、请求其他节点投票) lastBroadcastTime time.Time // 作为leader,上次的广播时间 } // // example RequestVote RPC arguments structure. // field names must start with capital letters! // type RequestVoteArgs struct { // Your data here (2A, 2B). Term int CandidateId int LastLogIndex int LastLogTerm int } // // example RequestVote RPC reply structure. // field names must start with capital letters! // type RequestVoteReply struct { // Your data here (2A). Term int VoteGranted bool } type AppendEntriesArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []*LogEntry LeaderCommit int } type AppendEntriesReply struct { Term int Success bool } |
Make 程序入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me // Your initialization code here (2A, 2B, 2C). rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.votedFor = -1 rf.lastActiveTime = time.Now() // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) // election逻辑 go rf.electionLoop() // leader逻辑 go rf.appendEntriesLoop() DPrintf("Raftnode[%d]启动", me) return rf } |
GetState(单元测试会调用)
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// return currentTerm and whether this server // believes it is the leader. func (rf *Raft) GetState() (int, bool) { rf.mu.Lock() defer rf.mu.Unlock() var term int var isleader bool // Your code here (2A). term = rf.currentTerm isleader = rf.role == ROLE_LEADER return term, isleader } |
RequestVote相关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
// // example RequestVote RPC handler. // func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (2A, 2B). rf.mu.Lock() defer rf.mu.Unlock() reply.Term = rf.currentTerm reply.VoteGranted = false DPrintf("RaftNode[%d] Handle RequestVote, CandidatesId[%d] Term[%d] CurrentTerm[%d] LastLogIndex[%d] LastLogTerm[%d] votedFor[%d]", rf.me, args.CandidateId, args.Term, rf.currentTerm, args.LastLogIndex, args.LastLogTerm, rf.votedFor) defer func() { DPrintf("RaftNode[%d] Return RequestVote, CandidatesId[%d] VoteGranted[%v] ", rf.me, args.CandidateId, reply.VoteGranted) }() // 任期不如我大,拒绝投票 if args.Term < rf.currentTerm { return } // 发现更大的任期,则转为该任期的follower if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.role = ROLE_FOLLOWER rf.votedFor = -1 rf.leaderId = -1 // 继续向下走,进行投票 } // 每个任期,只能投票给1人 if rf.votedFor == -1 || rf.votedFor == args.CandidateId { // candidate的日志必须比我的新 // 1, 最后一条log,任期大的更新 // 2,更长的log则更新 lastLogTerm := 0 if len(rf.log) != 0 { lastLogTerm = rf.log[len(rf.log)-1].Term } if args.LastLogTerm < lastLogTerm || args.LastLogIndex < len(rf.log) { return } rf.votedFor = args.CandidateId reply.VoteGranted = true rf.lastActiveTime = time.Now() // 为其他人投票,那么重置自己的下次投票时间 } rf.persist() } func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { ok := rf.peers[server].Call("Raft.RequestVote", args, reply) return ok } func (rf *Raft) electionLoop() { for !rf.killed() { time.Sleep(1 * time.Millisecond) func() { rf.mu.Lock() defer rf.mu.Unlock() now := time.Now() timeout := time.Duration(200+rand.Int31n(150)) * time.Millisecond // 超时随机化 elapses := now.Sub(rf.lastActiveTime) // follower -> candidates if rf.role == ROLE_FOLLOWER { if elapses >= timeout { rf.role = ROLE_CANDIDATES DPrintf("RaftNode[%d] Follower -> Candidate", rf.me) } } // 请求vote if rf.role == ROLE_CANDIDATES && elapses >= timeout { rf.lastActiveTime = now // 重置下次选举时间 rf.currentTerm += 1 // 发起新任期 rf.votedFor = rf.me // 该任期投了自己 rf.persist() // 请求投票req args := RequestVoteArgs{ Term: rf.currentTerm, CandidateId: rf.me, LastLogIndex: len(rf.log), } if len(rf.log) != 0 { args.LastLogTerm = rf.log[len(rf.log)-1].Term } rf.mu.Unlock() DPrintf("RaftNode[%d] RequestVote starts, Term[%d] LastLogIndex[%d] LastLogTerm[%d]", rf.me, args.Term, args.LastLogIndex, args.LastLogTerm) // 并发RPC请求vote type VoteResult struct { peerId int resp *RequestVoteReply } voteCount := 1 // 收到投票个数(先给自己投1票) finishCount := 1 // 收到应答个数 voteResultChan := make(chan *VoteResult, len(rf.peers)) for peerId := 0; peerId < len(rf.peers); peerId++ { go func(id int) { if id == rf.me { return } resp := RequestVoteReply{} if ok := rf.sendRequestVote(id, &args, &resp); ok { voteResultChan <- &VoteResult{peerId: id, resp: &resp} } else { voteResultChan <- &VoteResult{peerId: id, resp: nil} } }(peerId) } maxTerm := 0 for { select { case voteResult := <-voteResultChan: finishCount += 1 if voteResult.resp != nil { if voteResult.resp.VoteGranted { voteCount += 1 } if voteResult.resp.Term > maxTerm { maxTerm = voteResult.resp.Term } } // 得到大多数vote后,立即离开 if finishCount == len(rf.peers) || voteCount > len(rf.peers)/2 { goto VOTE_END } } } VOTE_END: rf.mu.Lock() defer func() { DPrintf("RaftNode[%d] RequestVote ends, finishCount[%d] voteCount[%d] Role[%s] maxTerm[%d] currentTerm[%d]", rf.me, finishCount, voteCount, rf.role, maxTerm, rf.currentTerm) }() // 如果角色改变了,则忽略本轮投票结果 if rf.role != ROLE_CANDIDATES { return } // 发现了更高的任期,切回follower if maxTerm > rf.currentTerm { rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.currentTerm = maxTerm rf.votedFor = -1 rf.persist() return } // 赢得大多数选票,则成为leader if voteCount > len(rf.peers)/2 { rf.role = ROLE_LEADER rf.leaderId = rf.me rf.lastBroadcastTime = time.Unix(0, 0) // 令appendEntries广播立即执行 return } } }() } } |
AppendEnties(仅心跳)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() DPrintf("RaftNode[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s]", rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.role) defer func() { DPrintf("RaftNode[%d] Return AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s]", rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.role) }() reply.Term = rf.currentTerm reply.Success = false if args.Term < rf.currentTerm { return } // 发现更大的任期,则转为该任期的follower if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.role = ROLE_FOLLOWER rf.votedFor = -1 rf.leaderId = -1 // 继续向下走 } // 认识新的leader rf.leaderId = args.LeaderId // 刷新活跃时间 rf.lastActiveTime = time.Now() // 日志操作lab-2A不实现 rf.persist() } func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool { ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) return ok } // lab-2A只做心跳,不考虑log同步 func (rf *Raft) appendEntriesLoop() { for !rf.killed() { time.Sleep(1 * time.Millisecond) func() { rf.mu.Lock() defer rf.mu.Unlock() // 只有leader才向外广播心跳 if rf.role != ROLE_LEADER { return } // 100ms广播1次 now := time.Now() if now.Sub(rf.lastBroadcastTime) < 100*time.Millisecond { return } rf.lastBroadcastTime = time.Now() // 并发RPC心跳 type AppendResult struct { peerId int resp *AppendEntriesReply } for peerId := 0; peerId < len(rf.peers); peerId++ { if peerId == rf.me { continue } args := AppendEntriesArgs{} args.Term = rf.currentTerm args.LeaderId = rf.me // log相关字段在lab-2A不处理 go func(id int, args1 *AppendEntriesArgs) { DPrintf("RaftNode[%d] appendEntries starts, myTerm[%d] peerId[%d]", rf.me, args1.Term, id) reply := AppendEntriesReply{} if ok := rf.sendAppendEntries(id, args1, &reply); ok { rf.mu.Lock() defer rf.mu.Unlock() if reply.Term > rf.currentTerm { // 变成follower rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.currentTerm = reply.Term rf.votedFor = -1 rf.persist() } DPrintf("RaftNode[%d] appendEntries ends, peerTerm[%d] myCurrentTerm[%d] myRole[%s]", rf.me, reply.Term, rf.currentTerm, rf.role) } }(peerId, &args) } }() } } |
总结Lab2A
- 一把大锁保护好状态,RPC期间释放锁,RPC结束后注意状态二次判定
- request/response都要先判断term > currentTerm,转换follower
- 一个currentTerm只能voteFor其他节点1次
- 注意candidates请求vote的时间随机性
- 注意requestVote得到大多数投票后立即结束等待剩余RPC
- 注意成为leader后尽快appendEntries心跳,否则其他节点又会成为candidates
- 注意几个刷新选举超时时间的逻辑点
其他了解:
- 虽然Lab2A不要求关注persist状态的落地(currentTerm,voteFor,[]log),但是我稍微扫了一眼代码,发现它的persist其实还是写在内存里,供单元测试去check结果,因此猜测这门课可能不会真的实现raft如何持久化log这个存储部分(从目前来看的确是这样的)。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

你好我想請教一下, 為何在 appendEntriesLoop() 這個 function 裡面我們不需要在 78行 之前 Unlock()?
因為,我看到你在 electionLoop() 這個 function 裡面, 有實現提前 Unlock 這功能。
而且,你在最後的總結裡提到我們應該要在RPC期間釋放鎖,照這樣說的話我們在 appendEntriesLoop() 這個 function 裡面實行傳送RPC功能之前應該也要先 Unlock() 對吧?
RPC期间不能lock,否则性能就不行了。
既然不能悲观锁,那就只能活锁,也就是RPC完成后lock住做state的再次检查,确保还是之前的状态。
看这里,这里有讲,会死锁的
http://nil.csail.mit.edu/6.824/2020/labs/raft-locking.txt
您好,我有个问题想请教。我们在做leader选举的时候,各个节点应该是分布在一段时间区域内的不同时间点开始选举。那么为什么在make函数里,我们需要 go electionLoop()呢?这点我有点想不明白,能解答一二吗?
因为随时都可能网络分区,然后其他小团伙就选出新leader,然后网络分区恢复,然后你收到新leader的心跳,然后你臣服为follower,然后又网络分区了,你发现收不到leader心跳了,于是自告奋勇宣告相当leader让大家投票,所以electionLoop一定是要持续监测的呀。
这个定时器思路可以,我是用的timer,感觉bug不断
在选举投票的阶段是不是假设机器不会崩溃,以及rpc的往返时间要小于投票的超时时间,要不然会一直卡在for{}这个循环里面吧
1