zk系列三:zookeeper实战之分布式锁实现

一、分布式锁的通用实现思路

分布式锁的概念以及常规解决方案可以参考之前的博客:聊聊分布式锁的解决方案;今天我们先分析下分布式锁的实现思路;

  • 首先,需要保证唯一性,即某一时点只能有一个线程访问某一资源;比方说待办短信通知功能,每天早上九点短信提醒所有工单的处理人处理工单,假设服务部署了20个容器,那么早上九点的时候会有20个线程启动准备发送短信,此时我们只能让一个线程执行短信发送,否则用户会收到20条相同的短信;
  • 其次,需要考虑下何时应该释放锁?这又分三种情况,一是拿到锁的线程正常结束,另一种是获取锁的线程异常退出,还有种是获取锁的线程一直阻塞;第一种情况直接释放即可,第二种情况可以通过定义下锁的过期时间然后通过定时任务去释放锁;zk的话直接通过临时节点即可;最后一种阻塞的情况也可以通过定时任务来释放,但是需要根据业务来综合判断,如果业务本身就是长时间耗时的操作那么锁的过期时间就得设置的久一点
  • 最后,当拿到锁的线程释放锁的时候,如何通知其他线程可以抢锁了呢
    这里简单介绍两种解决方案,一种是所有需要锁的线程主动轮询,固定时间去访问下看锁是否释放,但是这种方案无端增加服务器压力并且时效性无法保证;另一种就是zk的watch,监听锁所在的目录,一有变化立马得到通知

二、ZK实现分布式锁的思路

  • zk通过每个线程在同一父目录下创建临时有序节点,然后通过比较节点的id大小来实现分布式锁功能;再通过zk的watch机制实时获取节点的状态,如果被删除立即重新争抢锁;具体流程见线图:
    zk系列三:zookeeper实战之分布式锁实现

提示:需要关注下图里判断自身不是最小节点时的监听情况,为什么不监听父节点?原因图里已有描述,这里就不再赘述

三、ZK实现分布式锁的编码实现

1、核心工具类实现

通过不断的调试,我封装了一个 ZkLockHelper类,里面封装了上锁和释放锁的方法,为了方便我将zk的一些监听和回调机智也融合到一起了,并没有抽出来,下面贴上该类的全部代码

package com.darling.service.zookeeper.lock;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.platform.commons.util.StringUtils;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;

/**
 * @description:
 * @author: dll
 * @date: Created in 2022/11/4 8:41
 * @version:
 * @modified By:
 */
@Data
@Slf4j
public class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {

    private final String lockPath = "/lockItem";

    ZooKeeper zkClient;
    String threadName;

    CountDownLatch cd = new CountDownLatch(1);
    private String pathName;

    /**
     * 上锁
     */
    public void tryLock() {
        try {
            log.info("线程:{}正在创建节点",threadName);
            zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA");
            log.info("线程:{}正在阻塞......",threadName);
            // 由于上面是异步创建所以这里需要阻塞住当前线程
            cd.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 释放锁
     */
    public void unLock() {
        try {
            zkClient.delete(pathName,-1);
            System.out.println(threadName + " 工作结束....");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * create方法的回调,创建成功后在此处获取/DCSLock的子目录,比较节点ID是否最小,是则拿到锁。。。
     * @param rc        状态码
     * @param path      create方法的path入参
     * @param ctx       create方法的上下文入参
     * @param name      创建成功的临时有序节点的名称,即在path的后面加上了zk维护的自增ID;
     *                  注意如果创建的不是有序节点,那么此处的name和path的内容一致
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name);
        if (StringUtils.isNotBlank(name)) {
            try {
                pathName =  name ;
                // 此处path需注意要写/
                zkClient.getChildren("/", false,this,"123");
//                List<string> children = zkClient.getChildren("/", false);
//                log.info(">>>>>threadName:{},children:{}",threadName,children);
//                // &#x7ED9;children&#x6392;&#x5E8F;
//                Collections.sort(children);
//                int i = children.indexOf(pathName.substring(1));
//                // &#x5224;&#x65AD;&#x81EA;&#x8EAB;&#x662F;&#x5426;&#x7B2C;&#x4E00;&#x4E2A;
//                if (Objects.equals(i,0)) {
//                    // &#x662F;&#x7B2C;&#x4E00;&#x4E2A;&#x5219;&#x8868;&#x793A;&#x62A2;&#x5230;&#x4E86;&#x9501;
//                    log.info("&#x7EBF;&#x7A0B;{}&#x62A2;&#x5230;&#x4E86;&#x9501;",threadName);
//                    cd.countDown();
//                }else {
//                    // &#x8868;&#x793A;&#x6CA1;&#x62A2;&#x5230;&#x9501;
//                    log.info("&#x7EBF;&#x7A0B;{}&#x62A2;&#x9501;&#x5931;&#x8D25;&#xFF0C;&#x91CD;&#x65B0;&#x6CE8;&#x518C;&#x76D1;&#x542C;&#x5668;",threadName);
//                    zkClient.exists("/"+children.get(i-1),this,this,"AAA");
//                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * exists&#x65B9;&#x6CD5;&#x7684;&#x56DE;&#x8C03;&#xFF0C;&#x6B64;&#x5904;&#x6682;&#x4E0D;&#x505A;&#x5904;&#x7406;
     * @param rc
     * @param path
     * @param ctx
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {

    }

    /**
     * exists&#x7684;watch&#x76D1;&#x542C;
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        //&#x5982;&#x679C;&#x7B2C;&#x4E00;&#x4E2A;&#x7EBF;&#x7A0B;&#x9501;&#x91CA;&#x653E;&#x4E86;&#xFF0C;&#x7B49;&#x4EF7;&#x4E8E;&#x7B2C;&#x4E00;&#x4E2A;&#x7EBF;&#x7A0B;&#x5220;&#x9664;&#x4E86;&#x8282;&#x70B9;&#xFF0C;&#x6B64;&#x65F6;&#x53EA;&#x6709;&#x7B2C;&#x4E8C;&#x4E2A;&#x7EBF;&#x7A0B;&#x4F1A;&#x76D1;&#x63A7;&#x7684;&#x5230;
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zkClient.getChildren("/", false,this,"123");
//                // &#x6B64;&#x5904;path&#x9700;&#x6CE8;&#x610F;&#x8981;&#x5199;"/"
//                List<string> children = null;
//                try {
//                    children = zkClient.getChildren("/", false);
//                } catch (KeeperException e) {
//                    e.printStackTrace();
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                log.info(">>>>>threadName:{},children:{}",threadName,children);
//                // &#x7ED9;children&#x6392;&#x5E8F;
//                Collections.sort(children);
//                int i = children.indexOf(pathName.substring(1));
//                // &#x5224;&#x65AD;&#x81EA;&#x8EAB;&#x662F;&#x5426;&#x7B2C;&#x4E00;&#x4E2A;
//                if (Objects.equals(i,0)) {
//                    // &#x662F;&#x7B2C;&#x4E00;&#x4E2A;&#x5219;&#x8868;&#x793A;&#x62A2;&#x5230;&#x4E86;&#x9501;
//                    log.info("&#x7EBF;&#x7A0B;{}&#x62A2;&#x5230;&#x4E86;&#x9501;",threadName);
//                    cd.countDown();
//                }else {
//                    /**
//                     *  &#x8868;&#x793A;&#x6CA1;&#x62A2;&#x5230;&#x9501;&#xFF1B;&#x9700;&#x8981;&#x5224;&#x65AD;&#x524D;&#x7F6E;&#x8282;&#x70B9;&#x5B58;&#x4E0D;&#x5B58;&#x5728;&#xFF0C;&#x5176;&#x5B9E;&#x8FD9;&#x91CC;&#x5E76;&#x4E0D;&#x662F;&#x7279;&#x522B;&#x5173;&#x5FC3;&#x524D;&#x7F6E;&#x8282;&#x70B9;&#x5B58;&#x4E0D;&#x5B58;&#x5728;&#xFF0C;&#x6240;&#x4EE5;&#x5176;&#x56DE;&#x8C03;&#x53EF;&#x4EE5;&#x4E0D;&#x5904;&#x7406;&#xFF1B;
//                     *  &#x4F46;&#x662F;&#x8FD9;&#x91CC;&#x5173;&#x6CE8;&#x7684;&#x524D;&#x7F6E;&#x8282;&#x70B9;&#x7684;&#x76D1;&#x542C;&#xFF0C;&#x5F53;&#x524D;&#x7F6E;&#x8282;&#x70B9;&#x76D1;&#x542C;&#x5230;&#x88AB;&#x5220;&#x9664;&#x65F6;&#x5C31;&#x662F;&#x5176;&#x4ED6;&#x7EBF;&#x7A0B;&#x62A2;&#x9501;&#x4E4B;&#x65F6;
//                     */
//                    zkClient.exists("/"+children.get(i-1),this,this,"AAA");
//                }
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }
    }

    /**
     * getChildren&#x65B9;&#x6CD5;&#x7684;&#x56DE;&#x8C03;
     * @param rc
     * @param path
     * @param ctx
     * @param children
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<string> children) {
        try {
            log.info(">>>>>threadName:{},children:{}", threadName, children);
            if (Objects.isNull(children)) {
                return;
            }
            // &#x7ED9;children&#x6392;&#x5E8F;
            Collections.sort(children);
            int i = children.indexOf(pathName.substring(1));
            // &#x5224;&#x65AD;&#x81EA;&#x8EAB;&#x662F;&#x5426;&#x7B2C;&#x4E00;&#x4E2A;
            if (Objects.equals(i, 0)) {
                // &#x662F;&#x7B2C;&#x4E00;&#x4E2A;&#x5219;&#x8868;&#x793A;&#x62A2;&#x5230;&#x4E86;&#x9501;
                log.info("&#x7EBF;&#x7A0B;{}&#x62A2;&#x5230;&#x4E86;&#x9501;", threadName);
                cd.countDown();
            } else {
                // &#x8868;&#x793A;&#x6CA1;&#x62A2;&#x5230;&#x9501;
                log.info("&#x7EBF;&#x7A0B;{}&#x62A2;&#x9501;&#x5931;&#x8D25;&#xFF0C;&#x91CD;&#x65B0;&#x6CE8;&#x518C;&#x76D1;&#x542C;&#x5668;", threadName);
                /**
                 *  &#x8868;&#x793A;&#x6CA1;&#x62A2;&#x5230;&#x9501;&#xFF1B;&#x9700;&#x8981;&#x5224;&#x65AD;&#x524D;&#x7F6E;&#x8282;&#x70B9;&#x5B58;&#x4E0D;&#x5B58;&#x5728;&#xFF0C;&#x5176;&#x5B9E;&#x8FD9;&#x91CC;&#x5E76;&#x4E0D;&#x662F;&#x7279;&#x522B;&#x5173;&#x5FC3;&#x524D;&#x7F6E;&#x8282;&#x70B9;&#x5B58;&#x4E0D;&#x5B58;&#x5728;&#xFF0C;&#x6240;&#x4EE5;&#x5176;&#x56DE;&#x8C03;&#x53EF;&#x4EE5;&#x4E0D;&#x5904;&#x7406;&#xFF1B;
                 *  &#x4F46;&#x662F;&#x8FD9;&#x91CC;&#x5173;&#x6CE8;&#x7684;&#x524D;&#x7F6E;&#x8282;&#x70B9;&#x7684;&#x76D1;&#x542C;&#xFF0C;&#x5F53;&#x524D;&#x7F6E;&#x8282;&#x70B9;&#x76D1;&#x542C;&#x5230;&#x88AB;&#x5220;&#x9664;&#x65F6;&#x5C31;&#x662F;&#x5176;&#x4ED6;&#x7EBF;&#x7A0B;&#x62A2;&#x9501;&#x4E4B;&#x65F6;
                 */
                zkClient.exists("/" + children.get(i - 1), this, this, "AAA");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
</string></string></string>

提示:代码中注释的代码块可以关注下,原本是直接阻塞式编程,将获取所有子节点并释放锁的操作直接写在getChildren方法的回调里,后来发现当节点被删除时我们还要重新抢锁,那么代码就冗余了,于是结合响应式编程的思想,将这段核心代码放到 getChildren&#x65B9;&#x6CD5;&#x7684;&#x56DE;&#x8C03;里,这样代码简洁了并且可以让业务更只关注于 getChildren这件事了

2、测试代码编写

线程安全问题复现

package com.darling.service.zookeeper.lock;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

/**
 * @description:  &#x5F00;&#x542F;&#x662F;&#x4E2A;&#x7EBF;&#x7A0B;&#x7ED9;i&#x505A;&#x9012;&#x51CF;&#x64CD;&#x4F5C;&#xFF0C;&#x672A;&#x52A0;&#x9501;&#x7684;&#x60C5;&#x51B5;&#x4E0B;&#x4F1A;&#x6709;&#x7EBF;&#x7A0B;&#x5B89;&#x5168;&#x95EE;&#x9898;
 * @author: dll
 * @date: Created in 2022/11/8 8:32
 * @version:
 * @modified By:
 */
@Slf4j
public class ZkLockTest02 {

    private int i = 10;

    @Test
    public void test() throws InterruptedException {

        for (int n = 0; n < 10; n++) {
            new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    Thread.sleep(100);
                    incre();
                }
            }).start();
        }
        Thread.sleep(5000);
        log.info("i = {}",i);
    }

    /**
     * i&#x9012;&#x51CF; &#x7EBF;&#x7A0B;&#x4E0D;&#x5B89;&#x5168;
     */
    public void incre(){
//        i.incrementAndGet();
        log.info("&#x5F53;&#x524D;&#x7EBF;&#x7A0B;&#xFF1A;{},i = {}",Thread.currentThread().getName(),i--);
    }
}
  • 上面代码运行结果如下:
    zk系列三:zookeeper实战之分布式锁实现

使用上面封装的 ZkLockHelper 实现的分布式锁

package com.darling.service.zookeeper.lock;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @description: &#x4F7F;&#x7528;zk&#x5B9E;&#x73B0;&#x7684;&#x5206;&#x5E03;&#x5F0F;&#x9501;&#x89E3;&#x51B3;&#x7EBF;&#x7A0B;&#x5B89;&#x5168;&#x95EE;&#x9898;
 * @author: dll
 * @date: Created in 2022/11/8 8:32
 * @version:
 * @modified By:
 */
@Slf4j
public class ZkLockTest03 {

    ZooKeeper zkClient;

    @Before
    public void conn (){
        zkClient  = ZkUtil.getZkClient();
    }

    @After
    public void close (){
        try {
            zkClient.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private int i = 10;

    @Test
    public void test() throws InterruptedException {

        for (int n = 0; n < 10; n++) {
            new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    Thread.sleep(100);
                    ZkLockHelper zkHelper = new ZkLockHelper();
                    // &#x8FD9;&#x91CC;&#x7ED9;zkHelper&#x8BBE;&#x7F6E;threadName&#x662F;&#x4E3A;&#x4E86;&#x540E;&#x7EED;&#x8C03;&#x8BD5;&#x7684;&#x65F6;&#x5019;&#x65E5;&#x5FD7;&#x6253;&#x5370;&#xFF0C;&#x4FBF;&#x4E8E;&#x89C2;&#x5BDF;&#x5B58;&#x5728;&#x7684;&#x95EE;&#x9898;
                    String threadName = Thread.currentThread().getName();
                    zkHelper.setThreadName(threadName);
                    zkHelper.setZkClient(zkClient);
                    // tryLock&#x4E0A;&#x9501;
                    zkHelper.tryLock();
                    incre();
                    log.info("&#x7EBF;&#x7A0B;{}&#x6B63;&#x5728;&#x6267;&#x884C;&#x4E1A;&#x52A1;&#x4EE3;&#x7801;...",threadName);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // &#x91CA;&#x653E;&#x9501;
                    zkHelper.unLock();
                }
            }).start();
        }
        while (true) {
        }
    }

    /**
     * i&#x9012;&#x51CF; &#x7EBF;&#x7A0B;&#x4E0D;&#x5B89;&#x5168;
     */
    public void incre(){
//        i.incrementAndGet();
        log.info("&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x2606;&#x5F53;&#x524D;&#x7EBF;&#x7A0B;&#xFF1A;{},i = {}",Thread.currentThread().getName(),i--);
    }
}
  • 运行结果如下:
    zk系列三:zookeeper实战之分布式锁实现

由于日志中掺杂着zk的日志所有此处并未截全,但是也能看到i是在按规律递减的,不会出现通过线程拿到相同值的情况

四、zk实现分布式锁的优缺点

优点

  • 集群部署不存在单点故障问题
  • 统一视图
    zk集群每个节点对外提供的数据是一致的,数据一致性有所报障
  • 临时有序节点
    zk提供临时有序节点,这样当客户端失去连接时会自动释放锁,不用像其他方案一样当拿到锁的实例服务不可用时,需要定时任务去删除锁;临时节点的特性就是当客户端失去连接会自动删除
  • watch能力加持
    当获取不到锁时,无需客户端定期轮询争抢,只需watch前一节点即可,当有变化时会及时通知,比普通方案即及时又高效;注意这里最好只watch前一节点,如果watch整个父目录的话,当客户端并发较大时会不断有请求进出zk,给zk性能带来压力

缺点

  • 与单机版redis比较的话性能肯定较差,但是当客户端集群足够庞大且业务量足够多时肯定还是集群更加稳定
  • 极端情况下还是会出现多个线程抢到同一把锁的问题;假设某个线程拿到锁后还没执行业务代码就进入长时间的垃圾收集STW了,此时与zk的连接也会消失;然后此时别的线程的watch会被触发从而抢到锁去执行了,但是当stw的线程恢复过来时继续执行自身的业务代码,此时就会出现不一致的问题了;当然,个人认为这种设想太过极端了,毕竟如果stw时间过长肯定会影响整个集群的性能的,所以我感觉可以不必考虑,真的要解决那么再加上mysql乐观锁吧;

好了,zk实现分布式锁的编码实现就到这了,后续有时间再写偏redis的,其实思路缕清了,编码实现还是相对简单的

Original: https://www.cnblogs.com/darling2047/p/16870751.html
Author: 木木他爹
Title: zk系列三:zookeeper实战之分布式锁实现

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/798951/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球