Java 响应式编程基础

概述

响应式编程(Reactive Programming)是 2013 年底由 Netflix、Lightbend 和 Pivotal(Spring 背后的公司)的工程师发起的一项计划,是一种处理异步数据流并传播变化的编程范式,旨在构建更具响应性、弹性、可伸缩性的非阻塞式应用。它主要围绕数据流和变化的自动传播来构建,核心在于对事件作出反应。 2015 年 Reactive Stream(响应式流)规范诞生,为 JVM 上的响应式编程定义了一组接口和交互规则。根据反应式编程规范,反应式系统具有以下特征:

  • 响应迅速:一个响应式系统应该提供快速且一致的响应时间,从而提供一致的服务质量。
  • 高可用:反应式系统应通过复制和隔离,在发生随机故障时保持响应能力。
  • 可扩展:这样的系统应该能够通过经济高效的可扩展性,在不可预测的工作负载下保持响应能力。
  • 消息驱动:应该依赖于系统组件之间的异步消息传递。

响应式模型有两种基本的实现机制:

  • 一种就是传统开发模式下的“拉”模式,即消费者主动从生产者拉取元素;
  • 一种是“推”模式,在这种模式下,生产者将元素推送给消费者。相较于“拉”模式,“推”模式下的数据处理的资源利用率更好。

推模式中数据流的生产者会持续地生成数据并推送给消费者。这里就引出了 流量控制问题 ,即如果数据的生产者和消费者处理数据的速度是不一致的。当生产者和消费者处理数据的速度不一致有如下两种情况:

  • 生产者生产数据的速率小于消费者的场景: 这种场景对于消费者来说没啥压力,正常消费就好了,这里也就不需要所谓的流量控制了。
  • 生产者生产数据的速率大于消费者的场景: 生产者生产数据的速率大于消费者的场景,应该是我们业务中经常遇到的场景了,这种场景由于消费者处理不过来导致崩溃,业界通常的做法是在生产者与消费者之间加一个队列做缓冲。我们知道队列具有存储与转发的功能,所以可以用它来进行一定的流量控制。

控制流量主要基于 Java 的队列实现,主要有以下三种实现方式:

  • 无界队列:无界队列在原则上是拥有无线大小容量的队列,可以存放生产者产生的所有消息。

    • 优势:确保消费者消费到所有的数据
    • 劣势:系统的回弹性降低,任何一个系统不可能拥有无限的资源,一旦内存等资源耗尽,系统就可能会有崩溃的风险。
  • 有界丢弃队列:为了避免上面无界队列的弊端,有界丢弃队列采用的是如果队列满了,就会采用丢弃后面传入的值,这里可以设置一些丢弃策略,比如说按照优先级或先进先出等。

    • 优势:考虑到资源的限制,适合允许丢消息的业务场景。
    • 劣势:消息重要性很高的场景不建议采取这种队列
  • 有界阻塞队列:在数据高度一致性的场景是不允许丢数据的,这时使用有界阻塞队列,当队列消息数量达到上限后阻塞生产者,而不是直接丢弃消息。

    • 优势:解决了不允许丢数据的业务场景
    • 劣势:当队列满了的时候,会阻塞生产者停止生产数据,这种场景不可能实现异步操作的。

在推模式下的数据流量会有很多不可控制的因素,并不能直接应用,而是需要在“推”模式和“拉”模式之间考虑一定的平衡性,从而优雅地实现流量控制。这就需要引出响应式系统中非常重要的一个概念——背压机制(Backpressure)。 背压机制是指下游能够向上游反馈流量请求的机制。采用背压机制,消费者会根据自身的处理能力来请求数据,而生产者也会根据消费者的能力来生产数据,从而在两者之间达成一种动态的平衡,确保系统的即时响应性。

Reactive Stream实现

前面提到,2015 年 Reactive Stream(响应式流)规范正式发布,Java 9 正式引入了响应式编程的 API,将 Reactive Stream 规范定义的四个接口集成到了 java.util.concurrent.Flow 类中,Java 9 Flow 提供了一个标准化的基础接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final class Flow {
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}

public static interface Subscription {
public void request(long n);
public void cancel();
}

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
}

在实际应用中,Java Reactive Stream 有多种具体实现,最常用的是:

  • Java 9 Flow API:Java 9 Flow 提供了一个标准化的基础接口,定义了响应式流应该如何工作,但不提供具体的实现或操作符。它旨在确保不同的响应式库(如 RxJava 和 Reactor)能够相互协作(互操作性)。
  • Project Reactor:是由 Spring 团队维护的,与 Spring 生态系统(特别是 Spring WebFlux)无缝集成,专注于高性能的服务器端应用。
  • RxJava: 是一个更通用的库,提供了更丰富的操作符和类型(如 Single, Completable),在 Android 和各种独立 Java 应用中非常流行。
  • Akka Streams:提供了直观、类型安全且支持背压的流处理 API,其核心功能是使用有限的缓冲区资源高效执行流式计算。

Project Reactor

Project Reactor 的核心功能专注于构建高效、非阻塞的服务器端应用程序,特别强调与函数式编程和响应式流规范的集成。其核心在于使用 Flux 和 Mono 这两个核心类型来处理数据流。核心功能如下:

  • 响应式流规范实现:Reactor 完全实现了 Reactive Streams 规范,确保了良好的互操作性和标准的背压(Backpressure)支持。
  • 函数式与声明式 API:Reactor 的 API 设计高度函数式,利用 Java 8 的 Lambda 表达式,使得数据转换和组合操作非常简洁易读。
  • 与 Spring 生态集成:这是 Reactor 的关键优势。它是 Spring WebFlux 的基础,使得构建完全响应式的 Spring 5+ 应用成为可能。
  • 简化的类型系统:相比 RxJava 复杂的类型(如 Observable, Flowable, Single, 等),Reactor 仅使用 Flux(0到N个元素)和 Mono(0到1个元素)来覆盖所有场景,降低了学习和使用的复杂性。
  • 高效的背压管理:通过 Reactive Streams 接口,Reactor 能够自动管理生产者和消费者之间的需求信号,有效控制资源使用。

Reactor 围绕两个核心发布者(Publisher)实现及其管理订阅和调度的机制展开:

  • Flux<T>:0到N个元素的异步序列:可以发出零个、一个或多个元素,最后可以发出一个完成信号或一个错误信号。用于处理数据流、事件序列、列表数据等,例如网络请求返回的多个结果。
  • Mono<T>: 0到1个元素的异步序列,最多发出一个元素,最后可以发出一个完成信号或一个错误信号。用于处理单个结果的异步操作,例如根据 ID 查找单个用户、执行一个 void 方法(返回 Mono)等。
  • Scheduler:线程调度器,类似于 RxJava 的 Scheduler,用于管理执行上下文和线程切换。
    • Schedulers.elastic(): 动态线程池,适合 IO 任务。
    • Schedulers.parallel(): 固定大小线程池,适合计算任务。
    • publishOn() / subscribeOn(): 用于控制操作符链的执行线程。
  • Subscriber:订阅者/消费者,遵循 Reactive Streams 规范的标准接口,用于消费 Flux 或 Mono 发出的数据。onSubscribe(), onNext(), onError(), onComplete(), request() (背压控制)。
  • Publisher: 发布者/数据源,Flux 和 Mono 实现的标准接口,定义了 subscribe(Subscriber s) 方法。这是响应式流规范的核心接口,代表可以发出元素和信号的数据源。

RxJava

RxJava 的核心功能在于使用可观察序列(observable sequences)来组合异步和基于事件的程序,它将扩展的观察者模式(Observer pattern)应用于数据流和事件流。其核心功能主要围绕四个基本组件及其提供的丰富操作符。

  • 异步编程与事件处理:RxJava 提供了一种优雅的方式来处理异步操作(如网络请求、数据库查询、用户界面事件),避免了传统回调嵌套导致的“回调地狱”(Callback Hell)。
  • 链式调用与操作符:RxJava 提供了数百种强大的操作符(Operators),允许开发者以声明式的方式组合、转换、过滤和合并数据流。这些操作符支持流畅的链式调用,使复杂的业务逻辑清晰易懂。
  • 线程调度(Schedulers):RxJava 抽象了线程管理,通过 Schedulers 组件,开发者可以轻松地指定代码运行的线程(例如,在 IO 线程执行耗时操作,在主线程更新 UI),从而简化并发编程。
  • 背压支持(Backpressure):针对可能产生大量数据的场景(数据生成速度快于消费速度),RxJava 引入了背压机制,允许消费者控制数据源的发送速度,防止系统过载。

RxJava 的核心概念主要由以下四个基本接口(或类)构成:

  • Observable / Flowable:发出数据或事件的序列。Flowable 专用于支持背压的场景,subscribe(Observer)将观察者与被观察者连接起来。

  • Observer / Subscriber:观察者/消费者,接收并处理 Observable 或 Flowable 发出的数据或事件。

    • onNext(T t): 接收下一个数据项。
    • onError(Throwable e): 接收错误通知。
    • onComplete(): 接收完成通知(无更多数据)。
    • onSubscribe(Disposable d): 建立订阅关系时调用。
  • Disposable:可处置对象,表示一个订阅关系,用于在不需要继续接收数据时取消订阅,防止内存泄漏。

    • dispose(): 取消订阅。
    • isDisposed(): 检查是否已取消。
  • Scheduler:线程调度器,管理操作符和订阅执行的线程上下文。

    • Schedulers.io(): 用于 IO 密集型任务。
    • AndroidSchedulers.mainThread(): 在 Android 主线程执行 UI 操作。
  • Single, Maybe, Completable:针对特定场景的简化版数据源。

    • Single 只发射一个数据或一个错误;
    • Completable 只发射完成信号或错误;
    • Maybe 可发射零个或一个数据,以及完成信号或错误。

Akka Streams

Akka Streams建立在强大的 Akka Actor 模型之上,但提供了一个更高级别的抽象,使得开发者无需手动管理底层的背压信号。其核心功能:

  • 非阻塞背压(Non-Blocking Backpressure):Akka Streams 完全实现了 Reactive Streams 规范,这是其最核心的功能。数据发送方会根据接收方的处理能力自动调整发送速率,防止资源耗尽或内存溢出。
  • 有界资源使用(Bounded Resource Usage):流处理的实体只缓冲有限数量的元素。即使在重负载或高流量输入下,也能确保有界的内存使用,这是它与传统 Actor 模型的一个关键区别。
  • 声明式 API 与图形 DSL:Akka Streams 提供了流畅的、声明式的领域特定语言(DSL),允许开发者以非常直观的方式描述数据流的拓扑结构(即流经的路径),类似于构建一个计算图(Graph)。
  • 物化(Materialization):流的定义(蓝图)本身只是一个描述,必须通过“物化”步骤才能真正运行。物化过程将逻辑流描述转换为正在运行的 Actor 链,并在此过程中处理线程调度和优化(如熔合 Fusing)。
  • 与 Akka 生态集成:可以方便地与 Akka Actors、Akka HTTP、Akka Kafka 连接器等其他 Akka 模块集成,用于构建完整的分布式、容错系统。

虽然 Akka Streams 在内部使用 Reactive Streams 接口,但提供给最终用户的 API 是高度抽象的三个核心组件,用于构建任何线性的流处理管道:

  • Source<Out, Mat>:数据源(Publisher),具有一个输出端口,负责产生数据流的起点。数据可以来自集合、文件、Actor 或其他系统。 对应JDK中Publisher。
  • Flow<In, Out, Mat>:数据转换(Processor),具有一个输入端口和一个输出端口,用于在数据流经过程中进行转换、过滤、映射、聚合等操作。 对应JDK中Processor。
  • Sink<In, Mat>:数据终点(Subscriber),具有一个输入端口,负责消费数据流的终点,例如写入文件、发送给 Actor 或返回单个聚合结果。对应JDK中Subscriber。

此外,还有两个核心接口:

  • RunnableGraph:当 Source、Flow 和 Sink 被连接成一个完整的、封闭的拓扑结构时,就形成了一个可运行的图(Runnable Graph)。调用其 .run(materializer) 方法即可启动流处理。
  • Materializer:这是一个至关重要的组件,负责将抽象的 RunnableGraph 转换为实际运行的流处理器实例(由 Akka Actors 支持)。

RxJava、Project Reactor、Java 9 Flow区别

RxJava、Project Reactor 和 Java 9 Flow API 的主要区别在于它们的定位、功能丰富度以及与特定生态系统的集成。Java 9 Flow 提供了一个标准化的基础接口,而 RxJava 和 Project Reactor 则是功能更全面的实现库,提供了丰富的操作符和实用工具。

特性 Java 9 Flow Project Reactor RxJava
定位 JDK 内置的 响应式流(Reactive Streams)规范 接口。 一个功能齐全的响应式编程库 ReactiveX 规范的 Java 实现库
功能 仅包含基础接口 (Publisher, Subscriber, Subscription, Processor),没有具体实现或操作符 拥有丰富的操作符和调试工具,提供 Flux (0..N 个元素) 和 Mono (0..1 个元素) 核心类型。 拥有数百种操作符,提供 Flowable (支持背压), Observable, Single, Completable, Maybe 等多种类型。
背压 (Backpressure) 作为规范的一部分,要求实现支持背压机制。 完全支持背压,是其核心特性之一。 Flowable 类专门用于处理支持背压的数据流。
主要用途/生态 作为其他响应式库互操作性的基础标准。 与 Spring WebFlux 紧密集成,常用于构建响应式微服务和非阻塞 API。 广泛应用于 Android 开发和通用的事件驱动架构。
学习曲线 简单,因为它只是几个接口。 学习曲线中等,操作符丰富,但有良好的文档和调试支持。 学习曲线较陡峭,概念和操作符众多。

jdk Flow

Java Flow API 的设计初衷是提供一个标准化的接口规范,而不是一个完整的终端用户库(如 RxJava 或 Project Reactor)。因此,JDK 默认不包含丰富的实现类或操作符。它的主要作用是作为不同响应式库之间的互操作性标准,使它们能够无缝协作。 Java Flow API 包含在 java.util.concurrent.Flow 类中,由四个静态嵌套接口组成:

  • Flow.Publisher<T>:发布者(数据源),负责生产数据流并将其发送给已订阅的 Subscriber。它只包含一个方法 subscribe(Subscriber<? super T> subscriber)。
  • Flow.Subscriber<T>:订阅者(消费者),负责消费 Publisher 发出的数据。它定义了四个生命周期方法:onSubscribe, onNext, onError, onComplete。
  • Flow.Subscription:上下文,代表 Publisher 和 Subscriber 之间的连接,用于管理背压(backpressure)。它有两个关键方法:request(long n)(订阅者请求数据)和 cancel()(取消订阅)。
  • Flow.Processor<T, R>:处理器,兼具 Subscriber 和 Publisher 的双重角色,用于在流中进行数据转换(例如,从类型 T 转换为类型 R)。

在标准的 JDK 中,Java 9 Flow API 仅提供了一个具体的、开箱即用的 Publisher 实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable {
// 任务队列
BufferedSubscription<T> clients;

/** The largest possible power of two array size. */
static final int BUFFER_CAPACITY_LIMIT = 1 << 30;

/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor ASYNC_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1) ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
private static final class ThreadPerTaskExecutor implements Executor {
ThreadPerTaskExecutor() {} // prevent access constructor creation
public void execute(Runnable r) { new Thread(r).start(); }
}

public SubmissionPublisher(Executor executor, int maxBufferCapacity,
BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
if (executor == null)
throw new NullPointerException();
if (maxBufferCapacity <= 0)
throw new IllegalArgumentException("capacity must be positive");
this.lock = new ReentrantLock();
this.executor = executor;
this.onNextHandler = handler;
this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
}

static final class ConsumerSubscriber<T> implements Subscriber<T> {
final CompletableFuture<Void> status;
final Consumer<? super T> consumer;
Subscription subscription;

ConsumerSubscriber(CompletableFuture<Void> status, Consumer<? super T> consumer) {
this.status = status; this.consumer = consumer;
}

public final void onSubscribe(Subscription subscription) {
this.subscription = subscription;
status.whenComplete((v, e) -> subscription.cancel());
if (!status.isDone())
subscription.request(Long.MAX_VALUE);
}

public final void onError(Throwable ex) {
status.completeExceptionally(ex);
}

public final void onComplete() {
status.complete(null);
}

public final void onNext(T item) {
try {
consumer.accept(item);
} catch (Throwable ex) {
subscription.cancel();
status.completeExceptionally(ex);
}
}
}

@jdk.internal.vm.annotation.Contended
static final class BufferedSubscription<T> implements Subscription, ForkJoinPool.ManagedBlocker {

}
}

JDK Flow使用示例

需要自定义实现:Subscriber与Processor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class JdkFlowExample {

public static void main(String[] args) throws InterruptedException {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {
// 2. 定义订阅者
DefaultSubscriber subscriber = new DefaultSubscriber();

// 3. 发布者和订阅者 建立订阅关系
publisher.subscribe(subscriber);
// 4. 生产数据, 并发布
for (int i = 0; i < 10; i++) {
// submit是个block方法
publisher.submit(i);
}
}

Thread.sleep(10000L);
}

static class DefaultSubscriber implements Subscriber<Integer> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);

try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
}

static class DefaultProcessor extends SubmissionPublisher<String> implements
Processor<Integer, String> {
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);
// 过滤掉小于0的, 然后发布出去
if (item > 0) {
this.submit("转换后的数据:" + item);
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 告诉发布者, 后面不接受数据了
this.subscription.cancel();
}

@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();
}
}
}

Java 响应式编程基础
http://example.com/2025/08/17/Java-响应式-编程基础/
作者
ares
发布于
2025年8月17日
许可协议