大纲
1.FutureTask(Future/Callable)的使用例子
2.FutureTask(Future/Callable)的实现原理
3.FutureTask(Future/Callable)的源码分析
4.CompletableFuture的基本介绍
5.CompletionStage方法及作用说明
6.CompletableFuture的实现原理分析
7.CompletableFuture的核心源码分析
1.FutureTask(Future/Callable)的使用例子
Future/Callable实现了一个异步执行并带有返回结果的功能。Future表示获取一个异步执行的结果,Callable表示一个异步执行的任务,Callable会产生一个结果并给到Future。
Future/Callable的使用例子如下:
public class FutureCallableExample {
static class CalculationCallable implements Callable {
private int x;
private int y;
public CalculationCallable(int x, int y) {
this.x = x;
this.y = y;
}
@Override
public Integer call() throws Exception {
System.out.println("开始执行:" + new Date());
TimeUnit.SECONDS.sleep(2);//模拟任务执行的耗时
return x + y;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CalculationCallable calculationCallable = new CalculationCallable(1, 2);
FutureTask futureTask = new FutureTask<>(calculationCallable);
new Thread(futureTask).start();
System.out.println("开始执行futureTask:" + new Date());
Integer rs = futureTask.get();
System.out.println("执行结果:" + rs);
System.out.println("结束执行futureTask:" + new Date());
}
}
首先定义一个CalculationCallable类。该类实现了Callable接口,并重写了call()方法,它的功能就是定义一个具有返回值的任务。
然后用FutureTask声明一个带有返回值的任务,把CalculationCallable作为构造参数传递进去。
FutureTask实现了Future接口和Runnable接口。我们知道线程执行完之后是不可能获得一个返回值的。Future之所以能够获得返回值,是因为在线程执行中做了相关处理。FutureTask就是用来获得线程执行结果的。
接着把FutureTask作为一个任务传入Thread的构造方法,让线程去执行。FutureTask既然实现了Runnable接口,创建FutureTask时又把实现了Callable接口的任务传递到其构造方法中,那么FutureTask的run()方法中会调用Callable接口的call()方法的实现,最终在获得返回值之后保存到某个属性中。
最后使用FutureTask.get()方法来获得返回值,这个get()方法是个阻塞方法。当线程还没有执行完FutureTask之前,主线程会阻塞在get()方法中。直到FutureTask执行结束,主线程才会被唤醒。
2.FutureTask(Future/Callable)的实现原理
(1)FutureTask的类关系
(2)FutureTask的实现核心
(1)FutureTask的类关系
Runnable接口的实现可以被线程执行,Future接口提供了获取线程执行结果的方法。RunnableFuture接口同时继承了Runnable接口和Future接口,而FutureTask类则实现了RunnableFuture接口。
创建FutureTask类实例时,会传入Callable接口的实现类实例作为构造参数,也就是FutureTask类会封装Callable接口的实现类。这样在启动线程后执行FutureTask类重写Runnable接口的run()方法时,FutureTask类实例就会把执行Callable接口call()方法的运行结果保存起来,然后通过Future接口提供的get()方法来获取运行结果。
一.FutureTask的类关系源码
//A cancellable asynchronous computation.
//This class provides a base implementation of Future, with methods to start and cancel a computation,
//query to see if the computation is complete, and retrieve the result of the computation.
//The result can only be retrieved when the computation has completed;
//the get methods will block if the computation has not yet completed.
//Once the computation has completed, the computation cannot be restarted or cancelled
//(unless the computation is invoked using #runAndReset).
//A FutureTask can be used to wrap a Callable or Runnable object.
//Because FutureTask implements Runnable,
//a FutureTask can be submitted to an Executor for execution.
//In addition to serving as a standalone class,
//this class provides protected functionality that may be useful when creating customized task classes.
public class FutureTask implements RunnableFuture {
...
//Creates a FutureTask that will, upon running, execute the given Callable.
public FutureTask(Callable callable) {
if (callable == null) {
throw new NullPointerException();
}
this.callable = callable;
this.state = NEW;//ensure visibility of callable
}
...
}
//A Future that is Runnable.
//Successful execution of the run method causes completion of the Future and allows access to its results.
public interface RunnableFuture extends Runnable, Future {
//Sets this Future to the result of its computation unless it has been cancelled.
void run();
}
//A Future represents the result of an asynchronous computation.
//Methods are provided to check if the computation is complete,
//to wait for its completion, and to retrieve the result of the computation.
//The result can only be retrieved using method get when the computation has completed,
//blocking if necessary until it is ready.
//Cancellation is performed by the cancel method.
//Additional methods are provided to determine if the task completed normally or was cancelled.
//Once a computation has completed, the computation cannot be cancelled.
//If you would like to use a Future for the sake of cancellability but not provide a usable result,
//you can declare types of the form Future> and return null as a result of the underlying task.
public interface Future {
//用来取消任务,取消成功则返回true,取消失败则返回false
//mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务
//如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false
//如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false
//如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true
boolean cancel(boolean mayInterruptIfRunning);
//表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true
boolean isCancelled();
//表示任务是否已经完成,若任务完成,则返回true
boolean isDone();
//获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果
V get() throws InterruptedException, ExecutionException;
//获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
二.FutureTask的类关系图
(2)FutureTask的实现核心
一.FutureTask本身是一个线程
通过new Thread(new FutureTask(callable)).start()来启动,必然会执行FutureTask实现Runnable接口的run()方法。
二.Runnable接口的run()方法是没有返回值的
实际上返回值是由Callable接口的call()方法提供,所以调用FutureTask的run()方法,会触发调用Callable的call()方法。
三.通过Future接口的get()方法阻塞式获得返回值
如果在FutureTask的run()方法中调用Callable接口的call()方法执行任务时,需要比较长的时间,那么为了能够正确获得返回值,Future接口的get()方法必须实现阻塞,直到call()方法执行完毕。
四.需要一个队列来保存阻塞的线程
涉及线程阻塞和唤醒,要使用LockSupport来阻塞和唤醒队列中的线程。
3.FutureTask(Future/Callable)的源码分析
(1)FutureTask的核心属性
(2)FutureTask的run()方法
(3)FutureTask的get()方法
(4)FutureTask的finishCompletion()方法
(5)FutureTask的实现原理总结
(1)FutureTask的核心属性
一.state
代表任务在运行过程中的状态(7种)。
二.callable
当前要执行的任务。
三.outcome
任务的执行结果,通过Future.get()获取的值。
四.runner
当前执行callable任务的线程。
五.waiter
用来保存所有等待任务执行结束的线程的单向链表。
public class FutureTask implements RunnableFuture {
//The run state of this task, initially NEW.
//The run state transitions to a terminal state only in methods set, setException, and cancel.
//During completion, state may take on transient values of COMPLETING (while outcome is being set)
//or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)).
//Transitions from these intermediate to final states use cheaper ordered/lazy writes
//because values are unique and cannot be further modified.
//Possible state transitions:
//NEW(初始状态) -> COMPLETING(正在设置任务结果) -> NORMAL,这是任务正常执行完毕时状态的变更流程
//NEW(初始状态) -> COMPLETING(正在设置任务结果) -> EXCEPTIONAL,这是任务执行异常时状态的变更流程
//NEW(初始状态) -> CANCELLED(任务被取消),这是调用了Future.cancel()方法
//NEW(初始状态) -> INTERRUPTING(正在中断执行任务的线程) -> INTERRUPTED(任务被中断)
//代表任务在运行过程中的状态(7种)
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
//The underlying callable; nulled out after running
//当前要执行的任务
private Callable callable;
//The result to return or exception to throw from get()
//任务的执行结果,通过Future.get()获取的值
private Object outcome;
//The thread running the callable; CASed during run()
//当前执行callable任务的线程
private volatile Thread runner;
//Treiber stack of waiting threads
//用来保存所有等待任务执行结束的线程的单向链表
private volatile WaitNode waiters;
...
}
(2)FutureTask的run()方法
使用线程来执行FutureTask任务时,比如new Thread(new FutureTask(callable)).start(),会回调FutureTask的run()方法。
FutureTask的run()方法的执行流程如下:
首先判断当前状态是否为NEW,并使用CAS设置runner属性为当前线程。如果当前状态不是NEW或者CAS设置失败,则说明已经有其他线程正在执行当前任务了,于是直接返回。然后获取通过构造方法传入的Callable接口的实现类实例callable,接着调用Callable接口的实现类实例callable中的call()方法获得执行结果,最后调用FutureTask的set()方法把执行结果保存到outcome属性中。
public class FutureTask implements RunnableFuture {
...
//代表任务在运行过程中的状态(7种)
private volatile int state;
//当前要执行的任务
private Callable callable;
//任务的执行结果,通过Future.get()获取的值
private Object outcome;
//当前执行callable任务的线程
private volatile Thread runner;
//用来保存所有等待任务执行结束的线程的单向链表
private volatile WaitNode waiters;
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
public void run() {
//首先判断当前状态是否为NEW,并使用CAS把runner属性设置为当前线程
//如果当前状态不是NEW或者CAS设置失败,说明已经有其他线程正在执行当前任务了,于是直接返回
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {
return;
}
try {
//获取通过构造方法传入的Callable接口的实现类实例callable
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//然后调用callable中的call()方法获得执行结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran) {
//调用set()方法把执行结果保存到outcome属性中
set(result);
}
}
} finally {
//runner must be non-null until state is settled to prevent concurrent calls to run()
runner = null;
//state must be re-read after nulling runner to prevent leaked interrupts
int s = state;
if (s >= INTERRUPTING) {
handlePossibleCancellationInterrupt(s);
}
}
}
//Sets the result of this future to the given value unless this future has already been set or has been cancelled.
//This method is invoked internally by the run method upon successful completion of the computation.
protected void set(V v) {
//CAS修改任务状态为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//把调用call()方法获取到的结果保存到outcome
outcome = v;
//CAS修改任务状态为NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
...
}
(3)FutureTask的get()方法
FutureTask的get()方法的逻辑很简单,如果当前状态不是COMPLETING,就调用awaitDone()方法让当前线程阻塞等待,直到任务执行完成。其中awaitDone()方法的返回值表示任务的状态,当任务进入终止状态后,会调用reports()方法,根据状态类型来决定是返回运行结果还是抛异常。
在FutureTask的awaitDone()方法中会进行自旋。首先如果检测到线程被中断,则把加入等待队列中的线程移除。然后如果发现任务已经进入终止状态,则直接返回任务状态。如果任务正在设置执行结果,则通过Thread.yield()让出当前线程的CPU资源。
当FutureTask.awaitDone()方法第一次调用时,在第一次for循环中会初始化一个WaitNode结点,这个WaitNode结点便保存了调用FutureTask.get()方法的线程。在第二次for循环中会通过CAS按头插法将WaitNode结点插入waiters链表。在之后的for循环中,也就是当前线程已经加入了等待队列后,如果发现任务还没有执行完成,则通过LockSupport的方法阻塞线程。
注意,被阻塞的线程在如下两种情况下会被唤醒:
一.任务执行完成后,在set()方法中调用finishCompletion()方法
二.线程被中断,在awaitDone()方法中执行中断检测if(Thread.interrupted())
public class FutureTask implements RunnableFuture {
...
//代表任务在运行过程中的状态(7种)
private volatile int state;
//当前要执行的任务
private Callable callable;
//任务的执行结果,通过Future.get()获取的值
private Object outcome;
//当前执行callable任务的线程
private volatile Thread runner;
//用来保存所有等待任务执行结束的线程的单向链表
private volatile WaitNode waiters;
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= completing s='awaitDone(false,' 0l return reports awaits completion or aborts interrupt or timeout. param timed true if use timed waits param nanos time to wait if timed return state upon completion private int awaitdoneboolean timed long nanos throws interruptedexception timed final long deadline='timed' system.nanotime nanos : 0l waitnode q='null;' boolean queued='false;' for if thread.interrupted removewaiterq throw new interruptedexception int s='state;' if s> COMPLETING) {
if (q != null) {
q.thread = null;
}
return s;
} else if (s == COMPLETING) {// cannot time out yet
//如果任务正在设置执行结果,则通过Thread.yield()让出当前线程的CPU资源
Thread.yield();
} else if (q == null) {
//awaitDone()方法被第一次调用时,q == null为true
//此时会初始化一个WaitNode结点并赋值给q,这个WaitNode结点保存了调用FutureTask.get()的线程
q = new WaitNode();
} else if (!queued) {
//awaitDone()方法被第一次调用时,进入的第二次for循环
//便会通过CAS将q结点按头插法插入waiters单向链表中
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
} else if (timed) {
//如果当前线程加入等待队列后,任务还没有执行完成,则通过LockSupport的方法阻塞线程
nanos = deadline - System.nanoTime();
if (nanos <= 0l removewaiterq return state locksupport.parknanosthis nanos else locksupport.parkthis returns result or throws exception for completed task. param s completed state value suppresswarningsunchecked private v reportint s throws executionexception object x='outcome;' if s='= NORMAL)' return vx if s>= CANCELLED) {
throw new CancellationException();
}
throw new ExecutionException((Throwable)x);
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() {
thread = Thread.currentThread();
}
}
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) {//restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null) {
pred = q;
} else if (pred != null) {
pred.next = s;
if (pred.thread == null) {// check for race
continue retry;
}
} else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) {
continue retry;
}
}
break;
}
}
}
...
}
(4)FutureTask的finishCompletion()方法
当Callable任务执行完成后,FutureTask的set()方法会调用finishCompletion()方法唤醒链表中的阻塞线程。
public class FutureTask implements RunnableFuture {
...
//代表任务在运行过程中的状态(7种)
private volatile int state;
//当前要执行的任务
private Callable callable;
//任务的执行结果,通过Future.get()获取的值
private Object outcome;
//当前执行callable任务的线程
private volatile Thread runner;
//用来保存所有等待任务执行结束的线程的单向链表
private volatile WaitNode waiters;
...
//Removes and signals all waiting threads, invokes done(), and nulls out callable.
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null) {
break;
}
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null;
}
...
}
(5)FutureTask的实现原理总结
FutureTask实现了Runnable和Future接口,FutureTask表示一个带有状态及执行结果的任务,而任务执行结果的获取是基于阻塞的方式来实现的。
在Callable接口的call()方法没有返回结果之前,其他线程调用FutureTask的get()方法获取结果时,FutureTask会构建一个waiters链表,把当前线程存储到链表中并通过LockSupport进行阻塞,直到call()方法返回后把结果设置到outcome属性以及唤醒阻塞的线程。
(6)FutureTask的局限性
局限性一:
在获取异步任务的执行结果时,要么调用get()方法阻塞等待返回结果,要么耗费CPU资源通过轮询调用FutureTask.isDone()方法来判断任务的执行状态,然后再调用get()方法获取返回结果。
局限性二:
FutureTask没有提供通知机制,没有办法知道任务什么时候执行完成。
4.CompletableFuture的基本介绍
(1)CompletableFuture的介绍
(2)CompletableFuture的类关系图
(3)CompletableFuture的方法说明
(1)CompletableFuture的介绍
CompletableFuture针对Future做了改进,也就是在异步任务执行完成后,主线程如果需要依赖该任务的执行结果来继续后面的操作,则可以不用通过等待来实现,只需向CompletableFuture传入一个回调对象。当异步任务执行完毕后,便会自动调用该回调对象(异步回调通知功能)。
CompletableFuture还提供了非常强大的功能。对于回调对象的执行,可以放到非任务线程中,也可以放到任务线程中。CompletableFuture提供了函数式编程能力,简化了异步编程的复杂性。还提供了多个CompletableFuture的组合与转化功能。
(2)CompletableFuture的类关系
CompletableFuture类实现了Future和CompletionStage这两个接口,其中Future接口提供了获取任务执行结果及任务执行状态的功能,CompletionStage接口表示任务执行的一个阶段。CompletionStage接口定义了很多方法,比如thenApply()、thenAccept()等。通过这些方法可以实现多个任务之间的时序关系,比如串行、并行、聚合等。
因此CompletableFuture既提供了Future阻塞式获取结果 + 任务状态的功能,也提供了CompletionStage的任务执行后触发回调 + 多个任务聚合的功能。
(3)CompletableFuture的方法说明
一.构建CompletableFuture的静态方法
二.runAsync()和supplyAsync()静态方法
三.allOf()和anyOf()静态方法
四.主动获取任务执行结果的方法
一.构建CompletableFuture的静态方法
CompletableFuture提供了4个静态方法来构建一个异步事件。由于传递进CompletableFuture这4个方法的任务需要异步执行,所以默认会使用ForkJoinPool.commonPool()提供的线程池来执行异步任务,当然也可以自定义一个线程池传入这些静态方法来执行异步任务。
//A Future that may be explicitly completed (setting its value and status),
//and may be used as a CompletionStage,
//supporting dependent functions and actions that trigger upon its completion.
public class CompletableFuture implements Future, CompletionStage {
...
//Returns a new CompletableFuture that is asynchronously completed
//by a task running in the ForkJoinPool#commonPool()
//with the value obtained by calling the given Supplier.
//@param supplier a function returning the value to be used to complete the returned CompletableFuture
//@param the function's return type
//@return the new CompletableFuture
//带有返回值的异步执行方法,传入一个函数式接口,返回一个新的CompletableFuture对象
//默认使用ForkJoinPool.commonPool()作为线程池执行异步任务
public static CompletableFuture supplyAsync(Supplier supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
//Returns a new CompletableFuture that is asynchronously completed
//by a task running in the given executor with the value obtained by calling the given Supplier.
//@param supplier a function returning the value to be used to complete the returned CompletableFuture
//@param executor the executor to use for asynchronous execution
//@param the function's return type
//@return the new CompletableFuture
//带有返回值的异步执行方法,传入一个函数式接口 + 一个线程池,返回一个新的CompletableFuture对象
//多了一个Executor参数,表示使用自定义线程池来执行任务
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
static CompletableFuture asyncSupplyStage(Executor e, Supplier f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
e.execute(new AsyncSupply(d, f));
return d;
}
//Returns a new CompletableFuture that is asynchronously completed
//by a task running in the ForkJoinPool#commonPool() after it runs the given action.
//@param runnable the action to run before completing the returned CompletableFuture
//@return the new CompletableFuture
//不带返回值的异步执行方法,传入一个Runnable参数,返回一个新的CompletableFuture对象
//默认使用ForkJoinPool.commonPool()作为线程池执行异步任务
public static CompletableFuture runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
//Returns a new CompletableFuture that is asynchronously completed
//by a task running in the given executor after it runs the given action.
//@param runnable the action to run before completing the returned CompletableFuture
//@param executor the executor to use for asynchronous execution
//@return the new CompletableFuture
//不带返回值的异步执行方法,传入一个Runnable参数 + 一个线程池,返回一个新的CompletableFuture对象
//多了一个Executor参数,表示使用自定义线程池来执行任务
public static CompletableFuture runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
static CompletableFuture asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
e.execute(new AsyncRun(d, f));
return d;
}
...
}
二.runAsync()和supplyAsync()静态方法
下面使用runAsync()方法来构建一个异步执行事件,由于runAsync()方法是没有返回值的,所以get()这个阻塞等待任务执行完成的方法返回的还是null。
CompletableFuture cf = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":异步执行一个任务");
});
cf.get();//阻塞等待任务执行完成
下面使用supplyAsync()方法来构建一个异步执行事件,由于supplyAsync()方法具有返回值,所以get()方法会返回"supplyAsync"。
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync");
});
cf.get();//阻塞等待任务执行完成
三.allOf()和anyOf()静态方法
allOf()方法接收多个CompletableFuture的无返回值的任务。当所有的任务都执行结束后,返回一个新的CompletableFuture对象。
allOf()方法相当于实现了等待多个任务执行结束后再返回的功能,并且接收的CompletableFuture任务是通过runAsync()方法构建的。之所以无返回值,是因为当多个任务都具有返回值时get()方法不知取哪个。
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture v1 = CompletableFuture.runAsync(() -> {
System.out.println("任务v1没有返回值");
});
CompletableFuture v2 = CompletableFuture.runAsync(() -> {
System.out.println("任务v2没有返回值");
});
//通过join()方法让主线程阻塞等待allOf()方法中的所有任务都执行完成后再继续执行
CompletableFuture.allOf(v1, v2).join();
}
}
anyOf()方法接收多个CompletableFuture的带有返回值的任务。当任何一个任务执行完成后,返回一个新的CompletableFuture对象。
anyOf()方法实现了等待多个任务中任何一个任务执行结束便返回的功能,接收的CompletableFuture任务是通过supplyAsync()方法构建的。
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture v1 = CompletableFuture.supplyAsync(() -> {
return "任务v1的返回值";
});
CompletableFuture v2 = CompletableFuture.supplyAsync(() -> {
return "任务v2的返回值";
});
//通过join()方法让主线程阻塞等待anyOf()方法中的任何一个任务执行完成后再继续执行
CompletableFuture.anyOf(v1, v2).thenAccept(value -> System.out.println(value)).join();
}
}
四.主动获取任务执行结果的方法
由于CompletableFuture实现了Future接口,所以它可以像Future那样主动通过阻塞或轮询的方式来获得执行结果。比如可以通过get()方法阻塞式获取异步任务的执行结果(可中断),比如也可以通过join()方法阻塞式获取异步任务的执行结果(不可中断),此外通过complete()方法可实现线程间的数据传递 + 唤醒被get()阻塞的线程。
public class CompleteMethodExample {
public static class ClientThread implements Runnable {
private CompletableFuture completableFuture;
public ClientThread(CompletableFuture completableFuture) {
this.completableFuture = completableFuture;
}
@Override
public void run() {
log.info(Thread.currentThread().getName() + ":" + completableFuture.get());
}
}
public static void main(String[] args) {
//在ClientThread线程中使用completableFuture.get()获取返回值时,
//由于传入的cf并没有使用runAsync()等方法构建具体的异步任务,
//所以ClientThread线程中的completableFuture.get()方法必然会阻塞;
CompletableFuture cf = new CompletableFuture();
new Thread(new ClientThread(cf)).start();
new Thread(new ClientThread(cf)).start();
System.out.println("此时两个客户端线程正在被get()方法阻塞");
//通过compelete()方法来完成cf任务,并且设置了任务的返回结果为"finish"
cf.complete("finish");//此时会将值为"finish"传入两个线程,并唤醒这两个线程
}
}