Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常处理
在 Java 后端开发里,“把几个慢接口并行调一下,再兜住超时和异常” 是非常常见的需求。
比如一个商品详情页,可能要同时查:
- 商品基础信息
- 库存
- 价格
- 营销活动
- 用户画像或推荐数据
如果这些调用串行执行,整体耗时往往是所有接口耗时之和;而如果并行执行,总耗时更接近最慢的那个接口。这也是 CompletableFuture 特别适合登场的地方。
这篇文章我会从**“为什么要用”讲到“怎么写能跑”,再到“线上容易踩什么坑”**。你可以把它当成一篇偏实战的上手指南。
背景与问题
先看一个很典型的同步写法:
public ProductPage getProductPage(String productId) {
Product product = productService.getProduct(productId);
Inventory inventory = inventoryService.getInventory(productId);
Price price = priceService.getPrice(productId);
Promotion promotion = promotionService.getPromotion(productId);
return new ProductPage(product, inventory, price, promotion);
}
这个写法的问题不复杂,但很致命:
-
耗时叠加
如果 4 个远程调用分别要 200ms、150ms、180ms、300ms,总耗时可能接近 830ms。 -
异常处理分散
任意一个服务失败,整个页面可能直接报错,或者需要一堆try-catch。 -
超时不可控
某个下游接口卡住,会把整个请求链路拖死。 -
扩展性差
以后再加“推荐”、“评价摘要”、“物流承诺”,同步代码会越来越难维护。
这时就需要一种编排方式:
- 能并行发起任务
- 能等待全部完成或部分完成
- 能做超时控制
- 能做异常兜底
- 能把多个结果最终聚合
CompletableFuture 正是为这类场景准备的。
前置知识与环境准备
适用版本
本文示例建议使用:
- JDK 9+:可直接使用
orTimeout、completeOnTimeout - 如果你是 JDK 8,也能做,但超时控制要稍微绕一点
你需要知道的基础
如果你已经会这些,读起来会更顺:
- Java 线程池基础
- Lambda 表达式
Future的基本概念- 异常传播机制
一个重要提醒
CompletableFuture 不是银弹。它更适合:
- I/O 密集型任务并发
- 多个相互独立的远程调用编排
- 需要阶段式回调和结果聚合的场景
如果你拿它去做:
- 大量 CPU 密集计算
- 不受控的阻塞任务堆积
- 无限并发的接口风暴
那很容易把线程池打爆。
核心原理
先别急着背 API,我更建议先理解它的三个核心角色。
1. 任务发起:异步执行
最常见的是:
supplyAsync:有返回值runAsync:无返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
2. 阶段编排:把后续动作串起来
你可以把它理解为:前一个任务完成后,接下来做什么。
常见方法:
thenApply:转换结果thenAccept:消费结果thenRun:不关心结果,只执行后续动作thenCompose:扁平化串联异步任务thenCombine:合并两个异步结果
3. 结果汇总:等待多个任务
常用两个:
allOf:全部完成再继续anyOf:任意一个完成就继续
一张图看懂 CompletableFuture 编排路径
flowchart LR
A[收到请求] --> B[并行查询商品信息]
B --> C1[商品基础信息]
B --> C2[库存]
B --> C3[价格]
B --> C4[营销活动]
C1 --> D[结果聚合]
C2 --> D
C3 --> D
C4 --> D
D --> E[返回商品详情页]
这张图背后的关键点是:
- 每个子任务互相独立,可以并行
- 聚合点统一收口
- 异常和超时最好在每个子任务内部就处理掉,而不是最后一把梭
CompletableFuture 的常用编排方式
并行:多个任务同时发起
CompletableFuture<Product> productFuture =
CompletableFuture.supplyAsync(() -> productService.getProduct(productId), executor);
CompletableFuture<Inventory> inventoryFuture =
CompletableFuture.supplyAsync(() -> inventoryService.getInventory(productId), executor);
聚合:等待全部任务完成
CompletableFuture.allOf(productFuture, inventoryFuture).join();
转换:处理结果
CompletableFuture<String> nameFuture =
productFuture.thenApply(Product::getName);
串联:依赖前一个异步结果
比如先查商品,再根据商品分类查推荐:
CompletableFuture<Recommendation> recFuture =
productFuture.thenCompose(product ->
CompletableFuture.supplyAsync(
() -> recommendationService.getRecommendation(product.getCategoryId()),
executor
)
);
这里很多人第一次会把 thenApply 和 thenCompose 搞混。
thenApply:返回普通值thenCompose:返回另一个CompletableFuture
你可以简单记忆:
“如果回调里已经是异步了,就用 thenCompose。”
实战代码(可运行)
下面我们做一个完整示例:模拟一个商品聚合接口,包含:
- 并行调用多个服务
- 每个子任务设置超时
- 异常自动降级
- 最终聚合结果返回
示例目标
构造一个 ProductPageService,查询:
- 商品信息
- 库存
- 价格
- 营销活动
其中:
- 库存服务偶发异常
- 营销服务可能超时
- 即使某个服务失败,页面仍然尽量返回可用信息
完整示例代码
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CompletableFutureDemo {
public static void main(String[] args) {
ExecutorService executor = new ThreadPoolExecutor(
8,
16,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new NamedThreadFactory("cf-demo-"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
ProductService productService = new ProductService();
InventoryService inventoryService = new InventoryService();
PriceService priceService = new PriceService();
PromotionService promotionService = new PromotionService();
ProductPageService pageService = new ProductPageService(
executor, productService, inventoryService, priceService, promotionService
);
long start = System.currentTimeMillis();
ProductPage page = pageService.getProductPage("SKU-1001");
long cost = System.currentTimeMillis() - start;
System.out.println("结果:" + page);
System.out.println("耗时:" + cost + " ms");
executor.shutdown();
}
static class ProductPageService {
private final Executor executor;
private final ProductService productService;
private final InventoryService inventoryService;
private final PriceService priceService;
private final PromotionService promotionService;
public ProductPageService(
Executor executor,
ProductService productService,
InventoryService inventoryService,
PriceService priceService,
PromotionService promotionService
) {
this.executor = executor;
this.productService = productService;
this.inventoryService = inventoryService;
this.priceService = priceService;
this.promotionService = promotionService;
}
public ProductPage getProductPage(String productId) {
CompletableFuture<Product> productFuture = CompletableFuture
.supplyAsync(() -> productService.getProduct(productId), executor)
.orTimeout(500, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("product service failed: " + ex.getMessage());
return Product.defaultProduct(productId);
});
CompletableFuture<Inventory> inventoryFuture = CompletableFuture
.supplyAsync(() -> inventoryService.getInventory(productId), executor)
.orTimeout(300, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("inventory service failed: " + ex.getMessage());
return Inventory.unknown(productId);
});
CompletableFuture<Price> priceFuture = CompletableFuture
.supplyAsync(() -> priceService.getPrice(productId), executor)
.orTimeout(300, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("price service failed: " + ex.getMessage());
return Price.defaultPrice(productId);
});
CompletableFuture<Promotion> promotionFuture = CompletableFuture
.supplyAsync(() -> promotionService.getPromotion(productId), executor)
.completeOnTimeout(Promotion.noPromotion(productId), 200, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
log("promotion service failed: " + ex.getMessage());
return Promotion.noPromotion(productId);
});
CompletableFuture<Void> all = CompletableFuture.allOf(
productFuture, inventoryFuture, priceFuture, promotionFuture
);
return all.thenApply(v -> new ProductPage(
productFuture.join(),
inventoryFuture.join(),
priceFuture.join(),
promotionFuture.join(),
LocalDateTime.now()
)).join();
}
private void log(String msg) {
System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
}
}
static class ProductService {
public Product getProduct(String productId) {
sleep(120);
return new Product(productId, "机械键盘", "外设");
}
}
static class InventoryService {
public Inventory getInventory(String productId) {
sleep(180);
if (ThreadLocalRandom.current().nextBoolean()) {
throw new RuntimeException("库存服务偶发异常");
}
return new Inventory(productId, 99, true);
}
}
static class PriceService {
public Price getPrice(String productId) {
sleep(150);
return new Price(productId, new BigDecimal("399.00"), "CNY");
}
}
static class PromotionService {
public Promotion getPromotion(String productId) {
sleep(400);
return new Promotion(productId, "满 300 减 20");
}
}
static class Product {
private final String productId;
private final String name;
private final String category;
public Product(String productId, String name, String category) {
this.productId = productId;
this.name = name;
this.category = category;
}
public static Product defaultProduct(String productId) {
return new Product(productId, "未知商品", "默认分类");
}
@Override
public String toString() {
return "Product{productId='" + productId + "', name='" + name + "', category='" + category + "'}";
}
}
static class Inventory {
private final String productId;
private final int stock;
private final boolean available;
public Inventory(String productId, int stock, boolean available) {
this.productId = productId;
this.stock = stock;
this.available = available;
}
public static Inventory unknown(String productId) {
return new Inventory(productId, -1, false);
}
@Override
public String toString() {
return "Inventory{productId='" + productId + "', stock=" + stock + ", available=" + available + "}";
}
}
static class Price {
private final String productId;
private final BigDecimal amount;
private final String currency;
public Price(String productId, BigDecimal amount, String currency) {
this.productId = productId;
this.amount = amount;
this.currency = currency;
}
public static Price defaultPrice(String productId) {
return new Price(productId, BigDecimal.ZERO, "CNY");
}
@Override
public String toString() {
return "Price{productId='" + productId + "', amount=" + amount + ", currency='" + currency + "'}";
}
}
static class Promotion {
private final String productId;
private final String promotionText;
public Promotion(String productId, String promotionText) {
this.productId = productId;
this.promotionText = promotionText;
}
public static Promotion noPromotion(String productId) {
return new Promotion(productId, "无活动");
}
@Override
public String toString() {
return "Promotion{productId='" + productId + "', promotionText='" + promotionText + "'}";
}
}
static class ProductPage {
private final Product product;
private final Inventory inventory;
private final Price price;
private final Promotion promotion;
private final LocalDateTime queryTime;
public ProductPage(Product product, Inventory inventory, Price price, Promotion promotion, LocalDateTime queryTime) {
this.product = product;
this.inventory = inventory;
this.price = price;
this.promotion = promotion;
this.queryTime = queryTime;
}
@Override
public String toString() {
return "ProductPage{" +
"product=" + product +
", inventory=" + inventory +
", price=" + price +
", promotion=" + promotion +
", queryTime=" + queryTime +
'}';
}
}
static class NamedThreadFactory implements ThreadFactory {
private final String prefix;
private final AtomicInteger counter = new AtomicInteger(1);
public NamedThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, prefix + counter.getAndIncrement());
}
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程被中断", e);
}
}
}
这段代码里最值得关注的点
1. 每个子任务单独兜底
这是我非常推荐的做法。
.exceptionally(ex -> Inventory.unknown(productId))
不要把所有异常都留到 allOf(...).join() 再统一处理。
原因很简单:一旦你希望“部分失败、整体可用”,就必须让每个任务自己降级。
2. orTimeout 和 completeOnTimeout 的区别
这两个 API 很像,但语义完全不同。
orTimeout
超时后让任务异常完成
future.orTimeout(300, TimeUnit.MILLISECONDS)
适合:
- 你希望明确感知超时
- 想走异常处理逻辑
- 想统计超时率
completeOnTimeout
超时后返回一个默认值
future.completeOnTimeout(defaultValue, 300, TimeUnit.MILLISECONDS)
适合:
- 这个字段不是强依赖
- 你愿意降级展示
- 不希望因为超时把主流程打断
比如“营销活动”、“推荐文案”这类信息,通常就适合 completeOnTimeout。
3. join() 和 get() 的区别
很多人第一次用时会问:为什么示例里用 join(),不用 get()?
get()
- 会抛受检异常:
InterruptedException、ExecutionException - 更偏传统
Future风格
join()
- 抛运行时异常:
CompletionException - 写链式编排时代码更简洁
在业务编排里,我通常更常用 join()。
但前提是:你知道异常会怎么传播,并且做好了日志与兜底。
再用一张图理解异常与超时流转
sequenceDiagram
participant Client as 调用方
participant Page as ProductPageService
participant P as ProductService
participant I as InventoryService
participant R as PriceService
participant M as PromotionService
Client->>Page: getProductPage(productId)
Page->>P: 异步查询商品
Page->>I: 异步查询库存
Page->>R: 异步查询价格
Page->>M: 异步查询营销
P-->>Page: 正常返回
I-->>Page: 抛出异常
R-->>Page: 正常返回
M-->>Page: 超时
Page->>Page: inventory exceptionally 降级
Page->>Page: promotion completeOnTimeout 降级
Page->>Page: allOf 聚合
Page-->>Client: 返回可用页面结果
这就是线上非常常见的一类策略:
- 核心字段不能缺
- 非核心字段允许失败或超时
- 聚合层不要轻易被单点拖垮
逐步验证清单
如果你准备把代码放到自己的项目里,我建议按下面顺序验证。
第一步:先验证并行是否生效
方法很简单:打印每个任务开始和结束时间,看总耗时是否接近最慢任务,而不是总和。
第二步:故意制造一个异常
比如让库存服务抛异常,观察:
- 主流程是否仍然返回
- 降级值是否正确
- 日志里是否能定位到失败原因
第三步:故意制造超时
把营销服务睡眠时间调大,观察:
orTimeout是否抛异常completeOnTimeout是否返回默认值- 主线程是否被长时间拖住
第四步:压测线程池
重点看:
- 活跃线程数
- 队列长度
- 拒绝策略是否触发
- RT 是否抖动明显
这是很多 demo 看不出来、线上却非常致命的部分。
常见坑与排查
这一节我想讲得稍微接地气一点,因为这些坑我自己也踩过。
坑 1:默认线程池用得太随意
如果你不传 executor,很多异步任务会跑到 ForkJoinPool.commonPool()。
CompletableFuture.supplyAsync(() -> query());
这在 demo 里没问题,但在线上可能有风险:
- 线程池和别的任务共享
- 阻塞任务会拖累整个公共线程池
- 性能问题不好定位
建议:业务异步编排尽量使用自定义线程池。
坑 2:把阻塞 I/O 当成轻量异步
CompletableFuture 本质上并不会让阻塞 I/O 自动变快。
它做的是:把多个阻塞任务并行化。
如果你并发量大、每个任务都在阻塞等待远程结果,那么线程池容量、队列长度、下游承载能力都必须一起考虑。
否则你只是把“串行慢”变成了“并发更容易雪崩”。
坑 3:allOf 不会直接给你结果
这是初学者很容易误解的一点。
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
allOf 返回的是 CompletableFuture<Void>,它只是告诉你:这些任务都完成了。
你还得自己去取每个任务的结果:
all.join();
A a = f1.join();
B b = f2.join();
C c = f3.join();
坑 4:异常被包装,看日志看不懂
join() 抛出来的常常是 CompletionException,真正的业务异常在 getCause() 里。
排查时注意:
- 打完整堆栈
- 展开 root cause
- 不要只打印
ex.getMessage()
更推荐这样记录:
.exceptionally(ex -> {
Throwable cause = ex instanceof CompletionException ? ex.getCause() : ex;
cause.printStackTrace();
return fallbackValue;
})
坑 5:超时了,但任务未必真的停了
这点很关键。
orTimeout / completeOnTimeout 影响的是 CompletableFuture 的完成状态,
不等于一定中断底层正在执行的任务。
也就是说:
- 调用方看到“超时了”
- 但底层线程里的远程调用可能还在继续跑
如果你特别关心资源回收,就要结合:
- 下游 HTTP 客户端超时
- 数据库查询超时
- 可中断任务设计
- 限流和熔断
不能只靠 CompletableFuture 一层。
安全/性能最佳实践
这一节给你一组更偏落地的建议。
1. 为异步编排单独配置线程池
不要偷懒用公共池。建议按业务隔离线程池,例如:
- 商品详情聚合池
- 推荐系统聚合池
- 报表导出池
这样做的好处:
- 故障隔离更强
- 容量规划更清晰
- 监控更容易做
示例配置思路:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize),
new NamedThreadFactory("biz-async-"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
2. 区分强依赖与弱依赖
不是所有字段都值得“死等”。
可以大致这样分:
强依赖
- 商品基础信息
- 价格
- 库存
这类失败时,可能应该:
- 快速失败
- 返回明确错误
- 或者严格降级
弱依赖
- 推荐
- 营销标签
- 扩展信息
这类失败时,适合:
- 超时默认值
- 异常吞掉后记录日志
- 继续返回主结果
这一步其实比“会不会写 API”更重要,它决定了你的系统韧性。
3. 限制并发扇出数量
假设一个请求一口气并行发 20 个下游调用,请求量一大,很容易把整个系统放大成“流量倍增器”。
建议:
- 能合并的请求尽量合并
- 非必要字段延迟加载
- 给下游接口做并发上限控制
- 用缓存挡掉热点数据
4. 日志要带链路信息
异步任务最烦的一点就是:日志容易碎。
至少建议记录:
- 请求 ID
- 商品 ID / 用户 ID
- 子任务名称
- 开始时间、结束时间、耗时
- 异常类型、根因
- 是否走了降级
否则线上排查会非常痛苦。
5. 监控比代码更重要
如果系统已经在线上跑了,建议重点监控:
- 线程池活跃线程数
- 队列积压
- 任务拒绝次数
- 各子任务 RT / 超时率 / 异常率
- 降级触发比例
很多“异步写法看着很优雅”的问题,最后都要靠监控来救命。
6. 避免在回调里做重阻塞操作
比如:
future.thenApply(result -> {
// 这里又发起一个阻塞数据库查询
return slowQuery(result);
});
如果这样写,表面上是“异步回调”,实际上可能把线程池又堵住了。
更合理的方式:
- 纯计算转换用
thenApply - 新的异步 I/O 用
thenCompose + supplyAsync
一个更稳妥的编排模板
如果你想在项目里快速落地,我建议采用下面这种模式:
- 每个子任务独立超时
- 每个子任务独立异常兜底
- 最后统一
allOf - 聚合时只做对象组装,不做复杂逻辑
示例模板:
CompletableFuture<A> fa = CompletableFuture
.supplyAsync(() -> serviceA.query(id), executor)
.orTimeout(200, TimeUnit.MILLISECONDS)
.exceptionally(ex -> A.defaultValue());
CompletableFuture<B> fb = CompletableFuture
.supplyAsync(() -> serviceB.query(id), executor)
.completeOnTimeout(B.defaultValue(), 100, TimeUnit.MILLISECONDS)
.exceptionally(ex -> B.defaultValue());
CompletableFuture<Result> resultFuture = CompletableFuture
.allOf(fa, fb)
.thenApply(v -> new Result(fa.join(), fb.join()));
Result result = resultFuture.join();
这套结构的优点是:
- 代码可读性比较稳定
- 异常边界清晰
- 聚合逻辑简单
- 适合逐步扩展
适合用 CompletableFuture 的场景边界
最后再讲一下边界,避免“学会了锤子,看啥都像钉子”。
适合
- BFF/聚合层接口
- 多个微服务结果拼装
- 非阻塞要求没那么极致,但需要较好并发表现
- 对部分失败可容忍的业务场景
不太适合
- 超高并发、对线程利用率极端敏感的系统
- 大量事件流处理
- 需要结构化并发、取消传播非常严格的场景
- 复杂响应式数据流
这类场景可能更适合:
- Reactor / WebFlux
- Kotlin 协程
- 更完整的响应式方案
总结
如果你只记住三件事,我建议记这三条:
- 并行调用能显著降低聚合接口耗时
- 超时和异常要在子任务级别处理,而不是最后统一补锅
- 一定要自定义线程池,并做好监控和容量控制
CompletableFuture 真正的价值,不只是“异步”两个字,而是它让你可以把一个复杂的聚合流程拆成:
- 独立任务
- 明确依赖
- 可控超时
- 可观测失败
- 最终可聚合输出
这套思路一旦建立起来,你再看商品页、首页卡片流、用户画像拼装、订单确认页这些场景,都会清晰很多。
附:一张实战决策图
flowchart TD
A[需要聚合多个下游调用] --> B{任务是否相互独立?}
B -- 是 --> C[使用 supplyAsync 并行发起]
B -- 否 --> D[使用 thenCompose 串联依赖任务]
C --> E{是否允许部分失败?}
D --> E
E -- 允许 --> F[每个子任务设置 fallback]
E -- 不允许 --> G[关键任务异常快速失败]
F --> H{是否允许超时降级?}
G --> I[统一聚合并返回]
H -- 是 --> J[completeOnTimeout]
H -- 否 --> K[orTimeout + 异常处理]
J --> I
K --> I
希望这篇文章能帮你把 CompletableFuture 从“会用几个 API”,真正推进到“能写上线代码”。