跳转到内容
123xiao | 无名键客

《Java 中基于 CompletableFuture 的异步编排实战:从并行调用到超时控制与异常兜底》

字数: 0 阅读时长: 1 分钟

Java 中基于 CompletableFuture 的异步编排实战:从并行调用到超时控制与异常兜底

在日常后端开发里,我们经常会遇到这样一类接口:一个请求进来,要同时查多个下游服务,再把结果组装返回

比如商品详情页:

  • 查商品基础信息
  • 查库存
  • 查价格
  • 查营销标签
  • 查推荐列表

如果这些调用都串行执行,接口耗时大概率是各个下游耗时的总和;一旦其中某个服务偶尔抖一下,整体 RT 就会被拖垮。这个时候,CompletableFuture 就很适合拿来做异步编排

但很多同学第一次上手时,往往停留在 supplyAsync()allOf() 这两个 API 上。真正上线后,问题才开始出现:

  • 默认线程池被打满
  • 某个任务超时导致整体一直挂着
  • 某个分支抛异常,整个组合 future 直接失败
  • 日志里只看到 CompletionException,定位不到源头
  • 任务太多,结果吞吐反而下降

这篇文章我不打算只讲 API,而是带你从一个实际场景走一遍:并行调用、结果汇聚、超时控制、异常兜底、线程池配置以及排查思路。你可以直接拿去改成项目里的模板。


背景与问题

先看一个最典型的业务场景:聚合接口。

假设我们有一个“用户主页”接口,需要返回:

  • 用户基本信息
  • 账户余额
  • 优惠券数量

如果串行写,大概是这样:

UserProfile profile = userService.getProfile(userId);
AccountBalance balance = accountService.getBalance(userId);
CouponInfo couponInfo = couponService.getCouponInfo(userId);
return new UserHomeVO(profile, balance, couponInfo);

问题很明显:

  1. 整体耗时高:三个调用是串行的。
  2. 弱依赖也阻塞主流程:比如优惠券信息不是强依赖,却被绑死在主链路上。
  3. 失败影响面大:其中一个接口异常,整个请求失败。
  4. 超时不可控:下游卡住时,主线程只能干等。

更合理的目标应该是:

  • 能并行的尽量并行
  • 核心数据失败时明确报错,非核心数据失败时优雅降级
  • 每个异步分支有超时
  • 整体线程池可控,不滥用公共线程池
  • 异常能被看懂、日志能追踪

前置知识与环境准备

本文示例基于:

  • JDK 9+(因为会用到 orTimeout / completeOnTimeout
  • 任意 IDE
  • 一个普通的 Maven 或 Gradle Java 项目即可

如果你还没系统接触过 CompletableFuture,先记住三类最常用操作:

  1. 创建异步任务

    • runAsync
    • supplyAsync
  2. 结果转换与组合

    • thenApply
    • thenCompose
    • thenCombine
    • allOf
  3. 异常与结束处理

    • exceptionally
    • handle
    • whenComplete

核心原理

1. CompletableFuture 到底解决了什么问题

CompletableFuture 本质上是:

  • 一个可以代表“未来结果”的对象
  • 一套可以把多个异步任务串起来、合起来、兜住异常的编排机制

你可以把它理解成“异步版的流水线”:

  • 上一步完成后,进入下一步
  • 多个分支并行跑
  • 最后合并结果
  • 某一步失败时,有机会做降级或兜底

2. 常见编排模式

串行依赖:thenCompose

当第二个任务依赖第一个任务的结果时,用 thenCompose

例如:

  • 先查用户
  • 再根据用户等级查权益

并行聚合:thenCombine / allOf

当多个任务彼此独立时,用并行方式更合适。

  • thenCombine:适合两个任务合并
  • allOf:适合多个任务统一等待

异常处理:exceptionally / handle / whenComplete

这三个经常被混淆,我建议这样记:

  • exceptionally:只在异常时给默认值
  • handle:无论成功失败都能拿到结果并转换
  • whenComplete:更像“回调通知”,适合打日志,不适合改结果

一图看懂异步编排主流程

flowchart TD
    A[收到用户主页请求] --> B[并行查询用户信息]
    A --> C[并行查询账户余额]
    A --> D[并行查询优惠券数量]
    B --> E[设置超时与异常兜底]
    C --> E
    D --> E
    E --> F[allOf 等待完成]
    F --> G[汇总结果]
    G --> H[返回响应]

CompletableFuture 常用关系图

classDiagram
    class CompletableFuture {
        +supplyAsync()
        +runAsync()
        +thenApply()
        +thenCompose()
        +thenCombine()
        +allOf()
        +orTimeout()
        +completeOnTimeout()
        +exceptionally()
        +handle()
        +whenComplete()
    }

实战代码(可运行)

下面我写一个完整可运行的示例,模拟用户主页接口的异步编排。

这个例子里我们会演示:

  • 自定义线程池
  • 并行调用
  • 单个任务超时控制
  • 非核心依赖异常兜底
  • 最终结果组装

完整示例

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class CompletableFutureOrchestrationDemo {

    // ======== 模拟返回对象 ========
    static class UserProfile {
        Long userId;
        String nickname;

        UserProfile(Long userId, String nickname) {
            this.userId = userId;
            this.nickname = nickname;
        }

        @Override
        public String toString() {
            return "UserProfile{userId=" + userId + ", nickname='" + nickname + "'}";
        }
    }

    static class AccountBalance {
        BigDecimal amount;

        AccountBalance(BigDecimal amount) {
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "AccountBalance{amount=" + amount + "}";
        }
    }

    static class CouponInfo {
        int count;

        CouponInfo(int count) {
            this.count = count;
        }

        @Override
        public String toString() {
            return "CouponInfo{count=" + count + "}";
        }
    }

    static class UserHomeVO {
        UserProfile profile;
        AccountBalance balance;
        CouponInfo couponInfo;

        UserHomeVO(UserProfile profile, AccountBalance balance, CouponInfo couponInfo) {
            this.profile = profile;
            this.balance = balance;
            this.couponInfo = couponInfo;
        }

        @Override
        public String toString() {
            return "UserHomeVO{" +
                    "profile=" + profile +
                    ", balance=" + balance +
                    ", couponInfo=" + couponInfo +
                    '}';
        }
    }

    // ======== 自定义线程池 ========
    private static final ExecutorService BIZ_EXECUTOR = new ThreadPoolExecutor(
            4,
            8,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            new ThreadFactory() {
                private final AtomicInteger idx = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "cf-biz-" + idx.getAndIncrement());
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) {
        Long userId = 1001L;
        UserHomeVO result = getUserHome(userId);
        System.out.println(LocalDateTime.now() + " result = " + result);
        BIZ_EXECUTOR.shutdown();
    }

    public static UserHomeVO getUserHome(Long userId) {
        long start = System.currentTimeMillis();

        CompletableFuture<UserProfile> profileFuture =
                CompletableFuture.supplyAsync(() -> getUserProfile(userId), BIZ_EXECUTOR)
                        .orTimeout(800, TimeUnit.MILLISECONDS)
                        .whenComplete((res, ex) -> log("profile", res, ex));

        CompletableFuture<AccountBalance> balanceFuture =
                CompletableFuture.supplyAsync(() -> getAccountBalance(userId), BIZ_EXECUTOR)
                        .orTimeout(500, TimeUnit.MILLISECONDS)
                        .whenComplete((res, ex) -> log("balance", res, ex));

        CompletableFuture<CouponInfo> couponFuture =
                CompletableFuture.supplyAsync(() -> getCouponInfo(userId), BIZ_EXECUTOR)
                        .completeOnTimeout(new CouponInfo(0), 300, TimeUnit.MILLISECONDS)
                        .exceptionally(ex -> {
                            System.out.println("coupon fallback because of: " + ex.getMessage());
                            return new CouponInfo(0);
                        })
                        .whenComplete((res, ex) -> log("coupon", res, ex));

        try {
            CompletableFuture.allOf(profileFuture, balanceFuture, couponFuture).join();

            UserHomeVO vo = new UserHomeVO(
                    profileFuture.join(),
                    balanceFuture.join(),
                    couponFuture.join()
            );

            System.out.println("total cost = " + (System.currentTimeMillis() - start) + " ms");
            return vo;
        } catch (CompletionException e) {
            Throwable root = unwrapCompletionException(e);
            throw new RuntimeException("getUserHome failed: " + root.getMessage(), root);
        }
    }

    // ======== 模拟下游调用 ========

    private static UserProfile getUserProfile(Long userId) {
        sleep(200);
        return new UserProfile(userId, "Alice");
    }

    private static AccountBalance getAccountBalance(Long userId) {
        sleep(350);
        return new AccountBalance(new BigDecimal("1024.88"));
    }

    private static CouponInfo getCouponInfo(Long userId) {
        sleep(600); // 故意慢一点,触发超时默认值
        return new CouponInfo(5);
    }

    // ======== 工具方法 ========

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("thread interrupted", e);
        }
    }

    private static void log(String taskName, Object res, Throwable ex) {
        String threadName = Thread.currentThread().getName();
        if (ex != null) {
            System.out.println("[" + threadName + "] " + taskName + " failed: " + ex.getMessage());
        } else {
            System.out.println("[" + threadName + "] " + taskName + " success: " + res);
        }
    }

    private static Throwable unwrapCompletionException(Throwable e) {
        if (e instanceof CompletionException || e instanceof ExecutionException) {
            if (e.getCause() != null) {
                return unwrapCompletionException(e.getCause());
            }
        }
        return e;
    }
}

运行后你会看到什么

这个例子里:

  • profile 200ms 成功
  • balance 350ms 成功
  • coupon 600ms,超过 300ms,直接返回默认值 CouponInfo(0)

所以整体耗时大约接近最慢的核心任务,而不是三者之和。


逐步拆解这段代码

1. 为什么一定要自定义线程池

很多示例会这样写:

CompletableFuture.supplyAsync(() -> querySomething());

这会默认使用 ForkJoinPool.commonPool()。在 demo 里没问题,在线上我通常不建议这么干,原因有两个:

  1. 业务线程和公共线程池混用,不可控
  2. 遇到阻塞型 IO 时,ForkJoinPool 并不理想

像远程 RPC、数据库、HTTP 调用,大多数都是阻塞型任务。更稳妥的方式是:

  • 给业务隔离线程池
  • 根据接口吞吐、平均 RT、峰值流量配置核心线程数和队列

2. orTimeout 与 completeOnTimeout 的区别

这是实战里很关键的一对 API。

orTimeout

超时后让 Future 进入异常状态。

future.orTimeout(500, TimeUnit.MILLISECONDS)

适合场景:

  • 这是核心依赖
  • 超时应该明确失败
  • 后续要统一走异常链路

completeOnTimeout

超时后返回默认值。

future.completeOnTimeout(defaultValue, 300, TimeUnit.MILLISECONDS)

适合场景:

  • 非核心依赖
  • 允许降级
  • 页面上显示默认内容即可

我自己的经验是:

  • 强依赖orTimeout
  • 弱依赖completeOnTimeout + exceptionally

这套组合比较实用。


两种编排方式对比

sequenceDiagram
    participant Client
    participant API
    participant ProfileSvc
    participant BalanceSvc
    participant CouponSvc

    Client->>API: 请求用户主页
    API->>ProfileSvc: 异步查询用户信息
    API->>BalanceSvc: 异步查询账户余额
    API->>CouponSvc: 异步查询优惠券
    ProfileSvc-->>API: 返回成功
    BalanceSvc-->>API: 返回成功
    CouponSvc-->>API: 超时/异常
    API-->>API: 使用默认 CouponInfo(0)
    API-->>Client: 返回聚合结果

进阶:thenCompose 和 thenCombine 怎么用

很多人会把所有场景都写成 allOf(),其实没必要。

场景一:后一个任务依赖前一个任务结果

比如先查用户,再按用户等级查权益:

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "VIP", BIZ_EXECUTOR)
        .thenCompose(level ->
                CompletableFuture.supplyAsync(() -> "权益包-" + level, BIZ_EXECUTOR)
        );

System.out.println(future.join());

这里应该用 thenCompose,因为第二步依赖第一步结果。

场景二:两个独立任务并行,最后合并

CompletableFuture<String> nameFuture =
        CompletableFuture.supplyAsync(() -> "Alice", BIZ_EXECUTOR);

CompletableFuture<Integer> scoreFuture =
        CompletableFuture.supplyAsync(() -> 95, BIZ_EXECUTOR);

CompletableFuture<String> resultFuture =
        nameFuture.thenCombine(scoreFuture, (name, score) -> name + ":" + score);

System.out.println(resultFuture.join());

这里适合 thenCombine

场景三:多个独立任务统一等待

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();

适合聚合多个互不依赖的调用。


常见坑与排查

这一节很重要。我自己最早用 CompletableFuture 时,坑基本都踩在这里。

坑 1:allOf 不会自动帮你收集结果

很多人以为:

CompletableFuture.allOf(f1, f2, f3)

会直接返回三个结果。其实不会,它返回的是 CompletableFuture<Void>

你仍然需要手动取值:

CompletableFuture.allOf(f1, f2, f3).join();
Object r1 = f1.join();
Object r2 = f2.join();
Object r3 = f3.join();

坑 2:join 和 get 的异常包装不同

  • get() 抛受检异常:ExecutionException, InterruptedException
  • join() 抛非受检异常:CompletionException

线上排查时,经常看到一层层 CompletionException,真正原因藏在 cause 里。

建议统一做异常解包:

private static Throwable unwrap(Throwable e) {
    while ((e instanceof CompletionException || e instanceof ExecutionException) && e.getCause() != null) {
        e = e.getCause();
    }
    return e;
}

坑 3:用了 exceptionally 以后,异常被“吃掉”了

比如:

future.exceptionally(ex -> defaultValue);

这样做没错,但要知道:异常链路到这里就变成正常值了。如果后面还想知道它失败过,就必须在这里打日志或打指标。

一个更稳一点的写法是:

future.exceptionally(ex -> {
    log.error("task failed", ex);
    return defaultValue;
});

坑 4:异步里再做阻塞,线程池很快耗尽

比如你在 supplyAsync 里又调用了慢 SQL、慢 HTTP、文件 IO,这些都是阻塞操作。线程池如果设置过小,很快就会排队;如果设置过大,机器上下文切换压力又会上来。

排查思路:

  1. 看线程池活跃线程数
  2. 看队列长度
  3. 看任务平均执行时间
  4. 看超时是否集中在某个下游

坑 5:超时不等于真正取消下游执行

这一点特别容易误解。

future.orTimeout(300, TimeUnit.MILLISECONDS)

意思是:CompletableFuture 在 300ms 后超时失败
但如果你的下游任务已经在线程里跑起来了,它不一定真的停止。

也就是说:

  • 上层已经返回超时
  • 下游线程可能还在继续跑

所以如果你对资源消耗敏感,还要配合:

  • 下游 HTTP 客户端自身超时
  • RPC 超时
  • 数据库查询超时
  • 可中断任务的取消机制

排查一个真实风格的问题:为什么接口已经降级了,机器 CPU 还是高

这个现象很常见。

表面上看,你已经做了:

  • completeOnTimeout
  • exceptionally
  • 默认值兜底

按理说接口应该“很稳”了,但 CPU 还是高、线程池还是满。

原因通常是:

  1. 应用层 future 超时了,但下游任务没有真正停
  2. 大量超时任务还在后台继续执行
  3. 新流量继续进来,形成堆积

所以要分清两层超时:

  • 编排层超时CompletableFuture 的超时
  • 执行层超时:HTTP/RPC/DB 客户端的超时

两层都要配,才是真正可控。


安全/性能最佳实践

这里我给一套比较务实的建议,不追求“最炫”,追求“能上线”。

1. 业务线程池一定要隔离

不要把所有业务都扔进一个公共池子里。建议至少按调用类型拆分:

  • 聚合查询线程池
  • 导出任务线程池
  • 消息消费线程池

这样即使某一类任务堆积,也不会把其他核心功能拖死。

2. 给每个下游设置合理超时,不要只设总超时

错误示范:

  • 总接口超时 2 秒
  • 下游每个任务都不限时

结果就是总超时虽然兜住了,但线程资源已经被长时间占满。

更合理的是:

  • 用户信息:800ms
  • 余额:500ms
  • 优惠券:300ms,超时直接默认值

3. 强弱依赖要分层设计

不是所有字段都值得“死等”。

可以这样分:

  • 强依赖:失败直接报错,例如订单创建核心库存校验
  • 弱依赖:失败降级,例如推荐、标签、角标、统计信息

别把所有任务都按同一种策略处理。

4. 日志里必须带任务名、线程名、耗时、请求标识

异步场景下最怕日志串不起来。至少要有:

  • 请求 ID
  • 任务名
  • 开始/结束时间
  • 线程名
  • 是否超时/异常

如果项目里用了 MDC,要注意线程切换后上下文可能丢失,必要时需要手动透传。

5. 避免在回调链中写太复杂的业务逻辑

链式调用一多,很容易写成“异步意大利面”:

f1.thenApply(...)
  .thenCompose(...)
  .thenCombine(...)
  .handle(...)
  .thenApply(...)

如果一屏代码看不懂,后面维护的人一定会骂你,包括未来的你自己。

建议做法:

  • 一个 future 只负责一个清晰职责
  • 中间结果命名清楚
  • 聚合逻辑单独抽方法

6. 监控比语法更重要

在生产环境里,真正决定你能不能睡安稳的不是 thenApply 还是 handle,而是这些指标有没有:

  • 线程池活跃线程数
  • 队列积压长度
  • 任务平均耗时
  • 超时次数
  • 异常次数
  • 降级次数

如果没有这些监控,出了问题你会非常被动。


一个推荐的实战模板

如果你想在项目里快速落地,我建议按下面这个套路写:

CompletableFuture<ResultA> aFuture = CompletableFuture
        .supplyAsync(() -> callA(), executor)
        .orTimeout(300, TimeUnit.MILLISECONDS)
        .whenComplete((r, e) -> logResult("A", r, e));

CompletableFuture<ResultB> bFuture = CompletableFuture
        .supplyAsync(() -> callB(), executor)
        .completeOnTimeout(defaultB(), 200, TimeUnit.MILLISECONDS)
        .exceptionally(ex -> {
            logError("B", ex);
            return defaultB();
        });

CompletableFuture.allOf(aFuture, bFuture).join();

return merge(aFuture.join(), bFuture.join());

使用规则:

  • 核心依赖:orTimeout
  • 非核心依赖:completeOnTimeout + exceptionally
  • 全部任务:whenComplete 打日志
  • 最外层:统一解包异常

这套模板足够覆盖大多数聚合类接口。


逐步验证清单

你可以按这个顺序自己动手验证一遍。

第一步:先并行化

先把串行调用改成多个 supplyAsync,观察总耗时是否接近“最长那个任务”。

第二步:给弱依赖加默认值

对推荐、标签、统计信息这类弱依赖,加 completeOnTimeoutexceptionally

第三步:给强依赖加失败策略

对库存、价格、账户余额这类核心依赖,用 orTimeout,并在最外层统一失败。

第四步:换掉默认线程池

接入业务专属线程池,压测看活跃线程数和排队情况。

第五步:补齐日志与监控

如果你不能回答下面这些问题,说明还没真正上线准备好:

  • 哪个分支最慢?
  • 哪个分支最常超时?
  • 降级比例是多少?
  • 队列有没有堆积?
  • 是否存在超时后任务继续执行的问题?

总结

CompletableFuture 最有价值的地方,不只是“能异步”,而是它能帮我们把一个复杂请求拆成:

  • 可并行的分支
  • 可组合的流程
  • 可控制的超时
  • 可兜底的异常策略

如果你只记住本文三点,我建议记这三条:

  1. 别用默认线程池硬上生产
  2. 强依赖用 orTimeout,弱依赖用 completeOnTimeout + exceptionally
  3. 超时控制要区分编排层和下游执行层

最后给一个边界条件提醒:
CompletableFuture 很适合中等复杂度的异步编排。如果你的流程已经涉及大量动态依赖、复杂重试、跨服务补偿、状态持久化,那可能就不是它最擅长的场景了,应该考虑工作流引擎或消息驱动架构。

如果你现在手头正好有一个“聚合查询接口慢、偶发超时、部分字段允许降级”的需求,这篇文章里的示例基本就可以直接套进去。先把线程池、超时、异常兜底三件事做扎实,稳定性会立刻上一个台阶。


分享到:

上一篇
《从单体到高可用:基于 Kubernetes 的中小规模集群架构设计与故障切换实战》
下一篇
《从原型到上线:中级开发者如何构建可落地的 RAG 智能问答系统》