书接上文Raft Part C | MIT 6.824 Lab2C Persistence。
实验准备
- 实验代码:
git://g.csail.mit.edu/6.824-golabs-2021/src/raft
- 如何测试:
go test -run 2D -race
- 相关论文:Raft Extended Section 7
- 实验指导:6.824 Lab 2: Raft (mit.edu)
实验目标
实现 Snapshot
、 CondInstallSnapshot
、 InstallSnapshot RPC
,并修改之前的代码以支持本次实验的内容。
一些提示
- 不要使用论文中的偏移机制为数据分片,每个分片作为一个快照。而是每次RPC发送全部数据作为一个快照。
- 丢弃旧日志的全部引用,以便GC回收。
- 由于保存快照要丢弃部分日志,不能再使用日志长度来作为索引日志的标准。
- 考虑是否需要持久化
lastIncludeTerm
和lastIncludeIndex
。 - 使用
rf.persister.SaveStateAndSnapshot()
持久化快照。
日志压缩
日志序列不断扩张,是无法全部存储在内存中的,对于已经应用到状态机的部分日志,就不再需要维护在Raft中。
但由于仍可能存在部分Follower的日志序列远远落后于Leader,因此这部分日志不能被Leader丢弃,在同步日志时,若Leader中原应被同步的日志在快照中,则将快照发送给Follower。
lastIncluedTerm & lastIncludeIndex
日志压缩后,Raft需要记录额外的两个信息, lastIncludeIndex
、 lastIncludeTerm
表示快照中最后一个log的index和Term。
此处设计新的log类型如下。
type Log struct {
Entries []LogEntry
Base int
}
需要注意的是, Log.Entries
从1开始存储,因此 Log.Entries[0].Term
用于存储 lastIncludeTerm
, Log.Base
表示 Log.Entries[0]
的逻辑位置,也是 lastIncludeIndex
的值。
本例中,lastIncludeIndex = 4,lastIncludeTerm = 2,snapshot = [1,1,1,2]。
为Log添加相关成员函数。
func (l *Log) size() {
return l.Base + len(l.Entries)
}
func (l *Log) get(i int) {
return l.Entries[i-l.Base]
}
func (l *Log) set(i int, e LogEntry) {
l.[i-l.Base] = e
}
Snapshot()
Snapshot(index int, snapshot []byte)
由状态机调用,传入的 index
表示 lastIncludeIndex
, snapshot
由状态机生成,需要Raft保存,用于发送Follower时需要。
func (rf *Raft) Snapshot(index int, snapshot []byte) {
if index
index <= rf.log.base< code>说明传入的snapshot是一个旧的快照。<!--=-->
InstallSnapshot RPC
首先是 heartbeat()
应该新增如下逻辑,当Leader中应被同步到Follower的日志在快照中时,将快照发送给Follower。
if next
sendSnapshot()
和发送日志序列类似。
func (rf *Raft) sendSnapshot(id int, peer *labrpc.ClientEnd, args InstallSnapshotArgs) {
reply := InstallSnapshotReply{}
ok := peer.Call("Raft.InstallSnapshot", &args, &reply)
if !ok {
return
}
if reply.Term > rf.currentTerm {
rf.toFollower(reply.Term)
return
}
rf.nextIndex[id] = args.LastIncludedIndex + 1
rf.matchIndex[id] = args.LastIncludedIndex
}
InstallSnapshot()
和 AppendEntries()
类似, args.LastIncludedIndex <= rf.log.base< code>也是一样的,表示一个旧的快照。<!--=-->
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.lastRecv = time.Now()
if args.Term > rf.currentTerm {
rf.toFollower(args.Term)
}
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm || args.LastIncludedIndex
注意:快照是状态机中的概念,需要在状态机中加载快照,因此要通过applyCh将快照发送给状态机,但是发送后Raft并不立即保存快照,而是等待状态机调用 CondInstallSnapshot()
,如果从收到 InstallSnapshot()
后到收到 CondInstallSnapshot()
前,没有新的日志提交到状态机,则Raft返回True,Raft和状态机保存快照,否则Raft返回False,两者都不保存快照。
如此保证了Raft和状态机保存快照是一个原子操作。当然在 InstallSnapshot()
将快照发送给状态机后再将快照保存到Raft,令 CondInstallSnap()
永远返回True,也可以保证原子操作,但是这样做必须等待快照发送给状态机完成,但是 rf.applyCh <- applymsg< code>是有可能阻塞的,由于<code>InstallSnapshot()</code>需要持有全局的互斥锁,这可能导致整个节点无法工作。<!----->
为什么要保证原子操作?因为负责将commit状态的日志提交到状态机的goroutine不负责快照部分,因此必须是先保存快照,再同步日志。
本系列文章给出的代码为了好读,没有考虑同步问题,正常来讲
applyCh <- applymsg< code>这个操作是需要令起一个goroutine去做的。<!----->
如何判断 InstallSnapshot()
到 CondInstallSnapshot()
之间没有新的日志提交到状态机呢?这里使用 commitIndex
来判断,当 lastIncludeIndex <= commitindex< code>时,说明这期间原本没有的快照部分的日志补全了,虽然commit状态并不一定是apply状态,但这里以commit为准,更安全。<!--=-->
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
if lastIncludedIndex
需要注意的是,这里截断
rf.log.Entries
的方式,如果使用s = s[i:]
这样的方式,依然维持对底层数组全部元素的引用,是无法被GC回收的。
还有一点要注意的是,不要忘记在 Make()
中读取持久化的 snapshot
,并初始化 lastApplied
的值。
最后,为了证明我不是在乱写,附上我的测试结果。
Original: https://www.cnblogs.com/suqinglee/p/15550004.html
Author: 李素晴
Title: MIT 6.824 Lab2D Raft之日志压缩
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/611818/
转载文章受原作者版权保护。转载请注明原作者出处!