Golang 实现 Redis(10): 本地原子性事务

为了支持多个命令的原子性执行 Redis 提供了事务机制。 Redis 官方文档中称事务带有以下两个重要的保证:

  • 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  • 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行

我们在使用事务的过程中可能会遇到两类错误:

在遇到语法错误时 Redis 会中止命令入队并丢弃事务。在遇到运行时错误时 Redis 仅会报错然后继续执行事务中剩下的命令,不会像大多数数据库那样回滚事务。对此,Redis 官方的解释是:

Redis 命令只会因为错误的语法而失败(并且这些问题不能在入队时发现),或是命令用在了错误类型的键上面:这也就是说,从实用性的角度来说,失败的命令是由编程错误造成的,而这些错误应该在开发的过程中被发现,而不应该出现在生产环境中。
因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。
有种观点认为 Redis 处理事务的做法会产生 bug , 然而需要注意的是, 在通常情况下, 回滚并不能解决编程错误带来的问题。 举个例子, 如果你本来想通过 INCR 命令将键的值加上 1 , 却不小心加上了 2 , 又或者对错误类型的键执行了 INCR , 回滚是没有办法处理这些情况的。鉴于没有任何机制能避免程序员自己造成的错误, 并且这类错误通常不会在生产环境中出现, 所以 Redis 选择了更简单、更快速的无回滚方式来处理事务。

emmmm, 接下来我们尝试在 Godis 中实现具有原子性、隔离性的事务吧。

事务的原子性具有两个特点:1. 事务执行过程不可被其它事务(线程)插入 2. 事务要么完全成功要么完全不执行,不存在部分成功的状态
事务的隔离性是指事务中操作的结果是否对其它并发事务可见。由于KV数据库不存在幻读问题,因此我们需要避免脏读和不可重复度问题。

与 Redis 的单线程引擎不同 godis 的存储引擎是并行的,因此需要设计锁机制来保证执行多条命令执行时的原子性和隔离性。

实现一个常规命令需要提供3个函数:

  • ExecFunc 是实际执行命令的函数
  • PrepareFunc 在 ExecFunc 前执行,负责分析命令行读写了哪些 key 便于进行加锁
  • UndoFunc 仅在事务中被使用,负责准备 undo logs 以备事务执行过程中遇到错误需要回滚。

其中的 PrepareFunc 会分析命令行返回要读写的 key, 以 prepareMSet 为例:

// return writtenKeys, readKeys
func prepareMSet(args [][]byte) ([]string, []string) {
    size := len(args) / 2
    keys := make([]string, size)
    for i := 0; i < size; i++ {
        keys[i] = string(args[2*i])
    }
    return keys, nil
}

结合实现内存数据库 中提到的 LockMap 即可完成加锁。由于其它协程无法获得相关 key 的锁所以不可能插入到事务中,所以我们实现了原子性中不可被插入的特性。

事务需要把所有 key 一次性完成加锁, 只有在事务提交或回滚时才能解锁。不能用到一个 key 就加一次锁用完就解锁,这种方法可能导致脏读:

时间 事务1 事务2 t1 锁定key A t2 修改key A t3 解锁key A t4 锁定key A t4 读取key A t5 解锁key A t6 提交

如上图所示 t4 时刻, 事务 2 读到了事务 1未提交的数据,出现了脏读异常。

为了在遇到运行时错误时事务可以回滚(原子性),可用的回滚方式有两种:

  • 保存修改前的value, 在回滚时用修改前的value进行覆盖
  • 使用回滚命令来撤销原命令的影响。举例来说:键A原值为1,调用了 Incr A 之后变为了2,我们可以再执行一次 Set A 1命令来撤销 incr 命令。

出于节省内存的考虑我们最终选择了第二种方案。比如 HSet 命令只需要另一条 HSet 将 field 改回原值即可,若采用保存 value 的方法我们则需要保存整个 HashMap。类似情况的还有 LPushRPop 等命令。

有一些命令可能需要多条命令来回滚,比如回滚 Del 时不仅需要恢复对应的 key-value 还需要恢复 TTL 数据。或者 Del 命令删除了多个 key 时,也需要多条命令进行回滚。综上我们给出 UndoFunc 的定义:

// UndoFunc returns undo logs for the given command line
// execute from head to tail when undo
type UndoFunc func(db *DB, args [][]byte) []CmdLine

我们以可以回滚任意操作的 rollbackGivenKeys为例进行说明,当然使用 rollbackGivenKeys的成本较高,在可能的情况下尽量实现针对性的 undo log.

func rollbackGivenKeys(db *DB, keys ...string) []CmdLine {
    var undoCmdLines [][][]byte
    for _, key := range keys {
        entity, ok := db.GetEntity(key)
        if !ok {
            // 原来不存在 key 删掉
            undoCmdLines = append(undoCmdLines,
                utils.ToCmdLine("DEL", key),
            )
        } else {
            undoCmdLines = append(undoCmdLines,
                utils.ToCmdLine("DEL", key), // 先把新 key 删除掉
                aof.EntityToCmd(key, entity).Args, // 把 DataEntity 序列化成命令行
                toTTLCmd(db, key).Args,
            )
        }
    }
    return undoCmdLines
}

接下来看一下 EntityToCmd, 非常简单易懂:

func EntityToCmd(key string, entity *database.DataEntity) *protocol.MultiBulkReply {
    if entity == nil {
        return nil
    }
    var cmd *protocol.MultiBulkReply
    switch val := entity.Data.(type) {
    case []byte:
        cmd = stringToCmd(key, val)
    case *List.LinkedList:
        cmd = listToCmd(key, val)
    case *set.Set:
        cmd = setToCmd(key, val)
    case dict.Dict:
        cmd = hashToCmd(key, val)
    case *SortedSet.SortedSet:
        cmd = zSetToCmd(key, val)
    }
    return cmd
}

var hMSetCmd = []byte("HMSET")

func hashToCmd(key string, hash dict.Dict) *protocol.MultiBulkReply {
    args := make([][]byte, 2+hash.Len()*2)
    args[0] = hMSetCmd
    args[1] = []byte(key)
    i := 0
    hash.ForEach(func(field string, val interface{}) bool {
        bytes, _ := val.([]byte)
        args[2+i*2] = []byte(field)
        args[3+i*2] = bytes
        i++
        return true
    })
    return protocol.MakeMultiBulkReply(args)
}

Watch

Redis Watch 命令用于监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被放弃。

实现 Watch 命令的核心是发现 key 是否被改动,我们使用简单可靠的版本号方案:为每个 key 存储一个版本号,版本号变化说明 key 被修改了:

// database/single_db.go
func (db *DB) GetVersion(key string) uint32 {
    entity, ok := db.versionMap.Get(key)
    if !ok {
        return 0
    }
    return entity.(uint32)
}

// database/transaciton.go
func Watch(db *DB, conn redis.Connection, args [][]byte) redis.Reply {
    watching := conn.GetWatching()
    for _, bkey := range args {
        key := string(bkey)
        watching[key] = db.GetVersion(key) // 将当前版本号存在 conn 对象中
    }
    return protocol.MakeOkReply()
}

在执行事务前比较版本号:

// database/transaciton.go
func isWatchingChanged(db *DB, watching map[string]uint32) bool {
    for key, ver := range watching {
        currentVersion := db.GetVersion(key)
        if ver != currentVersion {
            return true
        }
    }
    return false
}
func (db *DB) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply {
    // 准备阶段
    // 使用 prepareFunc 获取事务要读写的 key
    writeKeys := make([]string, 0) // may contains duplicate
    readKeys := make([]string, 0)
    for _, cmdLine := range cmdLines {
        cmdName := strings.ToLower(string(cmdLine[0]))
        cmd := cmdTable[cmdName]
        prepare := cmd.prepare
        write, read := prepare(cmdLine[1:])
        writeKeys = append(writeKeys, write...)
        readKeys = append(readKeys, read...)
    }
    watchingKeys := make([]string, 0, len(watching))
    for key := range watching {
        watchingKeys = append(watchingKeys, key)
    }
    readKeys = append(readKeys, watchingKeys...)
    // 将要读写的 key 和被 watch 的 key 一起加锁
    db.RWLocks(writeKeys, readKeys)
    defer db.RWUnLocks(writeKeys, readKeys)

    // 检查被 watch 的 key 是否发生了改变
    if isWatchingChanged(db, watching) { // watching keys changed, abort
        return protocol.MakeEmptyMultiBulkReply()
    }

    // 执行阶段
    results := make([]redis.Reply, 0, len(cmdLines))
    aborted := false
    undoCmdLines := make([][]CmdLine, 0, len(cmdLines))
    for _, cmdLine := range cmdLines {
        // 在命令执行前再准备 undo log, 这样才能保证例如用 decr 回滚 incr 命令的实现可以正常工作
        undoCmdLines = append(undoCmdLines, db.GetUndoLogs(cmdLine))
        result := db.execWithLock(cmdLine)
        if protocol.IsErrorReply(result) {
            aborted = true
            // don't rollback failed commands
            undoCmdLines = undoCmdLines[:len(undoCmdLines)-1]
            break
        }
        results = append(results, result)
    }
    // 执行成功
    if !aborted {
        db.addVersion(writeKeys...)
        return protocol.MakeMultiRawReply(results)
    }
    // 事务失败进行回滚
    size := len(undoCmdLines)
    for i := size - 1; i >= 0; i-- {
        curCmdLines := undoCmdLines[i]
        if len(curCmdLines) == 0 {
            continue
        }
        for _, cmdLine := range curCmdLines {
            db.execWithLock(cmdLine)
        }
    }
    return protocol.MakeErrReply("EXECABORT Transaction discarded because of previous errors.")
}

Original: https://www.cnblogs.com/Finley/p/16215757.html
Author: -Finley-
Title: Golang 实现 Redis(10): 本地原子性事务

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529382/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • HCNP Routing&Switching之链路聚合

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年8月26日
    0322
  • JAVA实现微信小程序支付退款功能

    JAVA实现微信小程序支付退款功能 本如亲测有效(代码复制直接可以用的),退款的前提是必须有小程序的appid、商户号、商户密匙、和证书、 这个是微信小程序退款的官网大家可以去看看…

    Linux 2023年6月7日
    0181
  • 2.VMware三种网络模式

    本文参考《Vmware虚拟机三种网络模式详解》、《网络原理,以及对VMware Workstation虚拟网络VMnet0、VMnet1、VMnet8的图解》 一.VMware的网…

    Linux 2023年5月27日
    0100
  • [Linux]定时任务crontab

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年8月26日
    0263
  • redis中setbit的用法

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年9月14日
    0243
  • CentOS7安装MYSQL8.X详细教程

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年9月10日
    0224
  • ssh remote forward 监听 0.0.0.0 端口;How to make SSH remote port forward that listens 0.0.0.0

    今天使用ssh转发内网服务的时候,发现remote forward 转发到远程,监听的端口都是localhost。 之前还没发现这种情况,因为都是在所转发的目的主机使用服务。今天,…

    Linux 2023年5月27日
    079
  • Linux网络编程基础API

    Linux网络API主要可分为: socket地址API socket基础API 网络信息API socket地址API 两种字节序 大端字节序:整数的高位字节存储在内存的低地址处…

    Linux 2023年6月13日
    053
  • linux查看当前文件夹的总硬盘占用量

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年8月24日
    0234
  • Redis在C#中的使用及Redis的封装

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年9月14日
    0304
  • ASP.NET MVC实现POST方式的Redirect

    我们知道,在ASP.NET MVC中,要从一个Action跳转到另一个Action,通常是用一系列以”Redirect”开头的方法 Redirect Red…

    Linux 2023年6月13日
    080
  • Running cells requires Jupyter notebooks to be installed

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年8月30日
    0262
  • JQuery表格操作的常用技巧总结

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年8月20日
    0266
  • IDEA 使用Git图文详解(学好idea操作git大全)

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年8月30日
    0246
  • 玩转redis-简单消息队列

    使用 go语言基于 redis写了一个简单的消息队列源码地址使用demo redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据 添加数据和获取…

    Linux 2023年5月28日
    075
  • 启动springboot项目很慢的解决方案-InetAddress.getLocalHost().getHostName()

    https://blog.csdn.net/qq_39595769/article/details/119573111 Original: https://www.cnblogs….

    Linux 2023年6月13日
    067
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球