Java并发编程之AQS以及源码解析

文章目录

Java并发编程之AQS以及源码解析

; 概览

AQS(AbstractQueuedSynchronizer)是 Doug Lea 大师创作的用来构建锁或者其他同步组件(信号量、事件等)的基础框架类。

JDK中许多并发工具类的内部实现都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch等等。

AQS本身是一个抽象类,主要的使用方法是继承它作为一个内部类。

AQS设计基于模板方法模式,开发者需要继承同步器并且重写指定的方法,将其组合在并发组件的实现中,调用同步器的模板方法,模板方法会调用使用者重写的方法。

AQS定义了一套多线程访问共享资源的同步器框架,是整个包的基石, LockReadWriteLockCountDowndLatchCyclicBarrierSemaphoreThreadPoolExecutor等都是在AQS的基础上实现的。

实现思路

AQS内部维护一个FIFO队列来管理锁。线程会首先尝试获取锁,如果失败,则将当前线程以及等待状态等信息包成一个Node节点加到同步队列里。

接着会不断循环尝试获取锁(条件是当前节点为head的直接后继才会循环尝试),如果失败或者不是head的后继节点,则会阻塞自己,直至被唤醒;而当持有锁的线程释放锁时(或取消时),会唤醒队列中的后继线程。

更多请移驾。。。
🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽
本文作者:Java技术债务
原文链接:https://www.cuizb.top/myblog/article/1659968359
版权声明: 本博客所有文章除特别声明外,均采用 CC BY 3.0 CN协议进行许可。转载请署名作者且注明文章出处。
🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽

插播一下下嘛,文章肯定是好文章,不想成为我的粉丝,可以点一下上边的链接,是我个人的小网站,当然也可以关注我的公众号:Java技术债务,或者扫下边的二维码

Java并发编程之AQS以及源码解析

下面列举JDK中几种常见使用了AQS的同步组件:

  • ReentrantLock: 使用了AQS的独占获取和释放,用state变量记录线程获取独占锁的次数,获取锁时+1,释放锁时-1,等于0时表示没有线程占用锁,可以尝试获取锁。
  • Semaphore: 使用了AQS的共享获取和释放,用state变量作为计数器,只有在大于0时允许线程进入。获取锁时-1,释放锁时+1。
  • CountDownLatch: 使用了AQS的共享获取和释放,用state变量作为计数器,在初始化时指定(即CountDownLatch的入参count)。只要state还大于0,获取共享锁会因为失败而阻塞,直到计数器的值为0时,共享锁才允许获取,当前等待线程会被唤醒。

AQS主要做了三件事情

  • 同步状态的管理
  • 线程的阻塞和唤醒
  • 同步队列的维护

下面三个protected final方法是AQS中用来访问/修改同步状态的方法:

  • int getState(): 获取同步状态
  • void setState(): 设置同步状态
  • boolean compareAndSetState(int expect, int update):基于CAS,原子设置当前状态

; 实现原理

并发控制的核心是锁的获取与释放,锁的实现方式有很多种,AQS采用的是一种改进的 CLH锁

源自CLH锁

CLH(Craig, Landin, and Hagersten locks)是一种 自旋锁,发明出来的主要原因是为了解决多核cpu体系中全部加锁线程都访问同一内存地址而出现过多内存竞争的问题。能确保无饥饿性,提供先来先服务的公平性。

自旋锁是为实现保护共享资源而提出一种锁机制。

其实,自旋锁与互斥锁比较类似,它们都是为了解决对某项资源的互斥使用。无论是互斥锁,还是自旋锁,在任何时刻,最多只能有一个保持者,也就是说,在任何时刻最多只能有一个执行单元获得锁。

但是两者在调度机制上略有不同。对于互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。而自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,”自旋”一词就是因此而得名。

CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

CLH锁引入了线程节点的概念,需要加锁的线程不断的从队尾加入队列,构造出了一个逻辑上的单向链表队列;获取锁的顺序也是从队列头部开始,早加入队列的线程便能更早的获得到CLH锁,实现先来先服务的公平性。

CLH锁结构图

Java并发编程之AQS以及源码解析

CLH锁中加锁的线程不再是统一的监听同一个标识锁状态的内存地址,而是只监听队列中当前线程节点其前驱线程节点的锁状态。如此一来,便分散了不同线程加锁时所要访问的内存变量地址,相比起前面介绍的原始自旋锁和票锁减少了大量的内存访问竞争,减少了底层为了实现线程间内存数据可见性同步时的性能开销。

加锁时,先cas的入队获取前驱节点后,便不断的循环监听前驱节点锁的状态,当发现前驱节点释放了锁时,当前节点便获得了锁。

而解锁时则很简单,将当前线程自己的锁状态更改为已释放即可。标识为已释放时,存在的后继加锁节点便能感知到这一变化,从而获得锁。

; AQS数据模型

AQS维护了一个 volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

Java并发编程之AQS以及源码解析

AQS的内部队列是CLH同步锁的一种变形。其主要从以下方面进行了改造:

  • 在结构上引入了头节点和尾节点,分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关,
  • 为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段
  • 在每个node里面使用一个状态字段来控制阻塞/唤醒,而不是自旋
  • head节点使用的是傀儡节点

FIFO队列中的节点有AQS的静态内部类 Node定义:

static final class Node {

    static final Node SHARED = new Node();

    static final Node EXCLUSIVE = null;

    static final int CANCELLED = 1;

    static final int SIGNAL = -1;

    static final int CONDITION = -2;

    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null) {
            throw new NullPointerException();
        } else {
            return p;
        }
    }

    Node() {
    }

    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

类中有两个常量 SHAREEXCLUSIVE,顾名思义这两个常量用于表示这个节点支持共享模式还是独占模式。

共享模式指的是允许多个线程获取同一个锁而且可能获取成功。

独占模式指的是一个锁如果被一个线程持有,其他线程必须等待。多个线程读取一个文件可以采用共享模式,而当有一个线程在写文件时不会允许另一个线程写这个文件,这就是独占模式的应用场景。

CAS操作

AQS有三个重要的变量:


private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

compareAndSetState方法是以乐观锁的方式更新共享资源。

CAS 指的是现代 CPU 广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。这个指令会对内存中的共享数据做原子的读写操作。

简单介绍一下这个指令的操作过程

首先,CPU 会将内存中将要被更改的数据与期望的值做比较。然后,当这两个值相等时,CPU 才会将内存中的数值替换为新的值。否则便不做操作。最后,CPU 会将旧的数值返回。

这一系列的操作是原子的。它们虽然看似复杂,但却是 Java 5 并发机制优于原有锁机制的根本。简单来说,CAS 的含义是”我认为原有的值应该是什么,如果是,则将原有的值更新为新值,否则不做修改,并告诉我原来的值是多少”。

CAS通过调用JNI(Java Native Interface)调用实现的。JNI允许java调用其他语言,而CAS就是借助C语言来调用CPU底层指令实现的。 Unsafe是CAS的核心类,它提供了硬件级别的原子操作。

大神在java同步器中大量使用了CAS技术,鬼斧神工的实现了多线程执行的安全性。CAS不仅在AQS的实现中随处可见,也是整个包的基石。

可以发现, headtailstate三个变量都是 volatile的。是轻量级的,它在多处理器开发中保证了共享变量的”可见性”。

volatile变量也存在一些局限:不能用于构建原子的复合操作,因此当一个变量依赖旧值时就不能使用volatile变量。而CAS呢,恰恰可以提供对共享变量的原子的读写操作。

volatile保证共享变量的可见性,CAS保证更新操作的原子性,简直是绝配!把这些特性整合在一起,就形成了整个concurrent包得以实现的基石。如果仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:

  • 首先,声明共享变量为volatile;
  • 然后,使用CAS的原子条件更新来实现线程之间的同步;
  • 同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。

AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些concurrent包中的基础类都是使用这种模式来实现的,而concurrent包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent包的实现示意图如下:

Java并发编程之AQS以及源码解析

主要方法

自定义同步器的实现方法

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

方法描述boolean tryAcquire(int arg)尝试获取独占锁成功则返回true,失败则返回false。boolean tryRelease(int arg)尝试释放独占锁成功则返回true,失败则返回false。int tryAcquireShared(int arg)尝试获取共享锁负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。boolean tryReleaseShared(int arg)尝试释放共享锁成功则返回true,失败则返回false。boolean isHeldExclusively()当前线程是否获得了独占锁只有用到condition才需要去实现它。

例子:初始化为0,表示未锁定状态。A线程 lock()时,会调用 tryAcquire()独占该锁并将state+1。此后,其他线程再时就会失败,直到A线程 unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现t ryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock

AQS定义的模板方法

AQS本身将同步状态的管理用模板方法模式都封装好了,以下列举了AQS中的一些模板方法:

方法描述void acquire(int arg)获取独占锁。会调用tryAcquire方法,如果未获取成功,则会进入同步队列等待void acquireInterruptibly(int arg)响应中断版本的acquireboolean tryAcquireNanos(int arg,long nanos)响应中断+带超时版本的acquirevoid acquireShared(int arg)获取共享锁。会调用tryAcquireShared方法void acquireSharedInterruptibly(int arg)响应中断版本的acquireSharedboolean tryAcquireSharedNanos(int arg,long nanos)响应中断+带超时版本的acquireSharedboolean release(int arg)释放独占锁boolean releaseShared(int arg)释放共享锁Collection getQueuedThreads()获取同步队列上的线程集合

源码解读

等待状态释义

值描述CANCELLED (1)当前线程因为超时或者中断被取消。这是一个终结态,也就是状态到此为止。SIGNAL (-1)当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的。CONDITION (-2)当前线程在condition队列中。PROPAGATE (-3)用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的0表示无状态。

Java并发编程之AQS以及源码解析

图为自制的AQS状态的流转图,AQS中0状态和CONDITION状态为始态,CANCELLED状态为终态。0状态同时也可以是节点生命周期的终态。 注意,上图仅表示状态之间流转的可达性,并不代表一定能够从一个状态沿着线随意跃迁。

在AQS中包含了head和tail两个Node引用,其中head在逻辑上的含义是当前持有锁的线程,head节点实际上是一个虚节点,本身并不会存储线程信息。当一个线程无法获取锁而被加入到同步队列时,会用CAS来设置尾节点tail为当前线程对应的Node节点。

head和tail在AQS中是延迟初始化的,也就是在需要的时候才会被初始化,也就意味着在所有线程都能获取到锁的情况下,队列中的head和tail都会是null。

; AQS获取锁的流程图

Java并发编程之AQS以及源码解析

获取独占锁的实现

根据流程图进行源码的剖析,一步一步的向下走。

  • acquire(int)独占模式下线程获取共享资源的顶层入口。

如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。获取到资源后,线程就可以去执行其临界区代码了。

如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。


public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • tryAcquire(int)此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

AQS只是一个框架,在这里定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS),至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了。

当然,自定义同步器在进行资源访问时要考虑线程安全的影响。

这里之所以没有定义成 abstract,是因为独占模式下只用实现 tryAcquire(int)tryRelease(int),而共享模式下只用实现 tryAcquireShared(int)tryReleaseShared(int)。如果都定义成 abstract,那么每个模式也要去实现另一模式下的接口。

  • addWaiter(node)将该线程加入等待队列的尾部,并标记为独占模式;其中,compareAndSetTail方法也是调用Unsafe类实现CAS操作,更新队尾。

private Node addWaiter(Node mode) {

    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;

    if (pred != null) {
        node.prev = pred;

        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    enq(node);
    return node;
}
  • enq(node) 通过循环+CAS在队列中成功插入一个节点后返回。

private Node enq(final Node node) {

    for (;;) {
        Node t = tail;

        if (t == null) {
            if (compareAndSetHead(new Node()))

                tail = head;
        } else {

            node.prev = t;

            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • acquireQueued(node, int)使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {

        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();

            if (p == head && tryAcquire(arg)) {

                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }

            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

如果获取资源失败后,会调用两个函数, shouldParkAfterFailedAcquireparkAndCheckInterrupt

  • shouldParkAfterFailedAcquire(pred, node)根据前驱节点中的waitStatus来判断是否需要阻塞当前线程。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)

        return true;
    if (ws > 0) {

        do {

            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {

        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

整个流程中,如果前驱节点的状态不是 SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

  • parkAndCheckInterrupt()让线程去休息,真正进入等待状态。
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
}

park()会让当前线程进入 waiting状态。

在此状态下,有两种途径可以唤醒该线程:被 unpark()或被 interrupt()

  • cancelAcquire(node)实现某个node取消获取锁。

private void cancelAcquire(Node node) {

   if (node == null)
       return;

   node.thread = null;

   Node pred = node.prev;
   while (pred.waitStatus > 0)
       node.prev = pred = pred.prev;

   Node predNext = pred.next;

   node.waitStatus = Node.CANCELLED;

   if (node == tail && compareAndSetTail(node, pred)) {
       compareAndSetNext(pred, predNext, null);
   } else {

       int ws;
       if (pred != head &&
           ((ws = pred.waitStatus) == Node.SIGNAL ||
            (ws  0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
           pred.thread != null) {
           Node next = node.next;

           if (next != null && next.waitStatus  0)
               compareAndSetNext(pred, predNext, next);
       } else {

           unparkSuccessor(node);
       }

       node.next = node;
   }
}
  • unparkSuccessor(node)唤醒后继线程。

一句话概括:用 unpark()唤醒等待队列中最前边的那个未放弃线程。


private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;

    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;

    if (s == null || s.waitStatus > 0) {
        s = null;

        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus  0)
                s = t;
    }
    if (s != null)

        LockSupport.unpark(s.thread);
}

总结acquire的流程

  • 调用自定义同步器的 tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  • 没成功,则 addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被 unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。

释放独占锁的实现

对于释放一个独占锁,首先会调用 tryRelease(int),在完全释放掉独占锁后,这时后继线程是可以获取到独占锁的,因此释放者线程需要做的事情是唤醒一个队列中的后继者线程,让它去尝试获取独占锁。

上述所谓完全释放掉锁的含义,简单来说就是当前锁处于无主状态,等待线程有可能可以获取。

举例:对于可重入锁ReentrantLock, 每次tryAcquire后,state会+1,每次tryRelease后,state会-1,如果state变为0了,则此时称独占锁被完全释放了。

  • release(int) 释放锁的入口

release(int)acquire(int)的逆操作,是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

public final boolean release(int arg) {
    if (tryRelease(arg)) {

        Node h = head;

        if (h != null && h.waitStatus != 0)

            unparkSuccessor(h);
        return true;
    }
    return false;
}

整个release做的事情就是

  1. 调用tryRelease
  2. 如果tryRelease返回true也就是独占锁被完全释放,唤醒后继线程。

这里的唤醒是根据head几点来判断的,上面代码的注释中也分析了head节点的情况,只有在head存在并且等待状态小于零的情况下唤醒。

  • tryRelease(int)tryAcquire()一样,这个方法是需要自定义同步器去实现的。

正常来说, tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可,也不需要考虑线程安全的问题。

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

获取共享锁的实现

与获取独占锁的实现不同的关键在于,共享锁允许多个线程持有。

如果需要使用AQS中共享锁,在实现 tryAcquireShared(int)方法时需要注意,返回负数表示获取失败;返回0表示成功,但是后继争用线程不会成功;返回正数表示获取成功,并且后继争用线程也可能成功。

  • acquireShared(int)此方法是共享模式下线程获取共享资源的顶层入口。

它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
  • tryAcquireShared(int)需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
  • doAcquireShared(int)功能类似于独占模式下的 acquireQueued()
private void doAcquireShared(int arg) {

    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();

            if (p == head) {

                int r = tryAcquireShared(arg);

                if (r >= 0) {

                    setHeadAndPropagate(node, r);
                    p.next = null;
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }

            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

跟独占模式比,有一点需要注意的是,这里只有线程是 head.next时(”老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。

那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。因为老大先唤醒老二,老二一看资源不够自己用继续park(),也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。

  • setHeadAndPropagate(Node, int)函数用来设置新head,并在一定情况下调用 doReleaseShared

private void setHeadAndPropagate(Node node, int propagate) {

    Node h = head;
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())

            doReleaseShared();
    }
}

此方法在 setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继节点,毕竟是共享模式。

释放共享锁的实现

  • releaseShared(int)共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        doReleaseShared();
        return true;
    }
    return false;
}
  • tryReleaseShared(int)
protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
}

跟独占模式下的 release()相似,但有一点稍微需要注意:独占模式下的 tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于可重入的考量;而共享模式下的 releaseShared()则没有这种要求,多线程可并发执行,不适用于可重入。

  • doReleaseShared()

private void doReleaseShared() {

    for (;;) {
        Node h = head;

        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {

                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            }

            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }

        if (h == head)
            break;
    }
}

除了上面分析的核心方法,AQS还有定义了附带超时功能的 tryAcquireNanos()tryAcquireSharedNanos()方法,以及响应中断的 acquireInterruptibly()acquireSharedInterruptibly()方法,其核心流程与通用方法大同小异。

相关问题

Q1: unparkSuccessor(Node) 方法中为什么唤醒后继节点时要从tail向前查找最接近node的非取消节点,而不是直接从node向后找到第一个后break掉?

答:如果读到s == null(node.next),不代表node就为tail。
考虑如下场景:

  • node某时刻为tail
  • 有新线程通过addWaiter中的if分支或者enq方法添加自己
  • compareAndSetTail成功
  • 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了

反过来,如果从node开始向后找后继,此时node的对象后继node.next为空,找不到后继。

Q2: unparkSuccessor(Node) 方法在被release调用时是否存在这样的一个漏洞?

答:unparkSuccessor方法在被release调用时是否存在漏洞?

  • 时刻1: node -> tail && tail.waitStatus == Node.CANCELLED (node的下一个节点为tail,并且tail处于取消状态)
  • 时刻2: unparkSuccessor读到s.waitStatus > 0
  • 时刻3: unparkSuccessor从tail开始遍历
  • 时刻4: tail节点对应线程执行cancelAcquire方法中的if (node == tail && compareAndSetTail(node, pred)) 返回true,此时tail变为pred(也就是node)
  • 时刻5: 有新线程进队列tail变为新节点
  • 时刻6: unparkSuccessor没有发现需要唤醒的节点

最终新节点阻塞并且前驱节点结束调用,新节点再也无法被unpark
这种情况不会发生,确实可能出现从tail向前扫描,没有读到新入队的节点,但别忘了acquireQueued的思想就是不断循环检测是否能够独占获取锁,
否则再进行判断是否要阻塞自己,而release的第一步就是tryRelease,它的语义为true表示完全释放独占锁,完全释放之后才会执行后面的逻辑,也就是unpark后继线程。在这种情况下,新入队的线程应当能获取到锁。
如果没有获取锁,则必然是在覆盖tryAcquire/tryRelease的实现有问题,导致前驱节点成功释放了独占锁,后继节点获取独占锁仍然失败。也就是说AQS框架的可靠性还在
某些程度上依赖于具体子类的实现,子类实现如果有bug,那AQS再精巧也扛不住。

Q3:AQS如何保证在节点释放的同时又有新节点入队的情况下,不出现原持锁线程释放锁,后继线程被自己阻塞死的情况,保持同步队列的活跃?

答:需要结合 shouldParkAfterFailedAcquire((Node, Node))unparkSuccessor(Node)这两个方法。以独占锁为例,后继争用线程阻塞自己的情况是读到前驱节点的等待状态为SIGNAL,只要不是这种情况都会再试着去争取锁。

假设后继线程读到了前驱状态为SIGNAL,说明之前在tryAcquire的时候,前驱持锁线程还没有tryRelease完全释放掉独占锁。

此时如果前驱线程完全释放掉了独占锁,则在unparkSuccessor中还没执行完置waitStatus为0的操作,也就是还没执行到下面唤醒后继线程的代码,否则后继线程会再去争取锁。

那么就算后继争用线程此时把自己阻塞了,也一定会马上被前驱线程唤醒。

那么是否可能持锁线程执行唤醒后继线程的逻辑时,后继线程读到前驱等待状态为SIGNAL把自己给阻塞,再也无法苏醒呢?

这个问题在上面的Q2中已经有答案了,确实可能在扫描后继需要唤醒线程时读不到新来的线程,但只要tryRelease语义实现正确,在true时表示完全释放独占锁,则后继线程理应能够tryAcquire成功,shouldParkAfterFailedAcquire在读到前驱状态不为SIGNAL会给当前线程再一次获取锁的机会的。

Q4: addWaiter(Node)enq(Node) 方法中新增一个节点时为什么要先将新节点的prev置为tail再尝试CAS,而不是CAS成功后来构造节点之间的双向链接?

答:双向链表目前没有基于CAS原子插入的手段,如果我们将node.prev = t和t.next = node(t为方法执行时读到的tail,引用封闭在栈上)放到compareAndSetTail(t, node)
成功后执行,如下所示:

if (compareAndSetTail(t, node)) {
   node.prev = t;
   t.next = node;
   return t;
}

会导致这一瞬间的tail也就是t的prev为null,这就使得这一瞬间队列处于一种不一致的中间状态。

Q5:PROPAGATE状态存在的意义

答:在setHeadAndPropagate中我们可以看到如下的一段代码

if (propagate > 0 || h == null || h.waitStatus < 0 ||
       (h = head) == null || h.waitStatus < 0) {
       Node s = node.next;
       if (s == null || s.isShared())
           doReleaseShared();
}

在PROPAGATE引入之前,之所以可能会出现线程hang住的情况,就是在于releaseShared有竞争的情况下,可能会有队列中处于等待状态的节点因为第一个线程完成释放唤醒,第二个线程获取到锁,但还没设置好head,又有新线程释放锁,但是读到老的head状态为0导致释放但不唤醒,最终后一个等待线程既没有被释放线程唤醒,也没有被持锁线程唤醒。

if (ws == Node.SIGNAL) {
      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;
      unparkSuccessor(h);
    }
    else if (ws == 0 &&
           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
      continue;
}

Q6:AQS维护了一个FIFO队列,它是如何保证在运行期间不发生内存泄露的?

AQS在无竞争条件下,甚至都不会new出head和tail节点。

线程成功获取锁时设置head节点的方法为 setHead,由于头节点的thread并不重要,此时会置 node的thread和 prev为null,完了之后还会置原先 head也就是线程对应 node的前驱的 next为null,从而实现队首元素的安全移出。

而在取消节点时,也会令 node.thread = null,在 node不为 tail的情况下,会使 node.next = node(之所以这样也是为了isOnSyncQueue实现更加简洁)

更多请移驾。。。

🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽

本文作者:Java技术债务
原文链接:https://www.cuizb.top/myblog/article/1659968359
版权声明: 本博客所有文章除特别声明外,均采用 CC BY 3.0 CN协议进行许可。转载请署名作者且注明文章出处。

🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽

Java并发编程之AQS以及源码解析
JVM内存泄漏和内存溢出的原因
JVM常用监控工具解释以及使用
Redis 常见面试题(一)
ClickHouse之MaterializeMySQL引擎(十)
三种实现分布式锁的实现与区别
线程池的理解以及使用
最近面试BAT,整理一份面试资料,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。想获取吗?如果你想提升自己,并且想和优秀的人一起进步,感兴趣的朋友,可以在扫码关注下方公众号。资料在公众号里静静的躺着呢。。。
Java并发编程之AQS以及源码解析
  • 喜欢就收藏
  • 认同就点赞
  • 支持就关注
  • 疑问就评论*
    一键四连,你的offer也四连**

Original: https://www.cnblogs.com/cuizb/p/16709533.html
Author: Java技术债务
Title: Java并发编程之AQS以及源码解析

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/576228/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球