这篇,我们的关注点是 AQS 最后的部分,AQS 共享模式的使用。有前两篇文章的铺垫,剩下的源码分析将会简单很多。
本文先用 CountDownLatch 将共享模式说清楚,然后顺着把其他 AQS 相关的类 CyclicBarrier、Semaphore 的源码一起过一下。
相对来说,如果读者有前面两篇文章的基础,这篇文章是简单很多。
CountDownLatch
CountDownLatch 这个类是比较典型的 AQS 的共享模式的使用,这是一个高频使用的类。latch 的中文意思是门栓、栅栏。
使用
我们看下 Doug Lea 在 java doc 中给出的例子,这个例子非常实用,我经常会写到这个代码。
假设我们有 N ( N > 0 ) 个任务,那么我们会用 N 来初始化一个 CountDownLatch,然后将这个 latch 的引用传递到各个线程中,在每个线程完成了任务后,调用 latch.countDown() 代表完成了一个任务。
调用 latch.await() 的方法的线程会阻塞,直到所有的任务完成。
1 | class Driver2 { |
所以说 CountDownLatch 非常实用,我们常常会将一个比较大的任务进行拆分,然后开启多个线程来执行,等所有线程都执行完了以后,再往下执行其他操作。这里例子中,只有 main 线程调用了 await 方法。
我们再来看另一个例子,这个例子很典型,用了两个 CountDownLatch:
1 | class Driver { |
这个例子中,doneSignal 同第一个例子的使用,我们说说这里的 startSignal。N 个新开启的线程都调用了startSignal.await() 进行阻塞等待,它们阻塞在栅栏上,只有当条件满足的时候(startSignal.countDown()),它们才能同时通过这个栅栏,目的是让所有的线程站在一个起跑线上。
"如果始终只有一个线程调用 await 方法等待任务完成,那么 CountDownLatch 就会简单很多,所以之后的源码分析读者一定要在脑海中构建出这么一个场景:有 m 个线程是做任务的,有 n 个线程在某个栅栏上等待这 m 个线程做完任务,直到所有 m 个任务完成后,n 个线程同时通过栅栏。
源码分析
"Talk is cheap, show me the code.
构造方法,需要传入一个不小于 0 的整数:
1 | public CountDownLatch(int count) { |
先分析套路:AQS 里面的 state 是一个整数值,这边用一个 int count 参数其实初始化就是设置了这个值,所有调用了 await 方法的等待线程会挂起,然后有其他一些线程会做 state = state - 1 操作,当 state 减到 0 的同时,那个将 state 减为 0 的线程会负责唤醒 所有调用了 await 方法的线程。都是套路啊,只是 Doug Lea 的套路很深,代码很巧妙,不然我们也没有要分析源码的必要。
对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。
countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,读者这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。
我们用以下程序来分析源码,t1 和 t2 负责调用 countDown() 方法,t3 和 t4 调用 await 方法阻塞:
1 | public class CountDownLatchDemo { |
上述程序,大概在过了 10 秒左右的时候,会输出:
1 | 线程 t3 从 await 中返回了 |
这两条输出,顺序不是绝对的
后面的分析,我们假设 t3 先进入阻塞队列
接下来,我们按照流程一步一步走:先 await 等待,然后被唤醒,await 方法返回。
首先,我们来看 await() 方法,它代表线程阻塞,等待 state 的值减为 0。
1 | public void await() throws InterruptedException { |
从方法名我们就可以看出,这个方法是获取共享锁,并且此方法是可中断的(中断的时候抛出 InterruptedException 退出这个方法)。
1 | private void doAcquireSharedInterruptibly(int arg) |
我们来仔细分析这个方法,线程 t3 经过第 1 步 addWaiter 入队以后,我们应该可以得到这个:
"由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,t3 将 head 的 waitStatus 值设置为 -1,如下:
"然后进入到 parkAndCheckInterrupt 的时候,t3 挂起。
我们再分析 t4 入队,t4 会将前驱节点 t3 所在节点的 waitStatus 设置为 -1,t4 入队后,应该是这样的:
"然后,t4 也挂起。接下来,t3 和 t4 就等待唤醒了。
接下来,我们来看唤醒的流程。为了让下面的示意图更丰富些,我们假设用 10 初始化 CountDownLatch。
"当然,我们的例子中,其实没有 10 个线程,只有 2 个线程 t1 和 t2,只是为了让图好看些罢了。
我们再一步步看具体的流程。首先,我们看 countDown() 方法:
1 | public void countDown() { |
countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:
1 | // 调用这个方法的时候,state == 0 |
一旦 t3 被唤醒后,我们继续回到 await 的这段代码,parkAndCheckInterrupt 返回,我们先不考虑中断的情况:
1 | private void doAcquireSharedInterruptibly(int arg) |
接下来,t3 会进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程:
1 | private void setHeadAndPropagate(Node node, int propagate) { |
又回到这个方法了,那么接下来,我们好好分析 doReleaseShared 这个方法,我们根据流程,头节点 head 此时是 t3 节点了:
1 | /** |
为什么h==head
退出循环?
- 在head没有变化的情况下,再次循环就没有意义,而且会导致死循环
- 如果head发生了变化,那么就可以唤醒新head的后继节点
我们分析下最后一个 if 语句,然后才能解释第一个 CAS 为什么可能会失败:
- h == head:说明头节点还没有被刚刚用 unparkSuccessor 唤醒的线程(这里可以理解为 t4)占有,此时 break 退出循环。
- h != head:头节点被刚刚唤醒的线程(这里可以理解为 t4)占有,那么这里重新进入下一轮循环,唤醒下一个节点(这里是 t4 )。我们知道,等到 t4 被唤醒后,其实是会主动唤醒 t5、t6、t7…,那为什么这里要进行下一个循环来唤醒 t5 呢?我觉得是出于吞吐量的考虑。
满足上面的 2 的场景,那么我们就能知道为什么上面的 CAS 操作 compareAndSetWaitStatus(h, Node.SIGNAL, 0) 会失败了?
因为当前进行 for 循环的线程到这里的时候,可能刚刚唤醒的线程 t4 也刚刚好到这里了,那么就有可能 CAS 失败了。
for 循环第一轮的时候会唤醒 t4,t4 醒后会将自己设置为头节点,如果在 t4 设置头节点后,for 循环才跑到 if (h == head),那么此时会返回 false,for 循环会进入下一轮。t4 唤醒后也会进入到这个方法里面,那么 for 循环第二轮和 t4 就有可能在这个 CAS 相遇,那么就只会有一个成功了。
PROPAGATE状态存在的意义
CyclicBarrier
字面意思是“可重复使用的栅栏”或“周期性的栅栏”,总之不是用了一次就没用了的,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。看如下示意图,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。
"首先,CyclicBarrier 的源码实现和 CountDownLatch 大相径庭,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。
因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。
先用一张图来描绘下 CyclicBarrier 里面的一些概念,和它的基本使用流程:
"看图我们也知道了,CyclicBarrier 的源码最重要的就是 await() 方法了。
大家先把图看完,然后我们开始源码分析:
1 | public class CyclicBarrier { |
首先,先看怎么开启新的一代:
1 | // 开启新的一代,当最后一个线程到达栅栏上的时候,调用这个方法来唤醒其他线程,同时初始化“下一代” |
开启新的一代,类似于重新实例化一个 CyclicBarrier 实例
看看怎么打破一个栅栏:
1 | private void breakBarrier() { |
这两个方法之后用得到,现在开始分析最重要的等待通过栅栏方法 await 方法:
1 | // 不带超时机制 |
继续往里看:
1 | private int dowait(boolean timed, long nanos) |
好了,我想我应该讲清楚了吧,我好像几乎没有漏掉任何一行代码吧?
下面开始收尾工作。
首先,我们看看怎么得到有多少个线程到了栅栏上,处于等待状态:
1 | public int getNumberWaiting() { |
判断一个栅栏是否被打破了,这个很简单,直接看 broken 的值即可:
1 | public boolean isBroken() { |
前面我们在说 await 的时候也几乎说清楚了,什么时候栅栏会被打破,总结如下:
- 中断,我们说了,如果某个等待的线程发生了中断,那么会打破栅栏,同时抛出 InterruptedException 异常;
- 超时,打破栅栏,同时抛出 TimeoutException 异常;
- 指定执行的操作抛出了异常,这个我们前面也说过。
最后,我们来看看怎么重置一个栅栏:
1 | public void reset() { |
我们设想一下,如果初始化时,指定了线程 parties = 4,前面有 3 个线程调用了 await 等待,在第 4 个线程调用 await 之前,我们调用 reset 方法,那么会发生什么?
首先,打破栅栏,那意味着所有等待的线程(3个等待的线程)会唤醒,await 方法会通过抛出 BrokenBarrierException 异常返回。然后开启新的一代,重置了 count 和 generation,相当于一切归零了。
怎么样,CyclicBarrier 源码很简单吧。
Semaphore
有了 CountDownLatch 的基础后,分析 Semaphore 会简单很多。Semaphore 是什么呢?它类似一个资源池(读者可以类比线程池),每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。
大概大家也可以猜到,Semaphore 其实也是 AQS 中共享锁的使用,因为每个线程共享一个池嘛。
套路解读:创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。
构造方法:
1 | public Semaphore(int permits) { |
这里和 ReentrantLock 类似,用了公平策略和非公平策略。
看 acquire 方法:
1 | public void acquire() throws InterruptedException { |
这几个方法也是老套路了,大家基本都懂了吧,这边多了两个可以传参的 acquire 方法,不过大家也都懂的吧,如果我们需要一次获取超过一个的资源,会用得着这个的。
我们接下来看不抛出 InterruptedException 异常的 acquireUninterruptibly() 方法吧:
1 | public void acquireUninterruptibly() { |
前面说了,Semaphore 分公平策略和非公平策略,我们对比一下两个 tryAcquireShared 方法:
1 | // 公平策略: |
也是老套路了,所以从源码分析角度的话,我们其实不太需要关心是不是公平策略还是非公平策略,它们的区别往往就那么一两行。
我们再回到 acquireShared 方法,
1 | public final void acquireShared(int arg) { |
由于 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,虽然贴了很多代码,不在乎多这点了:
1 | private void doAcquireShared(int arg) { |
这个方法我就不介绍了,线程挂起后等待有资源被 release 出来。接下来,我们就要看 release 的方法了:
1 | // 任务介绍,释放一个资源 |
tryReleaseShared 方法总是会返回 true,然后是 doReleaseShared,这个也是我们熟悉的方法了,我就贴下代码,不分析了,这个方法用于唤醒所有的等待线程:
1 | private void doReleaseShared() { |
Semphore 的源码确实很简单,基本上都是分析过的老代码的组合使用了。
参考
- 分析的彻底:AbstractQueuedSynchronizer源码解读,涉及到PROPAGATE状态存在的意义
- 分析的详细: 逐行分析AQS源码(3)——共享锁的获取与释放
- 原文:一行一行源码分析清楚 AbstractQueuedSynchronizer (三)