Java进阶之深入理解队列同步器AbstractQueuedSynchronizer(AQS)

1 AQS概述

1.1 介绍

AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面:
在这里插入图片描述
AQS是一个用来构建锁和同步器的框架,使用AQS能构造出应用广泛的同步器,比如:Semaphore\CountDownLatch\ReentrantLock\ReentrantReadWriteLock\SynchronousQueue\FutureTask。我们也能利用AQS构造出符合我们自己需求的同步器。

1.2 基本原理

AQS通过内部实现的FIFO等待队列来完成资源获取线程的等待工作,如果当前线程获取资源失败,AQS则会将当前线程以及等待状态等信息构造成一个Node结构的节点,并将其加入等待队列中进行自旋,同时会阻塞当前线程;当其它获取到资源的线程释放持有的资源时,则会把等待队列节点(头节点的后继节点)中的线程唤醒,使其再次尝试获取对应资源。

2 AQS的学习链接

Java并发编程的艺术:P121-136/P189-199

Java并发之AQS详解

并发编程面试必备:AQS 原理以及 AQS 同步组件总结

高并发编程-AQS深入解析

3 AQS框架

3.1 基本架构图

在这里插入图片描述
CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,提供先来先服务的公平性。申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

3.2 资源state

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列),state的访问方式有三种:

// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;
// 返回同步状态的当前值
protected final int getState() {  
        return state;
}
// 设置同步状态的值
protected final void setState(int newState) { 
        state = newState;
}
// 原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

3.3 AQS定义两种资源共享方式

(1)Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
①公平锁:按照线程在队列中的排队顺序,先到者先拿到锁;
②非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

(2)Share(共享):多个线程可同时执行,如Semaphore\CountDownLatch\CyclicBarrier\ReadWriteLock。

3.4 如何自定义同步器

(1)不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

isHeldExclusively() // 该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int) // 独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int) // 独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int) // 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int) // 共享方式。尝试释放资源,成功则返回true,失败则返回false。

(2)一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

4 AQS源码详解

本节开始讲解AQS的源码实现。依照内部定义的Node类、acquire-release、acquireShared-releaseShared的次序来。

4.1 内部定义的Node类

(1)Node类构造函数

static final class Node {
        //声明共享模式下的等待节点
        static final Node SHARED = new Node();
        //声明独占模式下的等待节点
        static final Node EXCLUSIVE = null;

        //waitStatus的一常量值,表示线程已取消
        static final int CANCELLED =  1;
        //waitStatus的一常量值,表示后继线程需要取消挂起
        static final int SIGNAL    = -1;
        //waitStatus的一常量值,表示线程正在等待条件
        static final int CONDITION = -2;
        //waitStatus的一常量值,表示下一个acquireShared应无条件传播
        static final int PROPAGATE = -3;

        //waitStatus,其值只能为CANCELLED、SIGNAL、CONDITION、PROPAGATE或0
        //初始值为0
        volatile int waitStatus;

        //前驱节点
        volatile Node prev;

        //后继节点
        volatile Node next;

        //当前节点的线程,在节点初始化时赋值,使用后为null
        volatile Thread thread;

        //下一个等待节点
        Node nextWaiter;

        Node() { 
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

(2)AbstractQueuedSynchronizer的三个重要属性:

// 等待队列的头结点
private transient volatile Node head;
// 等待队列的尾节点
private transient volatile Node tail;
// 同步状态,这个很重要
private volatile int state;

(3)得到同步队列的基本结构:
在这里插入图片描述

4.2 acquire(int)独占方式同步状态获取流程

4.2.1 总调用流程

在这里插入图片描述

4.2.3

在这里插入图片描述

4.2.4

在这里插入图片描述
在这里插入图片描述

4.2.5

(1)unparkSuccessor()
用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使tryAcquire(arg)==false也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋tryAcquire(arg)就为true啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了。
(2)acquireQueued()
进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。跟医院排队拿号有点相似,在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。

版权声明:本文为chenliguan原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/chenliguan/article/details/102764558