八:异步编程

8.1 FutureTask

8.1.1 介绍

FutureTask是一个可以取消异步计算的类。FutureTask是对Future做的一个基本实现,可以调用方法开始和取消一个任务。一般配合Callable去使用。

异步任务启动后,可以获取一个绑定当前异步任务的FutureTask。

可以基于FutureTask的方法区取消任务、查看任务是否结束、获取任务的返回结果。

FutureTask的整体结构中实现了一个RunnableFuture接口,而RunnableFuture接口继承了Runnanle和Future接口。所以FutureTask也可以直接作为任务交给线程池去处理。

8.1.2 应用

FutureTask有两种有参构造。

public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}

一般使用Callable,因为有返回结果。

FutureTask对任务的控制:

  • 任务执行过程中对状态的控制
  • 任务执行完毕之后,对结果的控制

FutureTask的任务在执行run方法之后,是无法再次被运行,需要使用protected类型的runAndReset()方法才可以。

package com.xqm.juc.future;

import java.util.concurrent.*;

public class FutureTask01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.构建FutureTask,基于泛型执行返回结果类型
        // 两种有参构造
        FutureTask futureTask=new FutureTask(new Callable() {
            @Override
            public String call() throws Exception {
                System.out.println("使用Callable有参构造开始任务");
                TimeUnit.SECONDS.sleep(2);
                System.out.println("任务执行完毕");
                return "ok";
            }
        });
        // 2.构建线程池
        ExecutorService threadPool=Executors.newFixedThreadPool(2);
        // 3.执行任务
        threadPool.execute(futureTask);


        // run方法,futureTask提供了run方法,
        // 但是一般不会自己去调用,而是线程池执行任务时,由线程池去执行run方法
        // run方法在执行时,是有任务状态的,任务执行后,再次执行run方法时无效的,任务只能执行一次
        Thread.sleep(500);
        futureTask.run();

        // 如果希望任务被反复执行,需要调用runAndReset()方法



        // 获取任务的执行结果,类似阻塞队列的take()方法,会一直死等,直到任务结束,然后执行下面的任务
        // System.out.println(futureTask.get());

        //  也可以指定阻塞时间,时间到就不等了,会直接抛出TimeoutException异常
        try {
            Object o = futureTask.get(1, TimeUnit.SECONDS);
            System.out.println(o);
        } catch (TimeoutException e) {
            // e.printStackTrace();
            System.out.println("异常终止了");
        }

        // 判断任务是否结束
        System.out.println("任务执行结束了吗?"+futureTask.isDone());

        threadPool.shutdown();

    }
}

8.1.3 源码-状态

清楚任务的流转状态是什么样的,其次要知道核心属性是干嘛用的。

// FutureTask的核心属性
/**
<!-- 任务状态的流转 -->
    * Possible state transitions:
    * NEW -> COMPLETING -> NORMAL  (任务正常执行,并且返回结果也正常返回)
    * NEW -> COMPLETING -> EXCEPTIONAL (任务正常执行,但是结果是异常)
    * NEW -> CANCELLED  (任务被取消)
    * NEW -> INTERRUPTING -> INTERRUPTED (任务被中断)
    */
// 记录任务的状态
private volatile int state;
// 任务被构建之后的初始状态
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

// 需要执行的任务,会被赋值到这个属性
private Callable<V> callable;
// 任务的返回结果,存储到这个属性
private Object outcome; // non-volatile, protected by state reads/writes
// 执行任务的线程
private volatile Thread runner;
// 等待结果的线程Node对象
private volatile WaitNode waiters;

8.1.4 源码-构造函数

一共有两种构造函数。

其实构造函数就是给类的属性赋值,一个是任务,一个是状态赋值为NEW.

public FutureTask(Callable<V> callable) {
        // 健壮性校验
        if (callable == null)
            throw new NullPointerException();
    	// 给任务赋值
        this.callable = callable;
    	// 设置初始状态为NEW
        this.state = NEW;       // ensure visibility of callable
    }
public FutureTask(Runnable runnable, V result) {
      	// 将Runnable任务封装成Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

8.1.5 源码-run方法

任务执行前的一些判断,调用方法进行封装结果,以及发生异常做出的后续处理。

// FutureTask的run方法
public void run() {
    // 如果当前任务的状态不是NEW,直接Return退出
    if (state != NEW ||
            // 如果状态正确,是NEW,这里需要基于CAS将runner属性设置为当前线程
            // 如果CAS失败,代表有线程已经拿到这个任务执行了,直接return
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
        return;
    try {
        // 将要执行的任务拿到
        Callable<V> c = callable;
        // 健壮性校验,或者防止某些方式(比如反射)将任务赋值为null
        // 再次判断任务的状态(DCL)
        if (c != null && state == NEW) {
            // 执行任务
            // result:任务的返回结果
            // ran:如果为true,任务正常结束
            // ran:如果为false,任务异常结束
            V result;
            boolean ran;
            try {
                // 执行任务
                result = c.call();
                // 任务正常结束
                ran = true;
            } catch (Throwable ex) {
                // 发生异常
                // 结果置为null
                result = null;
                // 置为false
                ran = false;
                // 封装异常结果
                setException(ex);
            }
            if (ran)
                // 封装正常结果
                set(result);
        }
        //到这任务结束,正常或者异常
    } finally {
        // 将执行任务的线程置为null
        runner = null;
        // 拿到任务的状态
        int s = state;
        // INTERRUPTING:5
        // 判断线程是否被中断
        if (s >= INTERRUPTING)
            // 代表任务中断,做一些处理
            handlePossibleCancellationInterrupt(s);
    }
}

8.1.6 源码-set&&setException

任务执行完毕后,修改任务的状态以及封装任务的状态。

set

// 任务正常结束返回结果
protected void set(V v) {
    // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将任务的结果赋值给outcome
        outcome = v;
        // 将任务的状态变为NORMAL,正常结束
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        // 任务结束之后做的操作
        finishCompletion();
    }
}

setException

// 任务发生异常返回的结果
protected void setException(Throwable t) {
    // 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将异常信息赋给outcome
        outcome = t;
        // 将任务的状态变为EXCEPTIONAL,异常结束
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

8.1.7 源码-cancle方法

任务取消有两种,设置参数为true代表可以中断,设置为false代表不能被中断。

// 任务取消,mayInterruptIfRunning是否需要中断
//  NEW -> CANCELLED  (任务被取消,不能被中断)
//  NEW -> INTERRUPTING -> INTERRUPTED (任务被取消,可以中断)
public boolean cancel(boolean mayInterruptIfRunning) {
    // 任务正在执行中,也是NEW状态
    // 查看任务的状态是否是NEW,基于传入的参数mayInterruptIfRunning
    // 决定任务的状态是直接从NEW修改为INTERRUPTING或者为CANCELLED
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                                     // 如果为true
                                     mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        // 修改失败,返回false
        return false;
    try {
        // mayInterruptIfRunning为true,代表需要中断线程
        if (mayInterruptIfRunning) {
            try {
                // 拿到任务线程
                Thread t = runner;
                // 如果线程不为null,直接中断
                if (t != null)
                    t.interrupt();
            } finally { // final state
                // 价格任务状态设为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

8.1.8 源码-get方法

这是线程获取FutureTask任务执行结果的方法。

get方法

public V get() throws InterruptedException, ExecutionException {
    // 获取任务的状态
    int s = state;
    // 小于COMPLETING只有两种
    // 一种是NEW,任务还没执行完
    // 一种是COMPLETING,任务执行完,但是结果还没封装好
    if (s <= COMPLETING)
        // 让当前线程阻塞等待结果
        s = awaitDone(false, 0L);
    // 最终返回结果
    return report(s);
}

awaitDone方法

// 线程等待获取任务结果的方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
    // 获取任务需要等待的时间
    // 如果是get无参方法,就死等
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 声明Node对象
    WaitNode q = null;
    // queued代表节点是否放到链表中
    boolean queued = false;
    // 死循环
    for (;;) {
        // 查询线程是否中断,如果中断,就从链表中移除,抛出异常
        if (Thread.interrupted()) {
            // 从链表中移除这个节点
            removeWaiter(q);
            // 抛出中断异常
            throw new InterruptedException();
        }
        // 如果线程没中断,获取线程的状态
        int s = state;
        // 到这,说明任务结束了
        if (s > COMPLETING) {
            // 如果之前封装了waitNode,现在则要清空,如果没有,就返回状态
            if (q != null)
                q.thread = null;
            return s;
            // 如果任务状态是COMPLETING,因为从COMPLETING状态到下一个状态速度很快,所以只需要
            // 这个线程暂停一下,等待就行
        } else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 如果还没初始化waitNode,就初始化waitNode
        else if (q == null)
            q = new WaitNode();
        // 如果节点没放到队列中,就放到队列的前面
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 如果timed为true,代表有等待时间
        else if (timed) {
            // 判断是否还有时间
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                // 如果没有时间,就移除等待
                removeWaiter(q);
                // 返回任务的状态
                return state;
            }
            // 等待
            LockSupport.parkNanos(this, nanos);
        } else
            // 死等
            LockSupport.park(this);
    }
}

report方法

// 根据任务的状态,获取任务的结果
private V report(int s) throws ExecutionException {
    // 获取结果
    Object x = outcome;
    if (s == NORMAL)
        // 如果是正常结束,返回结果
        return (V)x;
    // 如果是取消,就抛出取消异常
    if (s >= CANCELLED)
        throw new CancellationException();
    // 否则异常结束,抛出其他异常
    throw new ExecutionException((Throwable)x);
}

8.1.9 源码-finishCompletion方法

只要任务结束,无论是正常返回,还是异常返回,还是任务被取消,都会执行这个方法。

这个方法就是唤醒那些执行get方法等待任务结果的线程。

// 任务结束后触发这个方法,添加到队列中是每次添加在头部
private void finishCompletion() {
    // 任务结束后,需要循环唤醒队列中的线程
    for (WaitNode q; (q = waiters) != null;) {
        // 以CAS的方式将waitNode置为null,循环里已经拿到引用
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 死循环
            for (;;) {
                // 拿到Node中的线程
                Thread t = q.thread;
                // 如果线程不为null
                if (t != null) {
                    // 置为null,帮助gc
                    q.thread = null;
                    // 唤醒这个线程
                    LockSupport.unpark(t);
                }
                // 拿到当前Node的next
                WaitNode next = q.next;
                // 如果next为null,代表已经全部唤醒,跳出循环
                if (next == null)
                    break;
                // 将next置为null
                q.next = null; // unlink to help gc
                // q的引用指向next
                q = next;
            }
            break;
        }
    }
    // 任务结束后,预留的接口,可以基于这个扩展方法,记录一些信息
    done();
    // 任务执行完,将任务置为null
    callable = null;        // to reduce footprint
}

8.2 CompletableFuture

8.2.1 介绍

平时多线程开发一般就是使用Runnbale、Thread、Callable、FutureTask、ThreadPoolExecutor等。相对来说成本都不高,使用熟悉就行。但是这些有时候很难完成异步编程的任务。

Thread+Runnable:执行异步任务,但是没有返回结果。

Thread+Callable+FutureTask:执行一个有返回结果的异步任务。但是有两个问题:

  • 获取返回结果,如果是基于get方法返回,线程需要挂起在waitNode里。
  • 获取返回结果,也可以基于isDone判断任务的状态,但是需要不断轮询。

如果业务的逻辑比较复杂,可能需要业务导出阻塞等待,或者需要大量的任务线程去执行一些业务逻辑,开发成本比较高。

CompletableFuture就是帮助处理任务之间的处理逻辑。编排好任务的执行方式后,任务会按照规划好的方式去一步一步执行,不需要让业务线程去频繁的等待。

8.2.2 应用

CompletableFuture提供了三个函数式编程。

Supplier<T>:无参有返回值
Consumer<T>:有参无返回值
Function<T,R>:有参有返回值

8.2.2.1 supplyAsync()

方法:有两个重载方法,一个使用默认线程池,另一个使用自定义线程池。

 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

supplyAsync()如果不提供线程池的话,默认使用ForkJoinPool线程池,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。

获取结果使用的是get()方法和join()方法,两者不同之处在于join()会帮你处理好异常,而get()需要自己处理异常。

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureTest {
    public static void main(String[] args) throws IOException {
        // supplyAsync()如果不提供线程池的话,默认使用ForkJoinPool线程池,而ForkJoinPool内部是守护线程
        // 如果main线程结束了,守护线程会跟着一起结束
        CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务开始执行");
            System.out.println("异步任务执行结束");
            return "返回结果";
        });

        // 获取任务结果,不需要处理异常
        String result1 = firstTask.join();

        // 获取任务结果,但是要自己处理异常
        try {
            String result2 = firstTask.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        // main线程进行阻塞,为了给CompletableFuture执行任务
        System.in.read();

    }
}

8.2.2.2 runAsync()

方法:有两个重载方法,一个使用默认线程池,另一个使用自定义线程池。如果不提供线程池的话,默认使用ForkJoinPool线程池,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。

public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

runAsync()方法既没有参数,也不会返回结果,是非常基础的任务编排方式。

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureTest01 {
    public static void main(String[] args) throws IOException {
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            System.out.println("开始任务");
            System.out.println("任务结束");
        });
      
        // main线程进行阻塞,为了给CompletableFuture执行任务
        System.in.read();

    }
}

8.2.2.3 thenApply()

有任务A,然后任务A执行完毕之后执行任务B,并且任务B需要任务A的返回结果,任务B也有自身的返回结果。

一个thenApply和两个重载的thenApplyAsync方法。使用thenApply会继续使用执行任务A的那个线程去执行任务B,使用thenApplyAsync时可能会换个线程去执行任务B。thenApplyAsync也可以指定线程池去运行。

public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }
public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }
public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

测试:

package com.xqm.juc.future;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest02 {
    public static void main(String[] args) throws IOException {
        CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务A");
            String uuid = UUID.randomUUID().toString();
            System.out.println("任务A的UUID为:" + uuid);
            return uuid;
        });
        CompletableFuture<String> taskB = taskA.thenApply(result -> {
            System.out.println("任务B获取任务A的结果为:" + result);
            result = result.replace("-", "");
            return result;
        });

        String taskBResult = taskB.join();
        System.out.println("任务B的执行结果为:"+taskBResult);
        System.in.read();

    }
}

结果:

执行任务A
任务A的UUID为:1d06e480-63c1-424e-b342-766e819b5790
任务B获取任务A的结果为:1d06e480-63c1-424e-b342-766e819b5790
任务B的执行结果为:1d06e48063c1424eb342766e819b5790

如果不需要中间结果的话:

package com.xqm.juc.future;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest03 {
    public static void main(String[] args) throws IOException {
        CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务A");
            String uuid = UUID.randomUUID().toString();
            System.out.println("任务A的UUID为:" + uuid);
            return uuid;
        }).thenApply(result -> {
            System.out.println("任务B获取任务A的结果为:" + result);
            result = result.replace("-", "");
            return result;
        });

        String taskBResult = taskB.join();
        System.out.println("任务B的执行结果为:"+taskBResult);
        System.in.read();

    }
}

也可以使用自定义线程池:

package com.xqm.juc.future;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureTest04 {
    public static void main(String[] args) throws IOException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务A");
            String uuid = UUID.randomUUID().toString();
            System.out.println("任务A的UUID为:" + uuid + "," + Thread.currentThread().getName());
            return uuid;
        }).thenApplyAsync(result -> {
            System.out.println("任务B获取任务A的结果为:" + result+ "," + Thread.currentThread().getName());
            result = result.replace("-", "");
            return result;
        //    自定义线程池
        },executorService);

        String taskBResult = taskB.join();
        System.out.println("任务B的执行结果为:" + taskBResult);
        System.in.read();

    }
}

结果:

执行任务A
任务A的UUID为:9ae49668-9ac9-4f4f-ba3d-9ab965629a68,ForkJoinPool.commonPool-worker-9
任务B获取任务A的结果为:9ae49668-9ac9-4f4f-ba3d-9ab965629a68,pool-1-thread-1
任务B的执行结果为:9ae496689ac94f4fba3d9ab965629a68

8.2.2.4 thenAccept()

thenAccept()方法和thenApply方法类似,都是拼接任务A到任务B。区别就是thenAccept没有返回结果。

前置任务需要有返回结果,后置任务接收前置任务的结果,后置任务没有返回结果。

thenAccept同样也是有三个方法。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                   Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureTest05 {
    public static void main(String[] args) throws IOException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务A");
            return "taskA";
        }).thenAcceptAsync(result -> {
            System.out.println("执行任务B,拿到结果:" + result);
        },executorService);

        System.in.read();

    }
}

结果

执行任务A
执行任务B,拿到结果:taskA

8.2.2.5 thenRun()

thenRun()方法和thenApply方法类似,都是拼接任务A到任务B。区别就是thenRun既没有接收参数,也没有返回结果。

前置任务没有返回结果,后置任务不接受前置任务结果,后置任务也没有返回结果。

同样有三个函数

public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }
 public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }
public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureTest06 {
    public static void main(String[] args) throws IOException {

        CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务A");
            return "taskA";
        }).thenRun(() -> {
            System.out.println("执行任务B");
        });


        CompletableFuture.runAsync(() -> {
            System.out.println("执行任务C");
        }).thenRun(() -> {
            System.out.println("执行任务D");
        });

        System.in.read();

    }
}

结果

执行任务A
执行任务B
执行任务C
执行任务D

8.2.2.6 thenCombine()

三个任务进行拼接。

比如有任务A和任务B并行执行,执行完再执行任务C。

thenCombine和thenApply对齐,只不过thenApply是执行一个任务然后接收一个参数,最后返回结果。而thenCombine执行两个任务接收两个参数,最后返回结果。

 public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }
public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }
public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest07 {
    public static void main(String[] args) throws IOException {

        CompletableFuture<String> taskC = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A");
            return "任务A";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B");
            return "任务B";
        }), (resultA, resultB) -> {
            System.out.println("处理任务A和任务B");
            return "处理完毕," + resultA + "-" + resultB;
        });

        System.out.println(taskC.join());

        System.in.read();

    }
}

结果

任务A
任务B
处理任务A和任务B
处理完毕,任务A-任务B

8.2.2.7 thenAcceptBoth()

比如有任务A和任务B并行执行,执行完再执行任务C。

thenAcceptBoth和thenAccept对齐,只不过thenAccept是执行一个任务然后接收一个参数然后没有返回结果,而thenAcceptBoth执行两个任务接收两个参数没有返回结果。

public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }
public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }
public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor) {
        return biAcceptStage(screenExecutor(executor), other, action);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest08 {
    public static void main(String[] args) throws IOException {

        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A");
            return "任务A";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B");
            return "任务B";
        }), (resultA, resultB) -> {
            System.out.println("处理任务A和任务B");
        });
      
        System.in.read();

    }
}

8.2.2.8 runAfterBoth()

比如有任务A和任务B并行执行,执行完再执行任务C。

runAfterBoth和thenRun对齐,只不过thenAccept是执行一个任务然后不接收参数然后没有返回结果,而thenAcceptBoth执行两个任务不接收参数没有返回结果。

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action) {
        return biRunStage(null, other, action);
    }
 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action) {
        return biRunStage(asyncPool, other, action);
    }
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action,
                                                     Executor executor) {
        return biRunStage(screenExecutor(executor), other, action);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest09 {
    public static void main(String[] args) throws IOException {

        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A");
            return "任务A";
        }).runAfterBoth(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B");
            return "任务B";
        }), () -> {
            System.out.println("处理任务A和任务B");
        });

        System.in.read();

    }
}

8.2.2.9 applyToEither

比如有任务A和任务B并行执行,只要任务A或任务B一个执行完,任务C就会执行。

applyToEither有一个接收参数,同时有返回值。

public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }

public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(asyncPool, other, fn);
    }

public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,
        Executor executor) {
        return orApplyStage(screenExecutor(executor), other, fn);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest10 {
    public static void main(String[] args) throws IOException {

        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A");
            return "任务A";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B");
            return "任务B";
        }), (result) -> {
            System.out.println("处理任务A和任务B,只要有一个执行完即可");
            System.out.println(result);
            return "执行完的任务是"+result;
        });

        System.in.read();

    }
}

结果

任务A
任务B
处理任务A和任务B,只要有一个执行完即可
任务A

8.2.2.10 acceptEither

比如有任务A和任务B并行执行,只要任务A或任务B一个执行完,任务C就会执行。

acceptEither有一个接收参数,同时没有返回值。

public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(null, other, action);
    }

public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(asyncPool, other, action);
    }

public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action,
        Executor executor) {
        return orAcceptStage(screenExecutor(executor), other, action);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest11 {
    public static void main(String[] args) throws IOException {

        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A");
            return "任务A";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B");
            return "任务B";
        }), (result) -> {
            System.out.println("处理任务A和任务B,只要有一个执行完即可");
            System.out.println(result);
        });

        System.in.read();

    }
}

8.2.2.11 runAfterEither

比如有任务A和任务B并行执行,只要任务A或任务B一个执行完,任务C就会执行。

runAfterEither没有接收参数,同时没有返回值。

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action) {
        return orRunStage(null, other, action);
    }

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action) {
        return orRunStage(asyncPool, other, action);
    }

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action,
                                                       Executor executor) {
        return orRunStage(screenExecutor(executor), other, action);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest12 {
    public static void main(String[] args) throws IOException {

        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A");
            return "任务A";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B");
            return "任务B";
        }), () -> {
            System.out.println("处理任务A和任务B,只要有一个执行完即可");
        });

        System.in.read();

    }
}

8.2.2.12 exceptionally

exceptionally这个也是拼接任务的方式。但是只有前面业务出现异常,才会走这个方法。

public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest13 {
    public static void main(String[] args) throws IOException {

        CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A");
            return "任务A";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B");
            return "任务B";
        }), (result) -> {
            System.out.println("首先执行完的是:"+result);
            System.out.println("处理任务A和任务B,只要有一个执行完即可");
            int a=1/0;
            return "首先执行完的是:"+result;
        }).exceptionally((exception)->{
            System.out.println("出现的异常为:"+exception.getMessage());
            return "-1";
        });
      
        System.in.read();

    }
}

结果

任务A
任务B
首先执行完的是:任务A
处理任务A和任务B,只要有一个执行完即可
出现的异常为:java.lang.ArithmeticException: / by zero

8.2.2.13 whenComplete

whenComplete也是异常处理的方法,但是比exceptionally要更丰富一些。

whenComplete可以拿到返回结果,也可以拿到异常,但是没有返回结果。无法捕获异常,但是可以拿到异常返回的结果。

public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }

public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }

public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor) {
        return uniWhenCompleteStage(screenExecutor(executor), action);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest14 {
    public static void main(String[] args) throws IOException {

        CompletableFuture<String> ex = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A");
            int a = 1 / 0;
            return "任务A";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B");
            return "任务B";
        }), (result) -> {
            System.out.println("首先执行完的是:" + result);
            System.out.println("处理任务A和任务B,只要有一个执行完即可");
            return "首先执行完的是:" + result;
        }).whenComplete((result, exception) -> {
            System.out.println("结果为:" + result);
            System.out.println("出现的异常为:" + exception);
        });
      
        // 这里会抛出异常
        System.out.println(ex.join());

        System.in.read();

    }
}

结果

任务A
任务B
结果为:null
出现的异常为:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1584)
	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1574)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
	at com.xqm.juc.future.CompletableFutureTest14.lambda$main$0(CompletableFutureTest14.java:11)
	at com.xqm.juc.future.CompletableFutureTest14$$Lambda$1/1921595561.get(Unknown Source)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582)
	... 5 more

8.2.2.14 handle

handle也是异常处理的方法,但是比exceptionally要更丰富一些。

handle既可以拿到返回结果,也可以拿到异常,也有返回结果。

public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(null, fn);
    }

public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(asyncPool, fn);
    }

public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
        return uniHandleStage(screenExecutor(executor), fn);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest15 {
    public static void main(String[] args) throws IOException {

        CompletableFuture<String> ex = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务A");
            int a = 1 / 0;
            return "任务A";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务B");
            return "任务B";
        }), (result) -> {
            System.out.println("首先执行完的是:" + result);
            System.out.println("处理任务A和任务B,只要有一个执行完即可");
            return "首先执行完的是:" + result;
        }).handle((result, exception) -> {
            System.out.println("结果为:" + result);
            System.out.println("出现的异常为:" + exception);
            return "-1";
        });

        // 这里会抛出异常
        System.out.println(ex.join());

        System.in.read();

    }
}

结果

任务A
任务B
结果为:null
出现的异常为:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
-1

8.2.2.15 allof

allof里面可以有很多任务。当所有任务执行完,才能继续执行后续的任务。

allof是没有返回结果的。

 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest16 {
    public static void main(String[] args) throws IOException {


        CompletableFuture.allOf(
                CompletableFuture.runAsync(()->{
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务A");
                }),
                CompletableFuture.runAsync(()->{
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务B");
                }),
                CompletableFuture.runAsync(()->{
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务C");
                }),
                CompletableFuture.runAsync(()->{
                    try {
                        Thread.sleep(4000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务D");
                })
        ).thenRun(()->{
            System.out.println("任务E开始执行");
        });

        System.in.read();

    }
}

8.2.2.16 anyOf

anyOf是可以有多个任务的,只要有一个任务执行完毕,就可以继续执行后续的任务。anyOf是可以拿到一个返回结果的。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }

测试

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest17 {
    public static void main(String[] args) throws IOException {

        CompletableFuture.anyOf(
                CompletableFuture.supplyAsync(()->{
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务A");
                    return "任务A";
                }),
                CompletableFuture.supplyAsync(()->{
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务B");
                    return "任务B";
                }),
                CompletableFuture.supplyAsync(()->{
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务C");
                    return "任务C";
                })
        ).thenApplyAsync(result->{
            System.out.println("任务E开始执行;"+"先执行完毕的是"+result);
            return "任务E";
        });

        System.in.read();

    }
}
任务A
任务E开始执行;先执行完毕的是任务A
任务B
任务C

8.2.3 源码-runAsync

将任务和CompletableFuture封装到一起,再执行封装好的具体对象的run方法。

runAsync

// CompletableFuture 的runAsync方法
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    // asyncPool是执行任务的线程池
    // runnable:具体任务
    return asyncRunStage(asyncPool, runnable);
}

asyncRunStage

// 内部执行的方法
// e:线程池 f:方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
    // 对任务进行非空校验
    if (f == null) throw new NullPointerException();
    // 构建CompletableFuture作为最后的返回结果
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    // AsyncRun:将任务和CompletableFuture封装在一起
    // 将封装好的任务交给线程池去执行
    e.execute(new AsyncRun(d, f));
    // 返回结果,一个CompletableFuture
    return d;
}

AsyncRun内部类

// 封装任务和CompletableFuture的内部类AsyncRun
static final class AsyncRun extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    // 声明CompletableFuture对象以及任务的成员变量
    CompletableFuture<Void> dep; Runnable fn;
    AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
        // 将传入的属性赋值给成员变量
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    // 执行AsyncRun的exec方法,实际就是执行run方法
    public final boolean exec() { run(); return true; }
    // 当前对象作为任务提交给线程池之后,会执行当前的run方法
    public void run() {
        // 声明局部变量
        CompletableFuture<Void> d; Runnable f;
        // 将成员变量赋值给局部变量,并判断非空
        if ((d = dep) != null && (f = fn) != null) {
            // 将成员变量置为null
            dep = null; fn = null;
            // 如果任务还没执行
            if (d.result == null) {
                try {
                    // 任务进行执行
                    f.run();
                    // 当前方法是针对Runnable的,不能将结果置为null
                    // 要给没有返回结果的Runnable设置一个返回结果
                    d.completeNull();
                } catch (Throwable ex) {
                    // 异常结束
                    d.completeThrowable(ex);
                }
            }
            d.postComplete();
        }
    }
}

封装返回结果为null和异常的

// 给Runnable的CompletableFuture设置返回结果的方式,将null设置为NIL
final boolean completeNull() {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       NIL);
}

// Runnable异常结束,要将异常结果封装给CompletableFuture
final boolean completeThrowable(Throwable x) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       encodeThrowable(x));
}

8.2.4 源码-任务编排方式

如果要在前置任务执行完,执行后置任务的话,有两种方式:

  • 前置任务没有执行完,后置任务会放到栈中
  • 前置任务执行完,后置任务会直接执行,不会放到栈中

如果单独采用runAsync在一个任务后面指定多个后置任务,CompletableFuture无法保证具体执行的顺序,影响执行顺序的是前置任务的执行时间,以及后置任务的编排方式。

一个任务A如果执行时间比较长,那么使用then之后的任务是使用栈来存储的,等到任务A执行完毕之后,从栈中拿取任务执行。

演示:

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest18 {
    public static void main(String[] args) throws IOException {

        CompletableFuture<Void> taskA = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务A");
        });

        taskA.thenRun(()->{
            System.out.println("任务B");
        });
        taskA.thenRun(()->{
            System.out.println("任务C");
        });
        taskA.thenRun(()->{
            System.out.println("任务D");
        });

        System.in.read();

    }
}

结果

任务A
任务D
任务C
任务B

但是如果任务A很快就执行完了,那么之后使用then执行的任务就会直接拿来执行,不会放入到栈中。

演示:

package com.xqm.juc.future;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public class CompletableFutureTest18 {
    public static void main(String[] args) throws IOException {

        CompletableFuture<Void> taskA = CompletableFuture.runAsync(() -> {
            System.out.println("任务A");
        });

        taskA.thenRun(()->{
            System.out.println("任务B");
        });
        taskA.thenRun(()->{
            System.out.println("任务C");
        });
        taskA.thenRun(()->{
            System.out.println("任务D");
        });

        System.in.read();

    }
}

结果

任务A
任务B
任务C
任务D

8.2.5 源码-thenRun

// 编排任务,前置任务执行完,执行后置任务
public CompletableFuture<Void> thenRun(Runnable action) {
    // null是线程池
    // action是后置任务
    return uniRunStage(null, action);
}

// 内部任务编排方法
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
    // 判断任务非空
    if (f == null) throw new NullPointerException();
    // 声明CompletableFuture对象,绑定后置任务
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    // 如果线程池不为null,代表异步执行,尝试将任务压入栈
    // 如果线程池为null,代表同步执行,先基于uniRun,看任务能否执行,能执行就直接返回结果
    if (e != null || !d.uniRun(this, f, null)) {
        // 如果传入线程池,或者没有传入线程池但是不能执行任务,走内部逻辑
        // e是线程池
        // d是CompletableFuture
        // this是前继任务的CompletableFuture
        // f是后继任务
        UniRun<T> c = new UniRun<T>(e, d, this, f);
        // 将封装好的任务push进入栈中
        // 只要前置任务没结束,就将后置任务放入栈中
        // 放入栈中可能会失败
        push(c);
        // 无论压栈成功还是失败,都会执行tryFire方法
        // 尝试执行一下任务
        c.tryFire(SYNC);
    }
    // 无论任务执行完毕还是没执行,都要返回后置任务的CompletableFuture
    return d;
}

8.2.6 源码-前置任务处理完毕,后置任务之间处理

// 同步
static final int SYNC   =  0;
// 异步
static final int ASYNC  =  1;
// 嵌套
static final int NESTED = -1;

UniRun内部类以及tryFire方法

// 后置任务无论压栈成功还是失败,都会执行tryFire方法
static final class UniRun<T> extends UniCompletion<T, Void> {
    Runnable fn;
    // executor:线程池
    // dep:后置任务的CompletableFuture
    // src:前置任务的CompletableFuture
    // fn:后置任务
    UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn) {
        super(executor, dep, src); this.fn = fn;
    }
    final CompletableFuture<Void> tryFire(int mode) {
        // 声明局部变量
        CompletableFuture<Void> d; CompletableFuture<T> a;
        // 赋值局部变量,并进行健壮性校验
        if ((d = dep) == null ||
                // 调用uniRun
                // a:前置任务的CompletableFuture
                // fn:后置任务
                // mode:在这mode为0,代表没有传入线程池,是同步的方式SYNC
                // 传入的是this,也就是UniRun
                !d.uniRun(a = src, fn, mode > 0 ? null : this))
            // 到这,代表任务没结束
            return null;
        dep = null; src = null; fn = null;
        return d.postFire(a, mode);
    }
}

uniRun方法

// 是否要主动执行任务
// a:前置任务的CompletableFuture
// f:后置任务
// c:传入封装的UniRun
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
    // 方法要么正常结束,要么异常结束的结果的局部变量
    Object r; Throwable x;
    // 健壮性校验
    if (a == null || (r = a.result) == null || f == null)
        // 代表任务没结束
        return false;
    // 查看后置任务的返回结果,如果为null,代表任务没结束
    if (result == null) {
        // 如果前置任务的结果是异常结束,那么直接封装异常结果
        if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
            completeThrowable(x, r);
        else
            // 如果前置任务正常结束,后置任务正常执行
            try {
                // 如果是基于tryFire(SYNC)进来,这里的c不为null,执行c.claim
                // 如果是没有传递线程池,这里的c就是null
                if (c != null && !c.claim())
                    return false;
                // 如果没有基于线程池运行,这里就是同步执行,直接run
                f.run();
                // 封装结果
                completeNull();
            } catch (Throwable ex) {
                // 封装异常结果
                completeThrowable(ex);
            }
    }
    // 如果后置任务执行结束
    return true;
}

线程池异步处理任务claim

// 异步线程池处理任务
final boolean claim() {
    Executor e = executor;
    if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
        // e是线程池,只要线程池对象不为null
        if (e == null)
            return true;
        executor = null; // disable
        // 基于线程池的execute去执行任务
        e.execute(this);
    }
    return false;
}

8.2.7 源码-前置任务未处理完毕,后置任务基于栈处理

前置任务结束,遍历栈去执行后置任务。

// 前置任务结束,遍历栈去执行后置任务。
final void postComplete() {
    // f:前置任务的CompletableFuture
    // h:存储后置任务的栈
    CompletableFuture<?> f = this; Completion h;
    // 赋值并且进行健壮性校验,确保栈中有数据
    while ((h = f.stack) != null ||
            // 循环一次后,对后续节点的赋值以及健壮性判断,确保栈中有数据
            (f != this && (h = (f = this).stack) != null)) {
        // 局部变量
        // t:当前栈中任务的后续
        CompletableFuture<?> d; Completion t;
        // 拿到之前的栈顶h之后,将栈顶切换到下一个数据
        if (f.casStack(h, t = h.next)) {
            // 如果还有任务
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            // 执行tryFire
            // NESTED:-1
            // A嵌套了B+C,B嵌套了D+E,A执行完之后,运行B,B会运行D和E
            // 进行递归运行任务
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

8.2.8 任务执行流程

image.png