背景与问题
在 Java 后端开发里,一个请求往往不只做一件事。
比如商品详情页,可能要同时查:
- 商品基础信息
- 价格
- 库存
- 营销活动
- 用户个性化推荐
如果这些调用按串行执行,总耗时就是各个依赖耗时相加。只要其中一个慢,整体就慢。很多团队会第一时间想到“上异步”,然后直接把 CompletableFuture.supplyAsync() 用起来,代码看着优雅,线上却慢慢出现这些问题:
- 接口 RT 忽高忽低
- 某个下游抖动时,把整个公共线程池拖死
- 超时不生效,或者超时后任务还在后台偷偷跑
- 异常被包装了好几层,日志看不出根因
- 结果聚合时一处失败导致全链路失败
这些问题我自己都踩过。尤其是“默认线程池 + 没有隔离 + 没有超时治理”这个组合,开发环境跑得挺好,线上一有波峰就容易出事。
这篇文章我们不只讲 CompletableFuture 的 API,而是从实战编排角度,把这几个关键点串起来:
- 如何做并行任务编排
- 为什么必须做线程池隔离
- 如何做超时控制与降级
- 如何把异常治理做得可观测、可恢复
- 怎样避免看起来异步、实际上更慢
前置知识与环境准备
建议你先具备这些基础:
- Java 8+ 基本语法
ExecutorService、线程池参数含义Future与CompletableFuture的基础使用- 对接口 RT、吞吐、超时、降级这些概念不陌生
本文代码以 Java 8 风格为主,尽量保证可直接运行。
说明一点:orTimeout、completeOnTimeout 是 JDK 9+ 才有的能力。考虑很多项目仍在 Java 8,我会给出 Java 8 可运行方案,也会顺手提一句高版本的替代写法。
核心原理
1. CompletableFuture 解决的核心问题
CompletableFuture 不是简单的“异步执行工具”,它更重要的价值是:
- 定义任务依赖关系
- 声明结果如何组合
- 为异常、超时、降级留出处理节点
常见编排模式:
supplyAsync:异步生产结果thenApply:结果转换thenCompose:串联下一个异步任务thenCombine:两个异步结果合并allOf:等待多个任务完成anyOf:谁先完成用谁exceptionally/handle:异常兜底
2. 为什么必须做线程池隔离
很多人图省事,直接这样写:
CompletableFuture.supplyAsync(() -> queryPrice());
如果你没传线程池,它会默认使用 ForkJoinPool.commonPool()。问题是:
- 它是全局共享资源
- 你项目里其他地方可能也在用
- 下游接口阻塞时,会把整个池子占满
- 一类任务的雪崩,可能拖垮另一类任务
所以在线上服务里,我非常建议:
不同性质的任务,使用不同线程池隔离。
例如:
- 商品信息查询池
- 营销服务调用池
- 推荐服务调用池
这样即使推荐服务抖动,也不会把价格、库存查询一起拖死。
3. 为什么光有异步还不够
异步只能解决“并发等待”的问题,不能自动解决慢调用、失败传播和资源耗尽。
一个完整的异步编排方案至少要包含:
- 并发执行
- 线程池隔离
- 超时控制
- 异常兜底
- 结果聚合策略
- 日志与监控
一个典型请求的编排模型
下面这张图展示一个商品详情接口的异步编排流程:
flowchart LR
A[请求进入] --> B[并行发起基础信息任务]
A --> C[并行发起价格任务]
A --> D[并行发起库存任务]
A --> E[并行发起营销任务]
B --> F[聚合结果]
C --> F
D --> F
E --> F
F --> G{是否存在失败/超时}
G -- 否 --> H[返回完整结果]
G -- 是 --> I[部分降级返回]
这就是实践中的核心思想:能并行的并行,能隔离的隔离,能降级的降级。
核心原理:线程池隔离 + 超时 + 异常治理
1. 线程池隔离不是“多建几个池”这么简单
线程池隔离至少要回答三个问题:
CPU 密集还是 IO 密集?
如果是远程调用、DB 查询、HTTP 请求,多半是 IO 密集型。
这类任务线程数一般可以比 CPU 核数更多,但不能无限大。
队列要不要无界?
我强烈建议:不要使用无界队列。
无界队列会隐藏流量问题,最终把风险变成:
- 请求堆积
- 内存上涨
- 超时越来越多
- GC 压力上升
拒绝策略怎么选?
常见策略:
AbortPolicy:直接拒绝,显式失败CallerRunsPolicy:调用线程执行,适合低流量缓冲,但高峰期可能把主线程拖慢- 自定义拒绝:记录日志、打监控、快速失败
在线上业务里,我更倾向于:
关键链路用有界队列 + 显式拒绝 + 兜底降级
这样问题暴露得早,不会默默积压。
2. CompletableFuture 的异常传播规律
这是个很容易绕进去的点。
比如:
join()抛的是CompletionExceptionget()抛的是ExecutionException- 真正根因通常在
getCause()里
如果你日志只打印:
log.error("task failed", ex);
有时看起来只是包了一层,再包一层。排查很费劲。
我一般建议:
- 在每个关键异步任务内部打业务维度日志
- 在聚合点统一做根因解包
- 区分:
- 可降级异常
- 超时异常
- 线程池拒绝异常
- 系统级异常
3. 超时控制的两个层次
超时至少有两层:
调用层超时
比如 HTTP client、RPC client、数据库连接池自带的超时。
这是最底层、最重要的超时,必须配。
Future 层超时
给异步任务一个总等待时间,到点就返回默认值或失败。
这层是为了避免:
- 聚合一直等
- 页面迟迟不返回
- 某个依赖长时间拖住主链路
注意:
CompletableFuture超时完成,不等于底层任务一定停止。
如果底层是阻塞 IO,除非调用客户端本身支持中断或超时,否则后台线程可能还在跑。这是很多人误解的地方。
实战代码(可运行)
下面我们写一个简化版“商品详情聚合服务”。它包含:
- 自定义线程池隔离
- 多个异步任务并行
- Java 8 方式实现超时控制
- 异常兜底
- 聚合返回部分结果
1. 完整示例代码
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Function;
public class CompletableFutureOrchestrationDemo {
private static final ExecutorService productPool = new ThreadPoolExecutor(
8, 16,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new NamedThreadFactory("product-pool"),
new ThreadPoolExecutor.AbortPolicy()
);
private static final ExecutorService promotionPool = new ThreadPoolExecutor(
4, 8,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new NamedThreadFactory("promotion-pool"),
new ThreadPoolExecutor.AbortPolicy()
);
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(
2, new NamedThreadFactory("timeout-scheduler")
);
public static void main(String[] args) {
ProductDetail detail = getProductDetail("sku-1001");
System.out.println(detail);
shutdown();
}
public static ProductDetail getProductDetail(String skuId) {
long start = System.currentTimeMillis();
CompletableFuture<String> infoFuture =
withTimeout(
CompletableFuture.supplyAsync(() -> queryProductInfo(skuId), productPool)
.exceptionally(ex -> {
log("queryProductInfo failed: " + unwrap(ex));
return "默认商品信息";
}),
800, TimeUnit.MILLISECONDS,
"商品信息超时默认值"
);
CompletableFuture<Integer> priceFuture =
withTimeout(
CompletableFuture.supplyAsync(() -> queryPrice(skuId), productPool)
.exceptionally(ex -> {
log("queryPrice failed: " + unwrap(ex));
return -1;
}),
500, TimeUnit.MILLISECONDS,
-1
);
CompletableFuture<Integer> stockFuture =
withTimeout(
CompletableFuture.supplyAsync(() -> queryStock(skuId), productPool)
.exceptionally(ex -> {
log("queryStock failed: " + unwrap(ex));
return 0;
}),
400, TimeUnit.MILLISECONDS,
0
);
CompletableFuture<String> promoFuture =
withTimeout(
CompletableFuture.supplyAsync(() -> queryPromotion(skuId), promotionPool)
.exceptionally(ex -> {
log("queryPromotion failed: " + unwrap(ex));
return "无活动";
}),
300, TimeUnit.MILLISECONDS,
"无活动"
);
CompletableFuture<Void> all = CompletableFuture.allOf(
infoFuture, priceFuture, stockFuture, promoFuture
);
try {
all.join();
} catch (CompletionException ex) {
log("allOf failed: " + unwrap(ex));
}
ProductDetail detail = new ProductDetail();
detail.setSkuId(skuId);
detail.setProductInfo(infoFuture.join());
detail.setPrice(priceFuture.join());
detail.setStock(stockFuture.join());
detail.setPromotion(promoFuture.join());
detail.setCostMs(System.currentTimeMillis() - start);
return detail;
}
public static <T> CompletableFuture<T> withTimeout(
CompletableFuture<T> future,
long timeout,
TimeUnit unit,
T fallback
) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
scheduler.schedule(() -> timeoutFuture.complete(fallback), timeout, unit);
return future.applyToEither(timeoutFuture, Function.identity());
}
private static String queryProductInfo(String skuId) {
sleepRandom(100, 700);
maybeFail(0.1, "商品服务异常");
return "商品信息(" + skuId + ")";
}
private static Integer queryPrice(String skuId) {
sleepRandom(100, 600);
maybeFail(0.15, "价格服务异常");
return 199;
}
private static Integer queryStock(String skuId) {
sleepRandom(50, 500);
maybeFail(0.1, "库存服务异常");
return 88;
}
private static String queryPromotion(String skuId) {
sleepRandom(100, 900);
maybeFail(0.2, "营销服务异常");
return "满200减20";
}
private static void sleepRandom(int min, int max) {
try {
int bound = max - min;
int sleepMs = min + new Random().nextInt(bound);
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
}
}
private static void maybeFail(double rate, String message) {
if (Math.random() < rate) {
throw new RuntimeException(message);
}
}
private static Throwable unwrap(Throwable ex) {
Throwable current = ex;
while (current instanceof CompletionException || current instanceof ExecutionException) {
if (current.getCause() == null) {
break;
}
current = current.getCause();
}
return current;
}
private static void log(String msg) {
System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
}
private static void shutdown() {
productPool.shutdown();
promotionPool.shutdown();
scheduler.shutdown();
}
static class ProductDetail {
private String skuId;
private String productInfo;
private Integer price;
private Integer stock;
private String promotion;
private long costMs;
public void setSkuId(String skuId) {
this.skuId = skuId;
}
public void setProductInfo(String productInfo) {
this.productInfo = productInfo;
}
public void setPrice(Integer price) {
this.price = price;
}
public void setStock(Integer stock) {
this.stock = stock;
}
public void setPromotion(String promotion) {
this.promotion = promotion;
}
public void setCostMs(long costMs) {
this.costMs = costMs;
}
@Override
public String toString() {
return "ProductDetail{" +
"skuId='" + skuId + '\'' +
", productInfo='" + productInfo + '\'' +
", price=" + price +
", stock=" + stock +
", promotion='" + promotion + '\'' +
", costMs=" + costMs +
'}';
}
}
static class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private int counter = 1;
NamedThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public synchronized Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + "-" + counter++);
t.setDaemon(false);
return t;
}
}
}
2. 这段代码做对了什么
使用独立线程池
CompletableFuture.supplyAsync(() -> queryPromotion(skuId), promotionPool)
营销服务用了独立池,避免和商品基础查询互相影响。
给每个任务单独超时与默认值
withTimeout(future, 300, TimeUnit.MILLISECONDS, "无活动")
这意味着营销服务慢了,不会拖住整个详情页。
每个任务自己兜底
.exceptionally(ex -> {
log("queryPrice failed: " + unwrap(ex));
return -1;
})
这样聚合阶段就不会因为单点失败而整体失败。
聚合阶段只负责收口
CompletableFuture.allOf(infoFuture, priceFuture, stockFuture, promoFuture)
由于前面都做了兜底,这里 join() 的风险会小很多。
异步编排时序图
把刚才的执行过程画成时序图,会更容易理解:
sequenceDiagram
participant Client as 调用方
participant Service as 聚合服务
participant P1 as 商品线程池
participant P2 as 营销线程池
participant Timer as 超时调度器
Client->>Service: 请求商品详情
Service->>P1: 异步查商品信息
Service->>P1: 异步查价格
Service->>P1: 异步查库存
Service->>P2: 异步查营销
Service->>Timer: 为各任务注册超时
P1-->>Service: 返回商品信息/异常
P1-->>Service: 返回价格/异常
P1-->>Service: 返回库存/异常
P2-->>Service: 返回营销/异常或超时降级
Service->>Service: allOf 聚合结果
Service-->>Client: 返回完整或部分降级结果
逐步验证清单
建议你不要一上来就把完整方案怼进生产代码。可以按这个顺序验证。
第一步:先验证并行是否生效
最简单的方法是打印每个任务开始和结束时间,看总耗时是否接近最长的那个任务,而不是所有任务耗时之和。
如果你发现总耗时还是差不多等于串行,那通常有几个原因:
- 任务其实写成了串行依赖
- 线程池太小
- 某个环节在主线程阻塞了
- 你在异步链中间过早
join()
第二步:故意让某个依赖变慢
把营销查询改成固定 sleep 1000ms,看:
- 是否在 300ms 左右就返回默认值
- 整体接口是否仍能及时返回
第三步:故意让某个依赖抛异常
确认:
- 日志里是否能看到原始异常
- 聚合结果是否按预期降级
- 不会因为一处异常导致全部失败
第四步:压线程池
把线程池核心线程调小、队列调小,再并发发请求。
重点观察:
- 是否触发拒绝策略
- 拒绝后是不是快速失败
- 有没有出现请求堆积和 RT 雪崩
常见坑与排查
这一节很关键。我挑几个最常见、也最容易“看着没问题,线上才出事”的坑。
坑 1:使用默认 commonPool
现象
- 平时没事,高峰期 RT 抖动
- 某个下游卡住后,整个服务都慢
原因
所有异步任务都挤在公共池里,互相抢资源。
排查
- 搜索代码中是否存在未传线程池的
supplyAsync/runAsync - 查看线程名是否是
ForkJoinPool.commonPool-*
解决
- 所有核心链路显式传入自定义线程池
- 按业务类型隔离线程池
坑 2:过早 join,异步写成“伪异步”
比如这样:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> queryA(), pool);
String a = f1.join();
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> queryB(a), pool);
String b = f2.join();
这不是错,但如果你本来希望并行,这种写法会让主线程很早阻塞。
更好的思路
如果真有依赖,用 thenCompose。
如果没有依赖,就先把所有 Future 发出去,最后统一收。
坑 3:allOf 不会直接给你结果
很多初学者以为:
CompletableFuture.allOf(f1, f2, f3)
执行完就能拿到所有结果。其实不会,allOf 返回的是 CompletableFuture<Void>,你还得自己从各个 future 里取值。
通常写法是:
CompletableFuture.allOf(f1, f2, f3).join();
String r1 = f1.join();
String r2 = f2.join();
String r3 = f3.join();
坑 4:超时了,但底层任务没停
这是非常常见的误解。
现象
主流程已经返回默认值,但线程池活跃线程还是很高。
原因
Future 层面只是“我不等你了”,不代表底层阻塞调用真正取消。
解决
- HTTP/RPC/DB 客户端都要配置连接超时、读超时、请求超时
- 避免长期阻塞型调用占满业务池
- 必要时把高风险下游单独隔离到更小线程池
坑 5:异常吞掉了,排查困难
比如:
.exceptionally(ex -> null)
这样短期看代码很“稳”,长期看就是灾难。你只知道结果是 null,不知道为什么。
建议
至少记录:
- 任务名
- 业务主键
- 耗时
- 根因异常
- 是否命中降级
坑 6:线程池参数随手拍
我见过最常见的情况是:
- 核心线程数 200
- 队列长度 50000
- 机器才 4 核
- 还跑一堆阻塞 IO
这样不一定更快,反而更容易:
- 线程上下文切换频繁
- 请求堆积严重
- 超时大量放大
- 内存压力上升
实际建议
先基于业务测出来,而不是拍脑袋。
如果是 IO 密集型,可以从一个适中的线程数起步,然后结合:
- 平均耗时
- P99 耗时
- 并发量
- 拒绝数
- 活跃线程数
逐步调优。
安全/性能最佳实践
这里把我更推荐的落地方式整理成清单,方便你直接对照项目检查。
1. 线程池隔离按“依赖类型”做,不按“方法”做
不要每个方法都 new 一个线程池,也不要一个应用只用一个线程池。
合理分法通常是:
- 核心主链路池
- 非核心扩展信息池
- 高风险慢依赖池
这样管理起来更清晰,监控指标也更有意义。
2. 所有线程池必须命名
线程名是排查问题时最便宜、最有效的信息之一。
没有命名的线程,线上看堆栈和日志会非常痛苦。
3. 有界队列 + 拒绝策略 + 降级兜底
这是我认为最务实的组合。
原因很简单:
- 无界队列会把问题延后爆炸
- 显式拒绝可以尽早暴露容量问题
- 降级保证用户有可接受结果
4. 超时控制要双层配置
一定要同时做:
- 客户端超时
- Future 编排层超时
只做一层通常不够。
5. 结果聚合要允许“部分成功”
不是所有业务都要求“全有或全无”。
像详情页、推荐页、画像补充信息这些场景,很多时候:
- 少一个营销标签,页面还能看
- 少一个推荐模块,也能返回主信息
把“哪些字段允许降级”提前设计好,整体可用性会明显更高。
6. 异常治理要分层
推荐分成三层:
任务内日志
记录具体任务失败细节。
编排层统计
统计哪个任务超时、哪个任务失败率高。
接口层兜底
决定最终返回默认值、部分成功还是整体失败。
7. 避免在异步任务中使用 ThreadLocal 上下文却不透传
如果你依赖:
- traceId
- 用户信息
- 租户上下文
异步线程切换后可能拿不到。
这会导致日志链路断裂、权限信息丢失。
解决方式包括:
- 显式参数透传
- 自定义包装 Executor
- 使用支持上下文传播的框架能力
8. 注意数据安全与幂等
异步编排中如果某些任务会触发:
- 写库
- 发消息
- 调第三方扣费接口
那就不能只盯着性能,还要关注:
- 幂等控制
- 重试副作用
- 超时后是否实际成功
- 异常补偿机制
也就是说,查询类编排和写操作编排的治理重点不一样,别直接套模板。
一个简单的线程池设计参考
下面给一个比较实用的思考框架:
flowchart TD
A[识别任务类型] --> B{是否阻塞IO}
B -- 是 --> C[使用业务隔离线程池]
B -- 否 --> D[考虑较小线程池或CPU池]
C --> E[设置核心线程数与最大线程数]
E --> F[配置有界队列]
F --> G[配置拒绝策略]
G --> H[埋点监控: 活跃线程/队列长度/拒绝数]
H --> I[结合压测逐步调优]
这个流程看起来朴素,但在大多数业务系统里足够有效。
进阶补充:thenCompose 与 thenCombine 怎么选
这两个 API 很容易混。
thenCompose:前一个结果决定下一个异步任务
比如先查用户,再根据用户等级查权益:
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "VIP", productPool)
.thenCompose(level ->
CompletableFuture.supplyAsync(() -> "权益包-" + level, promotionPool)
);
它更像“异步版 flatMap”。
thenCombine:两个独立任务并行后合并
CompletableFuture<String> infoFuture =
CompletableFuture.supplyAsync(() -> "商品A", productPool);
CompletableFuture<Integer> priceFuture =
CompletableFuture.supplyAsync(() -> 199, productPool);
CompletableFuture<String> result =
infoFuture.thenCombine(priceFuture,
(info, price) -> info + " 价格:" + price);
选择标准很简单:
- 有前后依赖:
thenCompose - 互相独立,最后合并:
thenCombine
常见排查思路
如果你的异步编排线上慢、偶发失败、定位困难,可以按这个顺序查。
1. 先看线程池指标
重点看:
- 活跃线程数
- 队列积压
- 拒绝次数
- 最大线程是否长期打满
如果线程池已经满了,继续看业务代码意义不大,先止血。
2. 再看下游耗时分布
不要只看平均值,要看:
- P95
- P99
- 超时占比
很多系统平均 50ms,看起来很好,但 P99 可能 2s,这种尾延迟会直接放大到聚合接口。
3. 再看异常分类
把失败区分成:
- 超时
- 拒绝执行
- 下游业务异常
- 参数问题
- 线程中断
分类后你会发现,治理手段完全不同。
4. 最后看编排方式是否合理
常见问题有:
- 本可并行却写成串行
- 一个慢任务被放在公共池
- 过早阻塞
- 降级边界没设计好
总结
CompletableFuture 真正难的,不是 API 会不会写,而是能不能把它放进真实业务里稳定运行。
你可以把这篇文章记成一句话:
异步编排不是“把代码改成并发”这么简单,而是“在线程池隔离、超时控制、异常治理下,稳定地并发”。
落地时我建议优先做这 5 件事:
- 所有核心异步任务显式指定线程池
- 不同依赖类型做线程池隔离
- 每个任务设置超时和降级默认值
- 异常统一解包并分类记录
- 线程池指标、超时率、拒绝率必须可观测
最后提醒一个边界条件:
- 如果你的任务是纯 CPU 计算,重点是控制线程数,避免过度并发。
- 如果你的任务是阻塞 IO 调用,重点是线程池隔离、客户端超时和降级。
- 如果你的任务涉及写操作和副作用,重点还要加上幂等、重试与补偿设计。
当你把这些基础打牢,CompletableFuture 就不只是“语法糖”,而是一个非常好用的异步任务编排工具。