百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

JUC并发—14.Future模式和异步编程分析一

cac55 2025-03-24 14:17 30 浏览 0 评论

大纲

1.FutureTask(Future/Callable)的使用例子

2.FutureTask(Future/Callable)的实现原理

3.FutureTask(Future/Callable)的源码分析

4.CompletableFuture的基本介绍

5.CompletionStage方法及作用说明

6.CompletableFuture的实现原理分析

7.CompletableFuture的核心源码分析


1.FutureTask(Future/Callable)的使用例子

Future/Callable实现了一个异步执行并带有返回结果的功能。Future表示获取一个异步执行的结果,Callable表示一个异步执行的任务Callable产生一个结果给到Future


Future/Callable的使用例子如下:

public class FutureCallableExample {
    static class CalculationCallable implements Callable {
        private int x;
        private int y;

        public CalculationCallable(int x, int y) {
            this.x = x;
            this.y = y;
        }

        @Override
        public Integer call() throws Exception {
            System.out.println("开始执行:" + new Date());
            TimeUnit.SECONDS.sleep(2);//模拟任务执行的耗时
            return x + y;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CalculationCallable calculationCallable = new CalculationCallable(1, 2);
        FutureTask futureTask = new FutureTask<>(calculationCallable);
        new Thread(futureTask).start();
        System.out.println("开始执行futureTask:" + new Date());
        Integer rs = futureTask.get();
        System.out.println("执行结果:" + rs);
        System.out.println("结束执行futureTask:" + new Date());
    }
}

首先定义一个CalculationCallable类。该类实现了Callable接口,并重写了call()方法,它的功能就是定义一个具有返回值的任务


然后用FutureTask声明一个带有返回值的任务,把CalculationCallable作为构造参数传递进去。


FutureTask实现了Future接口和Runnable接口。我们知道线程执行完之后是不可能获得一个返回值的。Future之所以能够获得返回值,是因为在线程执行中做了相关处理。FutureTask就是用来获得线程执行结果的。


接着把FutureTask作为一个任务传入Thread的构造方法,让线程去执行。FutureTask既然实现了Runnable接口,创建FutureTask时又把实现了Callable接口的任务传递到其构造方法中,那么FutureTask的run()方法中会调用Callable接口的call()方法的实现,最终在获得返回值之后保存到某个属性中。


最后使用FutureTask.get()方法来获得返回值,这个get()方法是个阻塞方法。当线程还没有执行完FutureTask之前,主线程会阻塞在get()方法中。直到FutureTask执行结束主线程才会被唤醒


2.FutureTask(Future/Callable)的实现原理

(1)FutureTask的类关系

(2)FutureTask的实现核心


(1)FutureTask的类关系

Runnable接口的实现可以被线程执行Future接口提供了获取线程执行结果的方法。RunnableFuture接口同时继承了Runnable接口Future接口,而FutureTask类则实现了RunnableFuture接口。


创建FutureTask类实例时,会传入Callable接口的实现类实例作为构造参数,也就是FutureTask类会封装Callable接口的实现类。这样在启动线程执行FutureTask类重写Runnable接口的run()方法时,FutureTask类实例就会把执行Callable接口call()方法运行结果保存起来,然后通过Future接口提供的get()方法获取运行结果


一.FutureTask的类关系源码

//A cancellable asynchronous computation.
//This class provides a base implementation of Future, with methods to start and cancel a computation, 
//query to see if the computation is complete, and retrieve the result of the computation.
//The result can only be retrieved when the computation has completed; 
//the get methods will block if the computation has not yet completed.
//Once the computation has completed, the computation cannot be restarted or cancelled 
//(unless the computation is invoked using #runAndReset).

//A FutureTask can be used to wrap a Callable or Runnable object.  
//Because FutureTask implements Runnable, 
//a FutureTask can be submitted to an Executor for execution.

//In addition to serving as a standalone class, 
//this class provides protected functionality that may be useful when creating customized task classes.
public class FutureTask implements RunnableFuture {
    ...
    //Creates a FutureTask that will, upon running, execute the given Callable.
    public FutureTask(Callable callable) {
        if (callable == null) {
            throw new NullPointerException();
        }
        this.callable = callable;
        this.state = NEW;//ensure visibility of callable
    }
    ...
}

//A Future that is Runnable.
//Successful execution of the run method causes completion of the Future and allows access to its results.
public interface RunnableFuture extends Runnable, Future {
    //Sets this Future to the result of its computation unless it has been cancelled.
    void run();
}

//A Future represents the result of an asynchronous computation.
//Methods are provided to check if the computation is complete, 
//to wait for its completion, and to retrieve the result of the computation.  
//The result can only be retrieved using method get when the computation has completed, 
//blocking if necessary until it is ready.
//Cancellation is performed by the cancel method.  
//Additional methods are provided to determine if the task completed normally or was cancelled. 
//Once a computation has completed, the computation cannot be cancelled.
//If you would like to use a Future for the sake of cancellability but not provide a usable result, 
//you can declare types of the form Future and return null as a result of the underlying task.
public interface Future {
    //用来取消任务,取消成功则返回true,取消失败则返回false
    //mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务
    //如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false
    //如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false
    //如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true
    boolean cancel(boolean mayInterruptIfRunning);

    //表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true
    boolean isCancelled();

    //表示任务是否已经完成,若任务完成,则返回true
    boolean isDone();

    //获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果
    V get() throws InterruptedException, ExecutionException;

    //获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

二.FutureTask的类关系图

(2)FutureTask的实现核心

一.FutureTask本身是一个线程

通过new Thread(new FutureTask(callable)).start()来启动,必然会执行FutureTask实现Runnable接口的run()方法


二.Runnable接口的run()方法是没有返回值的

实际上返回值是由Callable接口的call()方法提供,所以调用FutureTask的run()方法,会触发调用Callable的call()方法


三.通过Future接口的get()方法阻塞式获得返回值

如果在FutureTask的run()方法中调用Callable接口的call()方法执行任务时,需要比较长的时间,那么为了能够正确获得返回值,Future接口的get()方法必须实现阻塞,直到call()方法执行完毕。


四.需要一个队列来保存阻塞的线程

涉及线程阻塞和唤醒,要使用LockSupport来阻塞和唤醒队列中的线程


3.FutureTask(Future/Callable)的源码分析

(1)FutureTask的核心属性

(2)FutureTask的run()方法

(3)FutureTask的get()方法

(4)FutureTask的finishCompletion()方法

(5)FutureTask的实现原理总结


(1)FutureTask的核心属性

一.state

代表任务在运行过程中的状态(7种)。

二.callable

当前要执行的任务。

三.outcome

任务的执行结果,通过Future.get()获取的值。

四.runner

当前执行callable任务的线程。

五.waiter

用来保存所有等待任务执行结束的线程的单向链表。

public class FutureTask implements RunnableFuture {
    //The run state of this task, initially NEW. 
    //The run state transitions to a terminal state only in methods set, setException, and cancel.
    //During completion, state may take on transient values of COMPLETING (while outcome is being set) 
    //or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). 
    //Transitions from these intermediate to final states use cheaper ordered/lazy writes 
    //because values are unique and cannot be further modified.
    //Possible state transitions:
    //NEW(初始状态) -> COMPLETING(正在设置任务结果) -> NORMAL,这是任务正常执行完毕时状态的变更流程
    //NEW(初始状态) -> COMPLETING(正在设置任务结果) -> EXCEPTIONAL,这是任务执行异常时状态的变更流程
    //NEW(初始状态) -> CANCELLED(任务被取消),这是调用了Future.cancel()方法
    //NEW(初始状态) -> INTERRUPTING(正在中断执行任务的线程) -> INTERRUPTED(任务被中断)
    //代表任务在运行过程中的状态(7种)
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    //The underlying callable; nulled out after running
    //当前要执行的任务
    private Callable callable;
    
    //The result to return or exception to throw from get()
    //任务的执行结果,通过Future.get()获取的值
    private Object outcome;
    
    //The thread running the callable; CASed during run()
    //当前执行callable任务的线程
    private volatile Thread runner;
    
    //Treiber stack of waiting threads
    //用来保存所有等待任务执行结束的线程的单向链表
    private volatile WaitNode waiters;
    ...
}

(2)FutureTask的run()方法

使用线程来执行FutureTask任务时,比如new Thread(new FutureTask(callable)).start(),会回调FutureTask的run()方法。


FutureTask的run()方法的执行流程如下:

首先判断当前状态是否为NEW,并使用CAS设置runner属性为当前线程。如果当前状态不是NEW或者CAS设置失败,则说明已经有其他线程正在执行当前任务了,于是直接返回。然后获取通过构造方法传入的Callable接口的实现类实例callable,接着调用Callable接口的实现类实例callable中的call()方法获得执行结果,最后调用FutureTask的set()方法把执行结果保存到outcome属性中。

public class FutureTask implements RunnableFuture {
    ...
    //代表任务在运行过程中的状态(7种)
    private volatile int state;
    
    //当前要执行的任务
    private Callable callable;
    
    //任务的执行结果,通过Future.get()获取的值
    private Object outcome;
    
    //当前执行callable任务的线程
    private volatile Thread runner;
    
    //用来保存所有等待任务执行结束的线程的单向链表
    private volatile WaitNode waiters;
    
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    public void run() {
        //首先判断当前状态是否为NEW,并使用CAS把runner属性设置为当前线程
        //如果当前状态不是NEW或者CAS设置失败,说明已经有其他线程正在执行当前任务了,于是直接返回
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) { 
            return;
        }
        try {
            //获取通过构造方法传入的Callable接口的实现类实例callable
            Callable c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //然后调用callable中的call()方法获得执行结果
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran) {
                    //调用set()方法把执行结果保存到outcome属性中
                    set(result);
                }
            }
        } finally {
            //runner must be non-null until state is settled to prevent concurrent calls to run()
            runner = null;
            //state must be re-read after nulling runner to prevent leaked interrupts
            int s = state;
            if (s >= INTERRUPTING) {
                handlePossibleCancellationInterrupt(s);
            }
        }
    }
    
    //Sets the result of this future to the given value unless this future has already been set or has been cancelled.
    //This method is invoked internally by the run method upon successful completion of the computation.
    protected void set(V v) {
        //CAS修改任务状态为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //把调用call()方法获取到的结果保存到outcome
            outcome = v;
            //CAS修改任务状态为NORMAL
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    ...
}

(3)FutureTask的get()方法

FutureTask的get()方法的逻辑很简单,如果当前状态不是COMPLETING,就调用awaitDone()方法让当前线程阻塞等待,直到任务执行完成。其中awaitDone()方法的返回值表示任务的状态,当任务进入终止状态后,会调用reports()方法,根据状态类型来决定是返回运行结果还是抛异常。


FutureTask的awaitDone()方法中会进行自旋。首先如果检测到线程被中断,则把加入等待队列中的线程移除。然后如果发现任务已经进入终止状态,则直接返回任务状态。如果任务正在设置执行结果,则通过Thread.yield()让出当前线程的CPU资源


当FutureTask.awaitDone()方法第一次调用时,在第一次for循环中会初始化一个WaitNode结点,这个WaitNode结点便保存了调用FutureTask.get()方法的线程。在第二次for循环中会通过CAS按头插法将WaitNode结点插入waiters链表。在之后的for循环中,也就是当前线程已经加入了等待队列后,如果发现任务还没有执行完成,则通过LockSupport的方法阻塞线程


注意,被阻塞的线程在如下两种情况下会被唤醒:

一.任务执行完成后,在set()方法中调用finishCompletion()方法

二.线程被中断,在awaitDone()方法中执行中断检测if(Thread.interrupted())

public class FutureTask implements RunnableFuture {
    ...
    //代表任务在运行过程中的状态(7种)
    private volatile int state;
    //当前要执行的任务
    private Callable callable;
    //任务的执行结果,通过Future.get()获取的值
    private Object outcome;
    //当前执行callable任务的线程
    private volatile Thread runner;
    //用来保存所有等待任务执行结束的线程的单向链表
    private volatile WaitNode waiters;
    
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= completing s='awaitDone(false,' 0l return reports awaits completion or aborts interrupt or timeout. param timed true if use timed waits param nanos time to wait if timed return state upon completion private int awaitdoneboolean timed long nanos throws interruptedexception timed final long deadline='timed' system.nanotime nanos : 0l waitnode q='null;' boolean queued='false;' for if thread.interrupted removewaiterq throw new interruptedexception int s='state;' if s> COMPLETING) {
                if (q != null) {
                    q.thread = null;
                }
                return s;
            } else if (s == COMPLETING) {// cannot time out yet
                //如果任务正在设置执行结果,则通过Thread.yield()让出当前线程的CPU资源
                Thread.yield();
            } else if (q == null) {
                //awaitDone()方法被第一次调用时,q == null为true
                //此时会初始化一个WaitNode结点并赋值给q,这个WaitNode结点保存了调用FutureTask.get()的线程 
                q = new WaitNode();
            } else if (!queued) {
                //awaitDone()方法被第一次调用时,进入的第二次for循环
                //便会通过CAS将q结点按头插法插入waiters单向链表中
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            } else if (timed) {
                //如果当前线程加入等待队列后,任务还没有执行完成,则通过LockSupport的方法阻塞线程
                nanos = deadline - System.nanoTime();
                if (nanos <= 0l removewaiterq return state locksupport.parknanosthis nanos else locksupport.parkthis returns result or throws exception for completed task. param s completed state value suppresswarningsunchecked private v reportint s throws executionexception object x='outcome;' if s='= NORMAL)' return vx if s>= CANCELLED) {
            throw new CancellationException();
        }
        throw new ExecutionException((Throwable)x);
    }
    
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() {
            thread = Thread.currentThread();
        }
    }
    
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {//restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null) {
                        pred = q;
                    } else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) {// check for race
                            continue retry;
                        }
                    } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) {
                        continue retry;
                    }
                }
                break;
            }
        }
    }
    ...
}

(4)FutureTask的finishCompletion()方法

当Callable任务执行完成后,FutureTask的set()方法会调用finishCompletion()方法唤醒链表中的阻塞线程

public class FutureTask implements RunnableFuture {
    ...
    //代表任务在运行过程中的状态(7种)
    private volatile int state;
    //当前要执行的任务
    private Callable callable;
    //任务的执行结果,通过Future.get()获取的值
    private Object outcome;
    //当前执行callable任务的线程
    private volatile Thread runner;
    //用来保存所有等待任务执行结束的线程的单向链表
    private volatile WaitNode waiters;
    
    ...
    //Removes and signals all waiting threads, invokes done(), and nulls out callable.
    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null) {
                        break;
                    }
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }
    ...
}

(5)FutureTask的实现原理总结

FutureTask实现了Runnable和Future接口,FutureTask表示一个带有状态及执行结果的任务,而任务执行结果的获取是基于阻塞的方式来实现的。


在Callable接口的call()方法没有返回结果之前,其他线程调用FutureTask的get()方法获取结果时,FutureTask会构建一个waiters链表,把当前线程存储到链表中并通过LockSupport进行阻塞,直到call()方法返回后把结果设置到outcome属性以及唤醒阻塞的线程

(6)FutureTask的局限性

局限性一:

在获取异步任务的执行结果时,要么调用get()方法阻塞等待返回结果,要么耗费CPU资源通过轮询调用FutureTask.isDone()方法来判断任务的执行状态,然后再调用get()方法获取返回结果。


局限性二:

FutureTask没有提供通知机制,没有办法知道任务什么时候执行完成。


4.CompletableFuture的基本介绍

(1)CompletableFuture的介绍

(2)CompletableFuture的类关系图

(3)CompletableFuture的方法说明


(1)CompletableFuture的介绍

CompletableFuture针对Future做了改进,也就是在异步任务执行完成后,主线程如果需要依赖该任务的执行结果来继续后面的操作,则可以不用通过等待来实现,只需向CompletableFuture传入一个回调对象。当异步任务执行完毕后,便会自动调用该回调对象(异步回调通知功能)。


CompletableFuture还提供了非常强大的功能。对于回调对象的执行,可以放到非任务线程中,也可以放到任务线程中。CompletableFuture提供了函数式编程能力,简化了异步编程的复杂性。还提供了多个CompletableFuture的组合与转化功能。


(2)CompletableFuture的类关系

CompletableFuture类实现了FutureCompletionStage这两个接口,其中Future接口提供了获取任务执行结果及任务执行状态的功能,CompletionStage接口表示任务执行的一个阶段。CompletionStage接口定义了很多方法,比如thenApply()、thenAccept()等。通过这些方法可以实现多个任务之间的时序关系,比如串行、并行、聚合等。


因此CompletableFuture既提供了Future阻塞式获取结果 + 任务状态的功能,也提供了CompletionStage的任务执行后触发回调 + 多个任务聚合的功能


(3)CompletableFuture的方法说明

一.构建CompletableFuture的静态方法

二.runAsync()和supplyAsync()静态方法

三.allOf()和anyOf()静态方法

四.主动获取任务执行结果的方法


一.构建CompletableFuture的静态方法

CompletableFuture提供了4个静态方法来构建一个异步事件。由于传递进CompletableFuture这4个方法的任务需要异步执行,所以默认会使用ForkJoinPool.commonPool()提供的线程池来执行异步任务,当然也可以自定义一个线程池传入这些静态方法来执行异步任务。

//A Future that may be explicitly completed (setting its value and status), 
//and may be used as a CompletionStage,
//supporting dependent functions and actions that trigger upon its completion.
public class CompletableFuture implements Future, CompletionStage {
    ...
    //Returns a new CompletableFuture that is asynchronously completed 
    //by a task running in the ForkJoinPool#commonPool() 
    //with the value obtained by calling the given Supplier.
    //@param supplier a function returning the value to be used to complete the returned CompletableFuture 
    //@param  the function's return type
    //@return the new CompletableFuture
    //带有返回值的异步执行方法,传入一个函数式接口,返回一个新的CompletableFuture对象
    //默认使用ForkJoinPool.commonPool()作为线程池执行异步任务
    public static  CompletableFuture supplyAsync(Supplier supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    //Returns a new CompletableFuture that is asynchronously completed 
    //by a task running in the given executor with the value obtained by calling the given Supplier.
    //@param supplier a function returning the value to be used to complete the returned CompletableFuture
    //@param executor the executor to use for asynchronous execution
    //@param  the function's return type
    //@return the new CompletableFuture
    //带有返回值的异步执行方法,传入一个函数式接口 + 一个线程池,返回一个新的CompletableFuture对象
    //多了一个Executor参数,表示使用自定义线程池来执行任务
    public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
    
    static  CompletableFuture asyncSupplyStage(Executor e, Supplier f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture d = new CompletableFuture();
        e.execute(new AsyncSupply(d, f));
        return d;
    }

    //Returns a new CompletableFuture that is asynchronously completed
    //by a task running in the ForkJoinPool#commonPool() after it runs the given action.
    //@param runnable the action to run before completing the returned CompletableFuture
    //@return the new CompletableFuture
    //不带返回值的异步执行方法,传入一个Runnable参数,返回一个新的CompletableFuture对象
    //默认使用ForkJoinPool.commonPool()作为线程池执行异步任务
    public static CompletableFuture runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }

    //Returns a new CompletableFuture that is asynchronously completed 
    //by a task running in the given executor after it runs the given action.
    //@param runnable the action to run before completing the returned CompletableFuture
    //@param executor the executor to use for asynchronous execution
    //@return the new CompletableFuture
    //不带返回值的异步执行方法,传入一个Runnable参数 + 一个线程池,返回一个新的CompletableFuture对象
    //多了一个Executor参数,表示使用自定义线程池来执行任务
    public static CompletableFuture runAsync(Runnable runnable, Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }
    
    static CompletableFuture asyncRunStage(Executor e, Runnable f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture d = new CompletableFuture();
        e.execute(new AsyncRun(d, f));
        return d;
    }
    ...
}

二.runAsync()和supplyAsync()静态方法

下面使用runAsync()方法来构建一个异步执行事件,由于runAsync()方法是没有返回值的,所以get()这个阻塞等待任务执行完成的方法返回的还是null。

CompletableFuture cf = CompletableFuture.runAsync(() -> {
    System.out.println(Thread.currentThread().getName() + ":异步执行一个任务");
});
cf.get();//阻塞等待任务执行完成

下面使用supplyAsync()方法来构建一个异步执行事件,由于supplyAsync()方法具有返回值,所以get()方法会返回"supplyAsync"。

CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
    System.out.println("supplyAsync");
});
cf.get();//阻塞等待任务执行完成

三.allOf()和anyOf()静态方法

allOf()方法接收多个CompletableFuture的无返回值的任务。当所有的任务都执行结束后,返回一个新的CompletableFuture对象。


allOf()方法相当于实现了等待多个任务执行结束后再返回的功能,并且接收的CompletableFuture任务是通过runAsync()方法构建的。之所以无返回值,是因为当多个任务都具有返回值时get()方法不知取哪个。

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture v1 = CompletableFuture.runAsync(() -> {
            System.out.println("任务v1没有返回值");
        });
        CompletableFuture v2 = CompletableFuture.runAsync(() -> {
            System.out.println("任务v2没有返回值");
        });
        //通过join()方法让主线程阻塞等待allOf()方法中的所有任务都执行完成后再继续执行
        CompletableFuture.allOf(v1, v2).join();
    }
}

anyOf()方法接收多个CompletableFuture的带有返回值的任务。当任何一个任务执行完成后,返回一个新的CompletableFuture对象。


anyOf()方法实现了等待多个任务中任何一个任务执行结束便返回的功能,接收的CompletableFuture任务是通过supplyAsync()方法构建的。

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture v1 = CompletableFuture.supplyAsync(() -> {
            return "任务v1的返回值";
        });
        CompletableFuture v2 = CompletableFuture.supplyAsync(() -> {
            return "任务v2的返回值";
        });
        //通过join()方法让主线程阻塞等待anyOf()方法中的任何一个任务执行完成后再继续执行
        CompletableFuture.anyOf(v1, v2).thenAccept(value -> System.out.println(value)).join();
    }
}

四.主动获取任务执行结果的方法

由于CompletableFuture实现了Future接口,所以它可以像Future那样主动通过阻塞或轮询的方式来获得执行结果。比如可以通过get()方法阻塞式获取异步任务的执行结果(可中断),比如也可以通过join()方法阻塞式获取异步任务的执行结果(不可中断),此外通过complete()方法可实现线程间的数据传递 + 唤醒被get()阻塞的线程

public class CompleteMethodExample {
    public static class ClientThread implements Runnable {
        private CompletableFuture completableFuture;
        
        public ClientThread(CompletableFuture completableFuture) {
            this.completableFuture = completableFuture;
        }
        
        @Override
        public void run() {
            log.info(Thread.currentThread().getName() + ":" + completableFuture.get());
        }
    }
    
    public static void main(String[] args) {
        //在ClientThread线程中使用completableFuture.get()获取返回值时,
        //由于传入的cf并没有使用runAsync()等方法构建具体的异步任务,
        //所以ClientThread线程中的completableFuture.get()方法必然会阻塞;
        CompletableFuture cf = new CompletableFuture();
        new Thread(new ClientThread(cf)).start();
        new Thread(new ClientThread(cf)).start();
        System.out.println("此时两个客户端线程正在被get()方法阻塞");
        //通过compelete()方法来完成cf任务,并且设置了任务的返回结果为"finish"
        cf.complete("finish");//此时会将值为"finish"传入两个线程,并唤醒这两个线程
    }
}

相关推荐

上网行为管理有什么用,行为审计软件解决方案?

上网行为管理有什么用,行为审计软件解决方案?我们生活在互联网的时代,网络是比较复杂的,有时候经常会出现一些网络违规现象,这时候就可以进行上网行为管理了,现在有很多的公司都会进行上网行为管理,因为这样可...

上网行为管理软件如何监控员工访问网址信息

上网行为管理软件可以通过域之盾软件来监控员工访问的网址信息:主要方式↓1、网页日志记录上网行为管理软件可以通过网页日志记录功能,完整地记录员工在工作中访问的所有网站,包括访问时间、访问页面、访问方式等...

企业如何确保上网行为管理顺利进行?

企业的稳定长久发展离不开对员工的上网行为管理,因为员工的上网行为关乎到工作效率、生产效率、以及重要的数据信息安全问题。一旦有不规范的上网行为,便容易造成企业终端数据泄密事件,对企业造成重大的财产损失以...

员工上网行为监控如何实现?六个妙招!帮你轻松管理员工上网!

员工上网行为不仅关乎个人工作效率,更直接影响到企业的信息安全和整体运营。不当的上网行为,如访问非法网站、泄露公司机密、长时间闲聊等,都可能给企业带来不可估量的损失。因此,监控员工上网行为,不仅是为了提...

终端管理系统规范企业上网行为管理

企业内部经常会遇到不同的上网行为管理问题,如职员在上班时间炒股、打游戏、上网聊天等不正当的上网行为,用U盘、硬盘等移动设备随意拷贝资料,终端资产难以管理等,降低办公效率的同时,也增加了企业内部泄密风...

企业如何进行上网行为管理?_如何管理企业网络

企业如何进行上网行为管理?为了保障网络安全,提高员工工作效率,企业有必要部署上网行为管理。上网行为管理可对企业内部员工的上网行为进行全方位有效管理,保护Web访问安全,降低互联网使用风险,避免机密信息...

演员赵露思的官方微博注销的原因是什么?

根据多方权威媒体报道及平台验证,演员赵露思的微博账号已于2025年8月19日正式注销。目前搜索该账号显示“因用户自行申请关闭,现已无法查看”。这一结果源于她8月13日在直播中宣布的注销决定,当时她直言...

分手传闻仅4月,关晓彤的一张海边亲吻照,撕碎了鹿晗最后的体面

“原来八年真能被四个月的‘海边吻照’一键清空。”热搜上那张模糊的侧脸一贴,评论瞬间爆炸:关晓彤亲的是李昀锐,鹿晗的生日祝福还停留在去年。八年,够让一部剧从开播到大结局,也够让一对顶流情侣把微博背景换成...

计算机基础知识(五)(3)_计算机基础知识100道

四、如何使用计算机4、软件的通用使用方法计算机发展到现在,正常使用时均采用窗口式界面。我们的介绍不涉及苹果机。操作系统从DOS(DOS阶段有两类,一类是微软的,称为MS—DOS;另一类称为PC—DOS...

笔记本的这些基础知识,你再不知道,就真的被社会淘汰了!

请您在阅读前点击上面的“关注”二字,后续会为您提供更多有价值的电脑知识,感谢每一位朋友的支持!这篇文章依然是给大家讲讲关于笔记本电脑的基础知识,新手小白可一定要收藏起来,以后想看的时候可以找出来就看,...

电脑基础知识,引起电脑故障的原因

引起电脑出现故障的原因非常多,概括来说主要包括以下几个方面的问题。操作不当:操作不当是指误删除文件或非法关机等不当操作。操作不当通常会造成电脑程序无法运行或电脑无法启动,修复此类故障,只要将删除或损坏...

笔记本电脑新手使用教程,笔记本电脑使用技巧

学电脑能够快速入门是每个新手梦寐以求的事情,但是不是每个人都能快速入门的。但是如果定制好合理计划,循序渐进,就会收到非常好的效果。今天小编来跟大家说说笔记本电脑新手教程的详细介绍-装机吧,大家一起...

电脑基础知识:(二)电脑主机_电脑主机组成图解

你好!我是麦秋~前言现代生活随着互联网通讯的快速发展,通讯技术日趋完美,应用软件日益普及,电脑的使用与维护则成为人们日常生活中密不可分的、重要的生活内容。电脑恐怕是中老年人的短板,然而又是急需解决的问...

电脑基础知识:(六)电脑应用程序_电脑的应用程序是什么意思

你好!我是麦秋~应用软件是专为解决一些具体问题的软件,是体现电脑用途的部分,种类繁多,如:办公软件、游戏软件、杀毒软件等;自媒体平台上的西瓜、头条、抖音、剪映等,对于上述软件主要是操作和使用上的问题,...

键盘操作方法大全_键盘操作教程

【键盘操作方法大全】键盘可不仅仅能帮我们打字哦,还有很多快捷的操作你都知道吗?除了Ctrl+C、Ctrl+V以外,再多学几种吧,让你用起电脑来十指如飞~别再慢慢用鼠标点了,用开始键+Tab键切换程序让...

取消回复欢迎 发表评论: