Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常处理优化
在很多 Java 服务里,真正拖慢接口响应时间的,往往不是某一段复杂计算,而是多个远程调用串行执行:查用户、查订单、查库存、查营销信息……每一步都不算慢,但堆起来就慢了。
这篇文章我想从**“一个接口怎么从串行改成异步编排”**这个角度,带你把 CompletableFuture 用到实战里。重点放在三件事:
- 并行调用:把可同时执行的任务一起跑
- 超时控制:别让慢依赖拖垮主流程
- 异常处理优化:失败时能降级、能定位、能收口
如果你会基本的 Java 并发,这篇内容应该能直接上手。
背景与问题
先看一个典型需求:组装“用户首页”数据。
接口要聚合这些来源:
- 用户基本信息
- 订单统计
- 优惠券信息
如果写成串行,大概像这样:
public HomePageVO getHomePage(Long userId) {
UserInfo userInfo = userService.getUserInfo(userId);
OrderSummary orderSummary = orderService.getOrderSummary(userId);
CouponInfo couponInfo = couponService.getCouponInfo(userId);
return new HomePageVO(userInfo, orderSummary, couponInfo);
}
问题很明显:
- 三个调用彼此独立,却被串行执行
- 任意一个依赖慢,整体都慢
- 任意一个依赖报错,整个接口直接失败
- 超时、重试、降级逻辑散落在业务代码里
假设三个接口耗时分别是:
- 用户服务:80ms
- 订单服务:120ms
- 优惠券服务:100ms
串行总耗时接近 300ms+;并行后理想耗时接近 120ms+少量调度开销。
这就是异步编排最直接的价值。
前置知识与环境准备
你需要知道什么
建议你先熟悉这些概念:
ExecutorService- 线程池基础参数
- Lambda 表达式
Future和CompletableFuture的区别- Java 8+ 语法
运行环境
本文示例基于:
- JDK 9+ 更方便(因为有
orTimeout/completeOnTimeout) - JDK 8 也能做,后面会补一个兼容思路
核心原理
CompletableFuture 的好处,不只是“异步执行”,而是它支持声明式编排:
supplyAsync:异步执行有返回值任务runAsync:异步执行无返回值任务thenApply:转换结果thenCompose:串联异步任务thenCombine:合并两个独立结果allOf:等待多个任务全部完成anyOf:等待任一任务完成exceptionally/handle/whenComplete:异常处理与收尾orTimeout/completeOnTimeout:超时控制
一张图先看整体
flowchart LR
A[请求进入] --> B[并行发起用户服务]
A --> C[并行发起订单服务]
A --> D[并行发起优惠券服务]
B --> E[汇总结果]
C --> E
D --> E
E --> F[返回首页数据]
串行依赖和并行依赖的区别
- 并行依赖:彼此无前后关系,越早一起发起越好
- 串行依赖:后一步依赖前一步结果,比如先查用户等级,再决定查哪类权益
很多人第一次用 CompletableFuture,容易把所有步骤都写成链式调用,结果本来能并行的任务被不小心串行了。这个坑我自己也踩过。
一个最小认知模型
你可以把 CompletableFuture 理解成两层:
- 任务层:任务在哪个线程池执行
- 编排层:多个任务之间怎么组合、异常怎么传播、超时怎么控制
光把任务丢到线程池里不难,难的是把编排关系写清楚。
编排关系示意图
sequenceDiagram
participant Client as 调用方
participant CF1 as 用户任务
participant CF2 as 订单任务
participant CF3 as 优惠券任务
participant Agg as 聚合逻辑
Client->>CF1: supplyAsync()
Client->>CF2: supplyAsync()
Client->>CF3: supplyAsync()
CF1-->>Agg: UserInfo
CF2-->>Agg: OrderSummary
CF3-->>Agg: CouponInfo
Agg-->>Client: HomePageVO
实战代码(可运行)
下面我给一个完整可运行示例。为了方便演示,我用 sleep 模拟远程调用。
示例目标
实现一个首页聚合接口,要求:
- 三个依赖并行调用
- 订单服务超时后返回默认值
- 优惠券服务异常时降级
- 最终统一汇总
- 使用自定义线程池,避免误用公共线程池
完整代码
import java.time.LocalTime;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureDemo {
private static final ExecutorService BIZ_POOL = new ThreadPoolExecutor(
8,
16,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new ThreadFactory() {
private int index = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "cf-biz-" + index++);
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) {
try {
HomePageVO result = getHomePage(1001L);
log("最终结果:" + result);
} finally {
BIZ_POOL.shutdown();
}
}
public static HomePageVO getHomePage(Long userId) {
CompletableFuture<UserInfo> userFuture = CompletableFuture
.supplyAsync(() -> getUserInfo(userId), BIZ_POOL)
.whenComplete((r, ex) -> {
if (ex == null) {
log("用户信息查询完成");
} else {
log("用户信息查询失败: " + ex.getMessage());
}
});
CompletableFuture<OrderSummary> orderFuture = CompletableFuture
.supplyAsync(() -> getOrderSummary(userId), BIZ_POOL)
.completeOnTimeout(OrderSummary.defaultValue(), 150, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("订单服务异常,降级返回默认值: " + ex.getMessage());
return OrderSummary.defaultValue();
});
CompletableFuture<CouponInfo> couponFuture = CompletableFuture
.supplyAsync(() -> getCouponInfo(userId), BIZ_POOL)
.exceptionally(ex -> {
log("优惠券服务异常,降级返回空优惠券: " + ex.getMessage());
return CouponInfo.empty();
});
CompletableFuture<Void> allDone = CompletableFuture.allOf(userFuture, orderFuture, couponFuture);
return allDone.thenApply(v -> {
UserInfo userInfo = userFuture.join();
OrderSummary orderSummary = orderFuture.join();
CouponInfo couponInfo = couponFuture.join();
return new HomePageVO(userInfo, orderSummary, couponInfo);
}).join();
}
private static UserInfo getUserInfo(Long userId) {
sleep(80);
return new UserInfo(userId, "Alice");
}
private static OrderSummary getOrderSummary(Long userId) {
sleep(200); // 故意模拟超时
return new OrderSummary(12, 999.50);
}
private static CouponInfo getCouponInfo(Long userId) {
sleep(60);
throw new RuntimeException("coupon service unavailable");
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread interrupted", e);
}
}
private static void log(String msg) {
System.out.println(LocalTime.now() + " [" + Thread.currentThread().getName() + "] " + msg);
}
static class HomePageVO {
private final UserInfo userInfo;
private final OrderSummary orderSummary;
private final CouponInfo couponInfo;
public HomePageVO(UserInfo userInfo, OrderSummary orderSummary, CouponInfo couponInfo) {
this.userInfo = userInfo;
this.orderSummary = orderSummary;
this.couponInfo = couponInfo;
}
@Override
public String toString() {
return "HomePageVO{" +
"userInfo=" + userInfo +
", orderSummary=" + orderSummary +
", couponInfo=" + couponInfo +
'}';
}
}
static class UserInfo {
private final Long userId;
private final String name;
public UserInfo(Long userId, String name) {
this.userId = userId;
this.name = name;
}
@Override
public String toString() {
return "UserInfo{" +
"userId=" + userId +
", name='" + name + '\'' +
'}';
}
}
static class OrderSummary {
private final int orderCount;
private final double totalAmount;
public OrderSummary(int orderCount, double totalAmount) {
this.orderCount = orderCount;
this.totalAmount = totalAmount;
}
public static OrderSummary defaultValue() {
return new OrderSummary(0, 0.0);
}
@Override
public String toString() {
return "OrderSummary{" +
"orderCount=" + orderCount +
", totalAmount=" + totalAmount +
'}';
}
}
static class CouponInfo {
private final int availableCount;
public CouponInfo(int availableCount) {
this.availableCount = availableCount;
}
public static CouponInfo empty() {
return new CouponInfo(0);
}
@Override
public String toString() {
return "CouponInfo{" +
"availableCount=" + availableCount +
'}';
}
}
}
代码拆解:一步步看懂
1. 用 supplyAsync 发起并行任务
CompletableFuture<UserInfo> userFuture =
CompletableFuture.supplyAsync(() -> getUserInfo(userId), BIZ_POOL);
这一步只是“提交任务”,不会阻塞当前线程。
如果你连续写三个 supplyAsync,它们会尽快被线程池调度执行,因此能并行。
2. 使用 completeOnTimeout 做超时兜底
CompletableFuture<OrderSummary> orderFuture = CompletableFuture
.supplyAsync(() -> getOrderSummary(userId), BIZ_POOL)
.completeOnTimeout(OrderSummary.defaultValue(), 150, TimeUnit.MILLISECONDS);
意思是:
- 如果 150ms 内没完成
- 就直接用默认值补上这个 Future 的结果
这在“非核心字段可接受降级”的场景特别好用。
它和 orTimeout 的区别
orTimeout:超时后让任务异常完成completeOnTimeout:超时后让任务正常完成,但结果是默认值
如果你的业务希望“超时算失败,统一走异常逻辑”,用 orTimeout;
如果你希望“超时自动降级,不影响主流程”,用 completeOnTimeout。
3. 使用 exceptionally 兜底异常
.exceptionally(ex -> {
log("优惠券服务异常,降级返回空优惠券: " + ex.getMessage());
return CouponInfo.empty();
});
这里很实用,但要注意一个点:
exceptionally会吞掉异常并转换为正常结果- 后续链路拿到的是“降级后的值”,而不是异常
这意味着如果你还想记录原始异常,一定要在这里打日志,或者更早用 whenComplete 观测。
4. 用 allOf 等待全部完成,再 join
CompletableFuture<Void> allDone = CompletableFuture.allOf(userFuture, orderFuture, couponFuture);
return allDone.thenApply(v -> {
UserInfo userInfo = userFuture.join();
OrderSummary orderSummary = orderFuture.join();
CouponInfo couponInfo = couponFuture.join();
return new HomePageVO(userInfo, orderSummary, couponInfo);
}).join();
这段写法非常常见,也比较稳妥:
allOf等待全部 Future 完成- 再从各个 Future 中取结果
为什么这里用 join() 而不是 get()?
join()不需要处理受检异常,代码更干净- 它抛的是
CompletionException - 如果你在业务层统一封装异常,
join()常常更顺手
再进一层:串联依赖怎么写
并不是所有场景都适合 allOf。有些任务是有依赖关系的。
比如:
- 先查用户信息
- 根据用户等级决定权益查询策略
- 再查权益信息
这种场景应该用 thenCompose,而不是把第二步也提前并行发出去。
CompletableFuture<UserInfo> userFuture =
CompletableFuture.supplyAsync(() -> getUserInfo(1001L), BIZ_POOL);
CompletableFuture<String> benefitFuture = userFuture.thenCompose(user ->
CompletableFuture.supplyAsync(() -> "权益信息: level=" + user.name, BIZ_POOL)
);
依赖编排示意
flowchart TD
A[查询用户信息] --> B{根据用户结果判断}
B --> C[查询权益信息]
C --> D[组装返回值]
异常处理:不是会写就够了,要知道传播规则
这是 CompletableFuture 最容易绕的部分。
常用方法怎么选
exceptionally
只在异常时执行,返回一个兜底值。
future.exceptionally(ex -> defaultValue);
适合:
- 降级
- 给默认值
- 不想中断主流程
handle
不管成功还是失败都会执行,可以统一转换结果。
future.handle((result, ex) -> {
if (ex != null) {
return defaultValue;
}
return result;
});
适合:
- 成功失败统一映射
- 想把异常结果也转成某种领域对象
whenComplete
更像“观察者”,适合记录日志、打指标,但不改变结果。
future.whenComplete((result, ex) -> {
if (ex != null) {
log("任务失败");
}
});
适合:
- 埋点
- 日志
- tracing
我个人的习惯是:
- 记录日志:
whenComplete - 业务兜底:
exceptionally或handle
这样职责比较清晰。
常见坑与排查
这一部分很重要,很多线上问题不是不会写,而是“写得像对的,但跑起来不对”。
坑 1:默认线程池用得太随意
如果你写:
CompletableFuture.supplyAsync(() -> doSomething());
没有传线程池时,默认使用 ForkJoinPool.commonPool()。
问题在于:
- 这个公共线程池会被全局共享
- 一旦任务里有阻塞操作(比如 HTTP、数据库、RPC)
- 很容易把公共线程池拖慢
建议
- 业务异步编排,尽量传入专用线程池
- CPU 密集和 IO 密集任务尽量拆开线程池
坑 2:明明写了异步,结果还是串行
比如这样:
UserInfo user = CompletableFuture.supplyAsync(() -> getUserInfo(userId), BIZ_POOL).join();
OrderSummary order = CompletableFuture.supplyAsync(() -> getOrderSummary(userId), BIZ_POOL).join();
CouponInfo coupon = CompletableFuture.supplyAsync(() -> getCouponInfo(userId), BIZ_POOL).join();
看着用了异步,其实每次都马上 join(),本质上还是串行。
正确思路
先都发起,再统一等待:
CompletableFuture<UserInfo> f1 = CompletableFuture.supplyAsync(() -> getUserInfo(userId), BIZ_POOL);
CompletableFuture<OrderSummary> f2 = CompletableFuture.supplyAsync(() -> getOrderSummary(userId), BIZ_POOL);
CompletableFuture<CouponInfo> f3 = CompletableFuture.supplyAsync(() -> getCouponInfo(userId), BIZ_POOL);
CompletableFuture.allOf(f1, f2, f3).join();
坑 3:异常被包装,看不清真实原因
join() 抛的是 CompletionException,get() 抛的是 ExecutionException。真实异常通常藏在 getCause() 里。
排查方式
try {
future.join();
} catch (CompletionException ex) {
Throwable cause = ex.getCause();
cause.printStackTrace();
}
线上排障时,不要只打印外层异常信息,不然你很可能只看到一句:
java.util.concurrent.CompletionException
这几乎没法定位。
坑 4:超时只“超了 Future”,底层任务可能还在跑
这是很多人忽略的一点。
completeOnTimeout / orTimeout 控制的是 CompletableFuture 的完成状态,不等于底层 IO 调用真的被取消了。
也就是说:
- 你的主流程可能已经返回默认值了
- 但底层 HTTP/RPC 线程可能还在继续执行
这意味着什么
真正要止损,还要结合:
- HTTP 客户端超时
- RPC 框架超时
- 数据库查询超时
- 线程中断响应
只靠 CompletableFuture 不足以彻底“停止慢调用”。
坑 5:线程池队列太大,问题被隐藏
很多项目喜欢把队列设成超大,比如几千几万。表面上不报错,实际上:
- 请求堆积在线程池里
- 延迟越来越高
- 最后变成“慢性雪崩”
建议
线程池参数要结合业务压测:
- 核心线程数
- 最大线程数
- 队列大小
- 拒绝策略
对聚合接口来说,宁可更早暴露容量问题,也不要无限排队。
安全/性能最佳实践
这里我把最实用的建议收成一组,你可以直接拿去对照项目。
1. 为异步编排准备专用线程池
不要默认混用。
ExecutorService ioPool = new ThreadPoolExecutor(
16, 32, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
new ThreadPoolExecutor.CallerRunsPolicy()
);
如果你的任务主要是远程调用、网络等待,线程数通常可以比 CPU 核数大一些,但不要拍脑袋定值,最好压测。
2. 区分核心结果和可降级结果
不是所有字段都值得“死等”。
比如首页接口里:
- 用户身份、登录态:核心字段
- 优惠券角标、推荐内容:可降级字段
可降级字段建议:
- 更短超时
- 失败兜底默认值
- 单独打监控
核心字段建议:
- 明确失败策略
- 不要无脑吞异常
3. 为每个异步任务打上名字和日志
工程里最怕“有 Future,没观测”。
你至少应该能知道:
- 哪个子任务开始了
- 耗时多少
- 是超时还是业务异常
- 降级是否发生
- 降级比例是否上升
一个简单包装思路:
public static <T> Supplier<T> timed(String taskName, Supplier<T> supplier) {
return () -> {
long start = System.currentTimeMillis();
try {
return supplier.get();
} finally {
long cost = System.currentTimeMillis() - start;
System.out.println(taskName + " cost=" + cost + "ms");
}
};
}
使用方式:
CompletableFuture<UserInfo> future =
CompletableFuture.supplyAsync(
timed("queryUserInfo", () -> getUserInfo(1001L)),
BIZ_POOL
);
4. 尽量避免在异步链里做阻塞嵌套
例如在一个 supplyAsync 里面又去 join 另一个 Future,这很容易形成:
- 线程池占满
- 线程互相等待
- 吞吐下降
尽量让编排写在 Future 链外层,而不是在任务内部互等。
5. 结合限流、熔断、超时一起看
CompletableFuture 解决的是编排问题,不是全部的稳定性问题。
如果下游经常抖动,还应配合:
- 限流
- 熔断
- 隔离
- 重试(谨慎)
- 舱壁模式
尤其是重试,别直接在异步聚合层无脑重试,不然可能把下游放大打挂。
6. 对线程上下文传播保持警惕
在异步线程里,下面这些上下文可能会丢失:
ThreadLocal- MDC 日志上下文
- 链路追踪上下文
- 用户登录上下文
如果项目依赖这些信息,需要:
- 手动透传
- 使用支持上下文传播的封装线程池
- 或引入统一的任务装饰器
这个问题在本地测试时不明显,但线上日志一旦串不起来,排查会非常痛苦。
逐步验证清单
如果你准备在项目里落地,我建议按这个顺序验证。
第一步:先验证并行是否真的生效
方法很简单:
- 打开始时间
- 发起多个 Future
- 最后统计总耗时
如果总耗时仍接近串行总和,说明你很可能哪里提前 join 了。
第二步:验证超时降级是否符合预期
重点看两个问题:
- 主流程是否能按预期快速返回默认值
- 超时后的底层调用是否还在消耗资源
第二点经常被忽略。
第三步:验证异常传播路径
建议分别模拟:
- 单个依赖抛异常
- 多个依赖同时异常
- 核心任务异常、非核心任务异常
确认:
- 哪些异常被吞掉
- 哪些异常会向上抛出
- 日志中是否能定位真实原因
第四步:压测线程池
压测时重点观察:
- 活跃线程数
- 队列长度
- 平均耗时 / TP99
- 拒绝次数
- 降级次数
如果线程池队列一直积压,说明线程池大小或下游能力有问题,不是把队列再调大就能解决的。
JDK 8 怎么办?
如果你还在 JDK 8,orTimeout 和 completeOnTimeout 不可用。
常见做法有两种:
- 使用
ScheduledExecutorService自己实现超时补偿 - 借助成熟工具库封装超时 Future
一个简化思路如下:
import java.util.concurrent.*;
public class Jdk8TimeoutDemo {
private static final ScheduledExecutorService SCHEDULER =
Executors.newScheduledThreadPool(1);
public static <T> CompletableFuture<T> completeOnTimeout(
CompletableFuture<T> future, T defaultValue, long timeout, TimeUnit unit) {
SCHEDULER.schedule(() -> future.complete(defaultValue), timeout, unit);
return future;
}
}
注意,这只是演示思路。生产里要考虑:
- 定时任务线程池资源
- 原 Future 已完成时的竞争
- 底层任务取消
- 指标埋点
一个更贴近工程的落地建议
如果你的接口里异步任务越来越多,我不建议把所有 CompletableFuture 逻辑都堆在 Controller 或 Service 主函数里。
更好的做法是分层:
- 任务层:每个依赖调用一个独立方法
- 编排层:负责并行、串行、超时、异常兜底
- 聚合层:只做结果组装
例如:
public CompletableFuture<UserInfo> queryUser(Long userId) { ... }
public CompletableFuture<OrderSummary> queryOrder(Long userId) { ... }
public CompletableFuture<CouponInfo> queryCoupon(Long userId) { ... }
public HomePageVO aggregate(Long userId) { ... }
这样做的好处:
- 更容易单测
- 更容易替换降级策略
- 更容易查哪个依赖出了问题
总结
CompletableFuture 真正强的地方,不是“能异步”,而是它让你把任务之间的关系写清楚:
- 能并行的就并行,降低总耗时
- 可降级的加超时和默认值,避免拖垮主流程
- 异常统一收口,既能定位也能兜底
- 配合专用线程池和观测手段,才能真正在线上可用
最后给几个我认为最实用的可执行建议:
- 先发起,再统一等待,不要边创建边
join - 一定使用自定义线程池,别把业务阻塞任务丢进公共池
- 区分核心链路和可降级链路,超时策略不要一刀切
- 异常日志要打印真实 cause,别只看
CompletionException - 别把超时当取消,底层调用的超时仍需单独配置
如果你刚开始落地,可以先从一个简单聚合接口改造起:把 3 个独立远程调用并行化,再加上 1 个超时降级和 1 个异常兜底。先跑通,再逐步抽象。这个节奏通常最稳。