1. 背景与简介
Future是Java执行异步任务时的常用接口。我们通常会往ExecutorService
中提交一个Callable/Runnable
并得到一个Future对象,Future对象表示异步计算的结果,支持获取结果,取消计算等操作。在Java提供的Executor框架中,Future的默认实现为java.util.concurrent.FutureTask
。本文针对FutureTask
的源码进行分析与解读。
可以看到,FutureTask实现了RunnableFuture, 而RunnableFuture的JavaDoc对Runnable接口的run方法有了更精确的描述:run方法将该Future设置为计算的结果,除非计算被取消。
2. 源码解读
下面开始针对FutureTask的实现源码进行解读。
2.1 生命周期状态
FutureTask内置一个被volatile修饰的state变量。
按照生命周期的阶段可以分为:
- NEW 初始状态
- COMPLETING 任务已经执行完(正常或者异常),准备赋值结果
- NORMAL 任务已经正常执行完,并已将任务返回值赋值到结果
- EXCEPTIONAL 任务执行失败,并将异常赋值到结果
- CANCELLED 取消
- INTERRUPTING 准备尝试中断执行任务的线程
- INTERRUPTED 对执行任务的线程进行中断(未必中断到)
这里先给出自制的状态流转图。
可以看到NEW为起始状态,而NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTED这些状态为终止状态,而COMPLETING和INTERRUPTING为中间暂时状态。
2.2 内部结构
Callable callable`
内部封装的Callable对象。如果通过构造函数传的是Runnable对象,FutureTask会通过调用Executors#callable
,把Runnable对象封装成一个callable。Object outcome
用于保存计算结果或者异常信息。volatile Thread runner
用来运行callable的线程。volatile WaitNode waiters
FutureTask中用了Trieber Stack来保存等待的线程。
2.3 run方法
1 | public void run() { |
2.4 get方法
1 | public V get() throws InterruptedException, ExecutionException { |
2.5 cancel方法
1 | public boolean cancel(boolean mayInterruptIfRunning) { |
3. FutureTask存在的问题
至此已经将FutureTask的源码解读分析完毕,在读过源码之后,我个人认为JDK8u111的FutureTask源码存在两个问题,目前还需要进一步确认。
3.1 cancel(true)调用interrupt的线程对象
FutureTask的run方法的进入条件是
1 | state == NEW && UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) |
假设有两个线程A和B调用run方法,线程C调用cancel方法。
时刻1: 线程A和B同时读到state == NEW。
时刻2: 线程A成功对runner变量CAS进入run方法主体。
时刻3: 线程C调用cancel方法,成功将状态CAS为CANCELLED。
时刻4: 线程A调用finally中的runner = null。
时刻5: 线程B开始执行run方法第一句if的后半句,成功将runner变量CAS到线程B。
时刻6: 线程C读到runner为线程B,准备对线程B进行interrupt()
时刻7: 线程A调用handlePossibleCancellationInterrupt等待状态从INTERRUPTING流转至INTERRUPTED。
时刻8: 线程B被中断。
这里的问题是,调用cancel方法的线程C中断的是实质上没有对callable进行call调用的线程B,而线程A还试图防止中断操作逃逸出run方法。
这个东西在Future的JavaDoc上说了很含糊,如下所示:
1 | * true} if the thread executing this mayInterruptIfRunning { |
上面的情况到底线程A和B哪个算是the thread executing this task
说不清。
3.2 内存占用问题
通过阅读源码,发现FutureTask还是存在一个隐形的内存占用问题的,或者按照《Effective Java》上说的应该叫无意识的对象保留。
这个问题就是在FutureTask计算完成后,可能内部用于保存等待线程的栈留有一些已经无用的等待节点。
时刻1: 某线程调用get,已经入等待栈,此时waiters为该线程对应节点。
时刻2: 有大量线程通过调用get试图获取计算结果,get -> awaitDone方法中,经过两轮循环都读到状态是NEW的话,此时它们节点已经被初始化过了,但还没开始入队。
时刻3: 有线程调用run方法,通过run -> set -> finishCompletion,将waiters置为null,并唤醒了已经入栈的那个线程。
时刻4: 调用awaitDone方法的那些线程再试图入队的话,后面循环会发现状态已经是NORMAL了,但是waiters栈此时不为空,而且再也没法被清掉了。
这样下来,该FutureTask内部可能会留有一些的无效节点。具体会留多少实际上取决于那个瞬间有多少线程准备执行以及多少能够成功CAS。
1 | queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); |