八:异步编程
8.1 FutureTask
8.1.1 介绍
FutureTask是一个可以取消异步计算的类。FutureTask是对Future做的一个基本实现,可以调用方法开始和取消一个任务。一般配合Callable去使用。
异步任务启动后,可以获取一个绑定当前异步任务的FutureTask。
可以基于FutureTask的方法区取消任务、查看任务是否结束、获取任务的返回结果。
FutureTask的整体结构中实现了一个RunnableFuture接口,而RunnableFuture接口继承了Runnanle和Future接口。所以FutureTask也可以直接作为任务交给线程池去处理。
8.1.2 应用
FutureTask有两种有参构造。
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}
一般使用Callable,因为有返回结果。
FutureTask对任务的控制:
- 任务执行过程中对状态的控制
- 任务执行完毕之后,对结果的控制
FutureTask的任务在执行run方法之后,是无法再次被运行,需要使用protected类型的runAndReset()方法才可以。
package com.xqm.juc.future;
import java.util.concurrent.*;
public class FutureTask01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.构建FutureTask,基于泛型执行返回结果类型
// 两种有参构造
FutureTask futureTask=new FutureTask(new Callable() {
@Override
public String call() throws Exception {
System.out.println("使用Callable有参构造开始任务");
TimeUnit.SECONDS.sleep(2);
System.out.println("任务执行完毕");
return "ok";
}
});
// 2.构建线程池
ExecutorService threadPool=Executors.newFixedThreadPool(2);
// 3.执行任务
threadPool.execute(futureTask);
// run方法,futureTask提供了run方法,
// 但是一般不会自己去调用,而是线程池执行任务时,由线程池去执行run方法
// run方法在执行时,是有任务状态的,任务执行后,再次执行run方法时无效的,任务只能执行一次
Thread.sleep(500);
futureTask.run();
// 如果希望任务被反复执行,需要调用runAndReset()方法
// 获取任务的执行结果,类似阻塞队列的take()方法,会一直死等,直到任务结束,然后执行下面的任务
// System.out.println(futureTask.get());
// 也可以指定阻塞时间,时间到就不等了,会直接抛出TimeoutException异常
try {
Object o = futureTask.get(1, TimeUnit.SECONDS);
System.out.println(o);
} catch (TimeoutException e) {
// e.printStackTrace();
System.out.println("异常终止了");
}
// 判断任务是否结束
System.out.println("任务执行结束了吗?"+futureTask.isDone());
threadPool.shutdown();
}
}
8.1.3 源码-状态
清楚任务的流转状态是什么样的,其次要知道核心属性是干嘛用的。
// FutureTask的核心属性
/**
<!-- 任务状态的流转 -->
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL (任务正常执行,并且返回结果也正常返回)
* NEW -> COMPLETING -> EXCEPTIONAL (任务正常执行,但是结果是异常)
* NEW -> CANCELLED (任务被取消)
* NEW -> INTERRUPTING -> INTERRUPTED (任务被中断)
*/
// 记录任务的状态
private volatile int state;
// 任务被构建之后的初始状态
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 需要执行的任务,会被赋值到这个属性
private Callable<V> callable;
// 任务的返回结果,存储到这个属性
private Object outcome; // non-volatile, protected by state reads/writes
// 执行任务的线程
private volatile Thread runner;
// 等待结果的线程Node对象
private volatile WaitNode waiters;
8.1.4 源码-构造函数
一共有两种构造函数。
其实构造函数就是给类的属性赋值,一个是任务,一个是状态赋值为NEW.
public FutureTask(Callable<V> callable) {
// 健壮性校验
if (callable == null)
throw new NullPointerException();
// 给任务赋值
this.callable = callable;
// 设置初始状态为NEW
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
// 将Runnable任务封装成Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
8.1.5 源码-run方法
任务执行前的一些判断,调用方法进行封装结果,以及发生异常做出的后续处理。
// FutureTask的run方法
public void run() {
// 如果当前任务的状态不是NEW,直接Return退出
if (state != NEW ||
// 如果状态正确,是NEW,这里需要基于CAS将runner属性设置为当前线程
// 如果CAS失败,代表有线程已经拿到这个任务执行了,直接return
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 将要执行的任务拿到
Callable<V> c = callable;
// 健壮性校验,或者防止某些方式(比如反射)将任务赋值为null
// 再次判断任务的状态(DCL)
if (c != null && state == NEW) {
// 执行任务
// result:任务的返回结果
// ran:如果为true,任务正常结束
// ran:如果为false,任务异常结束
V result;
boolean ran;
try {
// 执行任务
result = c.call();
// 任务正常结束
ran = true;
} catch (Throwable ex) {
// 发生异常
// 结果置为null
result = null;
// 置为false
ran = false;
// 封装异常结果
setException(ex);
}
if (ran)
// 封装正常结果
set(result);
}
//到这任务结束,正常或者异常
} finally {
// 将执行任务的线程置为null
runner = null;
// 拿到任务的状态
int s = state;
// INTERRUPTING:5
// 判断线程是否被中断
if (s >= INTERRUPTING)
// 代表任务中断,做一些处理
handlePossibleCancellationInterrupt(s);
}
}
8.1.6 源码-set&&setException
任务执行完毕后,修改任务的状态以及封装任务的状态。
set
// 任务正常结束返回结果
protected void set(V v) {
// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将任务的结果赋值给outcome
outcome = v;
// 将任务的状态变为NORMAL,正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 任务结束之后做的操作
finishCompletion();
}
}
setException
// 任务发生异常返回的结果
protected void setException(Throwable t) {
// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将异常信息赋给outcome
outcome = t;
// 将任务的状态变为EXCEPTIONAL,异常结束
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
8.1.7 源码-cancle方法
任务取消有两种,设置参数为true代表可以中断,设置为false代表不能被中断。
// 任务取消,mayInterruptIfRunning是否需要中断
// NEW -> CANCELLED (任务被取消,不能被中断)
// NEW -> INTERRUPTING -> INTERRUPTED (任务被取消,可以中断)
public boolean cancel(boolean mayInterruptIfRunning) {
// 任务正在执行中,也是NEW状态
// 查看任务的状态是否是NEW,基于传入的参数mayInterruptIfRunning
// 决定任务的状态是直接从NEW修改为INTERRUPTING或者为CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
// 如果为true
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// 修改失败,返回false
return false;
try {
// mayInterruptIfRunning为true,代表需要中断线程
if (mayInterruptIfRunning) {
try {
// 拿到任务线程
Thread t = runner;
// 如果线程不为null,直接中断
if (t != null)
t.interrupt();
} finally { // final state
// 价格任务状态设为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
8.1.8 源码-get方法
这是线程获取FutureTask任务执行结果的方法。
get方法
public V get() throws InterruptedException, ExecutionException {
// 获取任务的状态
int s = state;
// 小于COMPLETING只有两种
// 一种是NEW,任务还没执行完
// 一种是COMPLETING,任务执行完,但是结果还没封装好
if (s <= COMPLETING)
// 让当前线程阻塞等待结果
s = awaitDone(false, 0L);
// 最终返回结果
return report(s);
}
awaitDone方法
// 线程等待获取任务结果的方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 获取任务需要等待的时间
// 如果是get无参方法,就死等
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 声明Node对象
WaitNode q = null;
// queued代表节点是否放到链表中
boolean queued = false;
// 死循环
for (;;) {
// 查询线程是否中断,如果中断,就从链表中移除,抛出异常
if (Thread.interrupted()) {
// 从链表中移除这个节点
removeWaiter(q);
// 抛出中断异常
throw new InterruptedException();
}
// 如果线程没中断,获取线程的状态
int s = state;
// 到这,说明任务结束了
if (s > COMPLETING) {
// 如果之前封装了waitNode,现在则要清空,如果没有,就返回状态
if (q != null)
q.thread = null;
return s;
// 如果任务状态是COMPLETING,因为从COMPLETING状态到下一个状态速度很快,所以只需要
// 这个线程暂停一下,等待就行
} else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 如果还没初始化waitNode,就初始化waitNode
else if (q == null)
q = new WaitNode();
// 如果节点没放到队列中,就放到队列的前面
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果timed为true,代表有等待时间
else if (timed) {
// 判断是否还有时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 如果没有时间,就移除等待
removeWaiter(q);
// 返回任务的状态
return state;
}
// 等待
LockSupport.parkNanos(this, nanos);
} else
// 死等
LockSupport.park(this);
}
}
report方法
// 根据任务的状态,获取任务的结果
private V report(int s) throws ExecutionException {
// 获取结果
Object x = outcome;
if (s == NORMAL)
// 如果是正常结束,返回结果
return (V)x;
// 如果是取消,就抛出取消异常
if (s >= CANCELLED)
throw new CancellationException();
// 否则异常结束,抛出其他异常
throw new ExecutionException((Throwable)x);
}
8.1.9 源码-finishCompletion方法
只要任务结束,无论是正常返回,还是异常返回,还是任务被取消,都会执行这个方法。
这个方法就是唤醒那些执行get方法等待任务结果的线程。
// 任务结束后触发这个方法,添加到队列中是每次添加在头部
private void finishCompletion() {
// 任务结束后,需要循环唤醒队列中的线程
for (WaitNode q; (q = waiters) != null;) {
// 以CAS的方式将waitNode置为null,循环里已经拿到引用
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 死循环
for (;;) {
// 拿到Node中的线程
Thread t = q.thread;
// 如果线程不为null
if (t != null) {
// 置为null,帮助gc
q.thread = null;
// 唤醒这个线程
LockSupport.unpark(t);
}
// 拿到当前Node的next
WaitNode next = q.next;
// 如果next为null,代表已经全部唤醒,跳出循环
if (next == null)
break;
// 将next置为null
q.next = null; // unlink to help gc
// q的引用指向next
q = next;
}
break;
}
}
// 任务结束后,预留的接口,可以基于这个扩展方法,记录一些信息
done();
// 任务执行完,将任务置为null
callable = null; // to reduce footprint
}
8.2 CompletableFuture
8.2.1 介绍
平时多线程开发一般就是使用Runnbale、Thread、Callable、FutureTask、ThreadPoolExecutor等。相对来说成本都不高,使用熟悉就行。但是这些有时候很难完成异步编程的任务。
Thread+Runnable:执行异步任务,但是没有返回结果。
Thread+Callable+FutureTask:执行一个有返回结果的异步任务。但是有两个问题:
- 获取返回结果,如果是基于get方法返回,线程需要挂起在waitNode里。
- 获取返回结果,也可以基于isDone判断任务的状态,但是需要不断轮询。
如果业务的逻辑比较复杂,可能需要业务导出阻塞等待,或者需要大量的任务线程去执行一些业务逻辑,开发成本比较高。
CompletableFuture就是帮助处理任务之间的处理逻辑。编排好任务的执行方式后,任务会按照规划好的方式去一步一步执行,不需要让业务线程去频繁的等待。
8.2.2 应用
CompletableFuture提供了三个函数式编程。
Supplier<T>:无参有返回值
Consumer<T>:有参无返回值
Function<T,R>:有参有返回值
8.2.2.1 supplyAsync()
方法:有两个重载方法,一个使用默认线程池,另一个使用自定义线程池。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
supplyAsync()如果不提供线程池的话,默认使用ForkJoinPool线程池,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。
获取结果使用的是get()方法和join()方法,两者不同之处在于join()会帮你处理好异常,而get()需要自己处理异常。
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureTest {
public static void main(String[] args) throws IOException {
// supplyAsync()如果不提供线程池的话,默认使用ForkJoinPool线程池,而ForkJoinPool内部是守护线程
// 如果main线程结束了,守护线程会跟着一起结束
CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务开始执行");
System.out.println("异步任务执行结束");
return "返回结果";
});
// 获取任务结果,不需要处理异常
String result1 = firstTask.join();
// 获取任务结果,但是要自己处理异常
try {
String result2 = firstTask.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// main线程进行阻塞,为了给CompletableFuture执行任务
System.in.read();
}
}
8.2.2.2 runAsync()
方法:有两个重载方法,一个使用默认线程池,另一个使用自定义线程池。如果不提供线程池的话,默认使用ForkJoinPool线程池,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
runAsync()方法既没有参数,也不会返回结果,是非常基础的任务编排方式。
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureTest01 {
public static void main(String[] args) throws IOException {
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
System.out.println("开始任务");
System.out.println("任务结束");
});
// main线程进行阻塞,为了给CompletableFuture执行任务
System.in.read();
}
}
8.2.2.3 thenApply()
有任务A,然后任务A执行完毕之后执行任务B,并且任务B需要任务A的返回结果,任务B也有自身的返回结果。
一个thenApply和两个重载的thenApplyAsync方法。使用thenApply会继续使用执行任务A的那个线程去执行任务B,使用thenApplyAsync时可能会换个线程去执行任务B。thenApplyAsync也可以指定线程池去运行。
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
测试:
package com.xqm.juc.future;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest02 {
public static void main(String[] args) throws IOException {
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务A");
String uuid = UUID.randomUUID().toString();
System.out.println("任务A的UUID为:" + uuid);
return uuid;
});
CompletableFuture<String> taskB = taskA.thenApply(result -> {
System.out.println("任务B获取任务A的结果为:" + result);
result = result.replace("-", "");
return result;
});
String taskBResult = taskB.join();
System.out.println("任务B的执行结果为:"+taskBResult);
System.in.read();
}
}
结果:
执行任务A
任务A的UUID为:1d06e480-63c1-424e-b342-766e819b5790
任务B获取任务A的结果为:1d06e480-63c1-424e-b342-766e819b5790
任务B的执行结果为:1d06e48063c1424eb342766e819b5790
如果不需要中间结果的话:
package com.xqm.juc.future;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest03 {
public static void main(String[] args) throws IOException {
CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务A");
String uuid = UUID.randomUUID().toString();
System.out.println("任务A的UUID为:" + uuid);
return uuid;
}).thenApply(result -> {
System.out.println("任务B获取任务A的结果为:" + result);
result = result.replace("-", "");
return result;
});
String taskBResult = taskB.join();
System.out.println("任务B的执行结果为:"+taskBResult);
System.in.read();
}
}
也可以使用自定义线程池:
package com.xqm.juc.future;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest04 {
public static void main(String[] args) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务A");
String uuid = UUID.randomUUID().toString();
System.out.println("任务A的UUID为:" + uuid + "," + Thread.currentThread().getName());
return uuid;
}).thenApplyAsync(result -> {
System.out.println("任务B获取任务A的结果为:" + result+ "," + Thread.currentThread().getName());
result = result.replace("-", "");
return result;
// 自定义线程池
},executorService);
String taskBResult = taskB.join();
System.out.println("任务B的执行结果为:" + taskBResult);
System.in.read();
}
}
结果:
执行任务A
任务A的UUID为:9ae49668-9ac9-4f4f-ba3d-9ab965629a68,ForkJoinPool.commonPool-worker-9
任务B获取任务A的结果为:9ae49668-9ac9-4f4f-ba3d-9ab965629a68,pool-1-thread-1
任务B的执行结果为:9ae496689ac94f4fba3d9ab965629a68
8.2.2.4 thenAccept()
thenAccept()方法和thenApply方法类似,都是拼接任务A到任务B。区别就是thenAccept没有返回结果。
前置任务需要有返回结果,后置任务接收前置任务的结果,后置任务没有返回结果。
thenAccept同样也是有三个方法。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest05 {
public static void main(String[] args) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务A");
return "taskA";
}).thenAcceptAsync(result -> {
System.out.println("执行任务B,拿到结果:" + result);
},executorService);
System.in.read();
}
}
结果
执行任务A
执行任务B,拿到结果:taskA
8.2.2.5 thenRun()
thenRun()方法和thenApply方法类似,都是拼接任务A到任务B。区别就是thenRun既没有接收参数,也没有返回结果。
前置任务没有返回结果,后置任务不接受前置任务结果,后置任务也没有返回结果。
同样有三个函数
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest06 {
public static void main(String[] args) throws IOException {
CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务A");
return "taskA";
}).thenRun(() -> {
System.out.println("执行任务B");
});
CompletableFuture.runAsync(() -> {
System.out.println("执行任务C");
}).thenRun(() -> {
System.out.println("执行任务D");
});
System.in.read();
}
}
结果
执行任务A
执行任务B
执行任务C
执行任务D
8.2.2.6 thenCombine()
三个任务进行拼接。
比如有任务A和任务B并行执行,执行完再执行任务C。
thenCombine和thenApply对齐,只不过thenApply是执行一个任务然后接收一个参数,最后返回结果。而thenCombine执行两个任务接收两个参数,最后返回结果。
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest07 {
public static void main(String[] args) throws IOException {
CompletableFuture<String> taskC = CompletableFuture.supplyAsync(() -> {
System.out.println("任务A");
return "任务A";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println("任务B");
return "任务B";
}), (resultA, resultB) -> {
System.out.println("处理任务A和任务B");
return "处理完毕," + resultA + "-" + resultB;
});
System.out.println(taskC.join());
System.in.read();
}
}
结果
任务A
任务B
处理任务A和任务B
处理完毕,任务A-任务B
8.2.2.7 thenAcceptBoth()
比如有任务A和任务B并行执行,执行完再执行任务C。
thenAcceptBoth和thenAccept对齐,只不过thenAccept是执行一个任务然后接收一个参数然后没有返回结果,而thenAcceptBoth执行两个任务接收两个参数没有返回结果。
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest08 {
public static void main(String[] args) throws IOException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务A");
return "任务A";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
System.out.println("任务B");
return "任务B";
}), (resultA, resultB) -> {
System.out.println("处理任务A和任务B");
});
System.in.read();
}
}
8.2.2.8 runAfterBoth()
比如有任务A和任务B并行执行,执行完再执行任务C。
runAfterBoth和thenRun对齐,只不过thenAccept是执行一个任务然后不接收参数然后没有返回结果,而thenAcceptBoth执行两个任务不接收参数没有返回结果。
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest09 {
public static void main(String[] args) throws IOException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务A");
return "任务A";
}).runAfterBoth(CompletableFuture.supplyAsync(() -> {
System.out.println("任务B");
return "任务B";
}), () -> {
System.out.println("处理任务A和任务B");
});
System.in.read();
}
}
8.2.2.9 applyToEither
比如有任务A和任务B并行执行,只要任务A或任务B一个执行完,任务C就会执行。
applyToEither有一个接收参数,同时有返回值。
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest10 {
public static void main(String[] args) throws IOException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务A");
return "任务A";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println("任务B");
return "任务B";
}), (result) -> {
System.out.println("处理任务A和任务B,只要有一个执行完即可");
System.out.println(result);
return "执行完的任务是"+result;
});
System.in.read();
}
}
结果
任务A
任务B
处理任务A和任务B,只要有一个执行完即可
任务A
8.2.2.10 acceptEither
比如有任务A和任务B并行执行,只要任务A或任务B一个执行完,任务C就会执行。
acceptEither有一个接收参数,同时没有返回值。
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(asyncPool, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest11 {
public static void main(String[] args) throws IOException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务A");
return "任务A";
}).acceptEither(CompletableFuture.supplyAsync(() -> {
System.out.println("任务B");
return "任务B";
}), (result) -> {
System.out.println("处理任务A和任务B,只要有一个执行完即可");
System.out.println(result);
});
System.in.read();
}
}
8.2.2.11 runAfterEither
比如有任务A和任务B并行执行,只要任务A或任务B一个执行完,任务C就会执行。
runAfterEither没有接收参数,同时没有返回值。
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action) {
return orRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action) {
return orRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest12 {
public static void main(String[] args) throws IOException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务A");
return "任务A";
}).runAfterEither(CompletableFuture.supplyAsync(() -> {
System.out.println("任务B");
return "任务B";
}), () -> {
System.out.println("处理任务A和任务B,只要有一个执行完即可");
});
System.in.read();
}
}
8.2.2.12 exceptionally
exceptionally这个也是拼接任务的方式。但是只有前面业务出现异常,才会走这个方法。
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest13 {
public static void main(String[] args) throws IOException {
CompletableFuture.supplyAsync(() -> {
System.out.println("任务A");
return "任务A";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println("任务B");
return "任务B";
}), (result) -> {
System.out.println("首先执行完的是:"+result);
System.out.println("处理任务A和任务B,只要有一个执行完即可");
int a=1/0;
return "首先执行完的是:"+result;
}).exceptionally((exception)->{
System.out.println("出现的异常为:"+exception.getMessage());
return "-1";
});
System.in.read();
}
}
结果
任务A
任务B
首先执行完的是:任务A
处理任务A和任务B,只要有一个执行完即可
出现的异常为:java.lang.ArithmeticException: / by zero
8.2.2.13 whenComplete
whenComplete也是异常处理的方法,但是比exceptionally要更丰富一些。
whenComplete可以拿到返回结果,也可以拿到异常,但是没有返回结果。无法捕获异常,但是可以拿到异常返回的结果。
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest14 {
public static void main(String[] args) throws IOException {
CompletableFuture<String> ex = CompletableFuture.supplyAsync(() -> {
System.out.println("任务A");
int a = 1 / 0;
return "任务A";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println("任务B");
return "任务B";
}), (result) -> {
System.out.println("首先执行完的是:" + result);
System.out.println("处理任务A和任务B,只要有一个执行完即可");
return "首先执行完的是:" + result;
}).whenComplete((result, exception) -> {
System.out.println("结果为:" + result);
System.out.println("出现的异常为:" + exception);
});
// 这里会抛出异常
System.out.println(ex.join());
System.in.read();
}
}
结果
任务A
任务B
结果为:null
出现的异常为:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1584)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1574)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
at com.xqm.juc.future.CompletableFutureTest14.lambda$main$0(CompletableFutureTest14.java:11)
at com.xqm.juc.future.CompletableFutureTest14$$Lambda$1/1921595561.get(Unknown Source)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582)
... 5 more
8.2.2.14 handle
handle也是异常处理的方法,但是比exceptionally要更丰富一些。
handle既可以拿到返回结果,也可以拿到异常,也有返回结果。
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest15 {
public static void main(String[] args) throws IOException {
CompletableFuture<String> ex = CompletableFuture.supplyAsync(() -> {
System.out.println("任务A");
int a = 1 / 0;
return "任务A";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println("任务B");
return "任务B";
}), (result) -> {
System.out.println("首先执行完的是:" + result);
System.out.println("处理任务A和任务B,只要有一个执行完即可");
return "首先执行完的是:" + result;
}).handle((result, exception) -> {
System.out.println("结果为:" + result);
System.out.println("出现的异常为:" + exception);
return "-1";
});
// 这里会抛出异常
System.out.println(ex.join());
System.in.read();
}
}
结果
任务A
任务B
结果为:null
出现的异常为:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
-1
8.2.2.15 allof
allof里面可以有很多任务。当所有任务执行完,才能继续执行后续的任务。
allof是没有返回结果的。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest16 {
public static void main(String[] args) throws IOException {
CompletableFuture.allOf(
CompletableFuture.runAsync(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务A");
}),
CompletableFuture.runAsync(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务B");
}),
CompletableFuture.runAsync(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务C");
}),
CompletableFuture.runAsync(()->{
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务D");
})
).thenRun(()->{
System.out.println("任务E开始执行");
});
System.in.read();
}
}
8.2.2.16 anyOf
anyOf是可以有多个任务的,只要有一个任务执行完毕,就可以继续执行后续的任务。anyOf是可以拿到一个返回结果的。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
测试
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest17 {
public static void main(String[] args) throws IOException {
CompletableFuture.anyOf(
CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务A");
return "任务A";
}),
CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务B");
return "任务B";
}),
CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务C");
return "任务C";
})
).thenApplyAsync(result->{
System.out.println("任务E开始执行;"+"先执行完毕的是"+result);
return "任务E";
});
System.in.read();
}
}
任务A
任务E开始执行;先执行完毕的是任务A
任务B
任务C
8.2.3 源码-runAsync
将任务和CompletableFuture封装到一起,再执行封装好的具体对象的run方法。
runAsync
// CompletableFuture 的runAsync方法
public static CompletableFuture<Void> runAsync(Runnable runnable) {
// asyncPool是执行任务的线程池
// runnable:具体任务
return asyncRunStage(asyncPool, runnable);
}
asyncRunStage
// 内部执行的方法
// e:线程池 f:方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
// 对任务进行非空校验
if (f == null) throw new NullPointerException();
// 构建CompletableFuture作为最后的返回结果
CompletableFuture<Void> d = new CompletableFuture<Void>();
// AsyncRun:将任务和CompletableFuture封装在一起
// 将封装好的任务交给线程池去执行
e.execute(new AsyncRun(d, f));
// 返回结果,一个CompletableFuture
return d;
}
AsyncRun内部类
// 封装任务和CompletableFuture的内部类AsyncRun
static final class AsyncRun extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
// 声明CompletableFuture对象以及任务的成员变量
CompletableFuture<Void> dep; Runnable fn;
AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
// 将传入的属性赋值给成员变量
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
// 执行AsyncRun的exec方法,实际就是执行run方法
public final boolean exec() { run(); return true; }
// 当前对象作为任务提交给线程池之后,会执行当前的run方法
public void run() {
// 声明局部变量
CompletableFuture<Void> d; Runnable f;
// 将成员变量赋值给局部变量,并判断非空
if ((d = dep) != null && (f = fn) != null) {
// 将成员变量置为null
dep = null; fn = null;
// 如果任务还没执行
if (d.result == null) {
try {
// 任务进行执行
f.run();
// 当前方法是针对Runnable的,不能将结果置为null
// 要给没有返回结果的Runnable设置一个返回结果
d.completeNull();
} catch (Throwable ex) {
// 异常结束
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
封装返回结果为null和异常的
// 给Runnable的CompletableFuture设置返回结果的方式,将null设置为NIL
final boolean completeNull() {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
NIL);
}
// Runnable异常结束,要将异常结果封装给CompletableFuture
final boolean completeThrowable(Throwable x) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
encodeThrowable(x));
}
8.2.4 源码-任务编排方式
如果要在前置任务执行完,执行后置任务的话,有两种方式:
- 前置任务没有执行完,后置任务会放到栈中
- 前置任务执行完,后置任务会直接执行,不会放到栈中
如果单独采用runAsync在一个任务后面指定多个后置任务,CompletableFuture无法保证具体执行的顺序,影响执行顺序的是前置任务的执行时间,以及后置任务的编排方式。
一个任务A如果执行时间比较长,那么使用then之后的任务是使用栈来存储的,等到任务A执行完毕之后,从栈中拿取任务执行。
演示:
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest18 {
public static void main(String[] args) throws IOException {
CompletableFuture<Void> taskA = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务A");
});
taskA.thenRun(()->{
System.out.println("任务B");
});
taskA.thenRun(()->{
System.out.println("任务C");
});
taskA.thenRun(()->{
System.out.println("任务D");
});
System.in.read();
}
}
结果
任务A
任务D
任务C
任务B
但是如果任务A很快就执行完了,那么之后使用then执行的任务就会直接拿来执行,不会放入到栈中。
演示:
package com.xqm.juc.future;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureTest18 {
public static void main(String[] args) throws IOException {
CompletableFuture<Void> taskA = CompletableFuture.runAsync(() -> {
System.out.println("任务A");
});
taskA.thenRun(()->{
System.out.println("任务B");
});
taskA.thenRun(()->{
System.out.println("任务C");
});
taskA.thenRun(()->{
System.out.println("任务D");
});
System.in.read();
}
}
结果
任务A
任务B
任务C
任务D
8.2.5 源码-thenRun
// 编排任务,前置任务执行完,执行后置任务
public CompletableFuture<Void> thenRun(Runnable action) {
// null是线程池
// action是后置任务
return uniRunStage(null, action);
}
// 内部任务编排方法
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
// 判断任务非空
if (f == null) throw new NullPointerException();
// 声明CompletableFuture对象,绑定后置任务
CompletableFuture<Void> d = new CompletableFuture<Void>();
// 如果线程池不为null,代表异步执行,尝试将任务压入栈
// 如果线程池为null,代表同步执行,先基于uniRun,看任务能否执行,能执行就直接返回结果
if (e != null || !d.uniRun(this, f, null)) {
// 如果传入线程池,或者没有传入线程池但是不能执行任务,走内部逻辑
// e是线程池
// d是CompletableFuture
// this是前继任务的CompletableFuture
// f是后继任务
UniRun<T> c = new UniRun<T>(e, d, this, f);
// 将封装好的任务push进入栈中
// 只要前置任务没结束,就将后置任务放入栈中
// 放入栈中可能会失败
push(c);
// 无论压栈成功还是失败,都会执行tryFire方法
// 尝试执行一下任务
c.tryFire(SYNC);
}
// 无论任务执行完毕还是没执行,都要返回后置任务的CompletableFuture
return d;
}
8.2.6 源码-前置任务处理完毕,后置任务之间处理
// 同步
static final int SYNC = 0;
// 异步
static final int ASYNC = 1;
// 嵌套
static final int NESTED = -1;
UniRun内部类以及tryFire方法
// 后置任务无论压栈成功还是失败,都会执行tryFire方法
static final class UniRun<T> extends UniCompletion<T, Void> {
Runnable fn;
// executor:线程池
// dep:后置任务的CompletableFuture
// src:前置任务的CompletableFuture
// fn:后置任务
UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
// 声明局部变量
CompletableFuture<Void> d; CompletableFuture<T> a;
// 赋值局部变量,并进行健壮性校验
if ((d = dep) == null ||
// 调用uniRun
// a:前置任务的CompletableFuture
// fn:后置任务
// mode:在这mode为0,代表没有传入线程池,是同步的方式SYNC
// 传入的是this,也就是UniRun
!d.uniRun(a = src, fn, mode > 0 ? null : this))
// 到这,代表任务没结束
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
uniRun方法
// 是否要主动执行任务
// a:前置任务的CompletableFuture
// f:后置任务
// c:传入封装的UniRun
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
// 方法要么正常结束,要么异常结束的结果的局部变量
Object r; Throwable x;
// 健壮性校验
if (a == null || (r = a.result) == null || f == null)
// 代表任务没结束
return false;
// 查看后置任务的返回结果,如果为null,代表任务没结束
if (result == null) {
// 如果前置任务的结果是异常结束,那么直接封装异常结果
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeThrowable(x, r);
else
// 如果前置任务正常结束,后置任务正常执行
try {
// 如果是基于tryFire(SYNC)进来,这里的c不为null,执行c.claim
// 如果是没有传递线程池,这里的c就是null
if (c != null && !c.claim())
return false;
// 如果没有基于线程池运行,这里就是同步执行,直接run
f.run();
// 封装结果
completeNull();
} catch (Throwable ex) {
// 封装异常结果
completeThrowable(ex);
}
}
// 如果后置任务执行结束
return true;
}
线程池异步处理任务claim
// 异步线程池处理任务
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
// e是线程池,只要线程池对象不为null
if (e == null)
return true;
executor = null; // disable
// 基于线程池的execute去执行任务
e.execute(this);
}
return false;
}
8.2.7 源码-前置任务未处理完毕,后置任务基于栈处理
前置任务结束,遍历栈去执行后置任务。
// 前置任务结束,遍历栈去执行后置任务。
final void postComplete() {
// f:前置任务的CompletableFuture
// h:存储后置任务的栈
CompletableFuture<?> f = this; Completion h;
// 赋值并且进行健壮性校验,确保栈中有数据
while ((h = f.stack) != null ||
// 循环一次后,对后续节点的赋值以及健壮性判断,确保栈中有数据
(f != this && (h = (f = this).stack) != null)) {
// 局部变量
// t:当前栈中任务的后续
CompletableFuture<?> d; Completion t;
// 拿到之前的栈顶h之后,将栈顶切换到下一个数据
if (f.casStack(h, t = h.next)) {
// 如果还有任务
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
// 执行tryFire
// NESTED:-1
// A嵌套了B+C,B嵌套了D+E,A执行完之后,运行B,B会运行D和E
// 进行递归运行任务
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}