MIT 6.824 Lab2D Raft之日志压缩

书接上文Raft Part C | MIT 6.824 Lab2C Persistence

实验准备

  1. 实验代码: git://g.csail.mit.edu/6.824-golabs-2021/src/raft
  2. 如何测试: go test -run 2D -race
  3. 相关论文:Raft Extended Section 7
  4. 实验指导:6.824 Lab 2: Raft (mit.edu)

实验目标

实现 SnapshotCondInstallSnapshotInstallSnapshot RPC,并修改之前的代码以支持本次实验的内容。

一些提示

  1. 不要使用论文中的偏移机制为数据分片,每个分片作为一个快照。而是每次RPC发送全部数据作为一个快照。
  2. 丢弃旧日志的全部引用,以便GC回收。
  3. 由于保存快照要丢弃部分日志,不能再使用日志长度来作为索引日志的标准。
  4. 考虑是否需要持久化 lastIncludeTermlastIncludeIndex
  5. 使用 rf.persister.SaveStateAndSnapshot()持久化快照。

日志压缩

日志序列不断扩张,是无法全部存储在内存中的,对于已经应用到状态机的部分日志,就不再需要维护在Raft中。

但由于仍可能存在部分Follower的日志序列远远落后于Leader,因此这部分日志不能被Leader丢弃,在同步日志时,若Leader中原应被同步的日志在快照中,则将快照发送给Follower。

lastIncluedTerm & lastIncludeIndex

日志压缩后,Raft需要记录额外的两个信息, lastIncludeIndexlastIncludeTerm表示快照中最后一个log的index和Term。

MIT 6.824 Lab2D Raft之日志压缩

此处设计新的log类型如下。

type Log struct {
    Entries []LogEntry
    Base    int
}

需要注意的是, Log.Entries从1开始存储,因此 Log.Entries[0].Term用于存储 lastIncludeTermLog.Base表示 Log.Entries[0]的逻辑位置,也是 lastIncludeIndex的值。

MIT 6.824 Lab2D Raft之日志压缩

本例中,lastIncludeIndex = 4,lastIncludeTerm = 2,snapshot = [1,1,1,2]。

为Log添加相关成员函数。

func (l *Log) size() {
    return l.Base + len(l.Entries)
}

func (l *Log) get(i int) {
    return l.Entries[i-l.Base]
}

func (l *Log) set(i int, e LogEntry) {
    l.[i-l.Base] = e
}

Snapshot()

Snapshot(index int, snapshot []byte)由状态机调用,传入的 index表示 lastIncludeIndexsnapshot由状态机生成,需要Raft保存,用于发送Follower时需要。

func (rf *Raft) Snapshot(index int, snapshot []byte) {
    if index

index <= rf.log.base< code>&#x8BF4;&#x660E;&#x4F20;&#x5165;&#x7684;snapshot&#x662F;&#x4E00;&#x4E2A;&#x65E7;&#x7684;&#x5FEB;&#x7167;&#x3002;<!--=-->

InstallSnapshot RPC

首先是 heartbeat()应该新增如下逻辑,当Leader中应被同步到Follower的日志在快照中时,将快照发送给Follower。

if next

sendSnapshot()和发送日志序列类似。

func (rf *Raft) sendSnapshot(id int, peer *labrpc.ClientEnd, args InstallSnapshotArgs) {
    reply := InstallSnapshotReply{}
    ok := peer.Call("Raft.InstallSnapshot", &args, &reply)
    if !ok {
        return
    }

    if reply.Term > rf.currentTerm {
        rf.toFollower(reply.Term)
        return
    }

    rf.nextIndex[id] = args.LastIncludedIndex + 1
    rf.matchIndex[id] = args.LastIncludedIndex
}

InstallSnapshot()AppendEntries()类似, args.LastIncludedIndex <= rf.log.base< code>&#x4E5F;&#x662F;&#x4E00;&#x6837;&#x7684;&#xFF0C;&#x8868;&#x793A;&#x4E00;&#x4E2A;&#x65E7;&#x7684;&#x5FEB;&#x7167;&#x3002;<!--=-->

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
    rf.lastRecv = time.Now()

    if args.Term > rf.currentTerm {
        rf.toFollower(args.Term)
    }
    reply.Term = rf.currentTerm

    if args.Term < rf.currentTerm || args.LastIncludedIndex

注意:快照是状态机中的概念,需要在状态机中加载快照,因此要通过applyCh将快照发送给状态机,但是发送后Raft并不立即保存快照,而是等待状态机调用 CondInstallSnapshot(),如果从收到 InstallSnapshot()后到收到 CondInstallSnapshot()前,没有新的日志提交到状态机,则Raft返回True,Raft和状态机保存快照,否则Raft返回False,两者都不保存快照。

如此保证了Raft和状态机保存快照是一个原子操作。当然在 InstallSnapshot()将快照发送给状态机后再将快照保存到Raft,令 CondInstallSnap()永远返回True,也可以保证原子操作,但是这样做必须等待快照发送给状态机完成,但是 rf.applyCh <- applymsg< code>&#x662F;&#x6709;&#x53EF;&#x80FD;&#x963B;&#x585E;&#x7684;&#xFF0C;&#x7531;&#x4E8E;<code>InstallSnapshot()</code>&#x9700;&#x8981;&#x6301;&#x6709;&#x5168;&#x5C40;&#x7684;&#x4E92;&#x65A5;&#x9501;&#xFF0C;&#x8FD9;&#x53EF;&#x80FD;&#x5BFC;&#x81F4;&#x6574;&#x4E2A;&#x8282;&#x70B9;&#x65E0;&#x6CD5;&#x5DE5;&#x4F5C;&#x3002;<!----->

为什么要保证原子操作?因为负责将commit状态的日志提交到状态机的goroutine不负责快照部分,因此必须是先保存快照,再同步日志。

本系列文章给出的代码为了好读,没有考虑同步问题,正常来讲 applyCh <- applymsg< code>&#x8FD9;&#x4E2A;&#x64CD;&#x4F5C;&#x662F;&#x9700;&#x8981;&#x4EE4;&#x8D77;&#x4E00;&#x4E2A;goroutine&#x53BB;&#x505A;&#x7684;&#x3002;<!----->

如何判断 InstallSnapshot()CondInstallSnapshot()之间没有新的日志提交到状态机呢?这里使用 commitIndex来判断,当 lastIncludeIndex <= commitindex< code>&#x65F6;&#xFF0C;&#x8BF4;&#x660E;&#x8FD9;&#x671F;&#x95F4;&#x539F;&#x672C;&#x6CA1;&#x6709;&#x7684;&#x5FEB;&#x7167;&#x90E8;&#x5206;&#x7684;&#x65E5;&#x5FD7;&#x8865;&#x5168;&#x4E86;&#xFF0C;&#x867D;&#x7136;commit&#x72B6;&#x6001;&#x5E76;&#x4E0D;&#x4E00;&#x5B9A;&#x662F;apply&#x72B6;&#x6001;&#xFF0C;&#x4F46;&#x8FD9;&#x91CC;&#x4EE5;commit&#x4E3A;&#x51C6;&#xFF0C;&#x66F4;&#x5B89;&#x5168;&#x3002;<!--=-->

func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
    if lastIncludedIndex

需要注意的是,这里截断 rf.log.Entries的方式,如果使用 s = s[i:]这样的方式,依然维持对底层数组全部元素的引用,是无法被GC回收的。

还有一点要注意的是,不要忘记在 Make()中读取持久化的 snapshot,并初始化 lastApplied的值。

MIT 6.824 Lab2D Raft之日志压缩

最后,为了证明我不是在乱写,附上我的测试结果。

Original: https://www.cnblogs.com/suqinglee/p/15550004.html
Author: 李素晴
Title: MIT 6.824 Lab2D Raft之日志压缩

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/611818/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球