MIT 6.824: Distributed Systems- 实现Raft Lab3A
接上文《MIT 6.824: Distributed Systems- 实现Raft Lab2C》,Lab3A要求基于Raft实现分布式kv server,支持Put/Get操作,并且要求设计满足线性一致性。
线性一致性,站在客户端视角应该满足以下特性:
- 对于单个client来说,发起OP1必须等待其结果返回,才能执行OP2,必须是顺序的(上锁或者排队提交)。
- 多个client可以并发请求。
- 一旦有1个client读取到新值,那么后续任意client的读操作都应该返回新值。
我们需要自己实现kv server和kv client来满足线性一致性要求,底层依赖我们在Lab2B实现的raft。
已通过MIT单元测试:
Lab2A实现
Clerk客户端
1 2 3 4 5 6 7 8 9 |
type Clerk struct { mu sync.Mutex servers []*labrpc.ClientEnd // You will have to modify this struct. // 只需要确保clientId每次重启不重复,那么clientId+seqId就是安全的 clientId int64 // 客户端唯一标识 seqId int64 // 该客户端单调递增的请求id leaderId int } |
每个客户端启动后随机分配clientId,确保全世界唯一,此后每个请求由单调递增的seqId标识。
seqId用于实现写请求的幂等性,避免重试的RPC导致KV存储写入多次数据,这块幂等性逻辑最终体现在Raft log提交那块。
leaderId缓存最近通讯的leader节点。
1 2 3 4 5 6 7 |
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { ck := new(Clerk) ck.servers = servers // You'll have to add code here. ck.clientId = nrand() return ck } |
PutAppend调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// Put or Append type PutAppendArgs struct { Key string Value string Op string // "Put" or "Append" // You'll have to add definitions here. // Field names must start with capital letters, // otherwise RPC will break. ClientId int64 SeqId int64 } type PutAppendReply struct { Err Err } |
请求与应答。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
func (ck *Clerk) PutAppend(key string, value string, op string) { // You will have to modify this function. args := PutAppendArgs{ Key: key, Value: value, Op: op, ClientId: ck.clientId, SeqId: atomic.AddInt64(&ck.seqId, 1), } DPrintf("Client[%d] PutAppend, Key=%s Value=%s", ck.clientId, key, value) leaderId := ck.currentLeader() for { reply := PutAppendReply{} if ck.servers[leaderId].Call("KVServer.PutAppend", &args, &reply) { if reply.Err == OK { // 成功 break } } leaderId = ck.changeLeader() time.Sleep(1 * time.Millisecond) } } |
该请求被唯一ID标识用于幂等处理,与当前认为的leader通讯,如果返回失败则可能因为超时或者Leader切换导致提交失败,需要重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
func (ck *Clerk) currentLeader() (leaderId int) { ck.mu.Lock() defer ck.mu.Unlock() leaderId = ck.leaderId return } func (ck *Clerk) changeLeader() (leaderId int) { ck.mu.Lock() defer ck.mu.Unlock() ck.leaderId = (ck.leaderId + 1) % len(ck.servers) return ck.leaderId } |
Get调用
1 2 3 4 5 6 7 8 9 10 11 |
type GetArgs struct { Key string // You'll have to add definitions here. ClientId int64 SeqId int64 } type GetReply struct { Err Err Value string } |
请求与应答。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
func (ck *Clerk) Get(key string) string { // You will have to modify this function. args := GetArgs{ Key: key, ClientId: ck.clientId, SeqId: atomic.AddInt64(&ck.seqId, 1), } DPrintf("Client[%d] Get starts, Key=%s ", ck.clientId, key) leaderId := ck.currentLeader() for { reply := GetReply{} if ck.servers[leaderId].Call("KVServer.Get", &args, &reply) { if reply.Err == OK { // 命中 return reply.Value } else if reply.Err == ErrNoKey { // 不存在 return ""; } } leaderId = ck.changeLeader() time.Sleep(1 * time.Millisecond) } } |
类似PutAppend逻辑,只是会多一个ErrNoKey的特殊错误,除此之外的错误是因为leader切换或者超时而导致失败,需要继续重试来得到最终结果。
启动KV Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer { // call labgob.Register on structures you want // Go's RPC library to marshall/unmarshall. labgob.Register(&Op{}) kv := new(KVServer) kv.me = me kv.maxraftstate = maxraftstate // You may need initialization code here. kv.applyCh = make(chan raft.ApplyMsg) kv.rf = raft.Make(servers, me, persister, kv.applyCh) // You may need initialization code here. kv.kvStore = make(map[string]string) kv.reqMap = make(map[int]*OpContext) kv.seqMap = make(map[int64]int64) go kv.applyLoop() return kv } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type KVServer struct { mu sync.Mutex me int rf *raft.Raft applyCh chan raft.ApplyMsg dead int32 // set by Kill() maxraftstate int // snapshot if log grows this big // Your definitions here. kvStore map[string]string // kv存储 reqMap map[int]*OpContext // log index -> 请求上下文 seqMap map[int64]int64 // 客户端id -> 客户端seq } |
利用kvStore存储应用层状态(纯内存,宕机重启基于raft log回放复原),reqMap存储正在进行中的RPC调用,seqMap记录每个clientId已提交的最大请求ID以便做写入幂等性判定。
Op日志
1 2 3 4 5 6 7 8 9 10 11 12 |
type Op struct { // Your definitions here. // Field names must start with capital letters, // otherwise RPC will break. Index int // 写入raft log时的index Term int // 写入raft log时的term Type string // PutAppend, Get Key string Value string SeqId int64 ClientId int64 } |
这是写入raft层的日志,需要有客户端的ClientId,SeqId做提交幂等处理,需要有操作类型与值,需要有写入Leader时的log index和term以便raft提交日志时判定是否发生了leader切换。
OpCtx请求上下文
1 2 3 4 5 6 7 8 9 10 11 12 |
// 等待Raft提交期间的Op上下文, 用于唤醒阻塞的RPC type OpContext struct { op *Op committed chan byte wrongLeader bool // 因为index位置log的term不一致, 说明leader换过了 ignored bool // 因为req id过期, 导致该日志被跳过 // Get操作的结果 keyExist bool value string } |
当raft log提交时,需要找到正在阻塞的RPC调用,唤醒它并返回客户端,因此用OpCtx来做请求上下文。
PutAppend处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { // Your code here. reply.Err = OK op := &Op{ Type: args.Op, Key: args.Key, Value: args.Value, ClientId: args.ClientId, SeqId: args.SeqId, } // 写入raft层 var isLeader bool op.Index, op.Term, isLeader = kv.rf.Start(op) if !isLeader { reply.Err = ErrWrongLeader return } opCtx := newOpContext(op) func() { kv.mu.Lock() defer kv.mu.Unlock() // 保存RPC上下文,等待提交回调,可能会因为Leader变更覆盖同样Index,不过前一个RPC会超时退出并令客户端重试 kv.reqMap[op.Index] = opCtx }() // RPC结束前清理上下文 defer func() { kv.mu.Lock() defer kv.mu.Unlock() if one, ok := kv.reqMap[op.Index]; ok { if one == opCtx { delete(kv.reqMap, op.Index) } } }() timer := time.NewTimer(2000 * time.Millisecond) defer timer.Stop() select { case <- opCtx.committed: // 如果提交了 if opCtx.wrongLeader { // 同样index位置的term不一样了, 说明leader变了,需要client向新leader重新写入 reply.Err = ErrWrongLeader } else if opCtx.ignored { // 说明req id过期了,该请求被忽略,对MIT这个lab来说只需要告知客户端OK跳过即可 } case <- timer.C: // 如果2秒都没提交成功,让client重试 reply.Err = ErrWrongLeader } } |
先把Op写入Raft,保存请求上下文到对应index,然后就是等待index位置的log被raft集群提交了。
如果index位置提交的log,其term与写入leader时的不一样,那么说明期间leader已经切换,需要让客户端重试。
还有一种情况是leader切换导致当前node写入的日志被截断,index位置持续没有提交,那么超时返回客户端,让客户端重试来确认。
Get处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { // Your code here. reply.Err = OK op := &Op{ Type: OP_TYPE_GET, Key: args.Key, ClientId: args.ClientId, SeqId: args.SeqId, } // 写入raft层 var isLeader bool op.Index, op.Term, isLeader = kv.rf.Start(op) if !isLeader { reply.Err = ErrWrongLeader return } opCtx := newOpContext(op) func() { kv.mu.Lock() defer kv.mu.Unlock() // 保存RPC上下文,等待提交回调,可能会因为Leader变更覆盖同样Index,不过前一个RPC会超时退出并令客户端重试 kv.reqMap[op.Index] = opCtx }() // RPC结束前清理上下文 defer func() { kv.mu.Lock() defer kv.mu.Unlock() if one, ok := kv.reqMap[op.Index]; ok { if one == opCtx { delete(kv.reqMap, op.Index) } } }() timer := time.NewTimer(2000 * time.Millisecond) defer timer.Stop() select { case <-opCtx.committed: // 如果提交了 if opCtx.wrongLeader { // 同样index位置的term不一样了, 说明leader变了,需要client向新leader重新写入 reply.Err = ErrWrongLeader } else if !opCtx.keyExist { // key不存在 reply.Err = ErrNoKey } else { reply.Value = opCtx.value // 返回值 } case <- timer.C: // 如果2秒都没提交成功,让client重试 reply.Err = ErrWrongLeader } } |
与PutAppend类似,为了线性一致性,把读操作也作为一条log写入raft等待提交后再响应RPC。
applyLoop日志提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
func (kv *KVServer) applyLoop() { for !kv.killed() { select { case msg := <- kv.applyCh: cmd := msg.Command index := msg.CommandIndex func() { kv.mu.Lock() defer kv.mu.Unlock() // 操作日志 op := cmd.(*Op) opCtx, existOp := kv.reqMap[index] prevSeq, existSeq := kv.seqMap[op.ClientId] kv.seqMap[op.ClientId] = op.SeqId if existOp { // 存在等待结果的RPC, 那么判断状态是否与写入时一致 if opCtx.op.Term != op.Term { opCtx.wrongLeader = true } } // 只处理ID单调递增的客户端写请求 if op.Type == OP_TYPE_PUT || op.Type == OP_TYPE_APPEND { if !existSeq || op.SeqId > prevSeq { // 如果是递增的请求ID,那么接受它的变更 if op.Type == OP_TYPE_PUT { // put操作 kv.kvStore[op.Key] = op.Value } else if op.Type == OP_TYPE_APPEND { // put-append操作 if val, exist := kv.kvStore[op.Key]; exist { kv.kvStore[op.Key] = val + op.Value } else { kv.kvStore[op.Key] = op.Value } } } else if existOp { opCtx.ignored = true } } else { // OP_TYPE_GET if existOp { opCtx.value, opCtx.keyExist = kv.kvStore[op.Key] } } DPrintf("RaftNode[%d] applyLoop, kvStore[%v]", kv.me, kv.kvStore) // 唤醒挂起的RPC if existOp { close(opCtx.committed) } }() } } } |
不断监听raft层的已提交日志,如果是写操作则判定Op的sequence id是否过期,过期则跳过,否则生效到kvStore。读操作则将kvStore此时的k=v保存到opCtx,并唤起阻塞的RPC。
总结Lab3A
- 线性一致性要求,客户端必须串行发起OP,并行发起OP服务端无法保证前1个OP生效,实际工程里我认为应该客户端对请求排队编号seqId,串行提交给KV server。
- 只要保证每个client的ID能够唯一,那么seqId无需持久化,保证clientId唯一还是比较简单的,比如用:ip+pid+timestamp就可以。
- 写入幂等性通过比较clientId最后一次提交日志的seqId,来确定是否可以执行当前OP。
- 读取的一致性则是通过向raft写入一条read log实现的,当read log被提交时再把此时此刻kvStore的数据返回给RPC,那么RPC看到的数据一定是符合线性一致性的,即后续的读操作一定可以继续读到这个值。
- RPC服务端要做超时处理,因为很有可能leader写入本地log后发生了选主,那么新leader会截断掉老leader写入的log,导致对应index位置持续得不到提交,所以要通过超时让客户端重新写入log来达成提交,得到最终结果。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

博主, 我在LAB4B时有个样例有时过不了,回头重新审视了代码,发现LAB3A可能有个BUG,如下,
函数func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) 中
如果发生了如下情况,对于serverRPC的清理上下文,如果在清理上下文之前, select { case <- opCtx.committed: 超时之后,如果applyLoop处理掉了提交的日志,那么服务器依然会返回wrongleader,此时客户端认为请求没有被处理,实际上请求已经在服务器落地了,造成客户端之后的重复请求都不会再返回OK了(seqId已经被记录了)。应该需要在func()清理上下文闭包拿到锁以后中进行一次double check 检查管道opCtx.committed吧?
话说不知道有没有啥好的DEBUG建议= =求教
对,rpc前后是需要考虑如何做活锁的,因为没法死锁住rpc过程,所以如何check是不是当时的上下文是关键。
具体debug的话就是打印log证明才猜想吧,需要磨一磨,坚持下去!
Put和Append有seqId说明已经执行过了,Get的话具有幂等性这里不会特殊判断,超时重发获得新的index即可。只要RPC请求还在就会获得返回值。如果跟博主实现方式相近的话,你问题应该不在这里感觉。你最后发现问题在哪里了吗?
上面老鹅
写入指针是对的吗?不应该写入Op吗,因为其他的follower节点在这个地址上不会是同一个值,求解.
抱歉大意了
1