深入理解并发编程同步工具类

大家好,我是陶朱公Boy。

今天跟大家分享一个并发编程领域中的一个知识点—— 同步工具类

我将结合一个真实线上案例作为背景来展开讲解这一知识点。给大家讲清楚什么是同步工具类、适合的场景、解决了什么问题、各个实现方案的对比。希望对大家理解同步工具类这个知识点有所帮助。

我们先看一个案例:

需求描述

图一:逻辑架构图

有一个线上”人脸识别”的应用,应用 首次启动要求多线程并行将存储在DB中的人脸数据(512位的double类型数组)载入到本地应用缓存中,主线程需要等待所有子线程完成任务后,才能继续执行余下的业务逻辑(比如加载dubbo组件)。

拿到这个需求,大家不妨先思考一下,如果让你来实现,你打算怎么做?思考点是什么?

需求分析

让我们一起来分析一下这个需求:

首先这个需求是应用首次启动,需要用多线程并行执行任务的,充分利用CPU的多核机制,加快整体任务的处理速度。

其次大家先可以看下上述图一,多线程并行执行下,主线程需要等待所有子线程完成任务后才能继续执行余下的业务逻辑。

要实现这个需求,我们就要思考一下看有没有一种机制能让主线程等待其他子线程完成任务后,它再继续执行它余下的业务逻辑?

方案实现

★方案一:Thread.join()

什么是join?

join方法是Thread类内部的一个方法,是一种一个线程等待另一个或多个线程完成任务的机制。

基本语义:

如果一个线程A调用了thread.join()方法,那么当前线程A需要等待thread线程完成任务后,才能从thread.join()阻塞处返回。

示例代码:

 public class JoinDemo {
 ​
   public static void main(String[] args) throws InterruptedException {
 ​
     Thread thread0=new Thread(()->{
       System.out.println(Thread.currentThread().getName()+"==start");
       try {
         Thread.sleep((long) (Math.random() * 10000));
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName()+"==end");
 ​
     });
 ​
     Thread thread1=new Thread(()->{
       System.out.println(Thread.currentThread().getName()+"==start");
       try {
         Thread.sleep((long) (Math.random() * 10000));
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName()+"==end");
 ​
     });
     thread0.start();
     thread1.start();
     thread1.join();
     System.out.println("main 1...");
     thread0.join();
     System.out.println("main 0...");
 ​
     System.out.println("====all finish===");
 ​
 ​
   }
 }

结果打印:

原理:

源码解析:

从源码细节来看(为了方便陈述,我们假设有一个线程A调用thread.join()),我们说线程A持有了thread对象的一把锁,while循环判断thread线程是否存活,如果返回false,表示thread线程任务尚未结束,那么线程A就会被挂起,释放锁,线程状态进入等待状态,等待被唤醒。

而唤醒的更多细节是在thread线程退出时,底层调用exit方法,详见hotspot关于thread.cpp文件中JavaThread::exit部分。如下(倒数第二行):

 void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
   assert(this == JavaThread::current(), "thread consistency check");
   ...

   // Notify waiters on thread object. This has to be done after exit() is called
   // on the thread (if the thread is the last thread in a daemon ThreadGroup the
   // group should have the destroyed bit set before waiters are notified).

   ensure_join(this);
   assert(!this->has_pending_exception(), "ensure_join should have cleared");
   ...

 ​
 ​
 static void ensure_join(JavaThread* thread) {
   // We do not need to grap the Threads_lock, since we are operating on ourself.

   Handle threadObj(thread, thread->threadObj());
   assert(threadObj.not_null(), "java thread object must exist");
   ObjectLocker lock(threadObj, thread);
   // Ignore pending exception (ThreadDeath), since we are exiting anyway
   thread->clear_pending_exception();
   // Thread is exiting. So set thread_status field in java.lang.Thread class to TERMINATED.

   java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
   //这里是清除native线程,这个操作会导致isAlive()方法返回false
   java_lang_Thread::set_thread(threadObj(), NULL);
   //唤醒等待在thread对象上的所有线程  lock.notify_all(thread);  // Ignore pending exception (ThreadDeath), since we are exiting anyway
   thread->clear_pending_exception();
 }

方案二:闭锁(CountDownLatch)

什么是闭锁?

闭锁是一种同步工具类,可以延迟线程进度直到其达到终止状态。

闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,直到到达结束状态时,这扇门将会永久打开。
闭锁用来确保某些任务直到其他任务都完成后才继续执行。

基本语义:

countDownLatch的构造函数接收一个int类型的参数作为计数器,比如你传入了参数N,那意思就是需要等待N个点完成。当我们调用countDown方法时,这个计数器就会减1,await方法会一直阻塞主线程,直到N变0为止。

原理:

适用场景:

像应用程序首次启动,主线程需要等待其他子线程完成任务后,才能做余下事情,并且是一次性的。 像作者文章开始处提的这个需求,其实比较适合用CountDownLatch这个方案,主线程必须等到子线程的任务完成,才能进一步加载其他组件,比如dubbo。

示例代码:

 public class CountDownLatchDemo {
     public static void main(String[] args) {
         ExecutorService service = Executors.newFixedThreadPool(3);
         final CountDownLatch latch = new CountDownLatch(3);
         for (int i = 0; i < 3; i++) {
             Runnable runnable = new Runnable() {
                 @Override
                 public void run() {
                     try {
                         System.out.println("&#x5B50;&#x7EBF;&#x7A0B;" + Thread.currentThread().getName() + "&#x5F00;&#x59CB;&#x6267;&#x884C;");
                         //&#x7761;&#x7720;&#x4E2A;&#x51E0;&#x5341;&#x6BEB;&#x79D2;
                         Thread.sleep((long) (Math.random() * 10000));
                         System.out.println("&#x5B50;&#x7EBF;&#x7A0B;" + Thread.currentThread().getName() + "&#x6267;&#x884C;&#x5B8C;&#x6210;");
                         latch.countDown();//&#x5F53;&#x524D;&#x7EBF;&#x7A0B;&#x8C03;&#x7528;&#x6B64;&#x65B9;&#x6CD5;&#xFF0C;&#x5219;&#x8BA1;&#x6570;&#x51CF;&#x4E00;
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
                 }
             };
             service.execute(runnable);
         }
         try {
             System.out.println("&#x4E3B;&#x7EBF;&#x7A0B;" + Thread.currentThread().getName() + "&#x7B49;&#x5F85;&#x5B50;&#x7EBF;&#x7A0B;&#x6267;&#x884C;&#x5B8C;&#x6210;...");
             latch.await(5,TimeUnit.SECONDS);//&#x963B;&#x585E;&#x5F53;&#x524D;&#x7EBF;&#x7A0B;&#xFF0C;&#x76F4;&#x5230;&#x8BA1;&#x6570;&#x5668;&#x7684;&#x503C;&#x4E3A;0
             System.out.println("&#x963B;&#x585E;&#x5B8C;&#x6BD5;&#xFF01;&#x4E3B;&#x7EBF;&#x7A0B;" + Thread.currentThread().getName() + "&#x7EE7;&#x7EED;&#x6267;&#x884C;&#x4E1A;&#x52A1;&#x903B;&#x8F91;...");
             service.shutdownNow();
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
 &#x200B;
     }
 }
 &#x7ED3;&#x679C;&#x6253;&#x5370;:&#x5B50;&#x7EBF;&#x7A0B;pool-1-thread-1&#x5F00;&#x59CB;&#x6267;&#x884C;
 &#x5B50;&#x7EBF;&#x7A0B;pool-1-thread-2&#x5F00;&#x59CB;&#x6267;&#x884C;
 &#x5B50;&#x7EBF;&#x7A0B;pool-1-thread-3&#x5F00;&#x59CB;&#x6267;&#x884C;
 &#x4E3B;&#x7EBF;&#x7A0B;main&#x7B49;&#x5F85;&#x5B50;&#x7EBF;&#x7A0B;&#x6267;&#x884C;&#x5B8C;&#x6210;...

 &#x5B50;&#x7EBF;&#x7A0B;pool-1-thread-2&#x6267;&#x884C;&#x5B8C;&#x6210;
 &#x5B50;&#x7EBF;&#x7A0B;pool-1-thread-1&#x6267;&#x884C;&#x5B8C;&#x6210;
 &#x5B50;&#x7EBF;&#x7A0B;pool-1-thread-3&#x6267;&#x884C;&#x5B8C;&#x6210;
 &#x963B;&#x585E;&#x5B8C;&#x6BD5;&#xFF01;&#x4E3B;&#x7EBF;&#x7A0B;main&#x7EE7;&#x7EED;&#x6267;&#x884C;&#x4E1A;&#x52A1;&#x903B;&#x8F91;...

源码解析:

 /**
  * &#x9759;&#x6001;&#x5185;&#x90E8;&#x7C7B;&#xFF0C;&#x81EA;&#x5B9A;&#x4E49;&#x540C;&#x6B65;&#x5668;&#x7EC4;&#x4EF6;
  */
 private final Sync sync;

 /**
  * &#x53EA;&#x6709;&#x4E00;&#x4E2A;&#x6784;&#x9020;&#x65B9;&#x6CD5;&#xFF0C;&#x63A5;&#x6536;&#x4E00;&#x4E2A;count&#x503C;
  */
 public CountDownLatch(int count) {
     // count&#x503C;&#x4E0D;&#x80FD;&#x5C0F;&#x4E8E;0
     if (count < 0) throw new IllegalArgumentException("count < 0");
     // &#x81EA;&#x5B9A;&#x4E49;&#x4E00;&#x4E2A;&#x540C;&#x6B65;&#x7EC4;&#x4EF6;&#xFF1B;&#x901A;&#x8FC7;&#x7EE7;&#x627F;AQS&#x7EC4;&#x4EF6;&#x5B9E;&#x73B0;&#xFF1B;
     this.sync = new Sync(count);
 }
 private static final class Sync extends AbstractQueuedSynchronizer {
     private static final long serialVersionUID = 4982264981922014374L;

     Sync(int count) {
         // &#x4F7F;&#x7528;&#x6784;&#x9020;&#x51FD;&#x4F20;&#x9012;&#x7684;&#x53C2;&#x6570;&#x503C;count&#x4F5C;&#x4E3A;&#x540C;&#x6B65;&#x72B6;&#x6001;&#x503C;&#x3002;
         setState(count);
     }

     /** &#x83B7;&#x53D6;&#x5F53;&#x524D;&#x7684;count&#x503C; */
     int getCount() {
         return getState();
     }

     /**&#x5171;&#x4EAB;&#x5F0F;&#x83B7;&#x53D6;&#x540C;&#x6B65;&#x72B6;&#x6001;
      * &#x8FD9;&#x662F;AQS&#x7684;&#x6A21;&#x677F;&#x65B9;&#x6CD5;acquireShared&#x3001;acquireSharedInterruptibly&#x7B49;&#x65B9;&#x6CD5;&#x5185;&#x90E8;&#x5C06;&#x4F1A;&#x8C03;&#x7528;&#x7684;&#x65B9;&#x6CD5;&#xFF0C;
      * &#x7531;&#x5B50;&#x7C7B;&#x5B9E;&#x73B0;&#xFF0C;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x7684;&#x4F5C;&#x7528;&#x662F;&#x5C1D;&#x8BD5;&#x83B7;&#x53D6;&#x4E00;&#x6B21;&#x5171;&#x4EAB;&#x9501;&#xFF0C;&#x5BF9;&#x4E8E;AQS&#x6765;&#x8BF4;&#xFF0C;
      * &#x6B64;&#x65B9;&#x6CD5;&#x8FD4;&#x56DE;&#x503C;&#x5927;&#x4E8E;&#x7B49;&#x4E8E;0&#xFF0C;&#x8868;&#x793A;&#x83B7;&#x53D6;&#x5171;&#x4EAB;&#x9501;&#x6210;&#x529F;&#xFF0C;&#x53CD;&#x4E4B;&#x5219;&#x83B7;&#x53D6;&#x5171;&#x4EAB;&#x9501;&#x5931;&#x8D25;&#xFF0C;
      * &#x800C;&#x5728;&#x8FD9;&#x91CC;&#xFF0C;&#x5B9E;&#x9645;&#x4E0A;&#x5C31;&#x662F;&#x5224;&#x65AD;count&#x662F;&#x5426;&#x7B49;&#x4E8E;0&#xFF0C;&#x7EBF;&#x7A0B;&#x80FD;&#x5426;&#x5411;&#x4E0B;&#x8FD0;&#x884C;
      */
     protected int tryAcquireShared(int acquires) {
         // &#x6B64;&#x5904;&#x5224;&#x65AD;state&#x7684;&#x503C;&#x662F;&#x5426;&#x4E3A;0&#xFF0C;&#x4E5F;&#x5C31;&#x662F;&#x5224;&#x65AD;count&#x662F;&#x5426;&#x4E3A;0&#xFF0C;
         // &#x82E5;count&#x4E3A;0&#xFF0C;&#x8FD4;&#x56DE;1&#xFF0C;&#x8868;&#x793A;&#x83B7;&#x53D6;&#x9501;&#x6210;&#x529F;&#xFF0C;&#x6B64;&#x65F6;&#x7EBF;&#x7A0B;&#x5C06;&#x4E0D;&#x4F1A;&#x963B;&#x585E;&#xFF0C;&#x6B63;&#x5E38;&#x8FD0;&#x884C;
         // &#x82E5;count&#x4E0D;&#x4E3A;0&#xFF0C;&#x5219;&#x8FD4;&#x56DE;-1&#xFF0C;&#x8868;&#x793A;&#x83B7;&#x53D6;&#x9501;&#x5931;&#x8D25;&#xFF0C;&#x7EBF;&#x7A0B;&#x5C06;&#x4F1A;&#x88AB;&#x963B;&#x585E;
         // &#x4ECE;&#x8FD9;&#x91CC;&#x6211;&#x4EEC;&#x5DF2;&#x7ECF;&#x53EF;&#x4EE5;&#x770B;&#x51FA;CountDownLatch&#x7684;&#x5B9E;&#x73B0;&#x65B9;&#x5F0F;&#x4E86;
         return (getState() == 0) ? 1 : -1;
     }

     /**&#x5171;&#x4EAB;&#x5F0F;&#x91CA;&#x653E;&#x540C;&#x6B65;&#x72B6;&#x6001;
      * &#x6B64;&#x65B9;&#x6CD5;&#x7684;&#x4F5C;&#x7528;&#x662F;&#x7528;&#x6765;&#x91CA;&#x653E;AQS&#x7684;&#x5171;&#x4EAB;&#x9501;&#xFF0C;&#x8FD4;&#x56DE;true&#x8868;&#x793A;&#x91CA;&#x653E;&#x6210;&#x529F;&#xFF0C;&#x53CD;&#x4E4B;&#x5219;&#x5931;&#x8D25;
      * &#x6B64;&#x65B9;&#x6CD5;&#x5C06;&#x4F1A;&#x5728;AQS&#x7684;&#x6A21;&#x677F;&#x65B9;&#x6CD5;releaseShared&#x4E2D;&#x88AB;&#x8C03;&#x7528;&#xFF0C;
      * &#x5728;CountDownLatch&#x4E2D;&#xFF0C;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x7528;&#x6765;&#x51CF;&#x5C0F;count&#x503C;
      */
     protected boolean tryReleaseShared(int releases) {
         // &#x4F7F;&#x7528;&#x6B7B;&#x5FAA;&#x73AF;&#x4E0D;&#x65AD;&#x5C1D;&#x8BD5;&#x91CA;&#x653E;&#x9501;
         for (;;) {
             // &#x9996;&#x5148;&#x83B7;&#x53D6;&#x5F53;&#x524D;state&#x7684;&#x503C;&#xFF0C;&#x4E5F;&#x5C31;&#x662F;count&#x503C;
             int c = getState();
             /**&#x82E5;count&#x503C;&#x5DF2;&#x7ECF;&#x7B49;&#x4E8E;0&#xFF0C;&#x5219;&#x4E0D;&#x80FD;&#x7EE7;&#x7EED;&#x51CF;&#x5C0F;&#x4E86;&#xFF0C;&#x4E8E;&#x662F;&#x76F4;&#x63A5;&#x8FD4;&#x56DE;false
             /* &#x4E3A;&#x4EC0;&#x4E48;&#x8FD4;&#x56DE;&#x7684;&#x662F;false&#xFF0C;&#x56E0;&#x4E3A;&#x7B49;&#x4E8E;0&#x8868;&#x793A;&#x4E4B;&#x524D;&#x7B49;&#x5F85;&#x7684;&#x90A3;&#x4E9B;&#x7EBF;&#x7A0B;&#x5DF2;&#x7ECF;&#x88AB;&#x5524;&#x9192;&#x4E86;&#xFF0C;            *&#x82E5;&#x8FD4;&#x56DE;true&#xFF0C;AQS&#x4F1A;&#x5C1D;&#x8BD5;&#x5524;&#x9192;&#x7EBF;&#x7A0B;&#xFF0C;&#x82E5;&#x8FD4;&#x56DE;false&#xFF0C;&#x5219;&#x76F4;&#x63A5;&#x7ED3;&#x675F;&#xFF0C;&#x6240;&#x4EE5;
             * &#x5728;&#x6CA1;&#x6709;&#x7EBF;&#x7A0B;&#x7B49;&#x5F85;&#x7684;&#x60C5;&#x51B5;&#x4E0B;&#xFF0C;&#x8FD4;&#x56DE;false&#x76F4;&#x63A5;&#x7ED3;&#x675F;&#x662F;&#x6B63;&#x786E;&#x7684;            */
             if (c == 0)
                 return false;
             // &#x82E5;count&#x4E0D;&#x7B49;&#x4E8E;0&#xFF0C;&#x5219;&#x5C06;&#x5176;-1
             int nextc = c-1;
             // compareAndSetState&#x7684;&#x4F5C;&#x7528;&#x662F;&#x5C06;count&#x503C;&#x4ECE;c&#xFF0C;&#x4FEE;&#x6539;&#x4E3A;&#x65B0;&#x7684;nextc
             // &#x6B64;&#x65B9;&#x6CD5;&#x57FA;&#x4E8E;CAS&#x5B9E;&#x73B0;&#xFF0C;&#x4FDD;&#x8BC1;&#x4E86;&#x64CD;&#x4F5C;&#x7684;&#x539F;&#x5B50;&#x6027;
             if (compareAndSetState(c, nextc))
                 // &#x82E5;nextc == 0&#xFF0C;&#x5219;&#x8FD4;&#x56DE;&#x7684;&#x662F;true&#xFF0C;&#x8868;&#x793A;&#x5DF2;&#x7ECF;&#x6CA1;&#x6709;&#x9501;&#x4E86;&#xFF0C;&#x7EBF;&#x7A0B;&#x53EF;&#x4EE5;&#x8FD0;&#x884C;&#x4E86;&#xFF0C;
                 // &#x82E5;nextc > 0&#xFF0C;&#x5219;&#x8868;&#x793A;&#x7EBF;&#x7A0B;&#x8FD8;&#x9700;&#x8981;&#x7EE7;&#x7EED;&#x963B;&#x585E;&#xFF0C;&#x6B64;&#x5904;&#x5C06;&#x8FD4;&#x56DE;false
                 return nextc == 0;
         }
     }
 &#x200B;
 }

我们看下示例代码中关于latch.countDown()方法源码部分:

 /**
  * &#x6B64;&#x65B9;&#x6CD5;&#x7684;&#x4F5C;&#x7528;&#x5C31;&#x662F;&#x5C06;count&#x7684;&#x503C;-1&#xFF0C;&#x5982;&#x679C;count&#x7B49;&#x4E8E;0&#x4E86;&#xFF0C;&#x5C31;&#x5524;&#x9192;&#x7B49;&#x5F85;&#x7684;&#x7EBF;&#x7A0B;
  */
 public void countDown() {
     // &#x8FD9;&#x91CC;&#x76F4;&#x63A5;&#x8C03;&#x7528;sync&#x7684;releaseShared&#x65B9;&#x6CD5;&#xFF0C;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x7684;&#x5B9E;&#x73B0;&#x5728;AQS&#x4E2D;&#xFF0C;&#x4E5F;&#x662F;AQS&#x63D0;&#x4F9B;&#x7684;&#x6A21;&#x677F;&#x65B9;&#x6CD5;&#xFF0C;
     // &#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x7684;&#x4F5C;&#x7528;&#x662F;&#x5F53;&#x524D;&#x7EBF;&#x7A0B;&#x91CA;&#x653E;&#x9501;&#xFF0C;&#x82E5;&#x91CA;&#x653E;&#x5931;&#x8D25;&#xFF0C;&#x8FD4;&#x56DE;false&#xFF0C;&#x82E5;&#x91CA;&#x653E;&#x6210;&#x529F;&#xFF0C;&#x5219;&#x8FD4;&#x56DE;false&#xFF0C;
     // &#x82E5;&#x9501;&#x88AB;&#x91CA;&#x653E;&#x6210;&#x529F;&#xFF0C;&#x5219;&#x5F53;&#x524D;&#x7EBF;&#x7A0B;&#x4F1A;&#x5524;&#x9192;AQS&#x540C;&#x6B65;&#x961F;&#x5217;&#x4E2D;&#x7B2C;&#x4E00;&#x4E2A;&#x88AB;&#x963B;&#x585E;&#x7684;&#x7EBF;&#x7A0B;&#xFF0C;&#x8BA9;&#x4ED6;&#x5C1D;&#x8BD5;&#x83B7;&#x53D6;&#x9501;
     // &#x5BF9;&#x4E8E;CountDownLatch&#x6765;&#x8BF4;&#xFF0C;&#x91CA;&#x653E;&#x9501;&#x5B9E;&#x9645;&#x4E0A;&#x5C31;&#x662F;&#x8BA9;count - 1&#xFF0C;&#x53EA;&#x6709;&#x5F53;count&#x88AB;&#x51CF;&#x5C0F;&#x4E3A;0&#xFF0C;
     // &#x9501;&#x624D;&#x662F;&#x771F;&#x6B63;&#x88AB;&#x91CA;&#x653E;&#xFF0C;&#x7EBF;&#x7A0B;&#x624D;&#x80FD;&#x7EE7;&#x7EED;&#x5411;&#x4E0B;&#x8FD0;&#x884C;
     sync.releaseShared(1);
 }
 /**
 * &#x5171;&#x4EAB;&#x5F0F;&#x7684;&#x91CA;&#x653E;&#x540C;&#x6B65;&#x72B6;&#x6001;
 */
 public final boolean releaseShared(int arg) {
     // &#x8C03;&#x7528;tryReleaseShared&#x5C1D;&#x8BD5;&#x91CA;&#x653E;&#x9501;&#xFF0C;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x5DF2;&#x7ECF;&#x7531;Sycn&#x91CD;&#x5199;&#xFF0C;&#x8BF7;&#x56DE;&#x987E;&#x4E0A;&#x9762;&#x5BF9;&#x6B64;&#x65B9;&#x6CD5;&#x7684;&#x5206;&#x6790;
     // &#x82E5;tryReleaseShared&#x8FD4;&#x56DE;true&#xFF0C;&#x8868;&#x793A;count&#x7ECF;&#x8FC7;&#x8FD9;&#x6B21;&#x91CA;&#x653E;&#x540E;&#xFF0C;&#x7B49;&#x4E8E;0&#x4E86;&#xFF0C;&#x4E8E;&#x662F;&#x6267;&#x884C;doReleaseShared
     if (tryReleaseShared(arg)) {
         // &#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x7684;&#x4F5C;&#x7528;&#x662F;&#x5524;&#x9192;AQS&#x7684;&#x540C;&#x6B65;&#x961F;&#x5217;&#x4E2D;&#xFF0C;&#x6B63;&#x5728;&#x7B49;&#x5F85;&#x7684;&#x7B2C;&#x4E00;&#x4E2A;&#x7EBF;&#x7A0B;
         // &#x800C;&#x6211;&#x4EEC;&#x5206;&#x6790;acquireSharedInterruptibly&#x65B9;&#x6CD5;&#x65F6;&#x5DF2;&#x7ECF;&#x8BF4;&#x8FC7;&#xFF0C;
         // &#x82E5;&#x4E00;&#x4E2A;&#x7EBF;&#x7A0B;&#x88AB;&#x5524;&#x9192;&#xFF0C;&#x68C0;&#x6D4B;&#x5230;count == 0&#xFF0C;&#x4F1A;&#x7EE7;&#x7EED;&#x5524;&#x9192;&#x4E0B;&#x4E00;&#x4E2A;&#x7B49;&#x5F85;&#x7684;&#x7EBF;&#x7A0B;
         // &#x4E5F;&#x5C31;&#x662F;&#x8BF4;&#xFF0C;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x7684;&#x4F5C;&#x7528;&#x662F;&#xFF0C;&#x5728;count == 0&#x65F6;&#xFF0C;&#x5524;&#x9192;&#x6240;&#x6709;&#x7B49;&#x5F85;&#x7684;&#x7EBF;&#x7A0B;
         doReleaseShared();
         return true;
     }
     return false;
 }

接下来我们看下另一个比较重要的方法即await方法部分源码:

 // &#x6B64;&#x65B9;&#x6CD5;&#x7528;&#x6765;&#x8BA9;&#x5F53;&#x524D;&#x7EBF;&#x7A0B;&#x963B;&#x585E;&#xFF0C;&#x76F4;&#x5230;count&#x51CF;&#x5C0F;&#x4E3A;0&#x624D;&#x6062;&#x590D;&#x6267;&#x884C;
 public void await() throws InterruptedException {
     // &#x8FD9;&#x91CC;&#x76F4;&#x63A5;&#x8C03;&#x7528;sync&#x7684;acquireSharedInterruptibly&#x65B9;&#x6CD5;&#xFF0C;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x5B9A;&#x4E49;&#x5728;AQS&#x4E2D;
     // &#x65B9;&#x6CD5;&#x7684;&#x4F5C;&#x7528;&#x662F;&#x5C1D;&#x8BD5;&#x83B7;&#x53D6;&#x5171;&#x4EAB;&#x9501;&#xFF0C;&#x82E5;&#x83B7;&#x53D6;&#x5931;&#x8D25;&#xFF0C;&#x5219;&#x7EBF;&#x7A0B;&#x5C06;&#x4F1A;&#x88AB;&#x52A0;&#x5165;&#x5230;AQS&#x7684;&#x540C;&#x6B65;&#x961F;&#x5217;&#x4E2D;&#x7B49;&#x5F85;
     // &#x76F4;&#x5230;&#x83B7;&#x53D6;&#x6210;&#x529F;&#x4E3A;&#x6B62;&#x3002;&#x4E14;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x662F;&#x4F1A;&#x54CD;&#x5E94;&#x4E2D;&#x65AD;&#x7684;&#xFF0C;&#x7EBF;&#x7A0B;&#x5728;&#x963B;&#x585E;&#x7684;&#x8FC7;&#x7A0B;&#x4E2D;&#xFF0C;&#x82E5;&#x88AB;&#x5176;&#x4ED6;&#x7EBF;&#x7A0B;&#x4E2D;&#x65AD;&#xFF0C;
     // &#x5219;&#x6B64;&#x65B9;&#x6CD5;&#x4F1A;&#x901A;&#x8FC7;&#x629B;&#x51FA;&#x5F02;&#x5E38;&#x7684;&#x65B9;&#x5F0F;&#x7ED3;&#x675F;&#x7B49;&#x5F85;&#x3002;
     sync.acquireSharedInterruptibly(1);
 }
 &#x200B;
 /**
 *&#x6B64;&#x65B9;&#x6CD5;&#x662F;AQS&#x4E2D;&#x63D0;&#x4F9B;&#x7684;&#x4E00;&#x4E2A;&#x6A21;&#x677F;&#x65B9;&#x6CD5;&#xFF0C;&#x7528;&#x4EE5;&#x83B7;&#x53D6;&#x5171;&#x4EAB;&#x9501;&#xFF0C;&#x5E76;&#x4E14;&#x4F1A;&#x54CD;&#x5E94;&#x4E2D;&#x65AD; */
 public final void acquireSharedInterruptibly(int arg)
     throws InterruptedException {
     // &#x9996;&#x5148;&#x5224;&#x65AD;&#x5F53;&#x524D;&#x7EBF;&#x7A0B;&#x91CA;&#x653E;&#x88AB;&#x4E2D;&#x65AD;&#xFF0C;&#x82E5;&#x88AB;&#x4E2D;&#x65AD;&#xFF0C;&#x5219;&#x76F4;&#x63A5;&#x629B;&#x51FA;&#x5F02;&#x5E38;&#x7ED3;&#x675F;
     if (Thread.interrupted())
         throw new InterruptedException();

     // &#x8C03;&#x7528;tryAcquireShared&#x65B9;&#x6CD5;&#x5C1D;&#x8BD5;&#x83B7;&#x53D6;&#x9501;&#xFF0C;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x88AB;Sycn&#x7C7B;&#x91CD;&#x5199;&#x4E86;&#xFF0C;
     // &#x82E5;count == 0&#xFF0C;&#x5219;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x4F1A;&#x8FD4;&#x56DE;1&#xFF0C;&#x8868;&#x793A;&#x83B7;&#x53D6;&#x9501;&#x6210;&#x529F;&#xFF0C;&#x5219;&#x8FD9;&#x91CC;&#x4F1A;&#x76F4;&#x63A5;&#x8FD4;&#x56DE;&#xFF0C;&#x7EBF;&#x7A0B;&#x4E0D;&#x4F1A;&#x88AB;&#x963B;&#x585E;&#xFF1B;&#x5426;&#x5219;&#x8FD4;&#x56DE;-1
     // &#x82E5;count < 0&#xFF0C;&#x5C06;&#x4F1A;&#x6267;&#x884C;&#x4E0B;&#x9762;&#x7684;doAcquireSharedInterruptibly&#x65B9;&#x6CD5;&#xFF0C;
     // &#x6B64;&#x5904;&#x8BF7;&#x53BB;&#x67E5;&#x770B;Sync&#x4E2D;tryAcquireShared&#x65B9;&#x6CD5;&#x7684;&#x5B9E;&#x73B0;
     if (tryAcquireShared(arg) < 0)
         // &#x4E0B;&#x9762;&#x8FD9;&#x4E2A;&#x65B9;&#x6CD5;&#x7684;&#x4F5C;&#x7528;&#x662F;&#xFF0C;&#x7EBF;&#x7A0B;&#x83B7;&#x53D6;&#x9501;&#x5931;&#x8D25;&#xFF0C;&#x5C06;&#x4F1A;&#x52A0;&#x5165;&#x5230;AQS&#x7684;&#x540C;&#x6B65;&#x961F;&#x5217;&#x4E2D;&#x963B;&#x585E;&#x7B49;&#x5F85;&#xFF0C;
         // &#x76F4;&#x5230;&#x6210;&#x529F;&#x83B7;&#x53D6;&#x5230;&#x9501;&#xFF0C;&#x800C;&#x6B64;&#x5904;&#x6210;&#x529F;&#x83B7;&#x53D6;&#x5230;&#x9501;&#x7684;&#x6761;&#x4EF6;&#x5C31;&#x662F;count == 0&#xFF0C;&#x82E5;&#x5F53;&#x524D;&#x7EBF;&#x7A0B;&#x5728;&#x7B49;&#x5F85;&#x7684;&#x8FC7;&#x7A0B;&#x4E2D;&#xFF0C;
         // &#x6210;&#x529F;&#x5730;&#x83B7;&#x53D6;&#x4E86;&#x9501;&#xFF0C;&#x5219;&#x5B83;&#x4F1A;&#x7EE7;&#x7EED;&#x5524;&#x9192;&#x5728;&#x5B83;&#x540E;&#x9762;&#x7B49;&#x5F85;&#x7684;&#x7EBF;&#x7A0B;&#xFF0C;&#x4E5F;&#x5C1D;&#x8BD5;&#x83B7;&#x53D6;&#x9501;&#xFF0C;
         // &#x8FD9;&#x4E5F;&#x5C31;&#x662F;&#x8BF4;&#xFF0C;&#x53EA;&#x8981;count == 0&#x4E86;&#xFF0C;&#x5219;&#x88AB;&#x963B;&#x585E;&#x7684;&#x7EBF;&#x7A0B;&#x90FD;&#x80FD;&#x6062;&#x590D;&#x8FD0;&#x884C;
         doAcquireSharedInterruptibly(arg);
 &#x200B;
 }

从源码细节来看,我们知道CountDownLatch底层是继承了AQS框架,是一个自定义同步组件。

AQS的状态变量被它当做了一个所谓的计数器实现。主线程调用await方法后,发现state的值不等于0,进入同步队列中阻塞等待。子线程每次调用countDown方法后,计数器减一,直到为0。这时会唤醒处于阻塞状态的主线程,然后主线程就会从await方法出返回。

方案三:栅栏(CyclicBarrier)

什么是栅栏?

CyclicBarrier字面意思是可循环(Cyclic)使用的栅栏(Barrier)。它的意思是让一组线程到达一个栅栏时被阻塞,直到最后一个耗时较长的线程完成任务后也到达栅栏时,栅栏才会打开,此时所有被栅栏拦截的线程才会继续执行。

基本语义:

CyclicBarrier有一个默认构造方法:CyclicBarrier(int parties),参数parties表示被栅栏拦截的线程数量。

每个线程调用await()方法告诉栅栏我已经到达栅栏,然后当前线程就会被阻塞,直到以下任一情况发生时,当前线程从await方法处返回。

  • 最后一个线程到达
  • 其他线程中断当前线程
  • 其他线程等待栅栏超时;通过调用await带超时时间的方法。
    await(long timeout, TimeUnit unit)
  • 其他一些线程在此屏障上调用重置

原理:

深入理解并发编程同步工具类

在CyclicBarrier的内部定义了一个Lock对象,每当一个线程调用await方法时,将拦截的线程数减1,然后判断剩余拦截数是否为初始值parties,如果不是,进入Lock对象的条件队列等待。如果是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程会依次的获取锁、释放锁。

适用场景:

1)实现多人游戏,直到所有玩家都加入才能开始。

2)经典场景:多线程计算数据,然后汇总结算结果场景。(比如一个Excel有多份sheet数据,开启多线程,每个线程处理一个sheet,最终将每个sheet的计算结果进行汇总)

示例代码:

 public class CyclicBarrierTest2 {
     static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new CalculateResult());
 &#x200B;
     public static void main(String[] args) {
         new Thread(() -> {
             try {
                 System.out.println("&#x7EBF;&#x7A0B;A&#x5904;&#x7406;&#x5B8C;sheet0&#x6570;&#x636E;...&#x603B;&#x8BA1;100");
                 cyclicBarrier.await();
             } catch (Exception e) {
 &#x200B;
             }
 &#x200B;
         }).start();
 &#x200B;
         try {
             System.out.println("&#x7EBF;&#x7A0B;B&#x5904;&#x7406;&#x5B8C;sheet1&#x6570;&#x636E;...&#x603B;&#x8BA1;200");
             cyclicBarrier.await();
         } catch (Exception e) {
 &#x200B;
         }
     }
 &#x200B;
     static class CalculateResult implements Runnable {
 &#x200B;
         @Override
         public void run() {
             System.out.println("&#x3010;&#x6C47;&#x603B;&#x7EBF;&#x7A0B;&#x3011;&#x5F00;&#x59CB;&#x7EDF;&#x8BA1;&#x5404;&#x4E2A;&#x5B50;&#x7EBF;&#x7A0B;&#x7684;&#x8BA1;&#x7B97;&#x7ED3;&#x679C;...&#xFF0C;&#x603B;&#x8BA1;300");
 &#x200B;
         }
     }
 }

响应结果打印:

 &#x7EBF;&#x7A0B;B&#x5904;&#x7406;&#x5B8C;sheet1&#x6570;&#x636E;...&#x603B;&#x8BA1;200
 &#x7EBF;&#x7A0B;A&#x5904;&#x7406;&#x5B8C;sheet0&#x6570;&#x636E;...&#x603B;&#x8BA1;100
 &#x3010;&#x6C47;&#x603B;&#x7EBF;&#x7A0B;&#x3011;&#x5F00;&#x59CB;&#x7EDF;&#x8BA1;&#x5404;&#x4E2A;&#x5B50;&#x7EBF;&#x7A0B;&#x7684;&#x8BA1;&#x7B97;&#x7ED3;&#x679C;...&#x603B;&#x8BA1;300

方案四:信号量(Semaphore)

什么是信号量?

信号量是用来控制 同时访问 特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

基本语义:

从Semaphore的构造方法Semaphore(int permits)来看,入参permits表示可用的许可数量。如果我们在方法内部执行操作前先执行了acquire()方法,那么当前线程就会尝试去获取可用的许可,如果获取不到,就会被阻塞(或者中途被其他线程中断),直到有可用的许可为止。

执行release()方法意味着会释放许可给Semaphore。此时许可数量就会加一。

使用场景:

Semaphore在 有限公共资源场景下,应用比较广泛,比如数据库连接池场景。

大家可以想象一下,比如我们平时在用的C3P0、druid等数据库连接池,因为数据库连接数是有限制的,面对突如其来的激增流量,一下子把有限的连接数量给占完了,那没有获取到可用的连接的线程咋办?是直接失败吗?

我们期望的效果是让这些没获取到连接的线程先暂时阻塞一会,而不是立即失败,这样一旦有可用的连接,这些被阻塞的线程就可以获取到连接而继续工作。

示例代码:

 public class BoundedHashSet<t> {
 &#x200B;
     private final Set<t> set;
 &#x200B;
     private final Semaphore sem;
 &#x200B;
     public BoundedHashSet(int bound) {
         this.set = Collections.synchronizedSet(new HashSet<t>());
         this.sem = new Semaphore(bound);
     }
 &#x200B;
     public boolean add(T o) throws InterruptedException {
         sem.acquire();
 &#x200B;
         boolean wasAdded = false;
 &#x200B;
         try {
             wasAdded = set.add(o);//&#x5982;&#x679C;&#x5143;&#x7D20;&#x5DF2;&#x5B58;&#x5728;&#xFF0C;&#x8FD4;&#x56DE;false;&#x5426;&#x5219;true
             return wasAdded;
         } catch (Exception e) {
 &#x200B;
         } finally {
             if (!wasAdded) {//&#x5982;&#x679C;&#x5143;&#x7D20;&#x5DF2;&#x7ECF;&#x5B58;&#x5728;&#xFF0C;&#x5219;&#x91CA;&#x653E;&#x8BB8;&#x53EF;&#x7ED9;&#x4FE1;&#x53F7;&#x91CF;
                 sem.release();
             }
         }
         return wasAdded;
     }
 &#x200B;
     public boolean remove(Object o) {
         boolean wasRemoved = set.remove(o);
         if (wasRemoved) {
             sem.release();//&#x4ECE;&#x5BB9;&#x5668;&#x4E2D;&#x79FB;&#x9664;&#x5143;&#x7D20;&#x540E;&#xFF0C;&#x9700;&#x8981;&#x91CA;&#x653E;&#x8BB8;&#x53EF;&#x7ED9;&#x4FE1;&#x53F7;&#x91CF;&#x3002;
         }
         return wasRemoved;
     }
 &#x200B;
 &#x200B;
 }</t></t></t>

总结

上述需求的实现方案我例举了join、CountDownLatch、CyclicBarrier Semaphore这几种。

期间也介绍了每种方案的实现原理、适用场景、源码解析。它们语意上有一些相似的地方,但差异性也很明显,接下来我们详细对它们进行一下对比。

首先我们说当前线程调用t.join()尽管能达到当前线程 等待线程t完成任务的业务语义。但细致的区别是join方法调用后必须要等到t线程完成它的任务后,当前线程才能从阻塞出返回。而CountDownLatch、CyclicBarrier显然提供了 更细粒度的控制。像CountDownLatch只要主线程将countDownLatch实例对象传递给子线程,子线程在方法内部某个地方执行latch.countDownLatch(),每调用一次计数器就会减1,直到为0,最后主线程就能感知到并从await阻塞出返回,不需要等到任务的完成。

其次我们说在当前线程方法内部,一旦出现超过2个join方法,整体代码就会变的很脏、可读性降低。反观JUC分装的CountDownLatch、CyclicBarrier等组件,通过对共享实例的操作(可以把这个实例传给子线程,然后子线程任务执行的时候调用相应方法,比如latch.countDown()) 显得更加清晰、优雅。

最后比较一下CyclicBarrier和CountDownLatch的差异性。比起CountDownLatch显然CyclicBarrier功能更多,比如支持reset方法。CountDownLatch的计数器只能使用一次,而CyclicBarrier可以多次使用,只要调用reset方法即可。(比如CyclicBarrier典型的数据统计场景,因为中途可能部分线程统计出错或外部数据的订正,可能需要重新再来一次计算,那么这个时候,CountDownLatch无能为力,而CyclicBarrier只要子线程调用reset方法即可)。

而Semaphore往往用来针对多线程并发访问指定有限资源的场景,比如数据库连接池场景。

写到最后

如果这篇文章你看了对你有帮助或启发,麻烦关注、点赞一下作者。你的肯定是作者创作源源不断的动力。

公众号

欢迎大家关注我的公众号:【 陶朱公Boy

深入理解并发编程同步工具类

里面不仅汇集了硬核的干货技术、还汇集了像左耳朵耗子、张朝阳总结的高效学习方法论、职场升迁窍门、软技能。希望能辅助你到达你的梦想之地!

公众号内 回复关键字” 电子书“下载pdf格式的电子书籍(JAVAEE、Spring、JVM、并发编程、Mysql、Linux、kafka、分布式等)、” 开发手册“获取阿里开发手册2本、” 面试“获取面试PDF资料。

加群

回复关键字” 加群“拉你入群,可以跟很多BAT大厂的前辈交流和学习。

Original: https://www.cnblogs.com/StarbucksBoy/p/16778653.html
Author: 陶朱公Boy
Title: 深入理解并发编程同步工具类

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/801002/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 1.什么是pytest

    1.什么是Pytest Pytest是一个非常成熟的python测试用例 &#x6846;&#x67B6;,它可以和很多的工具或框架,selenium、reques…

    Python 2023年9月13日
    042
  • 修改nc文件中的时间序列并给变量赋予新的时间属性

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一、pands.DataFrame如何应用与构建时间序列? 二、使用步骤 * 1.代码示例及相关结…

    Python 2023年8月20日
    082
  • Python3教程:Pandas模块删除数据的几种情况

    开始之前,pandas中DataFrame删除对象可能存在几种情况1、删除具体列2、删除具体行3、删除包含某些数值的行或者列4、删除包含某些字符、文字的行或者列本文就针对这四种情况…

    Python 2023年11月2日
    043
  • 用Python制作爆款视频,太绝了

    前言 前几天小编在抖音上刷到一个慢慢变老的视频,播放量居然有 30W+,当时就在想这视频 Python 可不可以做?经过一番搜索,小编找到了 腾讯云的人脸年龄变化 API,上面介绍…

    Python 2023年11月2日
    035
  • NumPy 数组属性

    NumPy 数组属性 一、重要 ndarray 对象属性 属性说明ndarray.ndim秩,即轴的数量或维度的数量ndarray.shape数组的维度,对于矩阵,n 行 m 列n…

    Python 2023年8月24日
    052
  • python面向对象之魔术方法(特定时机自动触发) 魔术属性

    魔术方法(特定时机自动触发) init 构造方法 触发时机:实例化对象,初始化的时候触发功能:为对象添加成员参数:参数不固定,至少一个self参数返回值:无 (1) 基本语法 cl…

    Python 2023年6月10日
    065
  • Django网站设计常见问题处理(整理)

    以下内容来源并整理于网络,并在实际应用过程中进行了汇总,希望能够帮到大家,后面有一段时间没有整理了,也欢迎大家交流、提问,并持续更新。问题一、”Python&#8221…

    Python 2023年8月4日
    058
  • 机器学习保姆级入门案例-波士顿房价预测

    利用scikit-learn进行机器学习入门案例 相信很多人都是知道波士顿房价的数据集,一个非常经典的机器学习入门案例数据集。在这个案例中直接使用sklearn中自带的数据集来进行…

    Python 2023年8月2日
    053
  • 【算法分析与设计】【期中(末)复习题】【2022秋】

    文章目录 一. 单选题 二. 填空题 三. 判断题 四. 多选题 一. 单选题 1.按照渐近阶从低到高的顺序排列下列表达式: 30n,2logn,4,n! A. 4 Origina…

    Python 2023年9月26日
    043
  • 手机进销存网站

    开发工具(eclipse/idea/vscode等):数据库(sqlite/mysql/sqlserver等):功能模块(请用文字描述,至少200字): 功能模块包括:员工模块、手…

    Python 2023年10月10日
    054
  • 树莓派安装Tensorflow2、OpenCV、numpy

    本次安装版本为:Tensorflow == 2.2.0, numpy == 1.21.5,OpenCV == 4.5.5,其版本兼容 建议通读全文,再根据需要安装 (本次安装的路径…

    Python 2023年8月25日
    067
  • 1分钟掌握DataFrame的行标签索引(loc与iloc)

    针对DataFrame标签索引的loc和iloc方法: lociloc行名称标签整数(位置)标签 代码示例: 1、首先建立一个示例数据 –“data&#82…

    Python 2023年8月18日
    040
  • python手机端游戏排行_利用Python开发手机同款游戏:开心消消乐

    手机上面的开心消消乐,我想大部分人都是玩过的吧,今天小编就教大家如何用python开发这款游戏 不过只有十个关卡,不像手机里面那么多的关卡!不过游戏的画面和bgm都是同款的哦~ 效…

    Python 2023年9月23日
    044
  • GANs系列:DCGAN原理简介与基础GAN的区别对比

    本文长期不定时更新最新知识,防止迷路记得收藏哦! 还未了解基础GAN的,可以先看下面两篇文章: GNA笔记–GAN生成式对抗网络原理以及数学表达式解剖 入门GAN实战&…

    Python 2023年9月27日
    041
  • python单元测试之pytest

    pytest pytest的运行方式 . 点号,表示用例通过F 表示失败 FailureE 表示用例中存在异常 Error pytest命名规则命名规则 类名必须是Test开头测试…

    Python 2023年9月10日
    032
  • 【网络安全】——文件上传之安全狗bypass

    作者名:Demo不是emo主页面链接:主页传送门创作初心: 一切为了她座右铭: 不要让时代的悲哀成为你的悲哀专研方向: 网络安全,数据结构每日emo:保持心脏震荡,等有人与我共鸣 …

    Python 2023年9月26日
    048
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球