Lab 2A leader election
Lab 2A要完成的是初始的leader选举过程,而没有日志的参与。主要需要完成三个部分:定时器,RequestVote和heartBeat.
定时器
定时器的逻辑很简单,无非是在随机等待时间超时后将状态切换为Candidate,然后开启一个协程开始选举过程,并继续等待下一次时间超时。在Lab的提示部分,提到”The easiest way to do this is to create a goroutine with a loop that calls time.Sleep(). Don’t use Go’s time.Timer
or time.Ticker
, which are difficult to use correctly.”因此采用了time.Sleep来实现计时,但是Raft要求在接收到新的heartbeat后重置定时器,而time.Sleep并没有重置的功能,因此在这里引入了一个新的变量isTimeout。当接收到新的RequestVote或AppendEntries RPC时,将isTimeout设置为false,表示当前的计时器超时后再等待下一轮计时。只有当time.Sleep返回,并且isTimeout为true时,才是真正的超时,将状态设置为Candidate并启动选举过程。当isTimeout为false并且状态为Follower时,将isTimeout重置为true,继续计时。
startElection和RequestVote
由于Lab 2A中没有引入日志,因此在这里有用的RequestVote的参数只有term和candidateId,分别表示Candidate的任期和id。在startElection中,因为要求Candidate接收到来自新的leader的AppendEntries RPC时将状态转换为Follower,而这一过程可能在startElection的过程中出现,因此首先需要判断一下当前的状态是否为Candidate,如果是,将当前的任期号+1,并设置votedFor = rf.me,从而防止投票给具有相同任期号的其他Candidate。对于集群中的每一个peer,调用sendRequestVote以调用其RequestVote RPC,peer收到RequestVote后,判断任期号是否更新,如果peer发现自己的任期号更大,则返回false和自己的任期号,Candidate在接收到更大的任期号之后将状态立刻设置为Follower,退出选举。否则,如果peer在当前任期内没有给其他的Candidate投票,那么返回true,票数+1.Candidate统计得票数,当票数过半时,将状态转换为Leader,开始发送heartBeat。
如果票数没过半,即发生了brain-split时应该怎么处理?论文中解释说在等待一个随机时间后再开始选举,以尽可能减少brain-split再次发生的概率。事实上,由于我们本身具有定时器机制,并且系统中此时并不存在另一个Leader来发送heartbeat,因此只要结束当前的startElection,等待计时器超时后再次启动startElection就可以解决brain-split问题。
heartBeat和AppendEntries
当前机器成为Leader后,应该一直以固定的时间间隔向系统中所有的peer发送heartBeat,直到发现自己已经不再是Leader为止。因此在循环的开始处,首先判断状态是否为Leader,如果不是Leader直接退出循环。与RequestVote类似,由于Lab 2A不存在日志,因此只需要用到Term和LeaderId两个参数。对于集群中的每一个peer,调用sendAppendEntries以调用AppendEntries RPC。在这里,只有当Leader的任期号小于Follower时才会返回false,因此当reply.Success为false时,直接将状态切换为Follower。对于Follower,当收到有效的AppendEntries,即Leader的任期号不小于自己当前的任期号时,重置isTimeout。
Lab 2B log
Lab 2B中加入了日志,因此相比于2A产生了更多和更复杂的情况。
首先,RequestEntries中引入了两个新的参数:LastLogIndex和LastLogTerm,分别是当前日志的末尾元素的下标和末尾元素的任期号,用于实现选举对Candidate日志长度的限制,并且在Candidate成为Leader时,需要将每个peer的nextIndex设置为当前的日志长度,matchIndex设置为0。
测试程序和基于Raft的应用使用Start这一接口来执行Command并将其记录到日志中。只有Leader的Start可以被调用,Follower对Start直接返回false。Leader首先将Command加入到自己的日志中,然后依次调用每个peer的AppendEntries来讲Command发送并提交给每个Follower。在这里,Leader需要判断每个peer的nextIndex,如果nextIndex比新插入的Command的下标更小,则说明前面还有没插入的Command,因此需要将从nextIndex之后的日志全部发送出去。如果Follower返回false,则有两种情况:1. Leader落后于系统,将状态转换为Follower。 2. Leader除了发送的这部分日志之外,还有Follower没有记录的日志未发送,因此需要减小nextIndex和matchIndex,具体来说,将nextIndex设为matchIndex,将matchIndex-1,直到matchIndex为0为止。如果发送成功,则将该peer的matchIndex指向当前日志的末尾,nextIndex指向matchIndex+1。在所有的AppendEntries都发送完之后,Leader需要更新自己的commitIndex,具体来说,从当前的commitIndex + 1开始,依次判断与每个peer的matchIndex的大小关系,找到最大的n,使大多数peer的matchIndex >= n,则n为新的commitIndex。而旧的commitIndex到新的commitIndex之间的日志视为已提交,并将其依次发送到applyCh,作为applied,随后更新lastApplied。
从上面的描述可以看到,首先Leader的Start过程需要等待所有的发送过程完成,一开始我采用了条件变量,每个用于发送的协程结束后调用wg.Done,而主线程使用wg.Wait阻塞,但是这一机制在某些测试中可能会导致阻塞时间过长,Follower重新选举或在预定的时间内没有成功提交等问题。而如果直接去掉条件变量,又会在另一些测试中由于部分发送协程没有执行完,因此产生不一致或是提交不完全等问题。我在发送结束后加了time.Sleep(10 * time.Millisecond),在大多数情况下可以通过测试(至少我没有遇到失败情况)。
RequestVote的修改比较简单,只需要添加对于Candidate日志长度的比较。对于AppendEntries,如果参数中的Entries长度不为0,就需要从PrevLogIndex开始,找出自己的日志中与Leader对应位置处不一致的日志,将此日志后续的所有日志删除,并将新的日志添加到后面。同时,如果LeaderCommit>commitIndex,将commitIndex更新为LeaderCommit和日志长度二者中的最小值,视为提交部分日志,并将提交后的日志发送给applyCh。
经过上面的修改,我产生了一个疑惑:Leader的CommitIndex是根据Follower的返回值来更新的,而Follower的CommitIndex的更新又依赖于LeaderCommit,那么如果仅有一个Start,那么是否日志将永远无法提交?答案显然是否定的。在外部应用调用Start之外,Leader会周期性的发送heartBeat,同样可以携带CommitIndex和日志信息来帮助进行日志提交。在这里需要对heartBeat作出修改,具体来说,除了不需要向日志中插入新的Command之外,其他部分与Start毫无二致。
实验给出了一个很有意思的Test,假设存在三台机器,0是Leader,1、2是Follower,让2短暂失去连接,过一段时间后再重新接入,在这个过程中要求系统始终保持一致。实质上这一Test测试的是election restriction的正确性。由于2断联,因此会不断startElection,也就是说currentTerm不断+1,但是0是leader,还在不断地向1和2发送AppendEntries,在2重新变成Follower加入之后,因为0的currentTerm < 2因此0会变成Follower,2成为Candidate,但是由于commitIndex的限制,0和1又不会同意2的选举,因此会出现三个Follower,并且0和1的任期被更新为与2的相同,最终还是只有0和1可以赢得选举,并将日志提交到2上,保持整个系统的一致性。
Lab2C: persistence
Lab2C要求完成persist和readPersist两个函数,用于保存和恢复服务器的状态 。persist和readPersist中其实已经给出了具体的写法,只要按相同的顺序对Figure2中所示的persistent state(即currentTerm、votedFor和log[])encode和decode。并在每次persistent state发生变化时调用persist,在Make时调用readPersist恢复。
Lab 2C中还增加了对Figure 8的测试,即leader只能通过计数来提交当前任期的日志,而在之前任期产生的日志通过间接提交来完成。具体来说,只有当前任期的日志已经复制到在大多数服务器上时,leader将该日志提交,而在此前的日志也被一并提交。
Lab 2D: Log compaction
Lab 2D要用snapshot来完成日志压缩。当服务器的日志过长时,需要大量时间来完成重放,因此需要定期生成快照来实现压缩。需要实现Snapshot和InstallSnapshot两个函数。
Snapshot比较简单,仅用于被上层服务调用,两个参数index和snapshot分别是snapshot对应的下标和保存的快照。如果index对应的下标已经生成过快照,即已经不在内存中,则忽略。如果index超出当前最大的日志,则服务器当前的状态已经包含在参数snapshot中了,创建一个新的log数组,将log[0]的index设置为参数index,将term设置为currentTerm。如果index在当前的log中,则丢弃index之前的部分,将后续的日志作为新的日志,并设置lastIncludedIndex为index。
InstallSnapshot用于Leader发现Follower对应的nextIndex已经被丢弃时,将快照发送给Follower。处理过程与AppendEntries比较相似。如果Follower发现Leader的任期落后于自己,则返回自己的Term。否则,判断是否需要修改自己的状态为Follower和重置votedFor。然后,使用与Snapshot相同的规则,丢弃多余的日志,设置lastIncludedIndex。区别在于,Follower在接受了snapshot后,需要通过applyCh来向上层服务发送ApplyMsg。由于的ApplyMsg并不是日志提交,因此并不需要修改lastApplied和commitIndex两个状态,二者只有当leader提交日志后才能够随着LeaderCommit的变化而变化。
与InstallSnapshot相对的,在Leader通过BroadCast向所有的服务器发送heartBeat之前,需要首先判断nextIndex对应的日志是否已经被丢弃,如果被丢弃,则要通过InstallSnapshot使对应的Follower与Leader保持一致,在InstallSnapshot成功时,由于要求系统状态不能后退,因此将matchIndex设置为max(lastIncludedIndex, matchIndex),nextIndex设置为max(matchIndex+1,nextIndex)。
我在Lab2D上花费的时间大约为前面三次时间的总和。一方面,由于之前没有考虑到下标的变化,在Lab2D中由于日志压缩的影响,所有的日志下标都要修改为压缩之后的值。另一方面,在2D的测试中,之前的测试用例没有测出来的代码中的潜在问题在此时全面爆发,因此需要从日志中查找细节错误。例如,在TestSnapshotInstallUnreliable2D中遇到了这么一个问题:系统中存在三台服务器,其中0和1变为Candidate,开始选举过程,但是始终发送的RequestVote一直没有响应,其中2可以收到RequestVote请求,但一直无法真正处理该请求。推测是因为系统中出现了死锁导致RequestVote无法获得锁进行处理。通过对每个Lock和Unlock打log,发现由于InstallSnapshot和AppendEntries使用锁保护了applyCh<-ApplyMsg这条语句,而applyCh容量为1,在上层服务没有将管道中的内容取走时,InstallSnapshot和AppendEntries被阻塞在此处,从而也就无法释放锁,而上层服务也被Lock锁住无法从管道中读取,从而导致了系统的死锁。通过将applyCh<-ApplyMsg从临界区中移出,成功解决了死锁问题。
我还遇到了一个Liveness问题,即系统长时间无法提供读服务。似乎是由于算法本身的缺陷所造成的。考虑如下场景,系统中存在0,1,2三台服务器产生的集群。初始时0赢得选举成为Leader,向1和2发送了AppendEntries,然后由于网络波动,2没有收到0,而1收到了。此时0的commitIndex为1,而在0发送下一次heartBeat之前,2由于长时间没有收到heartbeat,Term+1,成为Candidate,但是由于2中的日志比较旧,因此无法赢得选举,此时012均成为Follower,term均为2,1先于0成为Candidate,由于1的日志与0的一样新,比2的更新,因此赢得选举,term为3.但是,1中的日志是0在term为1时发送的,由于Raft限制只能提交当前任期的日志,因此在服务没有新的日志写入时,1将会始终无法提交日志。在此期间,如果又有读请求到了节点1,如果直接执行了读请求,则不满足线性一致性,因为状态机的状态依然是旧的(因为0已经提交了该日志,而1没有提交,按照线性一致性的要求,在读到了新的状态后就不能再读到旧的状态)。因此需要首先判断当前term是否提交过日志,如果没有提交过日志,则应该等待当前term提交过日志之后才能处理读请求,而client的读请求得不到满足,也就不会写入新的命令,1依然无法提交日志,从而造成了系统陷入了类似死锁的状态。
一种似乎可行的解决方法是在Apply中,如果最后一个日志的term小于当前的term,则写入一个空命令来使leader一并提交先前的日志。我试着实现了这一机制,但是加入之后lab 2D和lab2B的测试都出现了大量的错误。。。。
总结
我的代码实现在clingfei/6.824 (github.com)中,每个实验都测试了200次,没有出现错误。
前前后后花了八九个工作日,最后在11.11完成了Lab 2A~2D,果然单身狗只能写代码(笑)。对并发的理解提高了很多,深刻体会到了Go写并发的优势,debug的耐心也增加了很多。后面争取本学期结束前写完Lab 3和Lab 4。