本实验要求在Lab2中实现的Raft的基础上构建一个key/value存储服务。
Part A: Key/value service without snapshots
首先实现不包含快照的key/value服务,每个kvserver与Raft peer一一对应。与server对应的clerk作为存储服务的用户,向作为Raft leader的kvserver发送Put()/Append()/Get() RPC以更新或读取数据。kvserver将clerk发来的Put/Append/Get操作作为日志提交给下层的Raft,因此Raft的log中保存的是clerk的操作序列。所有的kvserver按照顺序执行这些数据访问操作以在所有的服务器上保持键值数据库的一致性。
Raft的leader可能随着时间变化,而kvserver并不会主动将这一变化同步给clerk,当clerk访问的kvserver不是leader或kvserver无法连接时,则更换到下一个server。kvserver在接收到来自clerk的请求后,并不会立刻执行,而是首先通过Raft apply在所有的kvserver上同步,等待apply返回后再执行该请求,并向clerk返回。如果apply返回错误,也就是该请求对应的log提交失败时,向clerk返回错误信息,clerk更换到下一个kvserver,重发请求。
client.go中,每个clerk抽象为一个Clerk结构体,其中servers数组表示所有的kvserver,curLeader表示上次发送请求时的作为Raft Leader的服务器序列号,在发送新的请求时从curLeader开始,以尽可能减少重发次数。sequenceId作为每次请求的序列号,供server区分不同请求的先后顺序,同时对于具有相同序列号的两次请求,server不会重复执行而是仅返回上次执行的结果,定义如下:
1
2
3
4
5
6
7
8
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
curLeader int
me int64
mu sync.Mutex
sequenceId int64
}
每个server抽象为一个server结构体,其中database作为每个服务器上存储的键值数据库,lastSequence用于记录已经完成的来自每个clerk的最后一次请求的序列号,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
database map[string]string
// 用于记录已经完成的请求的响应和序列号
lastSequence map[int64]int64
channel map[int]chan Notify
indexMap map[int64]int
snapshotLastIndex int
}
clerk的Get函数向Leader请求与参数对应的value,每次调用Get,使clerk的sequenceId+1,并且不断向server发送请求,直到接收到来自server的不为ErrWrongLeader的响应,其定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
ErrWrongLeader = "ErrWrongLeader"
)
type Err string
type GetArgs struct {
Key string
// You'll have to add definitions here.
ClientId int64
SequenceNum int64
}
type GetReply struct {
Err Err
Value string
}
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
args := &GetArgs{key, ck.me, ck.sequenceId}
ck.sequenceId++
for {
for _, srv := range ck.servers {
var reply GetReply
ok := srv.Call("KVServer.Get", args, &reply)
if ok && reply.Err != ErrWrongLeader {
return reply.Value
}
}
time.Sleep(100 * time.Millisecond)
}
}
server的Get RPC,当接收到来自clerk的请求时,首先判断自己当前状态,如果不是leader,立即返回ErrWrongLeader错误。否则,检查lastSequence,如果当前请求已经处理过,不需要再次提交到Raft,而是直接返回OK和请求的值(接收到重复请求说明上次请求的响应没有被clerk接收到),如果没有处理过该请求,构造与该请求对应的Command,其中包含了操作类型、参数、发送该请求的clerkId以及该请求的序列号(通过这些信息能够唯一确定一次请求),然后通过Start将该Command添加到Raft LogEntry中,并发送到所有的Raft peer以达成共识,根据Start返回的该Command在LogEntry中的位置,创建一个channel,通过select在该channel上等待直到该Command通过Raft在大多数服务器上达成共识后通过apply唤醒,然后向clerk发送响应,并销毁对应的channel,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
if kv.killed() {
reply.Err = ErrWrongLeader
return
}
if _, isLeader := kv.rf.GetState(); !isLeader {
reply.Err = ErrWrongLeader
return
} else {
kv.mu.Lock()
lastSequence, ok := kv.lastSequence[args.ClientId]
if ok && args.SequenceNum <= lastSequence {
reply.Err, reply.Value = OK, kv.database[args.Key]
kv.mu.Unlock()
return
}
Command := Op{
"Get", args.Key, "", args.ClientId, args.SequenceNum,
}
kv.mu.Unlock()
idx, term, isLeader := kv.rf.Start(Command)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
kv.mu.Lock()
ch, ok := kv.channel[idx]
if !ok {
kv.channel[idx] = make(chan Notify, 1)
ch = kv.channel[idx]
}
kv.mu.Unlock()
select {
case <-time.After(500 * time.Millisecond):
{
kv.mu.Lock()
_, isLeader = kv.rf.GetState()
lastSequence, ok := kv.lastSequence[args.ClientId]
if isLeader && ok && args.SequenceNum <= lastSequence {
reply.Err, reply.Value = OK, kv.database[args.Key]
} else {
reply.Err = ErrWrongLeader
}
delete(kv.channel, idx)
kv.mu.Unlock()
break
}
case notify := <-ch:
{
kv.mu.Lock()
if notify.sequenceNum == args.SequenceNum && term == notify.term {
if value, ok := kv.database[args.Key]; !ok {
reply.Err = ErrNoKey
kv.lastSequence[args.ClientId] = args.SequenceNum
} else {
kv.lastSequence[args.ClientId] = args.SequenceNum
reply.Value, reply.Err = value, OK
}
} else {
reply.Err = ErrWrongLeader
}
delete(kv.channel, idx)
kv.mu.Unlock()
break
}
}
}
}
clerk的Put与Append唯一的区别在于,Put是直接在数据库中添加从key到value的映射, Append是如果当前存在了key,则在原本的value后追加参数中的value,该逻辑由server处理,在clerk中只有操作类型的不同,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
args := &PutAppendArgs{key, value, op, ck.me, ck.sequenceId}
ck.sequenceId++
for {
for _, srv := range ck.servers {
var reply PutAppendReply
ok := srv.Call("KVServer.PutAppend", args, &reply)
if ok && reply.Err != ErrWrongLeader {
return
}
}
time.Sleep(100 * time.Millisecond)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
server的PutAppend RPC处理逻辑与Get基本一致,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
if kv.killed() {
reply.Err = ErrWrongLeader
return
}
kv.mu.Lock()
if _, isLeader := kv.rf.GetState(); !isLeader {
reply.Err = ErrWrongLeader
kv.mu.Unlock()
return
} else {
if lastSequence, ok := kv.lastSequence[args.ClientId]; ok && args.SequenceNum <= lastSequence {
reply.Err = OK
kv.mu.Unlock()
return
}
Command := Op{
args.Op, args.Key, args.Value, args.ClientId, args.SequenceNum,
}
kv.mu.Unlock()
idx, term, isLeader := kv.rf.Start(Command)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
kv.mu.Lock()
ch, ok := kv.channel[idx]
if !ok {
kv.channel[idx] = make(chan Notify, 1)
ch = kv.channel[idx]
}
kv.mu.Unlock()
select {
case <-time.After(500 * time.Millisecond):
{
kv.mu.Lock()
_, isLeader := kv.rf.GetState()
lastSequence, ok := kv.lastSequence[args.ClientId]
if isLeader && ok && args.SequenceNum <= lastSequence {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
delete(kv.channel, idx)
kv.mu.Unlock()
break
}
case notify := <-ch:
{
kv.mu.Lock()
if notify.sequenceNum == args.SequenceNum && term == notify.term {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
delete(kv.channel, idx)
kv.mu.Unlock()
break
}
}
}
}
在启动kvserver时,启动了一个协程执行apply函数,用于接收来自下层Raft协议的已经达成一致的Command。当从applyCh接收到applyMsg时,如果该applyMsg对应的Command有效,则根据操作Command的操作类型执行对应的处理逻辑,将该Command对应的clerk及其序列号记录到lastSequence中,并根据applyMsg对应的在LogEntry中的下标,获得之前RPC调用中创建并等待的channel,向其中写入Notify信息以唤醒与该请求对应的RPC处理协程,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (kv *KVServer) apply() {
for !kv.killed() {
applyMsg := <-kv.applyCh
if applyMsg.CommandValid {
command := (applyMsg.Command).(Op)
kv.mu.Lock()
lastSequence, ok := kv.lastSequence[command.ClientId]
if command.Operator == "Put" {
if !ok || lastSequence < command.SequenceNum {
kv.database[command.Key] = command.Value
kv.lastSequence[command.ClientId] = command.SequenceNum
}
} else if command.Operator == "Append" {
if !ok || lastSequence < command.SequenceNum {
value, ok := kv.database[command.Key]
if !ok {
kv.database[command.Key] = command.Value
} else {
kv.database[command.Key] = value + command.Value
}
kv.lastSequence[command.ClientId] = command.SequenceNum
}
}
ch, ok := kv.channel[applyMsg.CommandIndex]
notify := Notify{command.SequenceNum, applyMsg.CommandTerm}
kv.mu.Unlock()
if ok {
ch <- notify
} else {
}
}
}
}
Part B: Key/value service with snapshots
PartA中的代码没有实现Snapshot,因此当server重启时需要重放完整的日志才能够恢复状态。在PartB中利用Lab 2D的Snapshot()实现快照,以减少日志占用空间和状态恢复时间。
给定一个参数maxraftstate,表示Raft允许的持久化状态的最大长度,当persister.RaftStateSize()接近maxraftstate时,通过调用Snapshot保存快照, kvserver在apply()中,接收到来自Raft的applyMsg后,如果CommandValid为true,表示该applyMsg对应一个新达成共识的LogEntry,因此需要判断是否需要保存快照,而如果applyMsg的CommandValid为false,表示该kvserver对应的Raft peer已经落后leader太远了,接收到了来自leader发送的snapshot,因此直接从该snapshot中恢复状态,并丢弃自己原本的状态。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (kv *KVServer) apply() {
for !kv.killed() {
applyMsg := <-kv.applyCh
if applyMsg.CommandValid {
// ......
if kv.maxraftstate != -1 {
kv.mu.Lock()
if kv.rf.Persister.RaftStateSize() >= kv.maxraftstate {
kv.snapshotLastIndex = applyMsg.CommandIndex
kv.StartSnapshot()
}
kv.mu.Unlock()
}
} else {
snapshot := applyMsg.Snapshot
kv.mu.Lock()
kv.RestoreFromSnapshot(snapshot)
kv.snapshotLastIndex = applyMsg.SnapshotIndex
kv.CleanChannel()
kv.mu.Unlock()
}
}
}
func (kv *KVServer) RestoreFromSnapshot(snapshot []byte) {
if snapshot == nil || len(snapshot) < 1 {
return
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var database map[string]string
var lastSequence map[int64]int64
var snapshotLastIndex int
if d.Decode(&database) != nil || d.Decode(&lastSequence) != nil || d.Decode(&snapshotLastIndex) != nil {
DPrintf("ReadSnapshot failed\n")
} else {
kv.database = database
kv.lastSequence = lastSequence
kv.snapshotLastIndex = snapshotLastIndex
}
}
func (kv *KVServer) CleanChannel() {
for idx := range kv.channel {
if idx <= kv.snapshotLastIndex {
delete(kv.channel, idx)
}
}
}
运行go-test-many.sh脚本测试1000次,可以稳定通过测试。