【Java 多线程并发】 AQS(Abstract Queued Synchronizer)

Metadata

title: 【Java 多线程并发】 AQS(Abstract Queued Synchronizer)
date: 2023-07-04 08:31
tags:
  - 行动阶段/完成
  - 主题场景/程序
  - 笔记空间/KnowladgeSpace/ProgramSpace/BasicsSpace
  - 细化主题/Java/多线程并发
categories:
  - Java
keywords:
  - Java/多线程并发
description: 【Java 多线程并发】 AQS(Abstract Queued Synchronizer)

【Java 多线程并发】 AQS(Abstract Queued Synchronizer)

Java 并发包(JUC)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,譬如 ReentrangLock、Semaphore,它们的实现都用到了一个共同的基类 –AbstractQueuedSynchronizer, 简称 AQS(抽象队列同步器)。AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。

基本实现原理

AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。

private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过 procted 类型的 getStatesetStatecompareAndSetState 进行操作。 state 表示被占用资源的线程个数,即大于 0 表示被占用状态

AQS 支持两种同步方式:

1. 独占式
2. 共享式

这样方便使用者实现不同类型的同步组件,独占式如 ReentrantLock,共享式如 Semaphore,CountDownLatch,组合式的如 ReentrantReadWriteLock。总之,AQS 为使用提供了底层支撑,如何组装实现,使用者可以自由发挥。

同步器的设计是基于模板方法模式的,一般的使用方式是这样:

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这其实是模板方法模式的一个很经典的应用。

我们来看看 AQS 定义的这些可重写的方法:

  • protected boolean tryAcquire(int arg) : 独占式获取同步状态,试着获取,成功返回 true,反之为 false
  • protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;
  • protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于 0,代表获取成功;反之获取失败;
  • protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为 true,失败为 false
  • protected boolean isHeldExclusively() : 是否在独占模式下被线程占用。

关于 AQS 的使用,我们来简单总结一下:

如何使用

首先,我们需要去继承 AbstractQueuedSynchronizer 这个类,然后我们根据我们的需求去重写相应的方法,比如要实现一个独占锁,那就去重写 tryAcquire,tryRelease 方法,要实现共享锁,就去重写 tryAcquireShared,tryReleaseShared;最后,在我们的组件中调用 AQS 中的模板方法就可以了,而这些模板方法是会自动调用到我们之前重写的那些方法的。也就是说,我们只需要很小的工作量就可以实现自己的同步组件,重写的那些方法,仅仅是一些简单的对于共享资源 state 的获取和释放操作,至于像是获取资源失败,线程需要阻塞之类的操作,自然是 AQS 帮我们完成了。

设计思想

对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞 / 唤醒等一系列复杂的实现,这些都在 AQS 中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取 / 释放共享资源 state 的姿势 T_T。很经典的模板方法设计模式的应用,AQS 为我们定义好顶级逻辑的骨架,并提取出公用的线程入队列 / 出队列,阻塞 / 唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现即可。

自定义同步器

同步器代码实现

上面大概讲了一些关于 AQS 如何使用的理论性的东西,接下来,我们就来看下实际如何使用,直接采用 JDK 官方文档中的小例子来说明问题

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class Mutex implements java.io.Serializable {
    //静态内部类,继承AQS
    private static class Sync extends AbstractQueuedSynchronizer {
        //是否处于占用状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        //当状态为0的时候获取锁,CAS操作成功,则state状态为1,
        public boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //释放锁,将同步状态置为0
        protected boolean tryRelease(int releases) {
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }
 
    //同步对象完成一系列复杂的操作,我们仅需指向它即可
    private final Sync sync = new Sync();
    //加锁操作,代理到acquire(模板方法)上就行,acquire会调用我们重写的tryAcquire方法
    public void lock() {
        sync.acquire(1);
    }
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    //释放锁,代理到release(模板方法)上就行,release会调用我们重写的tryRelease方法。
    public void unlock() {
        sync.release(1);
    }
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

同步器代码测试

测试下这个自定义的同步器,我们使用之前文章中做过的并发环境下 a++ 的例子来说明问题(a++ 的原子性其实最好使用原子类 AtomicInteger 来解决,此处用 Mutex 有点大炮打蚊子的意味,好在能说明问题就好)

import java.util.concurrent.CyclicBarrier;  // 线程栅栏类
public class TestMutex {
    private static CyclicBarrier barrier = new CyclicBarrier(31); // 包括主线程一共31个线程
    private static int a = 0;
    private static  Mutex mutex = new Mutex();
    public static void main(String []args) throws Exception {
        //说明:我们启用30个线程,每个线程对i自加10000次,同步正常的话,最终结果应为300000;
        //未加锁前
        for(int i=0;i<30;i++){
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int i=0;i<10000;i++){
                        increment1();//没有同步措施的a++;
                    }
                    try {
                        barrier.await();//等30个线程累加完毕
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
        barrier.await();
        System.out.println("加锁前,a="+a);
 
        //加锁后
        barrier.reset();//重置CyclicBarrier
        a=0;
        for(int i=0;i<30;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int i=0;i<10000;i++){
                        increment2();//a++采用Mutex进行同步处理
                    }
                    try {
                        barrier.await();//等30个线程累加完毕
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        barrier.await();
        System.out.println("加锁后,a="+a);
    }
    /**
     * 没有同步措施的a++
     * @return
     */
    public static void increment1(){
        a++;
    }
    /**
     * 使用自定义的Mutex进行同步处理的a++
     */
    public static void increment2(){
        mutex.lock();
        a++;
        mutex.unlock();
    }
}

测试结果:

加锁前,a=279204
加锁后,a=300000

源码分析

我们先来简单描述下AQS的基本实现,前面我们提到过,AQS维护一个共享资源state,通过内置的FIFO来完成获取资源线程的排队工作。(这个内置的同步队列称为”CLH”队列)。该队列由一个一个的Node结点组成,每个Node结点维护一个prev引用和next引用,分别指向自己的前驱和后继结点。AQS维护两个指针,分别指向队列头部head和尾部tail。

undefined

总结

关于AQS的介绍及源码分析到此为止了。

AQS是JUC中很多同步组件的构建基础,简单来讲,它内部实现主要是状态变量state和一个FIFO队列来完成,同步队列的头结点是当前获取到同步状态的结点,获取同步状态state失败的线程,会被构造成一个结点(或共享式或独占式)加入到同步队列尾部(采用自旋CAS来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头结点的后继结点,使其加入对同步状态的争夺中。

AQS为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需重写tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。