Java 中基于 CompletableFuture 的异步编排实战:从并行调用到超时控制与异常兜底
在日常后端开发里,我们经常会遇到这样一类接口:一个请求进来,要同时查多个下游服务,再把结果组装返回。
比如商品详情页:
- 查商品基础信息
- 查库存
- 查价格
- 查营销标签
- 查推荐列表
如果这些调用都串行执行,接口耗时大概率是各个下游耗时的总和;一旦其中某个服务偶尔抖一下,整体 RT 就会被拖垮。这个时候,CompletableFuture 就很适合拿来做异步编排。
但很多同学第一次上手时,往往停留在 supplyAsync() 和 allOf() 这两个 API 上。真正上线后,问题才开始出现:
- 默认线程池被打满
- 某个任务超时导致整体一直挂着
- 某个分支抛异常,整个组合 future 直接失败
- 日志里只看到
CompletionException,定位不到源头 - 任务太多,结果吞吐反而下降
这篇文章我不打算只讲 API,而是带你从一个实际场景走一遍:并行调用、结果汇聚、超时控制、异常兜底、线程池配置以及排查思路。你可以直接拿去改成项目里的模板。
背景与问题
先看一个最典型的业务场景:聚合接口。
假设我们有一个“用户主页”接口,需要返回:
- 用户基本信息
- 账户余额
- 优惠券数量
如果串行写,大概是这样:
UserProfile profile = userService.getProfile(userId);
AccountBalance balance = accountService.getBalance(userId);
CouponInfo couponInfo = couponService.getCouponInfo(userId);
return new UserHomeVO(profile, balance, couponInfo);
问题很明显:
- 整体耗时高:三个调用是串行的。
- 弱依赖也阻塞主流程:比如优惠券信息不是强依赖,却被绑死在主链路上。
- 失败影响面大:其中一个接口异常,整个请求失败。
- 超时不可控:下游卡住时,主线程只能干等。
更合理的目标应该是:
- 能并行的尽量并行
- 核心数据失败时明确报错,非核心数据失败时优雅降级
- 每个异步分支有超时
- 整体线程池可控,不滥用公共线程池
- 异常能被看懂、日志能追踪
前置知识与环境准备
本文示例基于:
- JDK 9+(因为会用到
orTimeout/completeOnTimeout) - 任意 IDE
- 一个普通的 Maven 或 Gradle Java 项目即可
如果你还没系统接触过 CompletableFuture,先记住三类最常用操作:
-
创建异步任务
runAsyncsupplyAsync
-
结果转换与组合
thenApplythenComposethenCombineallOf
-
异常与结束处理
exceptionallyhandlewhenComplete
核心原理
1. CompletableFuture 到底解决了什么问题
CompletableFuture 本质上是:
- 一个可以代表“未来结果”的对象
- 一套可以把多个异步任务串起来、合起来、兜住异常的编排机制
你可以把它理解成“异步版的流水线”:
- 上一步完成后,进入下一步
- 多个分支并行跑
- 最后合并结果
- 某一步失败时,有机会做降级或兜底
2. 常见编排模式
串行依赖:thenCompose
当第二个任务依赖第一个任务的结果时,用 thenCompose。
例如:
- 先查用户
- 再根据用户等级查权益
并行聚合:thenCombine / allOf
当多个任务彼此独立时,用并行方式更合适。
thenCombine:适合两个任务合并allOf:适合多个任务统一等待
异常处理:exceptionally / handle / whenComplete
这三个经常被混淆,我建议这样记:
exceptionally:只在异常时给默认值handle:无论成功失败都能拿到结果并转换whenComplete:更像“回调通知”,适合打日志,不适合改结果
一图看懂异步编排主流程
flowchart TD
A[收到用户主页请求] --> B[并行查询用户信息]
A --> C[并行查询账户余额]
A --> D[并行查询优惠券数量]
B --> E[设置超时与异常兜底]
C --> E
D --> E
E --> F[allOf 等待完成]
F --> G[汇总结果]
G --> H[返回响应]
CompletableFuture 常用关系图
classDiagram
class CompletableFuture {
+supplyAsync()
+runAsync()
+thenApply()
+thenCompose()
+thenCombine()
+allOf()
+orTimeout()
+completeOnTimeout()
+exceptionally()
+handle()
+whenComplete()
}
实战代码(可运行)
下面我写一个完整可运行的示例,模拟用户主页接口的异步编排。
这个例子里我们会演示:
- 自定义线程池
- 并行调用
- 单个任务超时控制
- 非核心依赖异常兜底
- 最终结果组装
完整示例
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CompletableFutureOrchestrationDemo {
// ======== 模拟返回对象 ========
static class UserProfile {
Long userId;
String nickname;
UserProfile(Long userId, String nickname) {
this.userId = userId;
this.nickname = nickname;
}
@Override
public String toString() {
return "UserProfile{userId=" + userId + ", nickname='" + nickname + "'}";
}
}
static class AccountBalance {
BigDecimal amount;
AccountBalance(BigDecimal amount) {
this.amount = amount;
}
@Override
public String toString() {
return "AccountBalance{amount=" + amount + "}";
}
}
static class CouponInfo {
int count;
CouponInfo(int count) {
this.count = count;
}
@Override
public String toString() {
return "CouponInfo{count=" + count + "}";
}
}
static class UserHomeVO {
UserProfile profile;
AccountBalance balance;
CouponInfo couponInfo;
UserHomeVO(UserProfile profile, AccountBalance balance, CouponInfo couponInfo) {
this.profile = profile;
this.balance = balance;
this.couponInfo = couponInfo;
}
@Override
public String toString() {
return "UserHomeVO{" +
"profile=" + profile +
", balance=" + balance +
", couponInfo=" + couponInfo +
'}';
}
}
// ======== 自定义线程池 ========
private static final ExecutorService BIZ_EXECUTOR = new ThreadPoolExecutor(
4,
8,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactory() {
private final AtomicInteger idx = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "cf-biz-" + idx.getAndIncrement());
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
Long userId = 1001L;
UserHomeVO result = getUserHome(userId);
System.out.println(LocalDateTime.now() + " result = " + result);
BIZ_EXECUTOR.shutdown();
}
public static UserHomeVO getUserHome(Long userId) {
long start = System.currentTimeMillis();
CompletableFuture<UserProfile> profileFuture =
CompletableFuture.supplyAsync(() -> getUserProfile(userId), BIZ_EXECUTOR)
.orTimeout(800, TimeUnit.MILLISECONDS)
.whenComplete((res, ex) -> log("profile", res, ex));
CompletableFuture<AccountBalance> balanceFuture =
CompletableFuture.supplyAsync(() -> getAccountBalance(userId), BIZ_EXECUTOR)
.orTimeout(500, TimeUnit.MILLISECONDS)
.whenComplete((res, ex) -> log("balance", res, ex));
CompletableFuture<CouponInfo> couponFuture =
CompletableFuture.supplyAsync(() -> getCouponInfo(userId), BIZ_EXECUTOR)
.completeOnTimeout(new CouponInfo(0), 300, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
System.out.println("coupon fallback because of: " + ex.getMessage());
return new CouponInfo(0);
})
.whenComplete((res, ex) -> log("coupon", res, ex));
try {
CompletableFuture.allOf(profileFuture, balanceFuture, couponFuture).join();
UserHomeVO vo = new UserHomeVO(
profileFuture.join(),
balanceFuture.join(),
couponFuture.join()
);
System.out.println("total cost = " + (System.currentTimeMillis() - start) + " ms");
return vo;
} catch (CompletionException e) {
Throwable root = unwrapCompletionException(e);
throw new RuntimeException("getUserHome failed: " + root.getMessage(), root);
}
}
// ======== 模拟下游调用 ========
private static UserProfile getUserProfile(Long userId) {
sleep(200);
return new UserProfile(userId, "Alice");
}
private static AccountBalance getAccountBalance(Long userId) {
sleep(350);
return new AccountBalance(new BigDecimal("1024.88"));
}
private static CouponInfo getCouponInfo(Long userId) {
sleep(600); // 故意慢一点,触发超时默认值
return new CouponInfo(5);
}
// ======== 工具方法 ========
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread interrupted", e);
}
}
private static void log(String taskName, Object res, Throwable ex) {
String threadName = Thread.currentThread().getName();
if (ex != null) {
System.out.println("[" + threadName + "] " + taskName + " failed: " + ex.getMessage());
} else {
System.out.println("[" + threadName + "] " + taskName + " success: " + res);
}
}
private static Throwable unwrapCompletionException(Throwable e) {
if (e instanceof CompletionException || e instanceof ExecutionException) {
if (e.getCause() != null) {
return unwrapCompletionException(e.getCause());
}
}
return e;
}
}
运行后你会看到什么
这个例子里:
profile200ms 成功balance350ms 成功coupon600ms,超过 300ms,直接返回默认值CouponInfo(0)
所以整体耗时大约接近最慢的核心任务,而不是三者之和。
逐步拆解这段代码
1. 为什么一定要自定义线程池
很多示例会这样写:
CompletableFuture.supplyAsync(() -> querySomething());
这会默认使用 ForkJoinPool.commonPool()。在 demo 里没问题,在线上我通常不建议这么干,原因有两个:
- 业务线程和公共线程池混用,不可控
- 遇到阻塞型 IO 时,ForkJoinPool 并不理想
像远程 RPC、数据库、HTTP 调用,大多数都是阻塞型任务。更稳妥的方式是:
- 给业务隔离线程池
- 根据接口吞吐、平均 RT、峰值流量配置核心线程数和队列
2. orTimeout 与 completeOnTimeout 的区别
这是实战里很关键的一对 API。
orTimeout
超时后让 Future 进入异常状态。
future.orTimeout(500, TimeUnit.MILLISECONDS)
适合场景:
- 这是核心依赖
- 超时应该明确失败
- 后续要统一走异常链路
completeOnTimeout
超时后返回默认值。
future.completeOnTimeout(defaultValue, 300, TimeUnit.MILLISECONDS)
适合场景:
- 非核心依赖
- 允许降级
- 页面上显示默认内容即可
我自己的经验是:
- 强依赖:
orTimeout - 弱依赖:
completeOnTimeout + exceptionally
这套组合比较实用。
两种编排方式对比
sequenceDiagram
participant Client
participant API
participant ProfileSvc
participant BalanceSvc
participant CouponSvc
Client->>API: 请求用户主页
API->>ProfileSvc: 异步查询用户信息
API->>BalanceSvc: 异步查询账户余额
API->>CouponSvc: 异步查询优惠券
ProfileSvc-->>API: 返回成功
BalanceSvc-->>API: 返回成功
CouponSvc-->>API: 超时/异常
API-->>API: 使用默认 CouponInfo(0)
API-->>Client: 返回聚合结果
进阶:thenCompose 和 thenCombine 怎么用
很多人会把所有场景都写成 allOf(),其实没必要。
场景一:后一个任务依赖前一个任务结果
比如先查用户,再按用户等级查权益:
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "VIP", BIZ_EXECUTOR)
.thenCompose(level ->
CompletableFuture.supplyAsync(() -> "权益包-" + level, BIZ_EXECUTOR)
);
System.out.println(future.join());
这里应该用 thenCompose,因为第二步依赖第一步结果。
场景二:两个独立任务并行,最后合并
CompletableFuture<String> nameFuture =
CompletableFuture.supplyAsync(() -> "Alice", BIZ_EXECUTOR);
CompletableFuture<Integer> scoreFuture =
CompletableFuture.supplyAsync(() -> 95, BIZ_EXECUTOR);
CompletableFuture<String> resultFuture =
nameFuture.thenCombine(scoreFuture, (name, score) -> name + ":" + score);
System.out.println(resultFuture.join());
这里适合 thenCombine。
场景三:多个独立任务统一等待
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();
适合聚合多个互不依赖的调用。
常见坑与排查
这一节很重要。我自己最早用 CompletableFuture 时,坑基本都踩在这里。
坑 1:allOf 不会自动帮你收集结果
很多人以为:
CompletableFuture.allOf(f1, f2, f3)
会直接返回三个结果。其实不会,它返回的是 CompletableFuture<Void>。
你仍然需要手动取值:
CompletableFuture.allOf(f1, f2, f3).join();
Object r1 = f1.join();
Object r2 = f2.join();
Object r3 = f3.join();
坑 2:join 和 get 的异常包装不同
get()抛受检异常:ExecutionException,InterruptedExceptionjoin()抛非受检异常:CompletionException
线上排查时,经常看到一层层 CompletionException,真正原因藏在 cause 里。
建议统一做异常解包:
private static Throwable unwrap(Throwable e) {
while ((e instanceof CompletionException || e instanceof ExecutionException) && e.getCause() != null) {
e = e.getCause();
}
return e;
}
坑 3:用了 exceptionally 以后,异常被“吃掉”了
比如:
future.exceptionally(ex -> defaultValue);
这样做没错,但要知道:异常链路到这里就变成正常值了。如果后面还想知道它失败过,就必须在这里打日志或打指标。
一个更稳一点的写法是:
future.exceptionally(ex -> {
log.error("task failed", ex);
return defaultValue;
});
坑 4:异步里再做阻塞,线程池很快耗尽
比如你在 supplyAsync 里又调用了慢 SQL、慢 HTTP、文件 IO,这些都是阻塞操作。线程池如果设置过小,很快就会排队;如果设置过大,机器上下文切换压力又会上来。
排查思路:
- 看线程池活跃线程数
- 看队列长度
- 看任务平均执行时间
- 看超时是否集中在某个下游
坑 5:超时不等于真正取消下游执行
这一点特别容易误解。
future.orTimeout(300, TimeUnit.MILLISECONDS)
意思是:CompletableFuture 在 300ms 后超时失败。
但如果你的下游任务已经在线程里跑起来了,它不一定真的停止。
也就是说:
- 上层已经返回超时
- 下游线程可能还在继续跑
所以如果你对资源消耗敏感,还要配合:
- 下游 HTTP 客户端自身超时
- RPC 超时
- 数据库查询超时
- 可中断任务的取消机制
排查一个真实风格的问题:为什么接口已经降级了,机器 CPU 还是高
这个现象很常见。
表面上看,你已经做了:
completeOnTimeoutexceptionally- 默认值兜底
按理说接口应该“很稳”了,但 CPU 还是高、线程池还是满。
原因通常是:
- 应用层 future 超时了,但下游任务没有真正停
- 大量超时任务还在后台继续执行
- 新流量继续进来,形成堆积
所以要分清两层超时:
- 编排层超时:
CompletableFuture的超时 - 执行层超时:HTTP/RPC/DB 客户端的超时
两层都要配,才是真正可控。
安全/性能最佳实践
这里我给一套比较务实的建议,不追求“最炫”,追求“能上线”。
1. 业务线程池一定要隔离
不要把所有业务都扔进一个公共池子里。建议至少按调用类型拆分:
- 聚合查询线程池
- 导出任务线程池
- 消息消费线程池
这样即使某一类任务堆积,也不会把其他核心功能拖死。
2. 给每个下游设置合理超时,不要只设总超时
错误示范:
- 总接口超时 2 秒
- 下游每个任务都不限时
结果就是总超时虽然兜住了,但线程资源已经被长时间占满。
更合理的是:
- 用户信息:800ms
- 余额:500ms
- 优惠券:300ms,超时直接默认值
3. 强弱依赖要分层设计
不是所有字段都值得“死等”。
可以这样分:
- 强依赖:失败直接报错,例如订单创建核心库存校验
- 弱依赖:失败降级,例如推荐、标签、角标、统计信息
别把所有任务都按同一种策略处理。
4. 日志里必须带任务名、线程名、耗时、请求标识
异步场景下最怕日志串不起来。至少要有:
- 请求 ID
- 任务名
- 开始/结束时间
- 线程名
- 是否超时/异常
如果项目里用了 MDC,要注意线程切换后上下文可能丢失,必要时需要手动透传。
5. 避免在回调链中写太复杂的业务逻辑
链式调用一多,很容易写成“异步意大利面”:
f1.thenApply(...)
.thenCompose(...)
.thenCombine(...)
.handle(...)
.thenApply(...)
如果一屏代码看不懂,后面维护的人一定会骂你,包括未来的你自己。
建议做法:
- 一个 future 只负责一个清晰职责
- 中间结果命名清楚
- 聚合逻辑单独抽方法
6. 监控比语法更重要
在生产环境里,真正决定你能不能睡安稳的不是 thenApply 还是 handle,而是这些指标有没有:
- 线程池活跃线程数
- 队列积压长度
- 任务平均耗时
- 超时次数
- 异常次数
- 降级次数
如果没有这些监控,出了问题你会非常被动。
一个推荐的实战模板
如果你想在项目里快速落地,我建议按下面这个套路写:
CompletableFuture<ResultA> aFuture = CompletableFuture
.supplyAsync(() -> callA(), executor)
.orTimeout(300, TimeUnit.MILLISECONDS)
.whenComplete((r, e) -> logResult("A", r, e));
CompletableFuture<ResultB> bFuture = CompletableFuture
.supplyAsync(() -> callB(), executor)
.completeOnTimeout(defaultB(), 200, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
logError("B", ex);
return defaultB();
});
CompletableFuture.allOf(aFuture, bFuture).join();
return merge(aFuture.join(), bFuture.join());
使用规则:
- 核心依赖:
orTimeout - 非核心依赖:
completeOnTimeout + exceptionally - 全部任务:
whenComplete打日志 - 最外层:统一解包异常
这套模板足够覆盖大多数聚合类接口。
逐步验证清单
你可以按这个顺序自己动手验证一遍。
第一步:先并行化
先把串行调用改成多个 supplyAsync,观察总耗时是否接近“最长那个任务”。
第二步:给弱依赖加默认值
对推荐、标签、统计信息这类弱依赖,加 completeOnTimeout 和 exceptionally。
第三步:给强依赖加失败策略
对库存、价格、账户余额这类核心依赖,用 orTimeout,并在最外层统一失败。
第四步:换掉默认线程池
接入业务专属线程池,压测看活跃线程数和排队情况。
第五步:补齐日志与监控
如果你不能回答下面这些问题,说明还没真正上线准备好:
- 哪个分支最慢?
- 哪个分支最常超时?
- 降级比例是多少?
- 队列有没有堆积?
- 是否存在超时后任务继续执行的问题?
总结
CompletableFuture 最有价值的地方,不只是“能异步”,而是它能帮我们把一个复杂请求拆成:
- 可并行的分支
- 可组合的流程
- 可控制的超时
- 可兜底的异常策略
如果你只记住本文三点,我建议记这三条:
- 别用默认线程池硬上生产
- 强依赖用
orTimeout,弱依赖用completeOnTimeout + exceptionally - 超时控制要区分编排层和下游执行层
最后给一个边界条件提醒:
CompletableFuture 很适合中等复杂度的异步编排。如果你的流程已经涉及大量动态依赖、复杂重试、跨服务补偿、状态持久化,那可能就不是它最擅长的场景了,应该考虑工作流引擎或消息驱动架构。
如果你现在手头正好有一个“聚合查询接口慢、偶发超时、部分字段允许降级”的需求,这篇文章里的示例基本就可以直接套进去。先把线程池、超时、异常兜底三件事做扎实,稳定性会立刻上一个台阶。