MIT 6.824 2020 (4) Lab 4: Sharded Key/Value Service 实现笔记
花了三周时间把拖了很久的Lab 4给做了,相比于前面3个lab,难度明显提高了很多。难点在于官方没有提供任何参考资料,但是给了很多hint。我认为这就是这个lab设计得最好的一点,不直接告诉你如何去解决这个问题,但是会给出一些可能有价值的hint,描绘出一个模糊的轮廓,启发学生的思考。
不得不说,这是个很难得的锻炼自己系统设计能力的机会,如果去网上搜解决方案那就太浪费了。所以我还是和往常一样,坚持独立思考。采用的方法论还是那一套:问自己问题,然后自己回。不论问题大小,都记录下来,当问题大概都能解决了,整个思路就慢慢浮出水面了,觉得差不多了就可以开始动手了,边动手边和自己Q&A。从结果来看,这个方法论是适合我的。
接下来简单记录一下这个思考、实现、debug的过程,其中涉及到一些早期的废案(有标识),但我还是记下来了。
Part A: The Shard Master
和lab3的key/value service基本一致,要改的地方就是server应用raft层传上来的cmd时的处理逻辑。一共要实现Join、Leave、Move、Query共四种功能,shard master 的作用就是将总数固定的shard平均分配到各个server group中。注意,join/leave都是可以一次上线/下线多个server group的。
我的实现算法很朴素,Join时每次把shard数最多的group分一个shard分给shard数最少的group,直到shard数的极差<=1时,迭代结束。Leave的逻辑也是类似。实现上要注意,golang的map是reference,所以拷贝config时,其中的的Group字段需要手动深拷贝。
2024.3.9 发现shardmaster有问题,同一个Num,能Get出Num相同而Shard不同的Config值,跑TestUnreliable2时偶尔能复现;后来发现这是因为go的map是hash map,for loop遍历的时候go会故意打乱顺序,导致leader和follower运行同一个join或者leave指令,结果可能会不一样(破坏了确定性状态机状态唯一的性质),导致之后Get的结果可能不同。关于go map for loop的随机性详见stack overflow的这个问题。在查找解决方案时,我还知道了ordered map和sorted map原来不是一种东西,ordered map只要求有序就行,不要求按大小顺序,一般是按键的插入顺序;而sorted map就是按key大小排序的map,比如C++里的map。golang里这两种map都是由第三方库提供的。我最终的解决方法还是采用的标准库的map,按官方推荐的方法,将它的key(gid)拿出来,先sort一下,然后按照这个sort的序去遍历,这样在不同实例上的遍历顺序就是确定的了(按gid大小顺序)。
Part B: Sharded Key/Value Server
config 配置更新的设计
为每个shardkv实例维护一个newest_config。
Q: follower需要去shardmaster拉取最新配置吗?
A:两种方法,一种是follower自己去shardmaster拉取最新配置,第二种是leader发现config发生变化后通过raft log告知follower,应该都是可行的。我选的是前者。
shard transfer过程的设计
由于我的目标是实现全异步的configure update的过程,所以一个group不会等自己要传输的shard都集齐了,再一起transfer出去。这要求我们将newest_config的更新逻辑和shard transfer的逻辑分离开来。
func (kv *ShardKV) updateConfig() {
for {
if atomic.LoadInt32(&kv.dead) == 1 {
return
}
kv.mu.Lock()
if !kv.inited { // first-time started
(...)
} else {
kv.mu.Unlock()
config := kv.clerk.Query(-1)
kv.mu.Lock()
if config.Num != kv.newest_config.Num { // discover a newer config
kv.newest_config = config
// if a newer config detected, start a gorountine to manage shard sending
go kv.checkIfNeedToSendShard(&config)
}
kv.mu.Unlock()
}
time.Sleep(time.Duration(FETCH_CONFIG_INTERVAL) * time.Millisecond)
}
}
func (kv *ShardKV) checkIfNeedToSendShard(config *shardmaster.Config) {
for {
kv.mu.Lock()
if kv.needTransfer() { // test if there is any shard to be send
cmd := Command{
Op: SendShardBeginOp,
Config: copyConfig(config),
}
_, _, _ = kv.rf.Start(cmd)
} else { // if no more shard to be send, stop and return
kv.mu.Unlock()
return
}
kv.mu.Unlock()
time.Sleep(time.Duration(CHECK_SEND_INTERVAL) * time.Millisecond)
}
}
Q: shard transfer需要传输哪些内容?
(以下这个是我最初的FetchShard RPC的方案,最后没有采用,但这个方案应该也是可行的,大概思路就是target_group如果发现config变更,那么他就自己去source group来取这个shard(通过FetchShard RPC调用);source group接到FetchShard RPC后,以这个时刻为准,停止该shard的对外服务,并将数据打包通过reply返还给target group。我感觉这个方案难点在于垃圾收集,即source group将数据通过reply给了target group后,由于无法确定该reply有没有成功到达(两军问题),所以何时能安全地做垃圾收集是个问题)
A: 将对应shard的数据(snapshot)传输过去,所以snapshot也应该分shard存储。把raft log中对应shard的log挑出来? 拒绝请求意味着commit index不能变了,由于commit是异步的,所以可以等apply index前进到commit index后,再将最终的snapshot打包好,等待target group的FetchShard RPC来取,log就不用传输了。注意,target group的follower也应该能做FetchShard RPC,避免leader拿到Transfer Shard又挂掉了。target group拿到shard后,需要复制到多数节点上,所以可以将这个shard的内容作为一条log放到raft层去,放进去后就能开始开始对外提供该shard的服务了。
要拿哪些shard呢?
首先,传输shard的工作应当由src group的leader和target group的leader来处理。
需要等到apply index等于commit index,再打包snapshot吗?其实好像也没必要,已经commit但是还没有apply的log可以通过不apply来处理,这样的话有个问题,如果这时候src group的leaer节点挂了,那就需要由src group的新leader来重新打包snapshot,这是完全可以的。所以打包snapshot这个过程可以放在Transfer Shard RPC中。
一个传输的shard里包含的内容是对应shard的数据+proccessed_req,需要proccessed_req是因为需要防止某些请求在target group中重复执行。
Q: shard transfer的过程,即source group和target group分别需要做什么操作?
A:上面说的这个Transfer Shard RPC是个被动的传输过程,其实换成主动的Install Shard RPC好像更好理解。由old group的leader去调用new group leader的Install Shard RPC,在RPC中将这个Shard作为一个log放到raft层中,直到这条log成功apply,该RPC才会返回OK。如果返回OK,说明该shard已经在target group的大多数节点中了,即transfer成功,src group端可以删除该shard了(垃圾收集,chanllenge1)。
**SendShardBeginOp log的必要性: **如果source group内部不对config change发生的时机达到共识的话,那在leader进行install shard时,如果crash了,新leader是无法生成一个相同的快照的(可能新leader又apply了一些新的key/value,处理了一些请求),此时再向target group发送Install snapshot请求时,可能target group已经完成了config change的过程了,甚至已经处理了一些请求了,这意味着有段时间内,两个group都能处理这个shard的请求,这就会违反线性一致性(在其中一个group中进行的请求结果无法及时反映到另外一个group中)。
**RecvShardOp log的必要性: **一方面,target group的leader拿到shard后,需要将数据复制到多数节点上才能开始对外提供服务,所以可以将这个shard的内容作为一条log放到raft层去。另一方面,target group需要对何时开始为新shard向外提供服务达成共识,即提供一个log中的时间点,在这之前收到的请求应当拒绝。
**SendShardCompelete log的必要性: **一方面,是为了做垃圾收集(确定何时才能安全地把某个shard从一个集群中删除),这个丢弃的时间需要在source group集群内达成一个共识;另一方面,这个是为了防止故障恢复后的重复发送InstallShard。
Q: 那什么时候src group才能丢弃这个shard呢(垃圾收集)?
A: target group的多数节点拿到这个shard后,返回InstallShard RPC success的reply, src group接收到reply后,即可安全丢弃。
Q: 故障恢复时,遇到之前发送过的SendShardBegin op的log,还需要再send shard一遍吗?
A: 如果没有ShardSendComplete log,那就无法确认是否真正送达,所以要么引入ShardSendComplete log,要么重新发送(效率很低)。
访问控制与故障恢复
废案(shard版本号)
关于如何访问控制的思考,即一个group面对一个put/append/get请求,如何判断自己是否有能力处理该请求?我最开始的想法是为每个shard提供一个版本号,只有当一个group持有的shard的版本号等于最新的版本号时,才说明自己有能力处理该请求。
Q:上层apply command时,用“当前实际掌控的shard集合“还是”最新的config"来判断是否有能力处理该请求呢?
(以下是废案!因为我当时希望从Put/Append/Get RPC请求上进行完全的访问控制,忘了其实可以将这一步部分推迟到log apply时)
A:应该以“当前实际控制的shard集合”为准,因为可能该实例还没有完成到最新config的转换,没有成功获取该shard的控制权,如果按照newest_config来,由于各个group更新newest_config的时间不一样,所以可能导致同一时刻有多个实例同时有该shard的控制权;另外,由于“当前实际控制的shard集合”和config的更新有滞后性:从source group端来看,发现config更新后,会产生一条TransferSend log,只有这个log提交后,才会对”当前实际控制的shard集合“进行跟更新;从target group端来看,只有等TransferRecv log提交后,才会更新“当前实际控制的shard集合”。
那用“最新config”在Get/Put/PutAppend请求时做访问控制吗?其实也不行,因为如果在一次配置变更的过程中,从target group来看,按照“最新config"的标准自己可能已经是该shard的控制者了,但此时可能target group还没有提交该shard的ShardRecvlog,也就是说没有获得该shard的历史数据,所以是没有能力处理该shard的请求的。
用“当前实际掌控的shard集合”做单层的访问控制这个想法,需要我为每个shard维护一个标识Config Num版本号的,再加上newest_config的版本号,这样就可以进行更精细的访问控制,保证在配置变更的过程中,source group和target group都拒绝要transfer的shard的用户请求。
(发现一个难以解决的问题)
2024.3.7 用以上方案跑TestConcurrent2时发现一个致命问题,问题在于“故障恢复”和“发现config更新导致自己版本号自更新”这两个是有冲突的。故障恢复的过程中,手中的数据是旧数据,但是这时如果恰好最新的config中自己是某个shard的owner,那这些旧数据的版本可能就会被提升为最新,并开始向外提供服务,但此时如果上一个owner还没有将自己的新数据通过ShardInstall RPC传送过来,那就会有问题。(2024.3.8 更新:这个可以通过故障恢复时关闭“版本号自更新”来做,但是如何确定故障恢复完成的时间点又带来了一些复杂性)。
究其根本,该方案的问题在于故障恢复时,具有“版本号自更新”这个外界随机性(fetch config的协程的工作),shard的版本变更并没有体现在raft log中,所以故障恢复时,如何处理SendShardOp、RecvShardOp都是问题。
Q: 在故障恢复时,能否停机以专门应用日志,恢复到故障发生前的状态再开始对外提供服务?
A: 由于raft不会持久化applied index,所以无法恢复到故障发生前的状态。
重构(shard status,确定性状态机)
(以下是最终方案)
重新思考故障恢复正确性的要求,并提取了一下:
- 故障恢复后,如果一个group能够处理某个shard的请求,那他就必须获得故障发生前该shard的data;
- 故障恢复后,每个shard最终都有且仅有一个group来管理
“故障发生前哪些shard是归自己管”这个必须通过依次apply log就能得出,并且不存在两个group同时认为拥有同一个shard的情况。
所以,我们需要一个确定性状态机!也就是保证,一个shard在apply到某一个位置时,一个shard是否属于自己管控要是确定的(想想确定性状态机)。其实,按照以下的方式来做shard status变更就行:
shard transfer的过程中,shard status的变更过程:
source group: Valid -(ShardSendBeginOp)-> Sending -(ShardSendComplete)-> Invalid
target group: Invalid -(ShardRecvOp)-> Valid
没有了shard的版本号,就需要重新考虑访问控制的问题。从状态机的角度考虑,用“newest_config"做访问控制(初步过滤)其实是可行的。面对上面画下划线的问题,其实可以暂时按照”newest_config"来做Get/Put/PutAppend请求时来做初步过滤;然后在apply log时,根据状态机的状态(对应shard的status)来最终判定是否能处理该请求。如果不能,这时候再返回ErrWrongGroup也行。
总结:该方案能解决故障恢复的问题,因为他是个确定性状态机,只要按照规则,依次apply raft log就能让整个group,包括整个系统,恢复到一个一致的状态。
其他细节
Q: server端哪些内容是需要持久化的(snapshot) ?
A: snapshot保存kvdata、processedReq。原因是这三者代表着raft上层状态机的状态。
关于snapshot,其实这里还有个有意思的点:在检测到需要进行compaction时,假设决定将commandIndex=a之前的log进行compaction了,如果a之前有一条SendShardBeginOp的log,并且它对应的这些SendShard goroutine 可能还没有完成任务,如果这时候把a之前的log compaction了,那如果之后发生了故障,那这条SendShardBeginOp log就算是丢失了,故障恢复时也不会继续之前没有完成的传输任务。这是由于snapshot里不会记录“哪些SendShard没有完成SendShard"的任务,所以难以恢复。我对这个问题的解决方案是将snapshot应当作为一个异步的过程,发现需要compaction时,启动一个goroutine来做compaction,这个goroutine会检测a之前的sendShard任务是否都完结了,如果是,才能安全地snapshot。
Q: InstallShard RPC需要保证线性一致性吗?
A: 由于从source group来看,InstallShard RPC的调用是异步的(和Client保证Seq请求处理完后,才会发起Seq+1请求的同步模型不同)如果发生连续两次config变更,一个target group先后两次收到同一个source group的InstallShard请求,并且由于网络延迟的原因,target group先收到的是Num更大的InstallShard请求,如果不从source group中加以控制,那么后来的Num更小的这个InstallShard请求就会被后applied,系统允不允许这种乱序config的applied呢?好像这个lab中并没有对这个做要求,我认为这种乱序也不会影响Shard Kv service的线性一致性保证。
上面说了,InstallShard RPC的调用是异步的,所以shard_install_session的key应该是{gid, Num}这样一个pair,这样才能准确地标识一个session。
Q: apply SendShardOp\RecvShardOp如何去重?
A: 由于shard transfer是异步的过程,所以同一个config update的过程中,一个source group向一个target group传送shard可能得分多个InstallShard RPC完成。所以,这两个不需要刻意地做去重。
对于SendShardOp,由于source group只会将自己status==Valid && belongs to target gourp的shard送出去,已经是Sending状态的不需要再送(之前的goroutine会确保处于Sending的status最终会送到,对此,在故障恢复loadSnapshot时,如果发现一个shard的status是Sending状态,那么需要马上启动一个sendShard goroutine来继续之前没完成的传输工作,当时的InstallShardArgs可以存到Kvdata的shard中一起持久化,要是有rust的和类型就很方面了,可以存在Status::Sending中;最后我没有采用这个方法,而是将snapshot改为了异步snapshot,保证所有sending process结束后,才能安全地进行snapshot)。
对于ShardRecvOp,由于target group只会接收status=Invalid的shard,所以遇到重复的ShardRecvOp也不会影响正确性。2024.3.21 这里好像还有问题,在group A故障恢复时,group A向group B发送了一个旧的InstallShard请求,并成功作为一个raft log放在了group B的raft log队列末尾。group B提交到这条log时,可能这些shard现在并不属于这个group(因为这些shard实际上是旧的数据),如果按照target group接收status=Invalid的shard而不加其他限制的话,那shard B就会接收这些shard,并很可能马上又将其转交给最新的owner group去,这个过程是完全没有必要的。解决方法就是ShardRecvOp还是要去重,方法是为每个shard记录下recv过的最大的版本号,如果遇到之前见到过的版本,那即使status=Invalid也不应该接收。这个recv_max_version[shardmaster.n]数组也应该也应该持久化。另外,从发送端来看,已经收到确认的的SendShardBegin其实不需要再进行发送,这会占掉不需要的网络带宽。
踩坑记录
2024.3.5 config更新后,如果一个shard依然属于自己,那需要更新版本号。struct Config里的Groups是个map,复制的时候需要手动复制!
2024.3.6 如果只是发现config更新后判断一次是否需要send是不行的,因为config更新时可能刚好没有leader!(正在选举中),所以不能向raft层提交一条SendShard的log;所以,这个判断是否需要send的逻辑应当是个定期执行的任务。另外发现一个初始化时的问题:如果第一个join到集群的group在第一次获取config时,已经跳过了几个config,那可能之前本应该由其掌管的shard被他忽略了,之后遇到新config时,自然也不会transfer出去,这个shard就丢失了。这是由于我的设计中,shard是通过ShardRecv来被动接收的,所以整个系统管理的shard集合就是各个group初始化时的shard集合的并集,如果一开始就出现”无主的shard",那这个shard就永远丢失了。解决办法可以试试初始化的时候,强制通过ck.Query(1)去读Num=1的config(选择Num=1是因为它是第一个有意义的config),Num=1的config中永远包含所有shard。
2024.3.7 找了一天一个bug,最后居然是data race了,忘开-race选项了。简单来说,就是有一个map数据要从一个A集群转移到另外一个B集群,这个map是通过下层的raft层的log来传递的,B集群拿到这个log后,把这个log里的map拿出来,作为自己的数据。我当时把这个从log里拿map当所有权转移做的(就没去copy),因为我想着这个log应该没人用了。结果这个log还是被raft层引用着,我之后更新这个map,raft层里面的对应log的map也会改,当log之后落盘持久化的时候,map已经不是当时的值了,之后故障恢复读的值就是错的。这个bug从现象上来看,就是集群宕机后恢复时,有可能Get出来的value中有不应该存在的重复字符串,今天看log少说看了1w行吧,脑子中思考了所有可能出现的情况,最后结论是逻辑上不可能出现这样的结果…最后突然想到没开-race,才发现问题所在。教训是:养成良好的编程debug习惯,用go/c++写这种存在并发的程序,就老老实实把AddressSantizer打开; 2. 当脑子里“所有权转移”的想法时,think twice,是否以后真的没有别的地方引用这个变量了。
2024.3.9 loadsnapshot时”labgob对非默认值字段不一定decode成功“这个规矩…真逆天,一个下午坑了我两回。
2024.3.10 跑TestUnreliable2时,发现一个情况:如果一个group A to group B 的 transfer成功的SendShardComplete久久没有返回到A,而这时候一次新的config变更导致刚才A transfer到B的某个shard这时候又要回传过去,如果这个B回传的ShardRecv先于那个SendShardComplete到A,那B的这个shard的状态变更顺序最终为Send->Valid->Invalid。解决办法就是SendShardComplete的处理逻辑改为:只有发现status=Send时,才设为Invalid。
配置变更,做shard transfer时,如果等shard收集齐了再一起发送,那么如果出现循环依赖,就会导致互相等待并死锁(官方handout里的死锁应该指的就是这个),导致config变更无法完成、相关的shard最终无法提供服务。所以我在一开始就没有选择”等shard收集齐再一起发送“这个设计,而是“有什么shard就发什么shard”,没收到的shard,收到了再发。假设A要发送shard x给B,A收到shard x了,但是InstallShard success的reply在途中丢失了,那B就认为A没有成功收到,这时候又一个更新的config出现了,并且config指出shard x应该属于C,问题来了,那B要不要发shard x给C呢?B如果发的话,如果A其实已经收到shard x了,并且处理了一些shard x的请求了,A是会把他的shard x发给C的,如果B的shard x先于A的shard x到C,那A的shard x就会被拒绝(因为它已经收到过shard x了),这时,他拿到的就是shard x的旧版本,这就有问题了。所以,B不能发shard x给C。问题又来了,那B能什么都不干吗?也不行,因为从B的角度来看,还没收到A的InstallShard success回复,如果什么都不干的话,那shard x就没有group管了。所以,B只能继续将shard x发给A。C收到shard x后,再将其转交给C。
最终,能稳定通过所有测试点了,由于一开始就是将垃圾收集和异步Shard Transfer纳入了设计的(其实这两个方面应该是正常也需要考虑到的),所以两个challenge的Test也很顺利地通过了。
Debug建议
- 打log是一门技术活。首先,raft层的log按照raft组进行分类,输出到各自的文件中(如果能保证raft层绝对正确,那其实可以直接关闭raft的log)。上层ShardKV状态机层则所有实例共用一个logger实例,打印到stderr/stdout即可。log库可以采用结构化日志loggrus,format可以用nested format,便于阅读一点。
- 跑test时开-race选项!
- 看log要有目的地去看,哪个shard有问题,就看shard有关的transfer过程的log。如果难找,就只打印和这个shard有关的log。
茶余饭后
今天和室友讨论POSIX的信号机制,才知道sighandler中是不可以获取锁的,因为sighandler本质不是一个可以调度的执行流,它就是在内核态返回用户态时,先于正常控制流执行的一部分代码。如果主控制流获取了锁,并被信号打断,而sighandler中又尝试获取该锁的话,就会发生死锁。所以从这个角度来说,通过锁实现的线程安全的代码都不是信号安全的,比如内存分配器、printf(其中维护的缓冲区是通过锁来保护的)。POSIX标准中要求这些函数是信号安全的。
相关博客指出,sighandler中可以用reentrylock,但是如果遇到multi-thread+fork这种corner case就可能发生死锁。
另外刷leetcode周赛时碰到个滑动窗口比较典型的题,大意是从长度为d的窗口内维护最小的k个元素,可以用“对顶堆+懒删除”或者2个multiset解决,本质上是一样的。如果是更简单的情况,比如只要维护最小的元素什么的,那就只要用一个堆/multiset就行。可见,这种查询固定区间的问题,用滑动窗口就可以解决;而对于区间修改,很多时候可以用前缀和来解决。线段树这种复杂的数据结构是用来做任意区间的修改和查询。
然后就是golang修改for loop的变量作用域的preview blog,终于要改了:
For Go 1.22, we plan to change for loops to make these variables have per-iteration scope instead of per-loop scope.
我自己写go时,发现这个很容易导致for loop中的goroutine捕捉到的是一个per-loop的的变量(go lamda function只能引用capture,即使是个int),这个问题在非并发的情况下也可能导致bug。