分布式锁

非分布式下使用锁

  • 利用版本号来检测数据是否发生变化,从而判断是否能进行更新
  • JAVA 使用比较交换机制-CAS(Compare And Swap)机制实现
  • i++非线程安全,使用原子类AtomicInteger
  • JDK1.5开始提供原子类,使用CAS机制,乐观锁,并发包java.util.concurrent

  • 执行前加锁,执行后解锁

  • JAVA 使用 synchronized 与 ReentrantLock 实现

  • 线程有序获取锁,以队列的方式实现

  • ReentrantLock中的fairSync实现,通过构造方法传入true

  • 线程无序获取锁

  • ReentrantLock中的NonfairSync实现,默认都为非公平锁

  • 通过数据库解决,使用update的行锁解决并发

update product set count = count - 1 where count > 0 and id = '1'

update product set count = count - 1 where id = '1'
后续再针对库存进行校验,若为负数,抛出异常
  • 使用synchronized, 有一种情况为事务未提交锁已释放,需要手动控制事务
// 获取事务
TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
// 提交事务
platformTransactionManager.commit(transaction);
// 回滚事务
platformTransactionManager.rollback(transaction);
  • synchronized块
    // 对象锁
    synchronized(this){}
    // 类锁
    synchronized(Product.class){}
  • 使用ReentrantLock
  • 建议使用 try-finally 的方式,防止异常未释放锁
Lock lock = new ReentrantLock();
lock.lock();
try{
    ...

}finally{
    lock.unlock();
}

分布式锁

查询语句后加入 for update

查看mysql是否自动提交
SELECT @@autocommit;
1 自动提交 0 不自动提交
SET @@autocommit = 0;

SELECT * FROM order WHERE id = '1' FOR UPDATE
  • 优点:简单方便,易于理解,易于操作
  • 缺点:并发量大,对数据库压力较大
  • 建议:作为锁的数据库与业务数据库分开

  • 获取锁的命令
    — SET resource_name my_random_value NX PX 30000

  • resource_name:资源名称,可根据不同的业务区分不同的锁(相当于key)
  • my_random_value:随机值,每个线程的随机值都不同(UUID),用于释放锁的校验
  • NX:key不存在时设置成功,key存在则设置不成功,原子性操作
  • PX:自动失效时间,出现异常情况,锁可以过期失效。防止释放锁异常
  • 利用NX的原子性,多个线程并发时,只有一个线程可以设置成功
  • 设置成功即获得锁,可以执行后续的业务处理
  • 如果出现异常,过了锁的有效期,自动释放
  • 释放锁采用Redis的delete命令
  • 释放锁时校验之前设置的随机数,相同才能释放,防止释放了别人的锁
  • 释放锁使用LUA脚本
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
  • java实现代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.types.Expiration;

import java.util.Arrays;
import java.util.List;
import java.util.UUID;

/**
* 实现AutoCloseable接口,可用JDK1.7的try(xx)中的自动关闭特性
*/
@Slf4j
public class RedisLock implements AutoCloseable {

    private RedisTemplate redisTemplate;
    private String key;
    private String value;
    //单位:秒
    private int expireTime;

    public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.expireTime=expireTime;
        this.value = UUID.randomUUID().toString();
    }

    /**
     * 获取分布式锁
     * @return
     */
    public boolean getLock(){
        RedisCallback redisCallback = connection -> {
            //设置NX
            RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
            //设置过期时间
            Expiration expiration = Expiration.seconds(expireTime);
            //序列化key
            byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
            //序列化value
            byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
            //执行setnx操作
            Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
            return result;
        };

        //获取分布式锁
        Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
        return lock;
    }

    public boolean unLock() {
        String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
                "    return redis.call(\"del\",KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        RedisScript redisScript = RedisScript.of(script,Boolean.class);
        List keys = Arrays.asList(key);

        Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
        log.info("释放锁的结果:"+result);
        return result;
    }

    @Override
    public void close() throws Exception {
        unLock();
    }
}

zk节点

  • 持久节点
  • 瞬时节点:不可有子节点,会话结束自动消失,zk重启也消失

观察器

  • 监听3个方法:getData, getChildren, exists
  • 节点数据变化,通知客户端
  • 只通知一次

分布式锁原理

  • 利用zk的瞬时节点的有序性特点
  • 多线程并发创建有序瞬时节点,得到序列号
  • 序号最小获得锁
  • 其他线程监听前一个序列
  • 前一个线程执行完成,删除序列,后一个线程获得通知,可执行
  • 节点创建时,已决定了线程的执行顺序

maven


            org.apache.zookeeper
            zookeeper
            3.4.14

获取锁与释放锁

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

@Slf4j
public class ZkLock implements AutoCloseable, Watcher {

    private ZooKeeper zooKeeper;
    private String znode;

    public ZkLock() throws IOException {
        this.zooKeeper = new ZooKeeper("localhost:2181",
                10000,this);
    }

    public boolean getLock(String businessCode) {
        try {
            //创建业务 根节点
            Stat stat = zooKeeper.exists("/" + businessCode, false);
            if (stat==null){
                zooKeeper.create("/" + businessCode,businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }

            //创建瞬时有序节点  /order/order_00000001
            znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

            //获取业务节点下 所有的子节点
            List childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
            //子节点排序
            Collections.sort(childrenNodes);
            //获取序号最小的(第一个)子节点
            String firstNode = childrenNodes.get(0);
            //如果创建的节点是第一个子节点,则获得锁
            if (znode.endsWith(firstNode)){
                return true;
            }
            //不是第一个子节点,则监听前一个节点
            String lastNode = firstNode;
            for (String node:childrenNodes){
                if (znode.endsWith(node)){
                zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
                    break;
                }else {
                    lastNode = node;
                }
            }
            synchronized (this){
                wait();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public void close() throws Exception {
        zooKeeper.delete(znode,-1);
        zooKeeper.close();
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDeleted){
            synchronized (this){
                notify();
            }
        }
    }
}

maven


            org.apache.curator
            curator-recipes
            4.2.0

实现流程:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/order");
try {
    if ( lock.acquire(30, TimeUnit.SECONDS) ) {
        try  {
            // 业务
        } finally  {
            lock.release();
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}
client.close();

先声明bean,再进行使用

    /*
     * 在config中声明bean
     * 加入start与close,可在bean初始化与bean销毁时调用相应的方法
     */
    @Bean(initMethod="start",destroyMethod = "close")
    public CuratorFramework getCuratorFramework() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
        return client;
    }

    // 使用
    @Autowired
    private CuratorFramework client;

    public void test(){
        InterProcessMutex lock = new InterProcessMutex(client, "/order");
        try{
            if (lock.acquire(30, TimeUnit.SECONDS)){
                // 业务
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                // 释放锁
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  • maven

        org.redisson
        redisson
        3.11.2

        Config config = new Config();
        config.useSingleServer().setAddress("redis://ip:port");
        RedissonClient redisson = Redisson.create(config);

        RLock rLock = redisson.getLock("order");

        try {
            rLock.lock(30, TimeUnit.SECONDS);
            // 业务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 释放锁
            rLock.unlock();
        }

使用springboot的方式

  • maven

        org.redisson
        redisson-spring-boot-starter
        3.11.2

spring:
  redis:
    database: 1
    host: ip
    port: 6379
    password: 123
    @Autowired
    private RedissonClient redisson;

    public void test() {
        RLock rLock = redisson.getLock("order");
        try {
            rLock.lock(30, TimeUnit.SECONDS);
            // 业务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 解锁
            rLock.unlock();
        }
    }

分布式锁实现方式比较

方式 优点 缺点 数据库 简单 数据库压力大 Redis 麻烦,不支持阻塞 Zk 支持阻塞 原理复杂 Curator 依赖zk,强一致性 Redisson 支持阻塞

Original: https://www.cnblogs.com/maple92/p/14238699.html
Author: Topze
Title: 分布式锁

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/582936/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球