CompletableFuture 基础与原理

概述

CompletableFuture 是 JDK8 中新增的多线程任务执行类,通过它我们可以方便地进行串行、并行、组合和转换异步任务,它能够以一种非常灵活的方式处理异步操作的结果,包括成功的结果、异常和取消等情况。它是对Futurer接口增强,在异步计算中,Future确实是个非常优秀的接口。但也存在着诸多限制:

  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理:Future接口中没有关于异常处理的方法;

CompletableFuture的核心特性:

  • 基于事件驱动:CompletableFuture的核心思想是基于事件驱动和回调。当一个任务完成时,它会触发一个或多个依赖它的后续操作,而不是让调用方线程一直阻塞等待。
  • 链式调用与组合:CompletableFuture通过提供丰富的API(如thenApply、thenAccept、thenCompose等),允许以链式调用的方式将多个异步任务连接起来,形成一个完整的异步处理流水线。这使得处理复杂的异步流程变得非常简单,避免了“回调地狱”。
  • 状态机管理:CompletableFuture内部通过一个状态机来管理任务的生命周期,例如NEW、COMPLETING、COMPLETED_NORMALLY、COMPLETED_EXCEPTIONALLY等。当任务完成时,其结果或异常会存储在内部字段中,并更新状态,通知所有依赖它的后续任务。
  • 默认线程池:CompletableFuture的异步方法(如supplyAsync、runAsync等)默认使用ForkJoinPool.commonPool()来执行任务,也可以指定自定义的Executor来控制任务的执行线程。
  • 无锁设计与原子操作:为了确保高并发下的性能,CompletableFuture的内部实现大量依赖无锁原子操作(如CAS),以高效地管理任务状态和回调链,减少锁竞争。

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 可自定义二元操作的函数式
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);

// 自定义consumer实现,不返回结果
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);

// 自定义consumer实现,返回结果
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

// 直接执行任务
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);

// 组合任务
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);

// 任务完成回调事件,返回结果
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);

// 任务完成事件,不返回结果
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);

// 异常处理
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

使用

  • 基础示例
1
2
3
4
5
6
7
8
public static void main(String[] args) {
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> "Hello CompletableFuture")
.thenApply(String::toUpperCase)
.thenApply(String::trim);

cf.thenAccept(System.out::println);
}
  • 异常处理
1
2
3
4
5
6
7
CompletableFuture.supplyAsync(() -> {
double s = 3 / 0;
return s;
}).exceptionally(ex -> {
System.out.println(ex.getMessage());
return 0d;
}).thenAccept(System.out::println);
  • 多任务并行
1
2
3
4
5
6
7
8
9
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "cf1");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "cf2");
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "cf3");
CompletableFuture.allOf(cf1, cf2, cf3).thenRun(() -> {
String res1 = cf1.join();
String res2 = cf2.join();
String res3 = cf3.join();
System.out.println("result=" + res1 + "," + res2 + "," + res3);
});
  • 一个异步调用的聚合示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class AggService {
@Autowired
private AsyncRestTemplate restTemplate;

public CompletableFuture<Response> getAggResponse() {
CompletableFuture<User[]> ucf = CompletableFuture.supplyAsync(() -> {
return restTemplate.getForObject("http://domain/users", User[].class);
});

CompletableFuture<Product[]> pcf = CompletableFuture.supplyAsync(() -> {
return restTemplate.getForObject("http://domain/products", Product[].class);
});

CompletableFuture<Order[]> ocf = CompletableFuture.supplyAsync(() -> {
return restTemplate.getForObject("http://domain/orders", Order[].class);
});

return CompletableFuture
.allOf(usersFuture, productsFuture, ordersFuture)
.thenApply(v -> new AggResponse(ucf.join(), pcf.join(), ocf.join()));
}
}

核心原理

CompletableFuture实现了两个接口:Future、CompletionStage

  • Future表示异步计算的结果。
  • CompletionStage 是 Java 8 引入的一个核心接口,它定义了支持异步操作编排和组合所需的所有基本方法。它是 CompletableFuture 实现强大异步任务编排能力的关键,提供了一套标准、声明式的 API,使得复杂的异步逻辑能够以流畅、非阻塞、易于管理的方式编写。
1
2
3
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

}

CompletableFuture中包含两个字段:result和stack

  • result用于存储当前CF的结果;
  • stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个,以栈(Treiber stack)的形式存储,stack表示栈顶元素,依赖的动作(Dependency Action)都封装在一个单独Completion子类中。
1
2
volatile Object result;       // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions

Completion

在 CompletableFuture 中,Completion 并不是一个公共 API 接口或类,而是 CompletableFuture 类的内部抽象基类以及一系列相关的内部具体子类。它是实现链式调用(Chaining) 和 非阻塞回调(Non-blocking Callbacks) 机制的核心数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {

// 存储下一个任务链式调用栈。
volatile Completion next; // Treiber stack link
abstract CompletableFuture<?> tryFire(int mode);

abstract boolean isLive();

public final void run() { tryFire(ASYNC); }
public final boolean exec() { tryFire(ASYNC); return false; }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}

Completion主要作用是充当链表节点和回调任务的封装器,用于管理任务之间的依赖关系:

  • 表示依赖关系: 每一个 thenApply, thenAccept, thenRun 等方法调用都会创建一个新的 CompletableFuture,并将一个实现了 Completion 抽象类的实例(例如 UniCompletion、BiCompletion 等)作为回调节点附加到前一个 CompletableFuture 上。
  • 存储回调逻辑: Completion 对象内部封装了需要在前一个任务(前驱)完成后执行的具体逻辑(例如 Function, Consumer, Runnable 等)。
  • 实现任务编排: 当一个 CompletableFuture 完成时,它会遍历其内部的 stack 链表(由 Completion 节点组成),并触发执行所有依赖它的后续任务。
  • 传播结果与异常: Completion 节点负责将前驱任务的结果传递给后续任务,或者将异常信息向下游传播,直到被处理。

在上图中,CompletableFuture 内部实现,CompletableFuture.stack 和 Completion.next 共同构成了一个无锁(lock-free)的单向链表结构,用于管理依赖关系和回调任务。CompletableFuture.stack是一个 volatile 字段,类型是抽象内部类 Completion。它充当链表的头部(Head of the list),指向最近被添加到该 CompletableFuture 实例的那个 Completion 节点。而 Completion.next,充当链表节点的指针(Pointer to the next node),它指向下一个(即在此节点之前添加的)Completion 节点。

以thenApply()为例分析

在上图中,这个链表采用 头插法(Prepend/Push-style) 进行维护,并且操作是线程安全的,利用了 volatile 和底层的 CAS (Compare-And-Swap) 原子操作。

  • 创建新的CompletableFuture,CompletableFuture<V> d = newIncompleteFuture()
  • 添加依赖
    • 一个新的 Completion 节点(比如 UniApply)被创建。
    • 新节点的 next 字段被设置为当前 CompletableFuture 的旧 stack 头指针。
      • 使用 CAS 操作,尝试将 CompletableFuture 的 stack 指针更新为这个新节点。
      • 如果 CAS 失败(说明有其他线程同时添加了节点),则重试,直到成功。
    • stack 指针总是指向链表最前面的那个节点,而链表的顺序是 后进先出(LIFO) 的。
  • 任务完成时触发(调用 complete() 或任务执行结束):
    • CompletableFuture 会调用内部方法 postComplete()。
    • postComplete() 会从 stack 头开始,遍历并弹出链表中的每一个 Completion 节点。
    • 对于每个节点,它都会调用 tryFire() 方法来执行实际的回调逻辑。
    • 在触发执行时,通过 Completion.next 指针可以依次访问到所有后续的依赖任务。
1
CompletableFuture.completedFuture("Hello CompletableFuture").thenApply(String::toUpperCase);
  • thenApply()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}

private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
Object r;
if ((r = result) != null)
return uniApplyNow(r, e, f);
// 创建新的CompletableFuture
CompletableFuture<V> d = newIncompleteFuture();
// 添加依赖
unipush(new UniApply<T,V>(e, d, this, f));
return d;
}

final void unipush(Completion c) {
if (c != null) {
while (!tryPushStack(c)) {
if (result != null) {
NEXT.set(c, null);
break;
}
}
if (result != null)
// 重点完成通知
c.tryFire(SYNC);
}
}
  • UniApply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
abstract static class UniCompletion<T,V> extends Completion {
Executor executor; // executor to use (null if none)
CompletableFuture<V> dep; // the dependent to complete
CompletableFuture<T> src; // source for action

UniCompletion(Executor executor,
CompletableFuture<V> dep,
CompletableFuture<T> src) {
this.executor = executor;
this.dep = dep;
this.src = src;
}

// 返回是否 action 可以执行
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // disable
e.execute(this);
}
return false;
}
}

static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src);
this.fn = fn;
}
// 尝试去调用当前任务。uniApply()方法为核心逻辑。
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
Object r;
Throwable x;
Function<? super T,? extends V> f;
//判断源任务是否已经完成了,a表示的就是源任务,a.result就代表的是原任务的结果。
if ((a = src) == null || (r = a.result) == null
|| (d = dep) == null || (f = fn) == null)
return null;
// 验证是否出现异常结果,如有则任务执行结束
tryComplete: if (d.result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
d.completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
// 异步执行任务
if (mode <= 0 && !claim())
// 任务未执行返回false
return null;
else {
@SuppressWarnings("unchecked")
T t = (T) r;
// 任务执行完成将结果写入result
d.completeValue(f.apply(t));
}
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
src = null; dep = null; fn = null;
// 当前线程执行了该任务,返回结果继续执行前一个任务
return d.postFire(a, mode);
}

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) {
Object r;
if ((r = a.result) == null)
a.cleanStack();
if (mode >= 0 && (r != null || a.result != null))
// 完成任务执行并进行出栈
a.postComplete();
}
if (result != null && stack != null) {
// postComplete调用过来的任务已完成
if (mode < 0)
return this;
else
// 完成任务执行并进行出栈
postComplete();
}
return null;
}
}

CompletableFuture 通过维护任务完成状态和回调列表,实现了基于事件驱动和回调的非阻塞异步编程模型。


CompletableFuture 基础与原理
http://example.com/2025/07/17/java-juc-CompletableFuture基础与原理/
作者
ares
发布于
2025年7月17日
许可协议