Java 响应式编程-WebFlux基础

概述

Spring WebFlux 是 Spring 5 中引入的响应式编程框架,其核心概念围绕着构建异步、非阻塞、事件驱动的服务。WebFlux 的基石是实现了 Reactive Streams 规范。该规范定义了四个核心接口:

  • Publisher: 发布者,负责发布一系列数据(Mono 和 Flux 是其具体实现)。
  • Subscriber: 订阅者,消费发布者发出的数据。
  • Subscription: 订阅关系,在发布者和订阅者之间传递,用于管理流(例如,控制背压)。
  • Processor: 处理器,兼具 Subscriber 和 Publisher 的能力。

WebFlux 默认并深度集成了 Project Reactor 作为其响应式编程库。它提供了两个主要的、实现了 Publisher 接口的具体类型:

  • Mono: 表示包含 0 个或 1 个元素(即异步结果)的响应式序列。常用于返回单个对象、空值或错误。
  • Flux: 表示包含 0 到 N 个元素的响应式序列。常用于返回列表、数据流或无限序列。

与传统的每个请求分配一个线程的阻塞式 Servlet 容器(如 Tomcat)不同,WebFlux 运行在支持异步非阻塞 I/O 的服务器上:

  • Netty (默认和首选)
  • Undertow(spring-boot4移除了对其支持)
  • 支持非阻塞模式的 Tomcat 或 Jetty

这些容器使用 事件循环线程(Event Loop Threads) 来处理大量并发连接,提高了资源利用率和可扩展性。WebFlux 提供了两种风格的 API 来定义 HTTP 端点:

  • 基于注解的控制器 (Annotated Controllers): 类似于 Spring MVC 的 @Controller, @GetMapping 等注解,对于熟悉 Spring MVC 的开发者来说迁移成本很低。
  • 函数式端点 (Functional Endpoints / RouterFunctions): 采用函数式编程风格,使用 RouterFunctions 配置路由,使用 HandlerFunctions 处理请求和生成响应。这种模型更轻量级,完全脱离了注解和 Spring MVC 的基础设施。

与Spring MVC对比

Spring WebFlux和Spring MVC都是Spring框架中的Web模块,但它们之间有着显著的差异,主要体现在编程模型、处理方式、性能和适用场景等方面。

特性 Spring MVC (Servlet Stack) Spring WebFlux (Reactive Stack)
I/O 模型 同步阻塞 I/O (Blocking) 异步非阻塞 I/O (Non-Blocking)
并发模型 Thread-per-Request (一请求一线程) Event Loop (事件循环)
线程数量 多 (例如 Tomcat 默认 200+) 少 (通常等于 CPU 核心数)
上下文切换 频繁 (线程多,依赖 OS 调度) 极少 (线程少,用户态调度)
适用场景 计算密集型、传统 CRUD、依赖阻塞库 高并发 IO 密集型、流式处理、网关

SpringBoot WebFlux使用

跟 Spring Boot 大框架一样,Spring Boot Webflux 提供了很多 “开箱即用” 的 Starter 组件。因此在SpringBoot使用只需要引入对应starter依赖即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>${spring-boot-start.version}</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
<version>${spring-boot-start.version}</version>
</dependency>
</dependencies>

实现请求处理器ProductService:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
public class ProductService {
// 使用一个线程安全的 Map 模拟内存数据库
private final ProductRepository productRepository;

public ProductService(ProductRepository productRepository) {
this.productRepository = productRepository
}

// 处理 GET /products 请求:返回所有产品列表
public Mono<ServerResponse> listProducts() {
Flux<Product> products = productRepository.findAll();

// 使用 ServerResponse.ok().body(...) 构建响应
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(products, Product.class); // 返回 Flux<Product>
}

// 处理 GET /products/{id} 请求:根据 ID 返回单个产品
public Mono<ServerResponse> getProductById(ServerRequest request) {
String productId = request.pathVariable("id");
Mono<Product> productMono = productRepository.findById(productId);

// 如果找到产品,返回 200 OK;否则返回 404 Not Found
return productMono.flatMap(product -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(product)) // 返回 Mono<Product>
.switchIfEmpty(ServerResponse.notFound().build());
}
}

Repository:

1
2
3
4
@Repository
public interface ProductRepository extends ReactiveCrudRepository<ProductEntity, Integer> {

}

定义Product领域模型:

1
2
3
4
5
6
7
8
@Getter
@Setter
public class ProductEntity {
private String id;
private String name;
private BigDecimal price;

}

基于函数式端点,定义路由配置 (ProductRouter):

1
2
3
4
5
6
7
8
9
10
@Configuration
public class ProductRouter {

@Bean
public RouterFunction<ServerResponse> routeProduct(ProductService handler) {
return RouterFunctions
.route(GET("/products").and(accept(MediaType.APPLICATION_JSON)), handler::listProducts)
.andRoute(GET("/products/{id}").and(accept(MediaType.APPLICATION_JSON)), handler::getProductById);
}
}

基于注解的控制器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

@RestController("/product")
public class ProductController {
private final ProductService productService;
@GetMapping("/{id}")
public Mono<ServerResponse> getProductById(ServerRequest request){
return productService.getProductById(request);
}

@GetMapping
public Mono<ServerResponse> listProducts(ServerRequest request) {
return productService.listProducts(request);
}
}

全局异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@RestControllerAdvice
@Order(Ordered.HIGHEST_PRECEDENCE + 1)
public class GlobalExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(
GlobalExceptionHandler.class
);

@ExceptionHandler(BaseException.class)
public Mono<ResponseEntity<Response<?>>> handleBusinessException(BaseException ex) {
logger.info("Business exception: {}", ex.getMessage());
return Mono.just(ResponseEntity.ok(Response.failed(ex.getCode(), ex.getMessage())));
}

@ExceptionHandler(IllegalArgumentException.class)
public Mono<ResponseEntity<Response<?>>> handleIllegalArgumentException(
IllegalArgumentException ex
) {
logger.info("Validation error: {}", ex.getMessage());
return Mono.just(ResponseEntity.ok(
Response.failed(
BaseErrorCode.PARAM_ERROR.getCode(),
BaseErrorCode.PARAM_ERROR.getMsg()
)
)
);
}

@ExceptionHandler(Exception.class)
public Mono<ResponseEntity<Response<?>>> handleGenericException(
Exception ex
) {
logger.error("Unexpected error, msg: {}, ex: {}", ex.getMessage(), ex.getCause());
return Mono.just(ResponseEntity.ok(
Response.failed(
BaseErrorCode.UNKOWN_ERROR.getCode(),
BaseErrorCode.UNKOWN_ERROR.getMsg()
)
)
);
}
}

基于注解控制器webflux请求处理流程

与 Spring MVC 的 DispatcherServlet 类似,Spring WebFlux 的核心分发器是 DispatcherHandler,但其底层完全基于 Reactive Streams(反应式流),实现了非阻塞的异步处理。

WebFlux 基于注解的基本流程主要由 DispatcherHandler 协调,分为以下几个关键阶段:Request -> DispatcherHandler -> 查找 Mapping -> 适配 Adapter -> 执行 Controller -> 处理 Result -> Response

第一阶段:请求接入与适配 (Request Entry)。底层接入由Netty(默认容器)接收 TCP 连接。底层的请求适配为 ServerWebExchange,这是 WebFlux 的核心交换对象,包含 ServerHttpRequest (非阻塞) 和 ServerHttpResponse。然后请求通过 WebFilter 链(如安全认证、CORS 处理),最后到达 DispatcherHandler。

第二阶段:寻找处理器 (Handler Mapping)。DispatcherHandler 调用 getHandler(exchange),它会遍历所有注册的 HandlerMapping,对于注解控制器,最重要的是 RequestMappingHandlerMapping。其核心是根据 @RequestMapping、@GetMapping 配置的路径,找到对应的 Controller 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware {

@Nullable
private List<HandlerMapping> handlerMappings;

@Nullable
private List<HandlerAdapter> handlerAdapters;

@Nullable
private List<HandlerResultHandler> resultHandlers;

protected void initStrategies(ApplicationContext context) {

//获取HandlerMapping及其子类型的bean
//HandlerMapping根据请求request获取handler执行链
Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class, true, false);

ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
AnnotationAwareOrderComparator.sort(mappings);
this.handlerMappings = Collections.unmodifiableList(mappings);
//获取HandlerAdapter及其子类型的bean
Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerAdapter.class, true, false);

this.handlerAdapters = new ArrayList<>(adapterBeans.values());
AnnotationAwareOrderComparator.sort(this.handlerAdapters);
//获取HandlerResultHandler及其子类型的bean
Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerResultHandler.class, true, false);

this.resultHandlers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(this.resultHandlers);
}
}

public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {

@Nullable
private RouterFunction<?> routerFunction;

//afterPropertiesSet()方法 是组件初始化后回调 必须实现InitializingBean接口
@Override
public void afterPropertiesSet() throws Exception {
if (CollectionUtils.isEmpty(this.messageReaders)) {
ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();
this.messageReaders = codecConfigurer.getReaders();
}

if (this.routerFunction == null) {
initRouterFunctions();
}
if (this.routerFunction != null) {
RouterFunctions.changeParser(this.routerFunction, getPathPatternParser());
}
}

/**
* Initialized the router functions by detecting them in the application context.
* 从应用上下文中查找他们并初始化路由方法
*/
protected void initRouterFunctions() {
List<RouterFunction<?>> routerFunctions = routerFunctions();
this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
logRouterFunctions(routerFunctions);
}
}

第三阶段:请求适配与执行 (Handler Adapter)。DispatcherHandler 获取到 Handler 后,寻找支持该 Handler 的适配器,通常是 RequestMappingHandlerAdapter。适配器解析 Controller 方法的参数(如 @RequestBody, @PathVariable),@RequestBody 的读取也是非阻塞的(基于 Reactor Flux/Mono)。然后反射调用 Controller 的方法。Controller 返回 Mono<T> Flux<T>,适配器将其封装为 HandlerResult 返回给 Dispatcher。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
//handlerMappings在initStrategies()方法中已经构造好了
if (this.handlerMappings == null) {
return createNotFoundError();
}
//构造Flux,数据源为handlerMappings集合
return Flux.fromIterable(this.handlerMappings)
//获取Mono<Handler>对象,通过concatMap保证顺序和handlerMappings顺序一致
//严格保证顺序是因为在一个系统中可能存在一个Url有多个能够处理的HandlerMapping的情况
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
//如果next()娶不到值则抛出错误
.switchIfEmpty(createNotFoundError())
//触发HandlerApter的handle方法
.flatMap(handler -> invokeHandler(exchange, handler))
//触发HandlerResultHandler 的handleResult方法
.flatMap(result -> handleResult(exchange, result));
}

private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
return getResultHandler(result).handleResult(exchange, result)
.onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
}

private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
if (this.resultHandlers != null) {
for (HandlerResultHandler resultHandler : this.resultHandlers) {
if (resultHandler.supports(handlerResult)) {
return resultHandler;
}
}
}
throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
}

第四阶段:响应结果处理 (Result Handling)。DispatcherHandler 拿到 HandlerResult,遍历 HandlerResultHandler 列表。对于数据处理使用的是ResponseBodyResultHandler,ResponseBodyResultHandler用于处理 @ResponseBody 或 @RestController,它利用 HttpMessageWriter 将对象序列化为 JSON/XML。对于视图处理使用ViewResolutionResultHandler,即处理返回视图名称的场景(SSR 渲染)。

第五阶段:订阅与触发 (Subscription)。在 Reactive 编程中,”Nothing happens until you subscribe”。上述所有步骤实际上是在构建一个 Reactor 的调用链(Pipeline)。只有当 Netty 服务器最终订阅(Subscribe)了这个处理链的 Publisher 时,数据才会真正开始流动,响应才会发送给客户端。

总结

WebFlux 的注解处理流程在表面上与 Spring MVC 非常相似,但在底层实现上完全不同。它摒弃了 Servlet API,使用 Reactor 库构建了一个从 Socket 读取到业务处理再到 Socket 写入的全异步链路。在实际应用中,如果对性能要求较高,特别是在并发量大、I/O密集的场景下,WebFlux是一个更好的选择。但如果是构建传统的Web应用,或者需要广泛的生态支持和较为稳定的技术栈,Spring MVC会更为适合。


Java 响应式编程-WebFlux基础
http://example.com/2025/08/21/java-响应式-WebFlux基础/
作者
ares
发布于
2025年8月21日
许可协议