Redis分布式锁的使用与实现原理

模拟一个电商里面下单减库存的场景。
1.首先在redis里加入商品库存数量。

Redis分布式锁的使用与实现原理

2.新建一个Spring Boot项目,在pom里面引入相关的依赖。

  <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-data-redis</artifactid>
        </dependency>

3.接下来,在application.yml配置redis属性和指定应用的端口号:

server:
  port: 8090

spring:
  redis:
    host: 192.168.0.60
    port: 6379

4.新建一个Controller类,扣减库存第一版代码:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Objects;

@RestController
public class StockController {

    private static final Logger logger = LoggerFactory.getLogger(StockController.class);

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/reduceStock")
    public String reduceStock() {
        // &#x4ECE;redis&#x4E2D;&#x83B7;&#x53D6;&#x5E93;&#x5B58;&#x6570;&#x91CF;
        int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
        if (stock > 0) {
            // &#x51CF;&#x5E93;&#x5B58;
            int restStock = stock - 1;
            // &#x5269;&#x4F59;&#x5E93;&#x5B58;&#x518D;&#x91CD;&#x65B0;&#x8BBE;&#x7F6E;&#x5230;redis&#x4E2D;
            stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
            logger.info("&#x6263;&#x51CF;&#x6210;&#x529F;&#xFF0C;&#x5269;&#x4F59;&#x5E93;&#x5B58;&#xFF1A;{}", restStock);
        } else {
            logger.info("&#x5E93;&#x5B58;&#x4E0D;&#x8DB3;&#xFF0C;&#x6263;&#x51CF;&#x5931;&#x8D25;&#x3002;");
        }

        return "success";
    }
}

上面第一版的代码存在什么问题:超卖。假如多个线程同时调用获取库存数量的代码,那么每个线程拿到的都是100,判断库存都大于0,都可以执行减库存的操作。假如两个线程都做减库存更新缓存,那么缓存的库存变成99,但实际上,应该是减掉2个库存。
那么很多人的第一个想法是加synchronized同步代码块,因为获取数量和减库存不是原子性操作,有多个线程来执行代码的时候,只允许一个线程执行代码块里的代码。那么改完的第二版的代码如下:

 @RequestMapping("/reduceStock")
    public String reduceStock() {
        synchronized (this) {
            // &#x4ECE;redis&#x4E2D;&#x83B7;&#x53D6;&#x5E93;&#x5B58;&#x6570;&#x91CF;
            int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
            if (stock > 0) {
                // &#x51CF;&#x5E93;&#x5B58;
                int restStock = stock - 1;
                // &#x5269;&#x4F59;&#x5E93;&#x5B58;&#x518D;&#x91CD;&#x65B0;&#x8BBE;&#x7F6E;&#x5230;redis&#x4E2D;
                stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                logger.info("&#x6263;&#x51CF;&#x6210;&#x529F;&#xFF0C;&#x5269;&#x4F59;&#x5E93;&#x5B58;&#xFF1A;{}", restStock);
            } else {
                logger.info("&#x5E93;&#x5B58;&#x4E0D;&#x8DB3;&#xFF0C;&#x6263;&#x51CF;&#x5931;&#x8D25;&#x3002;");
            }
        }

        return "success";
    }

但使用synchronize存在的问题,就是只能保证单机环境运行时没有问题的。但现在的软件公司里,基本上都是集群架构,是多实例,前面使用Nginx做负载均衡,大概架构如下:

Redis分布式锁的使用与实现原理

Nginx分发请求,把请求发送到不同的Tomcat容器,而synchronize只能保证一个应用是没有问题的。

那么代码改进第三版,就是引入redis分布式锁,具体代码如下:

 @RequestMapping("/reduceStock")
    public String reduceStock() {
        String lockKey = "stockKey";
        try {
            boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
            if (!result) {
                return "errorCode";
            }
            // &#x4ECE;redis&#x4E2D;&#x83B7;&#x53D6;&#x5E93;&#x5B58;&#x6570;&#x91CF;
            int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
            if (stock > 0) {
                // &#x51CF;&#x5E93;&#x5B58;
                int restStock = stock - 1;
                // &#x5269;&#x4F59;&#x5E93;&#x5B58;&#x518D;&#x91CD;&#x65B0;&#x8BBE;&#x7F6E;&#x5230;redis&#x4E2D;
                stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                logger.info("&#x6263;&#x51CF;&#x6210;&#x529F;&#xFF0C;&#x5269;&#x4F59;&#x5E93;&#x5B58;&#xFF1A;{}", restStock);
            } else {
                logger.info("&#x5E93;&#x5B58;&#x4E0D;&#x8DB3;&#xFF0C;&#x6263;&#x51CF;&#x5931;&#x8D25;&#x3002;");
            }
        } finally {
            stringRedisTemplate.delete(lockKey)
        }
        return "success";
    }

如果有一个线程拿到锁,那么其他的线程就会等待。一定要记得在finally里面把使用完的锁要删除掉。否则一旦抛出异常,只有一个线程会一直持有锁,其他线程没有机会获取。
但如果在执行 if (stock > 0) {代码块里的代码,因为宕机或重启没有执行完,也会一直持有锁,所以,这里需要把锁加一个超时时间:

   boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
   stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

但如果上面两行代码在中间执行出问题了,设置超时时间的代码还没执行,也会出现锁不能释放的问题。好在有对应的方法:就是把上面两行代码设置成一个原子操作:

   // &#x8FD9;&#x91CC;&#x9ED8;&#x8BA4;&#x8BBE;&#x7F6E;&#x8D85;&#x65F6;&#x65F6;&#x95F4;&#x4E3A;10&#x79D2;
   boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);

到此为止,如果并发量不是很大的话,基本上是没有问题的。

但是,如果请求的并发量很大,就会出现新的问题:有种比较特殊的情况,第一个线程执行了15秒,但是执行到10秒钟的时候,锁已经失效释放了,那么在高并发场景下,第二个线程发现锁已经失效,那么它就可以拿到这把锁进行加锁,
假设第二个线程执行需要8秒,它执行到5秒钟后,此时第一个线程已经执行完了,执行完那一刻,进行了删除key的操作,但是此时的锁是第二个线程加的,这样第一个线程把第二个线程加的锁删掉了。
那意味着第三个线程又可以拿到锁,第三个线程执行了3秒钟,此时第二个线程执行完毕,那么第二个线程把第三个线程的锁又删除了。导致锁失效。
那么解决的思路就是,我自己加的锁,不要被别人删掉。那么可以为每个进来的请求生成一个唯一的id,作为分布式锁的值,然后在释放时,判断一下当前线程的id,是不是和缓存里的id是否相等。

 @RequestMapping("/reduceStock")
    public String reduceStock() {
        String lockKey = "stockKey";
        String id = UUID.randomUUID().toString();
        try {
            // &#x8FD9;&#x91CC;&#x9ED8;&#x8BA4;&#x8BBE;&#x7F6E;&#x8D85;&#x65F6;&#x65F6;&#x95F4;&#x4E3A;30&#x79D2;
            boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, id, 30, TimeUnit.SECONDS);
            if (!result) {
                return "errorCode";
            }
            // &#x4ECE;redis&#x4E2D;&#x83B7;&#x53D6;&#x5E93;&#x5B58;&#x6570;&#x91CF;
            int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
            if (stock > 0) {
                // &#x51CF;&#x5E93;&#x5B58;
                int restStock = stock - 1;
                // &#x5269;&#x4F59;&#x5E93;&#x5B58;&#x518D;&#x91CD;&#x65B0;&#x8BBE;&#x7F6E;&#x5230;redis&#x4E2D;
                stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                logger.info("&#x6263;&#x51CF;&#x6210;&#x529F;&#xFF0C;&#x5269;&#x4F59;&#x5E93;&#x5B58;&#xFF1A;{}", restStock);
            } else {
                logger.info("&#x5E93;&#x5B58;&#x4E0D;&#x8DB3;&#xFF0C;&#x6263;&#x51CF;&#x5931;&#x8D25;&#x3002;");
            }
        } finally {
            if (id.contentEquals(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(lockKey)))) {
                stringRedisTemplate.delete(lockKey);
            }
        }
        return "success";
    }

到此为止,一个比较完善的锁就实现了,可以应付大部分场景。
当然,上面的代码还有一个问题,就是一个线程执行时间超过了过期时间,后面的代码还没有执行完,锁就已经删除了,还是会有些bug存在。解决的方法是给锁续命的操作。
在当前主线程获取到锁以后,可以fork出一个线程,执行Timer定时器操作,假如默认超时时间为30秒,那么定时器每隔10秒去看下这把锁还是否存在,存在就说明这个锁里的逻辑还没有执行完,那么就可以把当前主线程的超时时间重新设置为30秒;如果不存在,就直接结束掉。

但是上面的逻辑,在高并发场景下,实现比较完善还是比较困难的。好在现在已经有比较成熟的框架,那就是Redisson。官方地址https://redisson.org。
下面用Redisson来实现分布式锁。
首先引入依赖包:

       <dependency>
            <groupid>org.redisson</groupid>
            <artifactid>redisson</artifactid>
            <version>3.6.5</version>
        </dependency>

配置类:

@Configuration
public class RedissonConfig {
    @Bean
    public Redisson redisson() {
        // &#x5355;&#x673A;&#x6A21;&#x5F0F;
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.0.60:6379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }
}

接下来用redisson重写上面的减库存操作:

 @Resource
    private Redisson redisson;

    @RequestMapping("/reduceStock")
    public String reduceStock() {
        String lockKey = "stockKey";
        RLock redissonLock = redisson.getLock(lockKey);
        try {
            // &#x52A0;&#x9501;&#xFF0C;&#x9501;&#x7EED;&#x547D;
            redissonLock.lock();
            // &#x4ECE;redis&#x4E2D;&#x83B7;&#x53D6;&#x5E93;&#x5B58;&#x6570;&#x91CF;
            int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
            if (stock > 0) {
                // &#x51CF;&#x5E93;&#x5B58;
                int restStock = stock - 1;
                // &#x5269;&#x4F59;&#x5E93;&#x5B58;&#x518D;&#x91CD;&#x65B0;&#x8BBE;&#x7F6E;&#x5230;redis&#x4E2D;
                stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                logger.info("&#x6263;&#x51CF;&#x6210;&#x529F;&#xFF0C;&#x5269;&#x4F59;&#x5E93;&#x5B58;&#xFF1A;{}", restStock);
            } else {
                logger.info("&#x5E93;&#x5B58;&#x4E0D;&#x8DB3;&#xFF0C;&#x6263;&#x51CF;&#x5931;&#x8D25;&#x3002;");
            }
        } finally {
           redissonLock.unlock();
        }
        return "success";
    }

其实就是三个步骤:获取锁,加锁,释放锁。

先简单看下Redisson的实现原理:

Redis分布式锁的使用与实现原理

这里先说一下Redis很多操作使用Lua脚本来实现原子性操作,关于Lua语法,可以去网上找下相关教程。
使用Lua脚本的好处有:
1.减少网络开销,多个命令可以使用一次请求完成;
2.实现了原子性操作,Redis会把Lua脚本作为一个整体去执行;
3.实现事务,Redis自带的事务功能有限,而Lua脚本实现了事务的常规操作,而且还支持回滚。

但是Lua实际上不会使用很多,如果Lua脚本执行时间过长,因为Redis是单线程,因此会导致堵塞。

最后,说下Redisson分布式锁的代码实现,
找到上面的redissonLock.lock();
lock方法点进去,一直点到RedissonLock类里面的lockInterruptibly方法:

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // &#x83B7;&#x53D6;&#x7EBF;&#x7A0B;id
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<redissonlockentry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }
</redissonlockentry>

重点看下tryAcquire方法,把线程id作为一个参数传递进来,在这个方法里面,找到tryLockInnerAsync方法点进去,

  <t> RFuture<t> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<t> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
</object></t></t></t>

这里就是一堆Lua脚本,先看第一个if命令,先去判断 KEYS[1](就是对应的锁key的名字),如果不存在,在hashmap里,设置一个属性为线程id,值为1,再把map的过期时间设置为internalLockLeaseTime,这个值默认是30秒,

Redis分布式锁的使用与实现原理
上面的操作对应的命令是:
hset keyname id:thread 1
pexpire keyname 30

然后返回nil,相当于null,那程序return了。
另外,Redisson还支持重入锁,那第二个if就是执行重入锁的操作,会判断锁是否存在,并且传入的线程id是否是当前线程的id,若果是,支持重复加锁进行自增操作;
如果是其他线程调用lock方法,上面两个if判断不会走,会返回锁剩余过期时间。

接着返回到tryAcquireAsync方法里面往下看:
实际上是加了一个监听器,在监听器里面有个很重要的方法scheduleExpirationRenewal,一看这个名字就能大概猜出是什么功能,
里面有个定时任务的轮询,

 private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // &#x5224;&#x65AD;&#x4F20;&#x9012;&#x8FDB;&#x6765;&#x7684;&#x7EBF;&#x7A0B;id&#x662F;&#x5426;&#x662F;&#x6211;&#x4EEC;&#x4E4B;&#x524D;&#x4E3B;&#x7EBF;&#x7A0B;&#x8BBE;&#x7F6E;&#x7684;id&#xFF0C;&#x5982;&#x679C;&#x662F;&#xFF0C;&#x5219;&#x589E;&#x52A0;&#x7EED;&#x547D;&#xFF0C;&#x589E;&#x52A0;30&#x79D2;&#x3002;
                RFuture<boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

                future.addListener(new FutureListener<boolean>() {
                    @Override
                    public void operationComplete(Future<boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }

                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }
</boolean></boolean></object></boolean>

接着推迟10秒钟(internalLockLeaseTime / 3),再执行续命操作逻辑。

到最后,再回到lockInterruptibly方法,
如果ttl 为null,说明加锁成功了,就返回null,那如果其他线程的话,就会返回剩余过期时间,那么就会进入到while死循环里,一直尝试加锁,调用tryAcquire方法,在琐失效以后,再会尝试获取加锁。

到此为止,分析完毕。

Original: https://www.cnblogs.com/IcanFixIt/p/14012661.html
Author: 林本托
Title: Redis分布式锁的使用与实现原理

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529086/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • OSPF之Default-router-advertise 解析

    1、关于default-route-advertise命令 Ospf是可以通过import-route命令引入外部路由的,但很少有人会注意到,在默认情况下,ospf是不会引入来自外…

    Linux 2023年6月14日
    098
  • 爱前端公开课学习笔记——JS03 运算符

    运算符的概述和加减运算课程丢失 乘法* 除法/ 直接在控制台中演示 主要是不要省略乘号 字符串不能进行乘除法运算 布尔型可以进行乘除法运算 false: 0 true: 1 乘除法…

    Linux 2023年6月14日
    081
  • 灵感来袭,基于Redis的分布式延迟队列(续)

    背景 上一篇(灵感来袭,基于Redis的分布式延迟队列)讲述了基于Java DelayQueue和Redis实现了分布式延迟队列,这种方案实现比较简单,应用于延迟小,消息量不大的场…

    Linux 2023年5月28日
    078
  • 【证券从业】金融基础知识-第二章 中国金融体系与多层次资本市场02

    注1:后续学习并整理到第八章,全书完结后再合并成一个笔记进行源文件分享 注2:本章内容巨多,大约分为两篇文章记录消化 posted @2022-05-31 22:14 陈景中 阅读…

    Linux 2023年6月13日
    088
  • jQuery的操作方式以及动画效果

    jQuery的操作标签 文本操作 $("选择符").html() // 读取指定元素的内容,如果$()函数获取了有多个元素,则提取第一个元素 $("选…

    Linux 2023年6月7日
    0111
  • CSAPP 之 CacheLab 详解

    前言 本篇博客将会介绍 CSAPP 之 CacheLab 的解题过程,分为 Part A 和 Part B 两个部分,其中 Part A 要求使用代码模拟一个高速缓存存储器,Par…

    Linux 2023年6月7日
    0137
  • redis 订阅与发布

    Reference: https://redisbook.readthedocs.io/en/latest/feature/pubsub.html Redis 的 SUBSCRIB…

    Linux 2023年5月28日
    0112
  • WPF 界面打不开提示 System.ArithmeticException Overflow or underflow in the arithmetic operation 异常

    本文告诉大家如何解决界面打不开,抛出 System.ArithmeticException: Overflow or underflow in the arithmetic ope…

    Linux 2023年6月6日
    087
  • 数据库的灾备

    数据是企业重要的生产资料,关键数据的丢失可能会给企业致命一击,因为数据是计算机系统存在的原因和基础。数据往往是不可再生的,一旦发生数据丢失,企业就会陷入困境:客户资料、技术文件、财…

    Linux 2023年6月6日
    0110
  • Java基础系列–05_面向对象

    1、概述:(1)面向过程:将问题一步一步的解决的过程(详细步骤),在C语言中所有的代码都是基于过程化的代码。(2)面向对象:面向对象是基于面向过程的编程思想,所有的事情都交由创建出…

    Linux 2023年6月7日
    0109
  • 通过shell命令在MAC安装证书

    Macmini打包要需要更新苹果证书,又不想连接显示器,鼠标点点,如果可以通过shell命令,直接远程安装证书就好了。 #双击证书文件 open #输入密码 security un…

    Linux 2023年5月28日
    0132
  • 实现邮箱发送验证码功能

    实现注册和忘记密码界面的邮箱发送验证码功能 邮箱验证步骤 本文使用了redis数据库做验证码的缓存,不想用redis数据库也可以直接将验证码放入mysql中,设置验证码有效期即可。…

    Linux 2023年6月7日
    0135
  • 博客园装饰——(二)滚动到页面顶部或底部

    功能描述: 1. 当页面向下滚动一定距离时,向下滚动到底部的按钮以淡入的效果出现,并以固定定位显示。且滚动到一定距离(快接近所设置的底部)时,该按钮又会以淡出效果消失。 2. 当页…

    Linux 2023年6月14日
    089
  • GIT合并部分文件的CLI

    | 0.24分钟 | 399.2字符 | 1、引言&背景 2、解决方案 3、声明与参考资料 | SCscHero | 2022/5/2 PM10:16 | 系列 | 已完成…

    Linux 2023年6月13日
    085
  • docker安装mysql

    -p 3306:3306 –name mysql -v /root/apply/docker/apply/mysql5.7/log:/var/log/mysql(日志文…

    Linux 2023年6月7日
    0123
  • 分享四款H5怀旧小游戏魔塔+伏魔记+三国霸业+寻仙纪

    前言 还记得中学时,用步步高学习机玩的魔塔、伏魔记、三国霸业吗?还记得3g网时,用Nokia或是杂牌机在3GQQ家园里玩的精武堂、纵横四海吗?没错,说的就是你! 那时的游戏很粗糙,…

    Linux 2023年6月7日
    0145
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球