Golang 实现 Redis(8): TCC分布式事务

本文是使用 golang 实现 redis 系列的第八篇, 将介绍如何在分布式缓存中使用 Try-Commit-Catch 方式来解决分布式一致性问题。

在上一篇文章中我们使用一致性 hash 算法将缓存中的 key 分散到不同的服务器节点中,从而实现了分布式缓存。随之而来的问题是:一条指令(比如 MSET)可能需要多个节点同时执行,可能有些节点成功而另一部分节点失败。

对于使用者而言这种部分成功部分失败的情况非常难以处理,所以我们需要保证 MSET 操作要么全部成功要么全部失败。

于是问题来了 DEL、MSET 等命令所涉及的 key 可能分布在不同的节点中,在集群模式下实现这类涉及多个 key 的命令最简单的方式当然是 For-Each 遍历 key 并向它们所在的节点发送相应的操作指令。 以 MGET 命令的实现为例:

func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
    if len(args) < 2 {
        return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")
    }
    // 从参数列表中取出要读取的 key
    keys := make([]string, len(args)-1)
    for i := 1; i < len(args); i++ {
        keys[i-1] = string(args[i])
    }

    resultMap := make(map[string][]byte)
    // 计算每个 key 所在的节点,并按照节点分组
    groupMap := cluster.groupBy(keys)
    // groupMap 的类型为 map[string][]string,key 是节点的地址,value 是 keys 中属于该节点的 key 列表
    for peer, group := range groupMap {
        // 向每个节点发送 mget 指令,读取分布在它上面的 key
        resp := cluster.Relay(peer, c, makeArgs("MGET", group...))
        if reply.IsErrorReply(resp) {
            errReply := resp.(reply.ErrorReply)
            return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))
        }
        arrReply, _ := resp.(*reply.MultiBulkReply)
        // 将每个节点上的结果 merge 到 map 中
        for i, v := range arrReply.Args {
            key := group[i]
            resultMap[key] = v
        }
    }
    result := make([][]byte, len(keys))
    for i, k := range keys {
        result[i] = resultMap[k]
    }
    return reply.MakeMultiBulkReply(result)
}

// 计算 key 所属的节点,并按节点分组
func (cluster *Cluster) groupBy(keys []string) map[string][]string {
    result := make(map[string][]string)
    for _, key := range keys {
        // 使用一致性 hash 计算所属节点
        peer := cluster.peerPicker.Get(key)
        // 将 key 加入到相应节点的分组中
        group, ok := result[peer]
        if !ok {
            group = make([]string, 0)
        }
        group = append(group, key)
        result[peer] = group
    }
    return result
}

若在执行 MSET 指令时遇到部分节点失败或超时,则会出现部分 key 设置成功而另一份设置失败的情况。对于缓存使用者而言这种部分成功部分失败的情况非常难以处理,所以我们需要保证 MSET 操作要么全部成功要么全部失败。

两阶段提交(2-Phase Commit, 2PC)算法是解决我们遇到的一致性问题最简单的算法。在 2PC 算法中写操作被分为两个阶段来执行:

2PC是一种简单的一致性协议,它存在一些问题:

  • 单点服务: 若协调者突然崩溃则事务流程无法继续进行或者造成状态不一致
  • 无法保证一致性: 若协调者第二阶段发送提交请求时崩溃,可能部分参与者受到COMMIT请求提交了事务,而另一部分参与者未受到请求而放弃事务造成不一致现象。
  • 阻塞: 为了保证事务完成提交,各参与者在完成第一阶段事务执行后必须锁定相关资源直到正式提交,影响系统的吞吐量。

首先我们定义事务的描述结构:

type Transaction struct {
    id      string   // 事务 ID, 由协调者使用 snowflake 算法生成
    cmdLine CmdLine // 事务要执行命令行
    cluster *Cluster
    conn    redis.Connection
    dbIndex int

    writeKeys  []string // 事务要进行写入的 Key
    readKeys   []string // 事务要进行读取的 Key
    keysLocked bool
    undoLog    []CmdLine // 回滚命令

    status int8
    mu     *sync.Mutex
}

协调者

先从协调者的角度看一下整个过程:

func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
    //  解析参数
    argCount := len(args) - 1
    if argCount%2 != 0 || argCount < 1 {
        return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")
    }

    size := argCount / 2
    keys := make([]string, size)
    valueMap := make(map[string]string)
    for i := 0; i < size; i++ {
        keys[i] = string(args[2*i+1])
        valueMap[keys[i]] = string(args[2*i+2])
    }

    // 找到相关 key 所属的节点
    groupMap := cluster.groupBy(keys)
    if len(groupMap) == 1 { // do fast
        // 若所有的 key 都在同一个节点直接执行,不使用较慢的 2pc 算法
        for peer := range groupMap {
            return cluster.Relay(peer, c, args)
        }
    }

    // 开始准备阶段
    var errReply redis.Reply
    txId := cluster.idGenerator.NextId() // 使用 snowflake 算法决定事务 ID
    txIdStr := strconv.FormatInt(txId, 10)
    rollback := false
    // 向所有参与者发送 prepare 请求
    for peer, group := range groupMap {
        peerArgs := []string{txIdStr}
        for _, k := range group {
            peerArgs = append(peerArgs, k, valueMap[k])
        }
        var resp redis.Reply
        if peer == cluster.self {
            resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))
        } else {
            resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...))
        }
        if reply.IsErrorReply(resp) {
            errReply = resp
            rollback = true
            break
        }
    }
    if rollback {
        // 若 prepare 过程出错则执行回滚
        RequestRollback(cluster, c, txId, groupMap)
    } else {
        // prepare 成功,要求所有节点提交
        _, errReply = RequestCommit(cluster, c, txId, groupMap)
        rollback = errReply != nil
    }
    if !rollback {
        return &reply.OkReply{}
    }
    return errReply
}

func requestCommit(cluster *Cluster, c redis.Connection, txID int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) {
    var errReply reply.ErrorReply
    txIDStr := strconv.FormatInt(txID, 10)
    respList := make([]redis.Reply, 0, len(peers))
    // 要求每个节点进行提交
    for peer := range peers {
        var resp redis.Reply
        if peer == cluster.self {
            resp = execCommit(cluster, c, makeArgs("commit", txIDStr))
        } else {
            resp = cluster.relay(peer, c, makeArgs("commit", txIDStr))
        }
        if reply.IsErrorReply(resp) {
            errReply = resp.(reply.ErrorReply)
            break
        }
        respList = append(respList, resp)
    }
    // 若有节点提交失败则要求所有节点回滚
    if errReply != nil {
        requestRollback(cluster, c, txID, peers)
        return nil, errReply
    }
    return respList, nil
}

参与者

godis 的每个命令的实现都提供了一个用于分析相关key的 PrepareFunc 和一个生成 undoLog 的 UndoFunc, 这两个功能极大方便了 TCC 事务实现。
参与者可以通过 GetRelatedKeys 和 GetUndoLogs 函数来使用对应功能。

我们的 undoLog 是一系列命令,比如可以通过 SET 或者 HMSET 命令来恢复被 DEL 命令删除的 string 或 hash 数据。

type command struct {
    executor ExecFunc
    prepare  PreFunc // 返回命令相关key
    undo     UndoFunc // 生成 undoLog
    arity    int
    flags    int
}

参与者的代码是通用的,只要底层数据库提供了相应的 command 实现,在使用 TCC 支持新命令时就不需要修改参与者代码,只实现相应的协调者即可。

接下来具体看一下事务参与者在 prepare 阶段做了什么:

// prepare 命令的格式是: Prepare txID, command, key1, key2 ...

// TxID 是事务 ID, 由协调者决定. command 是 tcc 要执行的命令, 比如这里的 MSet
func execPrepare(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
    if len(cmdLine) < 3 {
        return reply.MakeErrReply("ERR wrong number of arguments for 'preparedel' command")
    }
    txID := string(cmdLine[1])
    tx := NewTransaction(cluster, c, txID, cmdLine[2:])
    cluster.transactions.Put(txID, tx)
    err := tx.prepare()
    if err != nil {
        return reply.MakeErrReply(err.Error())
    }
    return &reply.OkReply{}
}

实际的准备操作在 tx.prepare() 中:

func (tx *Transaction) prepare() error {
    tx.mu.Lock()
    defer tx.mu.Unlock()

    // 锁定相关 key 避免并发问题
    tx.writeKeys, tx.readKeys = godis.GetRelatedKeys(tx.cmdLine)
    tx.lockKeys()

    // 准备 undoLog
    tx.undoLog = tx.cluster.db.GetUndoLogs(tx.dbIndex, tx.cmdLine)
    tx.status = preparedStatus
    // 在时间轮中添加任务, 自动回滚超时未提交的事务
    taskKey := genTaskKey(tx.id)
    timewheel.Delay(maxLockTime, taskKey, func() {
        if tx.status == preparedStatus {
            logger.Info("abort transaction: " + tx.id)
            _ = tx.rollback()
        }
    })
    return nil
}

事务参与者提交本地事务:

func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
    if len(cmdLine) != 2 {
        return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command")
    }
    txID := string(cmdLine[1])
    raw, ok := cluster.transactions.Get(txID)
    if !ok {
        return reply.MakeIntReply(0)
    }
    tx, _ := raw.(*Transaction)

    // 锁定事务
    // 执行者在 commit 阶段可能收到协调者发来的回滚命令,需要避免一个协程在提交另一个协程在回滚造成异常
    tx.mu.Lock()
    defer tx.mu.Unlock()

    // ExecWithLock 自己不会锁定相关 key, 需要调用方提供锁
    // 由于在 prepare 阶段相关 key 已经被锁定,所以使用 ExecWithLock 即可
    result := cluster.db.ExecWithLock(c, tx.cmdLine)

    if reply.IsErrorReply(result) {
        // 提交失败本地回滚并向协调者返回错误
        err2 := tx.rollback()
        return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))
    }
    // 提交完成,解锁相关key
    tx.unLockKeys()
    tx.status = committedStatus
    // 通过时间轮延时清理事务上下文
    // 由于协调者可能在提交完成后要求回滚事务,所以不能立即进行清理
    timewheel.Delay(waitBeforeCleanTx, "", func() {
        cluster.transactions.Remove(tx.id)
    })
    return result
}

回滚过程只需要执行 undoLog 中的命令即可

func (tx *Transaction) rollback() error {
    curStatus := tx.status
    tx.mu.Lock()
    defer tx.mu.Unlock()

    // 由于调用回滚的地方比较多,需要使用 check-lock-check 模式避免等待锁的过程中事务状态已被其它协程改变
    if tx.status != curStatus {
        return fmt.Errorf("tx %s status changed", tx.id)
    }
    if tx.status == rolledBackStatus {
        return nil
    }
    tx.lockKeys()
    for _, cmdLine := range tx.undoLog {
        tx.cluster.db.ExecWithLock(tx.conn, cmdLine)
    }
    tx.unLockKeys()
    tx.status = rolledBackStatus
    return nil
}

Original: https://www.cnblogs.com/Finley/p/14079108.html
Author: -Finley-
Title: Golang 实现 Redis(8): TCC分布式事务

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529386/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Centos8 设置中文

    1、一般情况 1.1 进入设置选择 Region&Language 1.2 点击 加号 1.3 点击 汉语(中国) 1.4 选择 汉语(智能拼音) 2、特殊情况 有些虚拟机…

    Linux 2023年5月27日
    0123
  • CSS中content属性的妙用

    前言 本文讲解CSS中使用频率并不高的content属性,通过多个实用的案例,带你由浅入深的掌握content的用法,让代码变得更加简洁、高效。 定义 W3school中这样定义:…

    Linux 2023年6月7日
    0136
  • Python 获取字典中的第一个键

    提供两种方法: 使用 list 将字典的 key 转换成列表,然后取第一个元素 [0]。如果想要最后一个 key 的话,就取最后一个元素 [-1]。 >>> my…

    Linux 2023年6月7日
    077
  • 【转载】人才成长攻略

    本文转载自知乎《前些天在知乎回复了一个帖子:怎么劝大四室友不要考计算机研?- 曹政的回答》,原作者曹政 评论里有一堆阴阳怪气的说法,什么没天赋怎么办,程序员也不是终身可靠的职业云云…

    Linux 2023年6月13日
    090
  • 正则表达式 8. 特殊限制(环视否定)

    https://www.zybuluo.com/Zjmainstay/note/709093 特殊限制(环视否定) (8.1)使用\d{1,3}匹配1-999的数据,不能以0开头 …

    Linux 2023年6月13日
    0109
  • webshell 免杀

    https://xz.aliyun.com/t/11391 Original: https://www.cnblogs.com/cute/p/16356651.htmlAuthor…

    Linux 2023年5月28日
    0116
  • 编程入门之日志聚合系统

    (关心具体部署的同学,可以移步我的另外一篇《Centos部署Loki日志聚合系统 》https://www.cnblogs.com/uncleguo/p/15975647.html…

    Linux 2023年6月13日
    084
  • Kafka 配置文件详情

    kafka的配置分为 broker、producter、consumer三个不同的配置 一 、BROKER 的全局配置 最为核心的三个配置 broker.id、log.dir、zo…

    Linux 2023年6月8日
    091
  • fastdfs集群部署

    fastdfs集群部署 参考链接:https://www.cnblogs.com/penngke/p/15396701.html部署架构如下: 部署规划 2台主机,数据存储节点共1…

    Linux 2023年6月8日
    0104
  • Mobaxterm使用(类似xshell)

    网盘保存(链接:https://pan.baidu.com/s/1r_tx_eZ7zSUslLNNl5oOFw 提取码:fcc8) 主要功能 支持各种连接SSH,X11,RDP,V…

    Linux 2023年5月28日
    0103
  • 博客怎么写才能更安全和简洁

    前言 博客实现本地存储 Markdown语法的介绍 博客对于我们普通人来说就是为了更好的去实现个人知识的一个整理融合然后把知识共享可以帮助其他去实现自己的一些工作或者学习中的一些疑…

    Linux 2023年6月14日
    092
  • 01-MySQL连接查询、聚合函数

    1、连接查询 1.1、左连接 以左表为基准进行查询,左表数据回全部显示出来 右表中如果匹配连接条件的数据则显示相应字段的数据,如果不匹配,则显示为NULL 1.2、右连接 以右表为…

    Linux 2023年6月7日
    0128
  • Linux FastDFS安装

    1.0、 fastDFS fastDFS介绍 FastDFS是用c语言编写的一款开源的分布式文件系统,它是由淘宝资深架构师余庆编写并开源。FastDFS专为互联网量身定制,充分考虑…

    Linux 2023年6月7日
    087
  • 操作系统虚拟内存发展史

    404. 抱歉,您访问的资源不存在。 可能是URL不正确,或者对应的内容已经被删除,或者处于隐私状态。 [En] It may be that the URL is incorre…

    Linux 2023年5月27日
    0116
  • 祖传代码如何优化性能?

    hello大家好呀,我是小楼~ 今天又带来一次性能优化的分享,这是我刚进公司时接手的祖传(坏笑)项目,这个项目在我的文章中屡次被提及,我在它上面做了很多的性能优化,比如《记一次提升…

    Linux 2023年6月8日
    0125
  • 使用 shell 脚本自动对比两个安装目录并生成差异补丁包

    问题的提出 公司各个业务线的安装包小则几十兆、大则几百兆,使用自建的升级系统向全国百万级用户下发新版本时,流量耗费相当惊人。有时新版本仅仅改了几个 dll ,总变更量不过几十 K …

    Linux 2023年6月6日
    085
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球