MIT 6.824: Distributed Systems是麻省理工大学的研究生公开课,主讲分布式系统。
该课程一共包含Lab1-4共4个大作业,Lab1是实现Mapreduce原型,Lab2-4是实现Raft以及基于Raft实现分布式KV存储。
我正在实现Lab2,也就是Raft核心算法,它被划分成了Lab2A、Lab2B、Lab2C三个子任务:
- Lab2A:实现leader election、heartbeat。
- Lab2B:实现Log replication。
- Lab2C:实现state persistent。
这个拆解符合Raft算法的描述,也就是选主、日志同步、状态存储,是可以分开实现的,最终凑出一个完整的Raft实现。
学习资料
以MIT课程教案与视频为主:
- 教案:https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
- 中文视频(因盈利原因,缺少Lab2B~Lab2C):https://www.bilibili.com/video/BV1x7411M7Sf
- 英文视频(带字幕,可英可中):https://www.youtube.com/watch?v=cQP8WApzIQQ&list=PLrw6a1wE39_tb2fErI4-WkMbsvGQk9_UB
中英文对照学习Raft论文:
- 中文:https://yuerblog.cc/wp-content/uploads/raft-zh_cn.pdf
- 英文:https://pdos.csail.mit.edu/6.824/papers/raft-extended.pdf
实现Raft的时候基本就盯着Figure2的图片即可:
Lab2A实现
因为课程分为Lab2A、Lab2B、Lab2C,为了能留住每个子任务的代码状态,我决定把每个子Lab的代码实现贴到博客上,方便拆分回看,我不会讲解代码细节。
下面是我的Lab2A实现,已经通过了课程的单元测试:
结构体定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
// 日志项 type LogEntry struct { Command interface{} Term int } // 当前角色 const ROLE_LEADER = "Leader" const ROLE_FOLLOWER = "Follower" const ROLE_CANDIDATES = "Candidates" // // A Go object implementing a single Raft peer. // type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() // Your data here (2A, 2B, 2C). // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain. // 所有服务器,持久化状态(lab-2A不要求持久化) currentTerm int // 见过的最大任期 votedFor int // 记录在currentTerm任期投票给谁了 log []*LogEntry // 操作日志 // 所有服务器,易失状态 commitIndex int // 已知的最大已提交索引 lastApplied int // 当前应用到状态机的索引 // 仅Leader,易失状态(成为leader时重置) nextIndex []int // 每个follower的log同步起点索引(初始为leader log的最后一项) matchIndex []int // 每个follower的log同步进度(初始为0),和nextIndex强关联 // 所有服务器,选举相关状态 role string // 身份 leaderId int // leader的id lastActiveTime time.Time // 上次活跃时间(刷新时机:收到leader心跳、给其他candidates投票、请求其他节点投票) lastBroadcastTime time.Time // 作为leader,上次的广播时间 } // // example RequestVote RPC arguments structure. // field names must start with capital letters! // type RequestVoteArgs struct { // Your data here (2A, 2B). Term int CandidateId int LastLogIndex int LastLogTerm int } // // example RequestVote RPC reply structure. // field names must start with capital letters! // type RequestVoteReply struct { // Your data here (2A). Term int VoteGranted bool } type AppendEntriesArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []*LogEntry LeaderCommit int } type AppendEntriesReply struct { Term int Success bool } |
Make 程序入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me // Your initialization code here (2A, 2B, 2C). rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.votedFor = -1 rf.lastActiveTime = time.Now() // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) // election逻辑 go rf.electionLoop() // leader逻辑 go rf.appendEntriesLoop() DPrintf("Raftnode[%d]启动", me) return rf } |
GetState(单元测试会调用)
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// return currentTerm and whether this server // believes it is the leader. func (rf *Raft) GetState() (int, bool) { rf.mu.Lock() defer rf.mu.Unlock() var term int var isleader bool // Your code here (2A). term = rf.currentTerm isleader = rf.role == ROLE_LEADER return term, isleader } |
RequestVote相关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
// // example RequestVote RPC handler. // func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (2A, 2B). rf.mu.Lock() defer rf.mu.Unlock() reply.Term = rf.currentTerm reply.VoteGranted = false DPrintf("RaftNode[%d] Handle RequestVote, CandidatesId[%d] Term[%d] CurrentTerm[%d] LastLogIndex[%d] LastLogTerm[%d] votedFor[%d]", rf.me, args.CandidateId, args.Term, rf.currentTerm, args.LastLogIndex, args.LastLogTerm, rf.votedFor) defer func() { DPrintf("RaftNode[%d] Return RequestVote, CandidatesId[%d] VoteGranted[%v] ", rf.me, args.CandidateId, reply.VoteGranted) }() // 任期不如我大,拒绝投票 if args.Term < rf.currentTerm { return } // 发现更大的任期,则转为该任期的follower if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.role = ROLE_FOLLOWER rf.votedFor = -1 rf.leaderId = -1 // 继续向下走,进行投票 } // 每个任期,只能投票给1人 if rf.votedFor == -1 || rf.votedFor == args.CandidateId { // candidate的日志必须比我的新 // 1, 最后一条log,任期大的更新 // 2,更长的log则更新 lastLogTerm := 0 if len(rf.log) != 0 { lastLogTerm = rf.log[len(rf.log)-1].Term } if args.LastLogTerm < lastLogTerm || args.LastLogIndex < len(rf.log) { return } rf.votedFor = args.CandidateId reply.VoteGranted = true rf.lastActiveTime = time.Now() // 为其他人投票,那么重置自己的下次投票时间 } rf.persist() } func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { ok := rf.peers[server].Call("Raft.RequestVote", args, reply) return ok } func (rf *Raft) electionLoop() { for !rf.killed() { time.Sleep(1 * time.Millisecond) func() { rf.mu.Lock() defer rf.mu.Unlock() now := time.Now() timeout := time.Duration(200+rand.Int31n(150)) * time.Millisecond // 超时随机化 elapses := now.Sub(rf.lastActiveTime) // follower -> candidates if rf.role == ROLE_FOLLOWER { if elapses >= timeout { rf.role = ROLE_CANDIDATES DPrintf("RaftNode[%d] Follower -> Candidate", rf.me) } } // 请求vote if rf.role == ROLE_CANDIDATES && elapses >= timeout { rf.lastActiveTime = now // 重置下次选举时间 rf.currentTerm += 1 // 发起新任期 rf.votedFor = rf.me // 该任期投了自己 rf.persist() // 请求投票req args := RequestVoteArgs{ Term: rf.currentTerm, CandidateId: rf.me, LastLogIndex: len(rf.log), } if len(rf.log) != 0 { args.LastLogTerm = rf.log[len(rf.log)-1].Term } rf.mu.Unlock() DPrintf("RaftNode[%d] RequestVote starts, Term[%d] LastLogIndex[%d] LastLogTerm[%d]", rf.me, args.Term, args.LastLogIndex, args.LastLogTerm) // 并发RPC请求vote type VoteResult struct { peerId int resp *RequestVoteReply } voteCount := 1 // 收到投票个数(先给自己投1票) finishCount := 1 // 收到应答个数 voteResultChan := make(chan *VoteResult, len(rf.peers)) for peerId := 0; peerId < len(rf.peers); peerId++ { go func(id int) { if id == rf.me { return } resp := RequestVoteReply{} if ok := rf.sendRequestVote(id, &args, &resp); ok { voteResultChan <- &VoteResult{peerId: id, resp: &resp} } else { voteResultChan <- &VoteResult{peerId: id, resp: nil} } }(peerId) } maxTerm := 0 for { select { case voteResult := <-voteResultChan: finishCount += 1 if voteResult.resp != nil { if voteResult.resp.VoteGranted { voteCount += 1 } if voteResult.resp.Term > maxTerm { maxTerm = voteResult.resp.Term } } // 得到大多数vote后,立即离开 if finishCount == len(rf.peers) || voteCount > len(rf.peers)/2 { goto VOTE_END } } } VOTE_END: rf.mu.Lock() defer func() { DPrintf("RaftNode[%d] RequestVote ends, finishCount[%d] voteCount[%d] Role[%s] maxTerm[%d] currentTerm[%d]", rf.me, finishCount, voteCount, rf.role, maxTerm, rf.currentTerm) }() // 如果角色改变了,则忽略本轮投票结果 if rf.role != ROLE_CANDIDATES { return } // 发现了更高的任期,切回follower if maxTerm > rf.currentTerm { rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.currentTerm = maxTerm rf.votedFor = -1 rf.persist() return } // 赢得大多数选票,则成为leader if voteCount > len(rf.peers)/2 { rf.role = ROLE_LEADER rf.leaderId = rf.me rf.lastBroadcastTime = time.Unix(0, 0) // 令appendEntries广播立即执行 return } } }() } } |
AppendEnties(仅心跳)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() DPrintf("RaftNode[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s]", rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.role) defer func() { DPrintf("RaftNode[%d] Return AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s]", rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.role) }() reply.Term = rf.currentTerm reply.Success = false if args.Term < rf.currentTerm { return } // 发现更大的任期,则转为该任期的follower if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.role = ROLE_FOLLOWER rf.votedFor = -1 rf.leaderId = -1 // 继续向下走 } // 认识新的leader rf.leaderId = args.LeaderId // 刷新活跃时间 rf.lastActiveTime = time.Now() // 日志操作lab-2A不实现 rf.persist() } func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool { ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) return ok } // lab-2A只做心跳,不考虑log同步 func (rf *Raft) appendEntriesLoop() { for !rf.killed() { time.Sleep(1 * time.Millisecond) func() { rf.mu.Lock() defer rf.mu.Unlock() // 只有leader才向外广播心跳 if rf.role != ROLE_LEADER { return } // 100ms广播1次 now := time.Now() if now.Sub(rf.lastBroadcastTime) < 100*time.Millisecond { return } rf.lastBroadcastTime = time.Now() // 并发RPC心跳 type AppendResult struct { peerId int resp *AppendEntriesReply } for peerId := 0; peerId < len(rf.peers); peerId++ { if peerId == rf.me { continue } args := AppendEntriesArgs{} args.Term = rf.currentTerm args.LeaderId = rf.me // log相关字段在lab-2A不处理 go func(id int, args1 *AppendEntriesArgs) { DPrintf("RaftNode[%d] appendEntries starts, myTerm[%d] peerId[%d]", rf.me, args1.Term, id) reply := AppendEntriesReply{} if ok := rf.sendAppendEntries(id, args1, &reply); ok { rf.mu.Lock() defer rf.mu.Unlock() if reply.Term > rf.currentTerm { // 变成follower rf.role = ROLE_FOLLOWER rf.leaderId = -1 rf.currentTerm = reply.Term rf.votedFor = -1 rf.persist() } DPrintf("RaftNode[%d] appendEntries ends, peerTerm[%d] myCurrentTerm[%d] myRole[%s]", rf.me, reply.Term, rf.currentTerm, rf.role) } }(peerId, &args) } }() } } |
总结Lab2A
- 一把大锁保护好状态,RPC期间释放锁,RPC结束后注意状态二次判定
- request/response都要先判断term > currentTerm,转换follower
- 一个currentTerm只能voteFor其他节点1次
- 注意candidates请求vote的时间随机性
- 注意requestVote得到大多数投票后立即结束等待剩余RPC
- 注意成为leader后尽快appendEntries心跳,否则其他节点又会成为candidates
- 注意几个刷新选举超时时间的逻辑点
其他了解:
- 虽然Lab2A不要求关注persist状态的落地(currentTerm,voteFor,[]log),但是我稍微扫了一眼代码,发现它的persist其实还是写在内存里,供单元测试去check结果,因此猜测这门课可能不会真的实现raft如何持久化log这个存储部分(从目前来看的确是这样的)。
如果文章帮助了你,请帮我点击1次谷歌广告,或者微信赞助1元钱,感谢!

知识星球有更多干货内容,对我认可欢迎加入:

Pingback引用通告: MIT 6.824: Distributed Systems- 实现Raft Lab2B | 鱼儿的博客