Golang 实现 Redis(4): AOF 持久化与AOF重写

AOF 持久化是典型的异步任务,主协程(goroutine) 可以使用 channel 将数据发送到异步协程由异步协程执行持久化操作。

在 DB 中定义相关字段:

type DB struct {
    // 主线程使用此channel将要持久化的命令发送到异步协程
    aofChan  chan *reply.MultiBulkReply
    // append file 文件描述符
    aofFile  *os.File
    // append file 路径
    aofFilename string

    // aof 重写需要的缓冲区,将在AOF重写一节详细介绍
    aofRewriteChan chan *reply.MultiBulkReply
    // 在必要的时候使用此字段暂停持久化操作
    pausingAof   sync.RWMutex
}

在进行持久化时需要注意两个细节:

我们在命令处理方法中返回 AOF 需要的额外信息:

type extra struct {
    // 表示该命令是否需要持久化
    toPersist  bool
    // 如上文所述 expire 之类的命令不能直接持久化
    // 若 specialAof == nil 则将命令原样持久化,否则持久化 specialAof 中的指令
   specialAof []*reply.MultiBulkReply
}

type CmdFunc func(db *DB, args [][]byte) (redis.Reply, *extra)

以 SET 命令为例:

func Set(db *DB, args [][]byte) (redis.Reply, *extra) {
    //....
    var result int
    switch policy {
    case upsertPolicy:
        result = db.Put(key, entity)
    case insertPolicy:
        result = db.PutIfAbsent(key, entity)
    case updatePolicy:
        result = db.PutIfExists(key, entity)
    }
    extra := &extra{toPersist: result > 0} // 若实际写入了数据则toPresist=true, 若因为XX或NX选项没有实际写入数据则toPresist=false
    if result > 0 {
        if ttl != unlimitedTTL { // 使用了 EX 或 NX 选项
            expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
            db.Expire(key, expireTime)
            // 持久化时使用 set key value 和 pexpireat 命令代替 set key value EX ttl 命令
            extra.specialAof = []*reply.MultiBulkReply{
                reply.MakeMultiBulkReply([][]byte{
                    []byte("SET"),
                    args[0],
                    args[1],
                }),
                makeExpireCmd(key, expireTime),
            }
        } else {
            db.Persist(key) // override ttl
        }
    }
    return &reply.OkReply{}, extra
}

var pExpireAtCmd = []byte("PEXPIREAT")

func makeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply {
  args := make([][]byte, 3)
  args[0] = pExpireAtCmd
  args[1] = []byte(key)
  args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10))
  return reply.MakeMultiBulkReply(args)
}

在异步协程中写入命令:

func (handler *Handler) handleAof() {
    handler.currentDB = 0
    for p := range handler.aofChan {
        // 使用锁保证每次都会写入一条完整的命令
        handler.pausingAof.RLock()
        // 每个客户端都可以选择自己的数据库,所以 payload 中要保存客户端选择的数据库
        // 选择的数据库与 aof 文件中最新的数据库不一致时写入一条 Select 命令
        if p.dbIndex != handler.currentDB {
            // select db
            data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
            _, err := handler.aofFile.Write(data)
            if err != nil {
                logger.Warn(err)
                continue // skip this command
            }
            handler.currentDB = p.dbIndex
        }
        // 写入命令内容
        data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
        _, err := handler.aofFile.Write(data)
        if err != nil {
            logger.Warn(err)
        }
        handler.pausingAof.RUnlock()
    }
    // 关闭过程中主协程会先关闭 handler.aofChan,然后使用
func (db *DB) loadAof(maxBytes int) {
    // delete aofChan to prevent write again
    aofChan := db.aofChan
    db.aofChan = nil
    defer func(aofChan chan *reply.MultiBulkReply) {
        db.aofChan = aofChan
    }(aofChan)

    file, err := os.Open(db.aofFilename)
    if err != nil {
        if _, ok := err.(*os.PathError); ok {
            return
        }
        logger.Warn(err)
        return
    }
    defer file.Close()

    reader := utils.NewLimitedReader(file, maxBytes)
    ch := parser.ParseStream(reader)
    for p := range ch {
        if p.Err != nil {
            if p.Err == io.EOF {
                break
            }
            logger.Error("parse error: " + p.Err.Error())
            continue
        }
        if p.Data == nil {
            logger.Error("empty payload")
            continue
        }
        r, ok := p.Data.(*reply.MultiBulkReply)
        if !ok {
            logger.Error("require multi bulk reply")
            continue
        }
        cmd := strings.ToLower(string(r.Args[0]))
        command, ok := cmdTable[cmd]
        if ok {
            handler := command.executor
            handler(db, r.Args[1:])
        }
    }
}

若我们对键a赋值100次会在AOF文件中产生100条指令但只有最后一条指令是有效的,为了减少持久化文件的大小需要进行AOF重写以删除无用的指令。

重写必须在固定不变的数据集上进行,不能直接使用内存中的数据。Redis 重写的实现方式是进行 fork 并在子进程中遍历数据库内的数据重新生成AOF文件。由于 golang 不支持 fork 操作,我们只能采用读取AOF文件生成副本的方式来代替fork。

在进行AOF重写操作时需要满足两个要求:

因此我们设计了一套比较复杂的流程:

首先准备开始重写操作:

func (handler *Handler) StartRewrite() (*rewriteCtx, error) {
    // 暂停 aof 写入, 数据会在 aofChan 中暂时堆积
    handler.pausingAof.Lock() // pausing aof
    defer handler.pausingAof.Unlock()

    // 调用 fsync 将缓冲区中的数据落盘,防止 aof 文件不完整造成错误
    err := handler.aofFile.Sync()
    if err != nil {
        logger.Warn("fsync failed")
        return nil, err
    }

    // 获得当前 aof 文件大小,用于判断哪些数据是 aof 重写过程中产生的
    // handleAof 会保证每次写入完整的一条指令
    fileInfo, _ := os.Stat(handler.aofFilename)
    filesize := fileInfo.Size()

    // 创建临时文件供重写使用
    file, err := ioutil.TempFile("", "*.aof")
    if err != nil {
        logger.Warn("tmp file create failed")
        return nil, err
    }
    return &rewriteCtx{
        tmpFile:  file,
        fileSize: filesize,
        dbIdx:  handler.currentDB, // 重写开始时 aof 文件选中的数据库
    }, nil
}

执行重写:

func (handler *Handler) DoRewrite(ctx *rewriteCtx) error {
    tmpFile := ctx.tmpFile

    // 将重写开始前的数据加载到内存
    tmpAof := handler.newRewriteHandler()
    tmpAof.LoadAof(int(ctx.fileSize))

    // 将内存中的数据写入临时文件
    for i := 0; i < config.Properties.Databases; i++ {
        // select db
        data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
        _, err := tmpFile.Write(data)
        if err != nil {
            return err
        }
        // dump db
        tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
            cmd := EntityToCmd(key, entity)
            if cmd != nil {
                _, _ = tmpFile.Write(cmd.ToBytes())
            }
            if expiration != nil {
                cmd := MakeExpireCmd(key, *expiration)
                if cmd != nil {
                    _, _ = tmpFile.Write(cmd.ToBytes())
                }
            }
            return true
        })
    }
    return nil
}

结束重写的过程最为复杂:

func (handler *Handler) FinishRewrite(ctx *rewriteCtx) {
    // 同样暂停 handleAof 的写入
    handler.pausingAof.Lock()
    defer handler.pausingAof.Unlock()

    // 打开线上 aof 文件并 seek 到重写开始的位置
    tmpFile := ctx.tmpFile
    src, err := os.Open(handler.aofFilename)
    if err != nil {
        logger.Error("open aofFilename failed: " + err.Error())
        return
    }
    defer func() {
        _ = src.Close()
    }()
    _, err = src.Seek(ctx.fileSize, 0)
    if err != nil {
        logger.Error("seek failed: " + err.Error())
        return
    }

    // 写入一条 Select 命令,使 tmpAof 选中重写开始时刻线上 aof 文件选中的数据库
    data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
    _, err = tmpFile.Write(data)
    if err != nil {
        logger.Error("tmp file rewrite failed: " + err.Error())
        return
    }
    // 对齐数据库后就可以把重写过程中产生的数据复制到 tmpAof 文件了
    _, err = io.Copy(tmpFile, src)
    if err != nil {
        logger.Error("copy aof filed failed: " + err.Error())
        return
    }

    // 使用 mv 命令用 tmpAof 代替线上 aof 文件
    _ = handler.aofFile.Close()
    _ = os.Rename(tmpFile.Name(), handler.aofFilename)

    // 重新打开线上 aof
    aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
    if err != nil {
        panic(err)
    }
    handler.aofFile = aofFile

    // 重新写入一次 select 指令保证 aof 中的数据库与 handler.currentDB 一致
    data = reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(handler.currentDB))).ToBytes()
    _, err = handler.aofFile.Write(data)
    if err != nil {
        panic(err)
    }
}

Original: https://www.cnblogs.com/Finley/p/12663636.html
Author: -Finley-
Title: Golang 实现 Redis(4): AOF 持久化与AOF重写

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529396/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 常用命令记录

    npm仓库查看和修改 npm config set registry https://registry.npm.taobao.org #设置使用淘宝提供的npm仓库 npm con…

    Linux 2023年6月14日
    060
  • source insight4.0最常用到的设置

    1、常用功能 1.1:全局查找 1.2:当前文件查找 1.3:高亮设置 1.4:配置字体以及其他 1.5:配置自动缩进 1.6:其他 1. 常用功能 全局查找 Ctl+/ 查找到的…

    Linux 2023年5月27日
    0111
  • MIT6.828(Step0)——实验环境配置

    实验环境配置 VirtualBox虚拟机为载体,安装Ubuntu $ uname -a Linux eliot-VirtualBox 5.11.0-36-generic #40~2…

    Linux 2023年5月27日
    083
  • Linux系统僵尸进程详解

    大安好,我是良许。 在本文中,我们将讨论什么是僵尸进程,如何创建僵尸进程,以及如何终止僵尸进程。 [En] In this article, we will discuss wha…

    Linux 2023年5月27日
    094
  • 3.20 什么是环境变量,Linux环境变量有哪些?

    变量是计算机系统用于保存可变值的数据类型,我们可以直接通过变量名称来提取到对应的变量值。在 Linux 系统中,环境变量是用来定义系统运行环境的一些参数,比如每个用户不同的家目录(…

    Linux 2023年6月7日
    082
  • 2-第一个Django程序

    第一个Django程序 从本章节开始将通过实现一个投票应用程序,来让用户逐步的了解Django。这个程序由两步分组成: 公共站点,允许用户访问进行投票,和查看投票。 站点管理,允许…

    Linux 2023年6月7日
    097
  • CANoe的安装和使用

    CANoe的简介 CANoe是德国Vector公司为汽车总线的开发而设计的一款总线开发环境,全称叫CAN open environment。CANoe集合了网络监控、数据获取/记录…

    Linux 2023年6月13日
    0186
  • bochs(2.6.11)配置安装

    下载:https://bochs.sourceforge.io/ 建议下载2.6.11,下文一开始安装的2.7,但运行时有无法解决的错误。但是大致安装过程一致。 linux 提前安…

    Linux 2023年5月27日
    0119
  • git-config 配置多用户环境以及 includeIf用法

    方法一: 直接在 $path文件中添加 &#x7528;&#x6237;&#x540D;和 &#x90AE;&#x7BB1;,如: [use…

    Linux 2023年5月27日
    0116
  • 笔记本 vmware 搭建的k8s 集群出现磁盘io爆满,node无法运行pod

    问题描述: k8s集群只有一个master节点和一个node节点,运行了一些java pod 之后出现磁盘io 爆满,pod无法调度和新建,但是节点的内存和磁盘空间都是充足的。 问…

    Linux 2023年6月14日
    098
  • 云笔记本:一个Laxcus应用软件

    给大家展示一个第三方开发的应用软件:云笔记本。 这个作品来自一位Laxcus分布式应用软件开发者,目前已经通过Laxcus集群操作系统的兼容性测试。云笔记本的界面和功能,类似Win…

    Linux 2023年6月6日
    0121
  • redis

    PHP-redis:http://pecl.php.net/package/redis PHP-redis中文文档(redis各种方法介绍):http://www.cnblogs….

    Linux 2023年5月28日
    073
  • 最新超详细VMware下CentOS系统安装

    一、了解CentOS系统 CentOS是免费的、开源的、可以重新分发的开源操作系统,CentOS(Community Enterprise Operating System,中文意…

    Linux 2023年5月27日
    086
  • Redis之事务

    一.是什么 可以一次执行多个命令,本质是一组命令的集合。一个事务中的所有命令都会序列化,按顺序地串行化执行而不会被其它命令插入,不许加塞二.能干嘛 一个队列中,一次性、顺序性、排他…

    Linux 2023年5月28日
    086
  • 022.常见硬盘检测方式

    硬盘监测概述 硬盘异常损坏日常相对概率较高,同时不同的文件系统(xfs,reiserfs,ext3)其检测方式不同。建议使用dmesag查看有没有硬件I/O故障的日志,也可使用用f…

    Linux 2023年6月7日
    080
  • 中土批量运维神器《ps1屠龙刀》 pk 西域批量运维圣器《ansible圣火令》

    据故老相传,运维界有句话:”脚林至尊,宝刀【ps1屠龙】,号令被控,莫敢不从”。 https://gitee.com/chuanjiao10/kasini3…

    Linux 2023年5月27日
    0101
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球