Java 并发编程实战:用 CompletableFuture 重构中台聚合接口的异步调用链
中台系统里最常见的一类接口,不是“自己算”,而是“到处取,再拼起来”。比如一个商品详情页聚合接口,往往要同时查商品基础信息、库存、价格、营销标签、店铺信息、用户个性化推荐。
如果这些调用串行执行,延迟会非常难看;如果到处手写线程池和 Future.get(),代码又会越来越难维护。
这篇文章我就从一个典型的中台聚合接口出发,带你用 CompletableFuture 把异步调用链重构一遍。重点不是 API 罗列,而是:
- 什么时候并行,什么时候串联
- 如何收敛超时、异常和线程池
- 怎么写出“可运行、可排查、可上线”的代码
前置知识与环境准备
你需要了解
在开始前,建议你已经熟悉:
- Java 8 及以上
- 线程池基础:
ThreadPoolExecutor Future/Callable的基本用法- REST/RPC 调用的超时概念
示例环境
本文代码基于:
- JDK 11+
- 单文件可运行示例
- 不依赖 Spring,也能直接跑通核心逻辑
如果你在 Spring Boot 项目里使用,迁移也很直接。
背景与问题
我们先看一个典型场景:一个中台聚合接口 /product/detail,对外返回完整商品详情。
它内部需要调用多个下游服务:
- 商品服务:查基础信息
- 库存服务:查库存
- 价格服务:查价格
- 营销服务:查活动标签
- 推荐服务:查相关推荐
其中有些调用互相独立,有些调用依赖上一步结果。比如:
- 商品基础信息、库存、价格、营销可以并行
- 推荐服务可能依赖商品类目或品牌,得等商品基础信息返回后再查
如果我们写成串行代码,通常会像这样:
ProductInfo product = productService.getProduct(productId);
Stock stock = stockService.getStock(productId);
Price price = priceService.getPrice(productId);
Promotion promotion = promotionService.getPromotion(productId);
Recommendation recommendation = recommendationService.recommend(product.getCategoryId());
问题很明显:
- 总耗时接近所有下游耗时之和
- 任一调用卡顿,整个接口都被拖慢
- 容错逻辑分散在各行代码里,不好统一治理
- 后面想加监控、超时、降级,会越来越乱
典型耗时对比
假设下游平均耗时如下:
| 服务 | 耗时 |
|---|---|
| 商品服务 | 80ms |
| 库存服务 | 50ms |
| 价格服务 | 60ms |
| 营销服务 | 70ms |
| 推荐服务 | 90ms |
串行总耗时大约:350ms
如果合理并行,理论上可以接近:80ms + 90ms = 170ms 左右
这也是聚合接口最值得优化的一类地方:不是单点算法慢,而是调用编排方式不合理。
核心原理
CompletableFuture 可以把异步任务之间的关系表达得更清楚。你可以把它理解成:
supplyAsync:启动一个异步任务并返回结果thenApply:把上一步结果做同步转换thenCompose:基于上一步结果,再发起一个新的异步任务thenCombine:合并两个异步任务结果allOf:等待一组任务都完成exceptionally/handle:处理异常和降级
1. 并行拆分:独立任务一起发
独立调用可以并行发起,不要互相阻塞。
flowchart LR
A[请求进入聚合接口] --> B[商品服务]
A --> C[库存服务]
A --> D[价格服务]
A --> E[营销服务]
B --> F[推荐服务]
B --> G[结果聚合]
C --> G
D --> G
E --> G
F --> G
2. 串联依赖:有依赖就用 thenCompose
如果 B 依赖 A 的结果,就不要在外部 join() 再继续,那会把异步链打断。
正确姿势是把依赖关系写进链路里。
CompletableFuture<ProductInfo> productFuture = CompletableFuture.supplyAsync(
() -> productService.getProduct(productId), executor);
CompletableFuture<Recommendation> recommendationFuture = productFuture.thenCompose(
product -> CompletableFuture.supplyAsync(
() -> recommendationService.recommend(product.getCategoryId()), executor)
);
3. 聚合收口:allOf 负责等待,join 负责取结果
CompletableFuture.allOf(...) 只负责“等大家都完成”,不负责返回聚合对象。
最终结果仍然需要从各个 future 中 join() 取出。
4. 异常与超时:不能只追求快,还要可控
中台接口最怕两种情况:
- 某个下游偶发超时,把主链路拖死
- 异常直接抛出,导致整个聚合接口失败
所以异步编排不仅是“提速”,更是“把失败约束在边界内”。
一张图看懂重构前后
sequenceDiagram
participant Client as 调用方
participant Agg as 聚合服务
participant P as 商品服务
participant S as 库存服务
participant R as 价格服务
participant M as 营销服务
participant Rec as 推荐服务
Client->>Agg: 请求商品详情
rect rgb(245,245,245)
Note over Agg,P: 重构后:并发发起独立调用
Agg->>P: 查商品
Agg->>S: 查库存
Agg->>R: 查价格
Agg->>M: 查营销
P-->>Agg: 商品信息
Agg->>Rec: 基于类目查推荐
S-->>Agg: 库存
R-->>Agg: 价格
M-->>Agg: 营销
Rec-->>Agg: 推荐
end
Agg-->>Client: 聚合结果
实战代码(可运行)
下面我们写一个完整可运行的示例。
这个例子会模拟多个下游服务的调用延迟,并演示:
- 并发拉取独立数据
- 依赖式异步调用
- 超时与降级
- 统一聚合返回
你可以直接复制运行。
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureAggregationDemo {
public static void main(String[] args) {
ProductAggregationService aggregationService = new ProductAggregationService();
long start = System.currentTimeMillis();
ProductDetailDTO detail = aggregationService.getProductDetail(1001L);
long cost = System.currentTimeMillis() - start;
System.out.println("聚合结果:");
System.out.println(detail);
System.out.println("总耗时: " + cost + " ms");
aggregationService.shutdown();
}
// ========== 聚合服务 ==========
static class ProductAggregationService {
private final ExecutorService executor = new ThreadPoolExecutor(
8,
16,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("aggregate-pool"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
private final ProductService productService = new ProductService();
private final StockService stockService = new StockService();
private final PriceService priceService = new PriceService();
private final PromotionService promotionService = new PromotionService();
private final RecommendationService recommendationService = new RecommendationService();
public ProductDetailDTO getProductDetail(Long productId) {
CompletableFuture<ProductInfo> productFuture = supplyAsyncWithTimeout(
() -> productService.getProduct(productId),
200,
TimeUnit.MILLISECONDS,
ProductInfo.defaultValue(productId),
"productService"
);
CompletableFuture<Stock> stockFuture = supplyAsyncWithTimeout(
() -> stockService.getStock(productId),
150,
TimeUnit.MILLISECONDS,
Stock.defaultValue(productId),
"stockService"
);
CompletableFuture<Price> priceFuture = supplyAsyncWithTimeout(
() -> priceService.getPrice(productId),
150,
TimeUnit.MILLISECONDS,
Price.defaultValue(productId),
"priceService"
);
CompletableFuture<Promotion> promotionFuture = supplyAsyncWithTimeout(
() -> promotionService.getPromotion(productId),
150,
TimeUnit.MILLISECONDS,
Promotion.defaultValue(productId),
"promotionService"
);
CompletableFuture<Recommendation> recommendationFuture = productFuture.thenCompose(product ->
supplyAsyncWithTimeout(
() -> recommendationService.recommend(product.getCategoryId()),
180,
TimeUnit.MILLISECONDS,
Recommendation.defaultValue(product.getCategoryId()),
"recommendationService"
)
);
CompletableFuture.allOf(
productFuture,
stockFuture,
priceFuture,
promotionFuture,
recommendationFuture
).join();
return new ProductDetailDTO(
productFuture.join(),
stockFuture.join(),
priceFuture.join(),
promotionFuture.join(),
recommendationFuture.join(),
LocalDateTime.now()
);
}
private <T> CompletableFuture<T> supplyAsyncWithTimeout(
Supplier<T> supplier,
long timeout,
TimeUnit unit,
T fallback,
String serviceName
) {
return CompletableFuture.supplyAsync(() -> {
log("调用开始: " + serviceName);
T result = supplier.get();
log("调用成功: " + serviceName);
return result;
}, executor)
.completeOnTimeout(fallback, timeout, unit)
.exceptionally(ex -> {
log("调用失败: " + serviceName + ", ex=" + ex.getMessage());
return fallback;
});
}
public void shutdown() {
executor.shutdown();
}
private void log(String msg) {
System.out.printf("[%s][%s] %s%n",
Thread.currentThread().getName(),
LocalDateTime.now(),
msg);
}
}
// ========== 模拟下游服务 ==========
static class ProductService {
public ProductInfo getProduct(Long productId) {
sleep(80);
return new ProductInfo(productId, "机械键盘", 10L, "KEYBOARD");
}
}
static class StockService {
public Stock getStock(Long productId) {
sleep(50);
return new Stock(productId, 128, false);
}
}
static class PriceService {
public Price getPrice(Long productId) {
sleep(60);
return new Price(productId, new BigDecimal("399.00"), "CNY");
}
}
static class PromotionService {
public Promotion getPromotion(Long productId) {
sleep(70);
return new Promotion(productId, "满300减30");
}
}
static class RecommendationService {
public Recommendation recommend(String categoryId) {
sleep(90);
List<String> items = Arrays.asList("电竞鼠标", "桌垫", "键帽套装");
return new Recommendation(categoryId, items);
}
}
// ========== DTO / Model ==========
static class ProductDetailDTO {
private final ProductInfo productInfo;
private final Stock stock;
private final Price price;
private final Promotion promotion;
private final Recommendation recommendation;
private final LocalDateTime assembledAt;
public ProductDetailDTO(ProductInfo productInfo, Stock stock, Price price,
Promotion promotion, Recommendation recommendation,
LocalDateTime assembledAt) {
this.productInfo = productInfo;
this.stock = stock;
this.price = price;
this.promotion = promotion;
this.recommendation = recommendation;
this.assembledAt = assembledAt;
}
@Override
public String toString() {
return "ProductDetailDTO{" +
"productInfo=" + productInfo +
", stock=" + stock +
", price=" + price +
", promotion=" + promotion +
", recommendation=" + recommendation +
", assembledAt=" + assembledAt +
'}';
}
}
static class ProductInfo {
private final Long productId;
private final String name;
private final Long shopId;
private final String categoryId;
public ProductInfo(Long productId, String name, Long shopId, String categoryId) {
this.productId = productId;
this.name = name;
this.shopId = shopId;
this.categoryId = categoryId;
}
public String getCategoryId() {
return categoryId;
}
public static ProductInfo defaultValue(Long productId) {
return new ProductInfo(productId, "未知商品", -1L, "DEFAULT");
}
@Override
public String toString() {
return "ProductInfo{" +
"productId=" + productId +
", name='" + name + '\'' +
", shopId=" + shopId +
", categoryId='" + categoryId + '\'' +
'}';
}
}
static class Stock {
private final Long productId;
private final int available;
private final boolean degraded;
public Stock(Long productId, int available, boolean degraded) {
this.productId = productId;
this.available = available;
this.degraded = degraded;
}
public static Stock defaultValue(Long productId) {
return new Stock(productId, 0, true);
}
@Override
public String toString() {
return "Stock{" +
"productId=" + productId +
", available=" + available +
", degraded=" + degraded +
'}';
}
}
static class Price {
private final Long productId;
private final BigDecimal amount;
private final String currency;
public Price(Long productId, BigDecimal amount, String currency) {
this.productId = productId;
this.amount = amount;
this.currency = currency;
}
public static Price defaultValue(Long productId) {
return new Price(productId, BigDecimal.ZERO, "CNY");
}
@Override
public String toString() {
return "Price{" +
"productId=" + productId +
", amount=" + amount +
", currency='" + currency + '\'' +
'}';
}
}
static class Promotion {
private final Long productId;
private final String label;
public Promotion(Long productId, String label) {
this.productId = productId;
this.label = label;
}
public static Promotion defaultValue(Long productId) {
return new Promotion(productId, "无活动");
}
@Override
public String toString() {
return "Promotion{" +
"productId=" + productId +
", label='" + label + '\'' +
'}';
}
}
static class Recommendation {
private final String categoryId;
private final List<String> items;
public Recommendation(String categoryId, List<String> items) {
this.categoryId = categoryId;
this.items = items;
}
public static Recommendation defaultValue(String categoryId) {
return new Recommendation(categoryId, List.of());
}
@Override
public String toString() {
return "Recommendation{" +
"categoryId='" + categoryId + '\'' +
", items=" + items +
'}';
}
}
// ========== 工具类 ==========
static class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private int counter = 1;
public NamedThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public synchronized Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + "-" + counter++);
t.setDaemon(false);
return t;
}
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
}
}
}
代码是怎么一步步工作的
上面这段代码里,有几个点很值得你注意。
1. 独立任务并发启动
CompletableFuture<Stock> stockFuture = supplyAsyncWithTimeout(...);
CompletableFuture<Price> priceFuture = supplyAsyncWithTimeout(...);
CompletableFuture<Promotion> promotionFuture = supplyAsyncWithTimeout(...);
这些服务彼此无依赖,所以应当第一时间并发发起。
2. 依赖任务用 thenCompose 串起来
CompletableFuture<Recommendation> recommendationFuture = productFuture.thenCompose(product ->
supplyAsyncWithTimeout(() -> recommendationService.recommend(product.getCategoryId()), ...)
);
这里如果你写成:
ProductInfo product = productFuture.join();
Recommendation recommendation = recommendationService.recommend(product.getCategoryId());
虽然功能上能跑,但会提早阻塞主线程,异步编排的意义就打折了。
3. allOf 用于“收口”
CompletableFuture.allOf(...).join();
这一步代表:等所有依赖的结果都准备好,再统一组装 DTO。
4. completeOnTimeout 做软超时降级
.completeOnTimeout(fallback, timeout, unit)
这是聚合接口里很好用的能力:
不要让一个下游慢请求拖垮整个接口,而是超时后返回默认值。
我自己在项目里经常这么做,尤其是营销、推荐、画像这种“非核心但影响体验”的字段,非常适合超时降级。
再画一张:异步任务状态流转
stateDiagram-v2
[*] --> Created
Created --> Running: supplyAsync
Running --> Success: 正常返回
Running --> TimeoutFallback: completeOnTimeout
Running --> ExceptionFallback: exceptionally
Success --> Joined
TimeoutFallback --> Joined
ExceptionFallback --> Joined
Joined --> [*]
常见坑与排查
CompletableFuture 很好用,但也特别容易写出“表面异步、实际堵死”的代码。下面这些坑,我基本都见过。
1. 默认线程池乱用,导致互相干扰
如果你直接写:
CompletableFuture.supplyAsync(() -> remoteCall());
默认会使用 ForkJoinPool.commonPool()。
问题在于:
- 这个线程池是全局共享的
- 你项目里别的模块也可能在用
- RPC/HTTP 这种阻塞 IO 任务并不适合直接扔进 commonPool
建议:聚合接口使用独立业务线程池。
排查思路
- 看线程名是否是
ForkJoinPool.commonPool-worker-* - 线上出现偶发慢请求时,检查 commonPool 是否被其他任务打满
2. 在异步链中途 join(),把并发写成串行
错误示例:
CompletableFuture<ProductInfo> productFuture = ...;
ProductInfo product = productFuture.join(); // 过早阻塞
CompletableFuture<Recommendation> recFuture = CompletableFuture.supplyAsync(
() -> recommendationService.recommend(product.getCategoryId()), executor
);
这会让主线程提前等待,后续编排不再自然流动。
正确做法:thenCompose。
3. allOf() 后以为能直接拿结果
很多人第一次用会写成:
CompletableFuture<Object> future = CompletableFuture.allOf(f1, f2);
但 allOf() 返回的是 CompletableFuture<Void>,不携带业务结果。
结果还是要从 f1.join()、f2.join() 里分别取。
4. 异常被吞掉,日志也没打
如果你只写:
future.exceptionally(ex -> fallback);
但没有日志,后面排查时会非常痛苦。因为你只看到了“怎么总是降级”,不知道为什么降级。
建议:
- 记录服务名、请求参数、耗时、异常摘要
- 如果有链路追踪,带上 traceId
- 区分超时降级与业务异常降级
5. 线程池太小或队列太大
线程池配置不是“越大越好”。
中台聚合接口的下游通常是 IO 型调用,线程数要结合机器核数、下游 RT、接口 QPS 来评估。
一个常见误区是:
- 核心线程很小,瞬时流量来时排队严重
- 队列很大,任务看似没拒绝,实际上延迟越来越高
经验建议
- IO 密集型线程池可适当大于 CPU 核数
- 队列不要无限大
- 一定要有拒绝策略和监控
6. 忘记处理中断
如果你在下游调用封装里吞掉 InterruptedException,任务取消和关闭过程会很别扭。
示例代码里我做了这件事:
Thread.currentThread().interrupt();
这是个小细节,但很重要。
常见排查清单
当聚合接口“偶发很慢”时,我一般按这个顺序查:
1. 看总耗时构成
先确认是:
- 单个下游慢
- 线程池排队慢
- 异步依赖链写错,实际串行了
2. 打印每个 future 的开始/结束时间
至少记录:
- 服务名
- 开始时间
- 结束时间
- 是否超时
- 是否异常
- 使用线程名
3. 检查线程池指标
重点看:
- activeCount
- queue size
- reject count
- 最大线程数是否经常打满
4. 检查超时配置是否层层失控
很常见的情况是:
- HTTP 客户端超时 3s
- 聚合接口超时 500ms
completeOnTimeout只是主线程返回了,但底层请求还在跑
这种场景会造成“主链路看似快了,系统整体资源却更紧张”。
安全/性能最佳实践
这一部分很关键。很多文章只讲 CompletableFuture 怎么写,却不讲怎么稳。真正上线时,稳定性比语法重要。
1. 区分核心字段与非核心字段
不是所有字段都值得等待到最后一毫秒。
比如商品详情接口里:
- 核心字段:商品信息、价格、库存
- 非核心字段:营销文案、推荐列表、扩展画像
建议策略:
- 核心字段:失败可重试、超时阈值略高
- 非核心字段:快速降级、宁可缺省也别拖主链路
2. 每个下游设置独立超时
不要只靠接口最外层超时。
应该给每个下游服务设置自己的 timeout,否则一个慢服务会持续占着线程资源。
3. 线程池隔离
不同类型的任务最好隔离:
- 聚合计算线程池
- 下游 HTTP/RPC 调用线程池
- 大对象转换或序列化线程池
这样一个模块雪崩时,不至于拖死全部异步任务。
4. 谨慎使用 join() 和 get()
join():抛出非受检异常,写法简洁get():会抛出受检异常,适合需要显式处理时使用
我的经验是:
业务代码里常用 join(),但要确保异常已经在链路上被处理或包装过。
5. 补齐监控维度
至少监控这些指标:
- 聚合接口总 RT / TP99
- 每个下游服务 RT / 错误率 / 超时率
- 降级命中率
- 线程池活跃线程数、队列长度、拒绝次数
如果没有这些指标,出了问题基本就是“猜”。
6. 防止请求风暴下的级联放大
聚合接口天然会放大下游压力。
例如一次入口请求,要打 5 个下游;入口 QPS 1000,就可能放大成 5000 次下游调用。
建议结合:
- 本地缓存
- 批量接口替代多次单查
- 热点数据预计算
- 熔断、限流、隔离
7. 注意上下文传递
如果你依赖:
- TraceId
- 用户上下文
- MDC 日志上下文
- ThreadLocal 中的租户信息
那在异步线程里可能丢失。
这点在 Spring 项目里特别常见。
解决方式通常包括:
- 显式参数传递
- 包装
Executor - 使用支持上下文透传的工具
我自己踩过这个坑:日志里主线程有 traceId,异步线程全没了,定位问题非常费劲。
方案对比:为什么不用 Future 或手工线程编排?
方案一:串行调用
优点:
- 最简单
- 易读
缺点:
- 延迟高
- 无法发挥并发能力
方案二:Future + 手工 get()
优点:
- 比串行快
- 能实现基础并发
缺点:
- 编排关系不清晰
- 依赖链很丑
- 异常处理分散
方案三:CompletableFuture
优点:
- 同时表达并行、串联、聚合、异常处理
- 更适合复杂聚合接口
- 扩展性好
缺点:
- 一旦写得太“链式魔法”,阅读门槛会上升
- 对线程池、超时、上下文要求更高
我的建议是:
中台聚合接口只要超过 3 个下游、且存在依赖关系,CompletableFuture 通常就是很合适的选择。
逐步验证清单
如果你准备把现网接口从串行重构为异步,建议按下面步骤推进,不要一次改太大。
第一步:先梳理调用依赖图
列清楚:
- 哪些服务独立
- 哪些服务依赖前置结果
- 哪些字段允许降级
- 哪些字段必须强一致
第二步:先并行独立调用
先改最容易的部分,比如库存、价格、营销并行。
第三步:再改依赖链
把“查商品后再查推荐”这类逻辑改成 thenCompose。
第四步:补超时与降级
不要等上线后再补。
异步一旦跑起来,没有超时和降级会更难收拾。
第五步:压测验证
重点观察:
- 平均 RT、P95、P99
- 错误率
- 线程池饱和情况
- 下游调用量是否被放大
第六步:灰度上线
建议按流量比例逐步放开,观察降级率和队列积压情况。
总结
用 CompletableFuture 重构中台聚合接口,本质上是在做三件事:
- 把独立调用并行化,降低总 RT
- 把依赖关系显式化,让调用链更清楚
- 把超时、异常、降级收敛起来,提升稳定性
如果你只记住几个最重要的实践,我建议是这几点:
- 独立调用用
supplyAsync - 有依赖的链路用
thenCompose - 聚合收口用
allOf - 不要滥用默认线程池
- 每个下游都要有超时和 fallback
- 日志、监控、上下文传递必须提前考虑
最后给一个比较务实的边界判断:
- 下游少、延迟低、关系简单:串行可能已经够用
- 下游多、RT 可观、存在依赖链:值得上
CompletableFuture - 再往上,涉及限流、熔断、隔离、批处理编排:就该从“代码级异步”升级到“治理级异步”了
如果你正在维护一个“看起来不复杂,但总是莫名其妙变慢”的中台聚合接口,那它往往不是业务逻辑有多复杂,而是调用链没有被好好组织。
而 CompletableFuture,正好就是把这件事做清楚的一把好工具。