MIT 6.824 2020 (2) lab2 Raft 实现笔记
MIT 6.824 2020 (2) lab2 Raft 实现笔记
这个lab我是边看论文、边看课边做的,前前后后花了快2周,非常有挑战性。做完后感触颇深,特别是后来再去了解了一下Paxos后,更感慨raft的大道至简,也钦佩这篇论文的作者——来自Stanford的Diego Ongaro,一是钦佩他的严谨的思维,二是钦佩他的文字表达能力,我虽然从没接触过共识算法,但也能较轻松地看懂论文(唯一理解上比较吃力的是 figure 8,看了我一个下午我都没看懂,后来上网查才发现好多人也没看懂这个),并根据figure 2来实现一个这样的系统,真可谓是深入浅出。
基础设施
labrpc是模拟了一个网络,网络除了模拟正常的网络延迟,还模拟了异常的情况,比如可能出现丢包(返回ReplyMsg{false, nil}), 长时间延迟等。
config.go里相当于构建了一个raft集群环境,可以通过connect,、disconnect等模拟这个环境可能出现的状况。并通过checkOneLeader等接口可以查看这个环境当前的运行状况。另外还有one函数,会尝试start一个log,并检测集群内是否达到了共识,是主要的test手段。
lab2A 领导人选举
首先,每个raft服务启动时,会开启一个electionTimeOut的计时器,如果计时结束前收到了RequestVote请求。
RequestVote RPC
代码如下,注意,后面关于有关与log的判断是lab2B加上去的。
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
fmt.Printf("RV from %d to %d received: %+v with curTerm:%d votedFor:%d\n", args.CandidateId, rf.me, args, rf.currentTerm, rf.votedFor)
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
reply.VoteGranted = false
rf.persist()
return
}
if args.Term == rf.currentTerm && rf.votedFor != None && rf.votedFor != args.CandidateId {
reply.VoteGranted = false
return
}
rf.currentTerm = args.Term
defer rf.persist()
rf.fallFollower()
if rf.log[len(rf.log)-1].Term < args.LastLogTerm || (rf.log[len(rf.log)-1].Term == args.LastLogTerm && len(rf.log) <= args.LastLogIndex+1) {
rf.votedFor = args.CandidateId
reply.VoteGranted = true
return
}
reply.VoteGranted = false
}
如果electionTimeOut的计时器超时,则将自己转为candidate, 并向其余server发送RequestVote请求。如果vote数目大于quorum,那么立即向其余server发送空的AppendEntry的心跳,之后以heartbeat interval的时间间隔来发送心跳。
时间常量的选择
对于时间常量的选择,这是etcd的做法https://www.zhihu.com/question/263684969heartbeat 是将interval设为100ms, 而electionTimeOut可以设为1000~2000ms,,论文里写的是:保证elecion timeout的数量级比heartbeat interval大1;而lab官方hint里的说法好像不一样,推荐使用heartbeat interval = 100ms, electionTimeout=150~300ms,必须保证leader挂了后5s内能选出新leader。
electionTimer, 计时到后自增term并发起下一轮选举,并重置electionTimer(选举也会超时,我后面才知道的)。如果candidate发现自己无法获得更多选票,即已经得到了所有server的回复,且vote数小于quorom,则竞选失败,等待下一轮竞选;如果一旦发现vote数大于等于quorom,则竞选成功,立马启动sendHeartbeat的goroutine开始安抚follower。
在AppendEntry rpc中,如果当前处于candidate状态,检查一下term,如果term小于自身自身term,则拒绝;否则重置electionTimeOut,更新term,最后返回true。
代码很简单,就不贴了,真正修改log的核心逻辑是在lab2B里实现的。
实现时踩的一些坑:
(1)很多地方是不能阻塞的,比如election阶段,作为candidate不需要也不能等所有其余server都给出reply后,才结束选举:这是因为有的server可能出现断连的状态,所以会等待不必要地较长时间,实际上如果得到了多数votes,就可以直接将自己作为follower退出了;另外,AppendEntries rpc的调用是不能阻塞的(这是我花时间找最多的一个bug),需要用goroutine,这是因为心跳要严格按照heartbeat interval的间隔来进行发送,而不是等调用成功(可能超时,用了1s钟以上,在这个时间段内,即使目标从disconnect 情况下重新connect了,也不能及时收到下一个AP请求,这会造成test fail)再sleep heartbeat interval。
(2)选举也会超时,这样面对选票割裂的情况,就不会无限僵持。所以在每轮election开始前,都会重新设置electionTimer, 如果选举失败,第二轮的election会重新由electionTimer来触发。为了第二次election的开始,相应的select <- rf.electionTimer.C的for loop也需要设置成非阻塞的。如下:
go func() {
for {
select {
case <-rf.electionTimer.C:
{
rf.mu.Lock()
if rf.status == Leader {
rf.mu.Unlock()
continue
}
rf.mu.Unlock()
go func() {
is_leader := rf.startElection()
if is_leader {
fmt.Printf("%d becomes the leader!!!\n", rf.me)
rf.sendHeartBeat()
}
}()
}
}
}
}()
做lab2B的时候,才发现官方推荐的electionTimeout检测不是使用time.Timer,而是
have the election timeout goroutine periodically check to see whether the time since then is greater than the timeout period. It’s easiest to use time.Sleep() with a small constant argument to drive the periodic checks. Don’t use time.Ticker and time.Timer; they are tricky to use correctly.
不过我认为使用Timer能够获得更好的性能,并且降低冲突概率(多个server同时超时)。
lab2B 日志复制
要在lab2A上添加的主要有两方面:
(1)RequestVote中,添加根据日志情况进行投票判断的逻辑,具体来说, 除了比较currentTerm,还要比较lastlog的term以及log的总长度。
(2)AppendEntries中,对于leader来说,需要将从nextIndex开始的所有log发送各个给follower,并根据返回结果是否success以及一些*额外的信息(比如follower的对应任期的第一条log)
注意,start函数并不是等大多数server复制了副本后才返回,而是直接返回,并且也不保证一定能够提交。这与真实的raft系统还是不一样的,因为真实的raft只有在大多数server复制了副本(复制副本也是通过AP rpc进行的,所以也会保证只有之前的log全部成功复制后,后面的log才有可能复制)后才会通知server,相当于给server的一个最终一定会提交的承诺。
start的语义实际上是“尽可能的交付”,其实并发起来也不会产生奇怪的问题。因为raft能够保证只有之前的log全都提交了,新的log才能提交,所以不会出现后面的log提交了,而前面的log最终提交失败的问题。
log的index从1和从0开始,都无所谓。
快速backup
可以通过在AP rpc中返回额外的信息来加速backup, Morris提到了一个很好的策略,具体的返回参数如下:
XTerm: 发生confict条目的term(case 1, case 2)
XIndex*: XTerm的第一条entry的index (case 1)
(*2023.8.24 注:我认为这个参数是没有必要的,我的实现中只用到了XTerm和XLen。2023.8.27 更新:好吧,这个参数是一定要的,不然某些情形下无法backup收敛)
XLen: log的长度(case 3)
可以解决以下三种情况。
Q:有没有可能遇到以下情况?
S1: 4 4 5
S2: 4 6 6 6
A:应该是不可能出现的, 因为index=2&&term=4的log一定是提交了的,与S2中的index=2 处的log矛盾了。
AppendEntries RPC
AP rpc的最终代码:
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
fmt.Printf("AP from %d to %d received with AppendEntriesArgs: %+v\n", args.LeaderId, rf.me, args)
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
return
}
need_persist := true
defer func() {
if need_persist {
rf.persist()
}
}()
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
}
reply.XLen = len(rf.log)
rf.fallFollower()
if len(rf.log) <= args.PrevLogIndex {
// log length diff >= 2
reply.Success = false
return
}
target_term := rf.log[args.PrevLogIndex].Term
if target_term != args.PrevLogTerm { // conflict
reply.Success = false
reply.XTerm = target_term
for i := args.PrevLogIndex - 1; i >= 0; i-- {
if rf.log[i].Term != target_term {
reply.XIndex = i + 1
break
}
}
return
}
reply.Success = true
// Tip: golang append slice to slice
rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
if rf.commitIndex < args.LeaderCommit {
rf.commitIndex = args.LeaderCommit
rf.applyCond.Broadcast()
}
rf.persist()
need_persist = false
fmt.Printf("%d's log: %v\n", rf.me, rf.log)
}
Install Snapshot RPC
对于snapshot, follower在落后leader很多导致leader没有保留需要的旧log的情况下,leader会对follower发送install snapshot 的RPC请求。
Q&A
Q:对于选举超时的机制的问题:在网络发生partition时,数量上占小部分的partition会不断地发生选举超时,任期无限制增加,等到网络分区问题修复后,会使得raft集群崩溃吗?
A:不会,因为任期作为参数和返回值通过RPC在集群中传播,所以分区问题修复后,各个机器会很快将自己的任期更新到最新的任期,所以接下来一切照常运行。
Q:为什么leader只能提交currentTerm的log?
A:见论文的figure 8,会发生已经commit的log被覆盖的问题。
Q:如果follower收到旧于当前日志的snapshot install(可能由于网络的原因),那是应该直接丢还是进行替换呢?
A:丢弃的好处是可以提前终止snapshot的发送(可能占不少网络资源),替换的好处是可以节省掉根据日志构建snapshot的时间,所以选择丢弃还是替换,可能可以作为一个研究点。
lab3C 持久化
figure 2里面写好了需要持久化的state,所以lab3C只需要在需要修改这三个变量的地方,调用持久化的接口就好了,所以实现很简单。真正的难点在于:为什么持久化这3个变量?
Q&A
Q:为什么要持久化currentTerm,我的以下方法规避了持久化currentTerm的问题,是否可行?我的方法:在recover后,将currentTerm设为log中最后一个entry的term值。
A:我这种方法不能防止如下情况:
S1: 5 5 6(作为term = 6 的leader,被disconnect)
S2: 5 5(S1宕机之后重启,此时按照log最后一个term将currentTerm赋值为5)
S3: 5 5 (同S2)
这种情况下,S2和S3中会选出新的term=6的leader,那么term=6就存在两个leader了,也就是说index=3&&term=6的log可能不同,这会最终导致状态机状态不一致。所以currentTerm必须持久化。
Q:votedFor必须持久化?
A:必须,因为一个follower 在vote后crash掉了,recover后它是不知道自己有没有投过票的,这样可能出现brain split的问题。
Q: log必须持久化?
A:必须,因为被提交的log entry会存到大部分servers中,这是raft safety property(被选举出来的leader一定保存有所有被commit的log)的一个很重要的保证。如果不持久化log, 而是在recover后将log设为空,那么就有可能丢失被提交的log,造成错误。
Q:为什么last applied 不用持久化?
Q:在持久化时,必须要像数据库那样采用WAL的策略吗,即先改变磁盘中的值?
A:不需要,只需要在下一次与外界进行rpc交流前进行持久化即可。而RV和AP rpc调用时首先都要获取锁,所以只要在持久化之后释放锁,就能保证这一点。
总结
值得记录的历程
2023.8.25 TestPersist22C这个测试点卡了我大半天,最后发现居然是RV rpc写错了,逻辑想简单了,看来前面lab2A和lab2B的测例不够强。非常感谢raft官网的动画,很助于理解。
2023.8.27 TestFigure82C这个测试点卡了我整整一天,最后发现居然是更新nextIndex的地方自作聪明避免使用使用Xindex,结果造成了nextIndex在特定情况下会僵持不动,无法完成backup的收敛。
感慨与收获
一开始我还有不少地方想着不用完全按照论文里的一模一样来做,所以看了论文感觉大概理解后,就开始动手了,边想边做。结果就是Lab测例里稍微复杂点的测例就通不过——最终的结果就是,大费周折找到的几乎只有四种情况:1.手滑的低级错误(index写成XIndex等);2.原文中我理解错的地方(如选举的timeout以及投票的逻辑);3.原文中我忽视的地方(figure 8的问题);4.论文里说的quick backup部分,我很自信没完全按他的来,自以为“优化”了,其实是不严谨的。
最大的收获是:我开始非常勤于问自己问题,并勤快地记录下自己的思考,应养成习惯!
下一步
学一下TLA+,看看Diego Ongaro给出的证明
再更深入了解下paxos及其变种。
参考的资料
http://nil.csail.mit.edu/6.824/2020/papers/raft-extended.pdf raft-extended 论文
http://nil.csail.mit.edu/6.824/2020/labs/lab-raft.html 6.824 Lab 2: Raft
https://raft.github.io/ 官方动画演示