概述 CompletableFuture 是 JDK8 中新增的多线程任务执行类,通过它我们可以方便地进行串行、并行、组合和转换异步任务,它能够以一种非常灵活的方式处理异步操作的结果,包括成功的结果、异常和取消等情况。它是对Futurer接口增强,在异步计算中,Future确实是个非常优秀的接口。但也存在着诸多限制:
并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
没有异常处理:Future接口中没有关于异常处理的方法;
CompletableFuture的核心特性:
基于事件驱动 :CompletableFuture的核心思想是基于事件驱动和回调。当一个任务完成时,它会触发一个或多个依赖它的后续操作,而不是让调用方线程一直阻塞等待。
链式调用与组合 :CompletableFuture通过提供丰富的API(如thenApply、thenAccept、thenCompose等),允许以链式调用的方式将多个异步任务连接起来,形成一个完整的异步处理流水线。这使得处理复杂的异步流程变得非常简单,避免了“回调地狱”。
状态机管理 :CompletableFuture内部通过一个状态机来管理任务的生命周期,例如NEW、COMPLETING、COMPLETED_NORMALLY、COMPLETED_EXCEPTIONALLY等。当任务完成时,其结果或异常会存储在内部字段中,并更新状态,通知所有依赖它的后续任务。
默认线程池 :CompletableFuture的异步方法(如supplyAsync、runAsync等)默认使用ForkJoinPool.commonPool()来执行任务,也可以指定自定义的Executor来控制任务的执行线程。
无锁设计与原子操作 :为了确保高并发下的性能,CompletableFuture的内部实现大量依赖无锁原子操作(如CAS),以高效地管理任务状态和回调链,减少锁竞争。
核心方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public <U> CompletableFuture<U> thenApply (Function<? super T,? extends U> fn) ;public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn) ;public CompletableFuture<Void> thenAccept (Consumer<? super T> action) ;public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action) ;public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) ;public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) ;public CompletableFuture<Void> thenRun (Runnable action) ;public CompletableFuture<Void> thenRunAsync (Runnable action) ;public <U,V> CompletableFuture<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) ;public <U,V> CompletableFuture<V> thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) ;public <U> CompletableFuture<U> handle (BiFunction<? super T, Throwable, ? extends U> fn) ;public <U> CompletableFuture<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn) ;public CompletionStage<T> whenComplete (BiConsumer<? super T, ? super Throwable> action) ;public CompletionStage<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action) ;public CompletionStage<T> exceptionally (Function<Throwable, ? extends T> fn) ;
使用
1 2 3 4 5 6 7 8 public static void main (String[] args) { CompletableFuture<String> cf = CompletableFuture .supplyAsync(() -> "Hello CompletableFuture" ) .thenApply(String::toUpperCase) .thenApply(String::trim); cf.thenAccept(System.out::println); }
1 2 3 4 5 6 7 CompletableFuture.supplyAsync(() -> { double s = 3 / 0 ; return s; }).exceptionally(ex -> { System.out.println(ex.getMessage()); return 0d ; }).thenAccept(System.out::println);
1 2 3 4 5 6 7 8 9 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "cf1" ); CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "cf2" ); CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "cf3" ); CompletableFuture.allOf(cf1, cf2, cf3).thenRun(() -> { String res1 = cf1.join(); String res2 = cf2.join(); String res3 = cf3.join(); System.out.println("result=" + res1 + "," + res2 + "," + res3); });
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Service public class AggService { @Autowired private AsyncRestTemplate restTemplate; public CompletableFuture<Response> getAggResponse () { CompletableFuture<User[]> ucf = CompletableFuture.supplyAsync(() -> { return restTemplate.getForObject("http://domain/users" , User[].class); }); CompletableFuture<Product[]> pcf = CompletableFuture.supplyAsync(() -> { return restTemplate.getForObject("http://domain/products" , Product[].class); }); CompletableFuture<Order[]> ocf = CompletableFuture.supplyAsync(() -> { return restTemplate.getForObject("http://domain/orders" , Order[].class); }); return CompletableFuture .allOf(usersFuture, productsFuture, ordersFuture) .thenApply(v -> new AggResponse (ucf.join(), pcf.join(), ocf.join())); } }
核心原理 CompletableFuture实现了两个接口:Future、CompletionStage 。
Future表示异步计算的结果。
CompletionStage 是 Java 8 引入的一个核心接口,它定义了支持异步操作编排和组合所需的所有基本方法。它是 CompletableFuture 实现强大异步任务编排能力的关键,提供了一套标准、声明式的 API,使得复杂的异步逻辑能够以流畅、非阻塞、易于管理的方式编写。
1 2 3 public class CompletableFuture <T> implements Future <T>, CompletionStage<T> { }
CompletableFuture中包含两个字段:result和stack :
result用于存储当前CF的结果;
stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个,以栈(Treiber stack)的形式存储,stack表示栈顶元素,依赖的动作(Dependency Action)都封装在一个单独Completion子类中。
1 2 volatile Object result; volatile Completion stack;
Completion 在 CompletableFuture 中,Completion 并不是一个公共 API 接口或类,而是 CompletableFuture 类的内部抽象基类以及一系列相关的内部具体子类。它是实现链式调用(Chaining) 和 非阻塞回调(Non-blocking Callbacks) 机制的核心数据结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 abstract static class Completion extends ForkJoinTask <Void> implements Runnable , AsynchronousCompletionTask { volatile Completion next; abstract CompletableFuture<?> tryFire(int mode); abstract boolean isLive () ; public final void run () { tryFire(ASYNC); } public final boolean exec () { tryFire(ASYNC); return false ; } public final Void getRawResult () { return null ; } public final void setRawResult (Void v) {} }
Completion主要作用是充当链表节点和回调任务的封装器,用于管理任务之间的依赖关系:
表示依赖关系: 每一个 thenApply, thenAccept, thenRun 等方法调用都会创建一个新的 CompletableFuture,并将一个实现了 Completion 抽象类的实例(例如 UniCompletion、BiCompletion 等)作为回调节点附加到前一个 CompletableFuture 上。
存储回调逻辑: Completion 对象内部封装了需要在前一个任务(前驱)完成后执行的具体逻辑(例如 Function, Consumer, Runnable 等)。
实现任务编排: 当一个 CompletableFuture 完成时,它会遍历其内部的 stack 链表(由 Completion 节点组成),并触发执行所有依赖它的后续任务。
传播结果与异常: Completion 节点负责将前驱任务的结果传递给后续任务,或者将异常信息向下游传播,直到被处理。
在上图中,CompletableFuture 内部实现,CompletableFuture.stack 和 Completion.next 共同构成了一个无锁(lock-free)的单向链表结构,用于管理依赖关系和回调任务。CompletableFuture.stack是一个 volatile 字段,类型是抽象内部类 Completion。它充当链表的头部(Head of the list),指向最近被添加到该 CompletableFuture 实例的那个 Completion 节点。而 Completion.next,充当链表节点的指针(Pointer to the next node),它指向下一个(即在此节点之前添加的)Completion 节点。
以thenApply()为例分析 在上图中,这个链表采用 头插法(Prepend/Push-style) 进行维护,并且操作是线程安全的,利用了 volatile 和底层的 CAS (Compare-And-Swap) 原子操作。
创建新的CompletableFuture,CompletableFuture<V> d = newIncompleteFuture()
添加依赖 :
一个新的 Completion 节点(比如 UniApply)被创建。
新节点的 next 字段被设置为当前 CompletableFuture 的旧 stack 头指针。
使用 CAS 操作,尝试将 CompletableFuture 的 stack 指针更新为这个新节点。
如果 CAS 失败(说明有其他线程同时添加了节点),则重试,直到成功。
stack 指针总是指向链表最前面的那个节点,而链表的顺序是 后进先出(LIFO) 的。
任务完成时触发 (调用 complete() 或任务执行结束):
CompletableFuture 会调用内部方法 postComplete()。
postComplete() 会从 stack 头开始,遍历并弹出链表中的每一个 Completion 节点。
对于每个节点,它都会调用 tryFire() 方法来执行实际的回调逻辑。
在触发执行时,通过 Completion.next 指针可以依次访问到所有后续的依赖任务。
1 CompletableFuture.completedFuture("Hello CompletableFuture" ).thenApply(String::toUpperCase);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public <U> CompletableFuture<U> thenApply ( Function<? super T,? extends U> fn) { return uniApplyStage(null , fn); }private <V> CompletableFuture<V> uniApplyStage ( Executor e, Function<? super T,? extends V> f) { if (f == null ) throw new NullPointerException (); Object r; if ((r = result) != null ) return uniApplyNow(r, e, f); CompletableFuture<V> d = newIncompleteFuture(); unipush(new UniApply <T,V>(e, d, this , f)); return d; }final void unipush (Completion c) { if (c != null ) { while (!tryPushStack(c)) { if (result != null ) { NEXT.set(c, null ); break ; } } if (result != null ) c.tryFire(SYNC); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 abstract static class UniCompletion <T,V> extends Completion { Executor executor; CompletableFuture<V> dep; CompletableFuture<T> src; UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src) { this .executor = executor; this .dep = dep; this .src = src; } final boolean claim () { Executor e = executor; if (compareAndSetForkJoinTaskTag((short )0 , (short )1 )) { if (e == null ) return true ; executor = null ; e.execute(this ); } return false ; } }static final class UniApply <T,V> extends UniCompletion <T,V> { Function<? super T,? extends V > fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V > fn) { super (executor, dep, src); this .fn = fn; } final CompletableFuture<V> tryFire (int mode) { CompletableFuture<V> d; CompletableFuture<T> a; Object r; Throwable x; Function<? super T,? extends V > f; if ((a = src) == null || (r = a.result) == null || (d = dep) == null || (f = fn) == null ) return null ; tryComplete: if (d.result == null ) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null ) { d.completeThrowable(x, r); break tryComplete; } r = null ; } try { if (mode <= 0 && !claim()) return null ; else { @SuppressWarnings("unchecked") T t = (T) r; d.completeValue(f.apply(t)); } } catch (Throwable ex) { d.completeThrowable(ex); } } src = null ; dep = null ; fn = null ; return d.postFire(a, mode); } final CompletableFuture<T> postFire (CompletableFuture<?> a, int mode) { if (a != null && a.stack != null ) { Object r; if ((r = a.result) == null ) a.cleanStack(); if (mode >= 0 && (r != null || a.result != null )) a.postComplete(); } if (result != null && stack != null ) { if (mode < 0 ) return this ; else postComplete(); } return null ; } }
CompletableFuture 通过维护任务完成状态和回调列表,实现了基于事件驱动和回调的非阻塞异步编程模型。