跳转到内容
123xiao | 无名键客

《Java 中基于 CompletableFuture 的异步编排实战:提升接口聚合性能与可维护性》

字数: 0 阅读时长: 1 分钟

Java 中基于 CompletableFuture 的异步编排实战:提升接口聚合性能与可维护性

在很多业务系统里,一个“详情页接口”往往不是查一张表那么简单。它可能要同时拿用户信息、订单摘要、优惠信息、库存状态、推荐列表,最后再拼成一个响应对象返回前端。

一开始我们通常会写成串行调用:查完 A,再查 B,再查 C。代码直观,但响应时间也会被一层层叠加。等某天接口从 120ms 涨到 800ms,排查后才发现,问题不一定出在某个单点服务慢,而是编排方式本身就不合理

这篇文章我想从“接口聚合”这个很常见的场景出发,带你用 CompletableFuture 做一次真正可落地的异步编排。重点不是 API 罗列,而是:

  • 什么场景值得异步化
  • 怎么把并行、依赖、兜底、超时、异常处理组织清楚
  • 如何避免线程池被打爆、日志难追、异常悄悄吞掉

如果你已经会 Future、线程池,或者写过一点 CompletableFuture,这篇会比较适合你。


背景与问题

先看一个典型的接口聚合场景:GET /user/profile/{userId}

它需要聚合 4 类信息:

  1. 用户基本信息
  2. 用户积分
  3. 最近订单
  4. 个性化推荐

假设每个下游服务平均耗时如下:

  • 用户服务:80ms
  • 积分服务:120ms
  • 订单服务:150ms
  • 推荐服务:200ms

如果串行执行,总耗时大概就是:

80 + 120 + 150 + 200 = 550ms

但实际上这四个请求里,很多是相互独立的。理论上,完全可以并行发起,总耗时接近:

max(80, 120, 150, 200) = 200ms 左右

这就是异步编排最直接的价值:把“累加等待”变成“最长等待”

不过现实问题没这么简单。接口聚合常常还会遇到这些情况:

  • 某些任务依赖前序结果,比如必须先拿到用户信息再查优惠等级
  • 某些结果失败了也要返回“部分可用”的页面
  • 不同下游服务的超时时间、重要性不同
  • 聚合逻辑越来越长,thenApply/thenCompose/handle 一层套一层,最后可维护性变差

所以真正要解决的,不只是“并发调用”,而是可控的异步编排


前置知识与环境准备

本文示例基于:

  • JDK 8+
  • CompletableFuture
  • ExecutorService

如果你在 JDK 9+,还可以直接用:

  • orTimeout
  • completeOnTimeout

本文为了实战感,代码会直接模拟远程调用耗时,并给出可运行示例。


核心原理

CompletableFuture 可以理解为两个能力的组合:

  1. Future:代表一个“未来会完成”的结果
  2. CompletionStage:支持把多个异步阶段串起来

它比传统 Future 强的地方主要在于:

  • 能链式编排
  • 能组合多个异步任务
  • 能统一处理异常
  • 能在任务完成后继续流转

常见编排方式速览

  • supplyAsync():异步执行并返回结果
  • runAsync():异步执行但不返回结果
  • thenApply():拿上一步结果做同步转换
  • thenCompose():把“异步套异步”拉平
  • thenCombine():合并两个独立异步结果
  • allOf():等待多个任务都完成
  • anyOf():任意一个完成即可继续
  • exceptionally() / handle():异常兜底
  • whenComplete():收尾记录,不改变结果

一个接口聚合的编排视图

下面这张图先帮助你建立整体认知。

flowchart TD
    A[收到用户详情请求] --> B[并行发起用户信息/积分/订单/推荐]
    B --> C1[用户信息]
    B --> C2[积分]
    B --> C3[订单]
    B --> C4[推荐]
    C1 --> D[等待 allOf 完成]
    C2 --> D
    C3 --> D
    C4 --> D
    D --> E[组装 ProfileResponse]
    E --> F[返回结果]

如果有依赖关系,结构会更像下面这样:

flowchart LR
    A[查询用户基本信息] --> B[根据用户等级查询优惠信息]
    A --> C[并行查询订单]
    A --> D[并行查询推荐]
    B --> E[汇总结果]
    C --> E
    D --> E

核心原理:并行、依赖、汇总、兜底

1. 并行任务:适合无依赖接口

如果几个下游服务互相独立,就应该优先并行。

CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> userService.getUserInfo(userId), executor);
CompletableFuture<PointsInfo> pointsFuture = CompletableFuture.supplyAsync(() -> pointsService.getPoints(userId), executor);

2. 依赖任务:用 thenCompose,而不是嵌套 Future

比如先查用户,再根据用户等级查权益:

CompletableFuture<BenefitInfo> benefitFuture = CompletableFuture
        .supplyAsync(() -> userService.getUserInfo(userId), executor)
        .thenCompose(user -> CompletableFuture.supplyAsync(
                () -> benefitService.getBenefits(user.getLevel()), executor
        ));

这里用 thenCompose 是关键。
如果你用了 thenApply 返回 CompletableFuture<BenefitInfo>,最后会得到 CompletableFuture<CompletableFuture<BenefitInfo>>,阅读和处理都很别扭。

3. 汇总任务:用 allOf 等待,再逐个 join

CompletableFuture<Void> all = CompletableFuture.allOf(userFuture, pointsFuture, orderFuture, recommendFuture);
all.join();

allOf 自身不会直接帮你“组装结果”,它只负责等待全部完成。最终结果要你自己从各个 future 里取:

UserInfo user = userFuture.join();
PointsInfo points = pointsFuture.join();

4. 失败兜底:不是所有失败都该直接炸接口

例如推荐服务失败,不应该影响整个详情页:

CompletableFuture<List<String>> recommendFuture = CompletableFuture
        .supplyAsync(() -> recommendService.getRecommendations(userId), executor)
        .exceptionally(ex -> {
            System.err.println("推荐服务失败: " + ex.getMessage());
            return Collections.emptyList();
        });

这类处理在聚合接口里非常常见:核心数据失败就报错,非核心数据失败就降级


实战代码(可运行)

下面我们实现一个完整的“用户主页聚合接口”示例。

功能目标:

  • 并行调用多个服务
  • 处理一个依赖任务
  • 对非核心服务做异常降级
  • 最终组装响应对象

这段代码可以直接放到一个 Demo.java 里运行。

import java.util.*;
import java.util.concurrent.*;

public class Demo {

    private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(
            8,
            16,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            new ThreadFactory() {
                private int count = 1;
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "cf-pool-" + count++);
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) {
        ProfileFacade facade = new ProfileFacade(
                new UserService(),
                new PointsService(),
                new OrderService(),
                new RecommendService(),
                new BenefitService(),
                EXECUTOR
        );

        long start = System.currentTimeMillis();
        ProfileResponse response = facade.getProfile(1001L);
        long cost = System.currentTimeMillis() - start;

        System.out.println("响应结果:");
        System.out.println(response);
        System.out.println("总耗时: " + cost + " ms");

        EXECUTOR.shutdown();
    }

    static class ProfileFacade {
        private final UserService userService;
        private final PointsService pointsService;
        private final OrderService orderService;
        private final RecommendService recommendService;
        private final BenefitService benefitService;
        private final Executor executor;

        public ProfileFacade(UserService userService,
                             PointsService pointsService,
                             OrderService orderService,
                             RecommendService recommendService,
                             BenefitService benefitService,
                             Executor executor) {
            this.userService = userService;
            this.pointsService = pointsService;
            this.orderService = orderService;
            this.recommendService = recommendService;
            this.benefitService = benefitService;
            this.executor = executor;
        }

        public ProfileResponse getProfile(Long userId) {
            CompletableFuture<UserInfo> userFuture = CompletableFuture
                    .supplyAsync(() -> userService.getUserInfo(userId), executor);

            CompletableFuture<PointsInfo> pointsFuture = CompletableFuture
                    .supplyAsync(() -> pointsService.getPoints(userId), executor);

            CompletableFuture<List<OrderInfo>> orderFuture = CompletableFuture
                    .supplyAsync(() -> orderService.getRecentOrders(userId), executor)
                    .exceptionally(ex -> {
                        System.err.println("[降级] 订单服务失败: " + ex.getMessage());
                        return Collections.emptyList();
                    });

            CompletableFuture<List<String>> recommendFuture = CompletableFuture
                    .supplyAsync(() -> recommendService.getRecommendations(userId), executor)
                    .exceptionally(ex -> {
                        System.err.println("[降级] 推荐服务失败: " + ex.getMessage());
                        return Collections.emptyList();
                    });

            // 依赖任务:先拿用户等级,再查权益
            CompletableFuture<BenefitInfo> benefitFuture = userFuture.thenCompose(user ->
                    CompletableFuture.supplyAsync(() -> benefitService.getBenefits(user.level), executor)
            ).exceptionally(ex -> {
                System.err.println("[降级] 权益服务失败: " + ex.getMessage());
                return new BenefitInfo("DEFAULT", "基础权益");
            });

            CompletableFuture<Void> all = CompletableFuture.allOf(
                    userFuture, pointsFuture, orderFuture, recommendFuture, benefitFuture
            );

            all.join();

            return new ProfileResponse(
                    userFuture.join(),
                    pointsFuture.join(),
                    orderFuture.join(),
                    recommendFuture.join(),
                    benefitFuture.join()
            );
        }
    }

    static class UserService {
        public UserInfo getUserInfo(Long userId) {
            sleep(80);
            log("查询用户信息");
            return new UserInfo(userId, "Alice", "VIP");
        }
    }

    static class PointsService {
        public PointsInfo getPoints(Long userId) {
            sleep(120);
            log("查询积分");
            return new PointsInfo(5200);
        }
    }

    static class OrderService {
        public List<OrderInfo> getRecentOrders(Long userId) {
            sleep(150);
            log("查询最近订单");
            return Arrays.asList(
                    new OrderInfo("ORD-10001", 199.0),
                    new OrderInfo("ORD-10002", 299.0)
            );
        }
    }

    static class RecommendService {
        public List<String> getRecommendations(Long userId) {
            sleep(200);
            log("查询推荐内容");
            // 你可以取消注释模拟失败
            // throw new RuntimeException("recommend timeout");
            return Arrays.asList("机械键盘", "降噪耳机", "显示器支架");
        }
    }

    static class BenefitService {
        public BenefitInfo getBenefits(String level) {
            sleep(100);
            log("查询用户权益, level=" + level);
            if ("VIP".equals(level)) {
                return new BenefitInfo("VIP_PACK", "免邮 + 专属优惠券");
            }
            return new BenefitInfo("NORMAL_PACK", "普通权益");
        }
    }

    static class UserInfo {
        Long userId;
        String name;
        String level;

        public UserInfo(Long userId, String name, String level) {
            this.userId = userId;
            this.name = name;
            this.level = level;
        }

        @Override
        public String toString() {
            return "UserInfo{userId=" + userId + ", name='" + name + "', level='" + level + "'}";
        }
    }

    static class PointsInfo {
        int points;

        public PointsInfo(int points) {
            this.points = points;
        }

        @Override
        public String toString() {
            return "PointsInfo{points=" + points + "}";
        }
    }

    static class OrderInfo {
        String orderId;
        double amount;

        public OrderInfo(String orderId, double amount) {
            this.orderId = orderId;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "OrderInfo{orderId='" + orderId + "', amount=" + amount + "}";
        }
    }

    static class BenefitInfo {
        String code;
        String desc;

        public BenefitInfo(String code, String desc) {
            this.code = code;
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "BenefitInfo{code='" + code + "', desc='" + desc + "'}";
        }
    }

    static class ProfileResponse {
        UserInfo userInfo;
        PointsInfo pointsInfo;
        List<OrderInfo> orders;
        List<String> recommendations;
        BenefitInfo benefitInfo;

        public ProfileResponse(UserInfo userInfo,
                               PointsInfo pointsInfo,
                               List<OrderInfo> orders,
                               List<String> recommendations,
                               BenefitInfo benefitInfo) {
            this.userInfo = userInfo;
            this.pointsInfo = pointsInfo;
            this.orders = orders;
            this.recommendations = recommendations;
            this.benefitInfo = benefitInfo;
        }

        @Override
        public String toString() {
            return "ProfileResponse{" +
                    "userInfo=" + userInfo +
                    ", pointsInfo=" + pointsInfo +
                    ", orders=" + orders +
                    ", recommendations=" + recommendations +
                    ", benefitInfo=" + benefitInfo +
                    '}';
        }
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("thread interrupted", e);
        }
    }

    static void log(String msg) {
        System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
    }
}

代码拆解:这段实现为什么比较稳

1. 用户、积分、订单、推荐是并行的

因为它们都只依赖 userId,没有先后关系,所以用了多个 supplyAsync() 同时发起。

2. 权益查询依赖用户等级

这个场景天然适合 thenCompose()
逻辑表达很直接:用户信息完成后,再异步查权益

3. 非核心链路做了降级

订单、推荐、权益失败时,都提供了默认值,而不是把整个接口直接打挂。

这在真实业务里非常实用。比如推荐位没有内容,页面通常还能看;但用户基本信息查不到,那就该返回错误。

4. 统一等待,再汇总结果

allOf(...).join() 是等待点。
等待结束后,用各自的 join() 组装响应对象。


时序图:请求在系统里的流转

sequenceDiagram
    participant Client
    participant Facade as ProfileFacade
    participant UserSvc as UserService
    participant PointsSvc as PointsService
    participant OrderSvc as OrderService
    participant RecSvc as RecommendService
    participant BenefitSvc as BenefitService

    Client->>Facade: getProfile(userId)
    Facade->>UserSvc: 异步查询用户信息
    Facade->>PointsSvc: 异步查询积分
    Facade->>OrderSvc: 异步查询订单
    Facade->>RecSvc: 异步查询推荐
    UserSvc-->>Facade: UserInfo
    Facade->>BenefitSvc: 基于用户等级查询权益
    PointsSvc-->>Facade: PointsInfo
    OrderSvc-->>Facade: OrderList
    RecSvc-->>Facade: RecommendList
    BenefitSvc-->>Facade: BenefitInfo
    Facade-->>Client: ProfileResponse

逐步验证清单

如果你准备把这套模式落到生产代码,我建议按这个顺序验证,而不是一上来就全量改造:

第一步:先找“天然可并行”的接口

适合异步改造的接口,通常有这些特征:

  • 存在多个远程 I/O 调用
  • 多个调用之间大多无依赖
  • 下游耗时占大头,CPU 计算不是瓶颈
  • 允许部分数据降级

第二步:先做 2~3 个任务并行,不要一口气铺太大

先对收益最明显的几个下游并发化,看看:

  • 接口 TP99 是否下降
  • 线程池是否平稳
  • 下游服务是否承压异常

第三步:补齐超时、异常、日志

这一步特别关键。
很多异步代码“功能可跑”,但一出问题完全不可观测。

第四步:最后再做抽象封装

不要太早封装一个“万能异步聚合框架”。
我自己的经验是,先让业务链路跑顺,再提炼模式,比一开始追求优雅更靠谱。


常见坑与排查

这部分是实战里最容易踩的地方,我尽量讲得贴近现场一点。

坑 1:默认线程池用错,导致互相抢资源

如果你这样写:

CompletableFuture.supplyAsync(() -> remoteCall());

没有传自定义 executor,默认会用 ForkJoinPool.commonPool()
它更适合 CPU 密集型任务,不太适合大量阻塞型 I/O 调用。

在接口聚合场景里,远程调用、数据库访问、RPC 请求大多是阻塞型的。
如果你直接用默认线程池,很可能出现:

  • 线程数不够
  • 任务堆积
  • 整体吞吐下降
  • 与系统里其他使用 commonPool 的任务相互影响

建议:聚合类异步任务一定配专用线程池。


坑 2:thenApply 和 thenCompose 用反了

很多人刚上手时容易写成这样:

CompletableFuture<CompletableFuture<BenefitInfo>> future =
        userFuture.thenApply(user ->
                CompletableFuture.supplyAsync(() -> benefitService.getBenefits(user.level), executor)
        );

结果套了一层 future,后续处理非常别扭。

如果回调里返回的还是一个异步任务,请优先用:

thenCompose(...)

这点看似小,但能明显提升代码可读性。


坑 3:异常被吞掉,日志里什么都没有

比如下面这段:

CompletableFuture<UserInfo> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("boom");
});

如果你既没 join/get,也没 exceptionally/handle/whenComplete,这个异常很可能静悄悄地挂着,直到某个时刻才暴露。

排查建议:

  • 所有聚合链路最终一定要有等待点
  • 关键 future 必须加异常日志
  • 降级时要明确记录是哪个下游失败

例如:

future.whenComplete((res, ex) -> {
    if (ex != null) {
        System.err.println("任务失败: " + ex.getMessage());
    }
});

坑 4:allOf 完成了,不代表你能直接拿结果对象

CompletableFuture.allOf() 返回的是 CompletableFuture<Void>
它只是告诉你“都完成了”,不负责帮你把结果组成一个列表或对象。

所以正确姿势一般是:

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join();

Result1 r1 = f1.join();
Result2 r2 = f2.join();
Result3 r3 = f3.join();

坑 5:join() 和 get() 的异常表现不一样

  • get() 抛受检异常:InterruptedExceptionExecutionException
  • join() 抛非受检异常:CompletionException

在业务代码里我更常用 join(),因为链路更简洁。
但你要知道,真正的原始异常通常藏在:

ex.getCause()

里面。


坑 6:线程池看起来没满,接口还是慢

这类问题我也踩过。排查时不要只盯着线程池活跃数,还要看:

  • 队列长度是否持续增长
  • 下游服务 RT 是否上升
  • 是否出现超时重试,导致放大流量
  • 是否所有聚合接口共用了一个线程池

异步不是“免费提速”,它只是把等待并行化。
如果下游本身扛不住,并发越高反而越容易雪崩。


安全/性能最佳实践

这里的“安全”主要是指系统稳定性、资源边界、异常隔离,而不只是传统安全漏洞。

1. 为聚合任务配置专用线程池

建议至少显式设置:

  • 核心线程数
  • 最大线程数
  • 队列大小
  • 拒绝策略
  • 线程名前缀

例如:

ExecutorService executor = new ThreadPoolExecutor(
        8, 16, 60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(100),
        new ThreadFactory() {
            private int count = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "cf-agg-" + count++);
            }
        },
        new ThreadPoolExecutor.CallerRunsPolicy()
);

线程名前缀很重要,线上排查日志和线程栈时非常方便。


2. 给每个下游设置超时,而不是无限等待

如果你是 JDK 9+,可以这样:

CompletableFuture<List<String>> recommendFuture = CompletableFuture
        .supplyAsync(() -> recommendService.getRecommendations(userId), executor)
        .completeOnTimeout(Collections.emptyList(), 300, TimeUnit.MILLISECONDS);

或者:

.orTimeout(300, TimeUnit.MILLISECONDS)

区别在于:

  • completeOnTimeout:超时后返回默认值
  • orTimeout:超时后直接抛异常

对聚合接口来说,非核心链路很适合 completeOnTimeout


3. 区分核心数据和可降级数据

这是聚合设计里非常关键的一条边界:

  • 核心数据:失败就整体失败
    例如用户身份、价格、权限
  • 非核心数据:失败可降级
    例如推荐位、标签、埋点扩展信息

不要把所有下游都一视同仁,否则系统要么太脆弱,要么数据质量太差。


4. 避免在异步链中写重 CPU 逻辑

CompletableFuture 非常适合 I/O 编排,但如果你在回调里做:

  • 大对象 JSON 转换
  • 批量复杂计算
  • 大量正则/加解密

那瓶颈就不一定是 I/O 了。
这种场景要么拆到专门计算线程池,要么重新评估是否适合放在接口同步链路中。


5. 做好链路日志与上下文透传

异步场景里,日志最怕“看不出来哪些任务属于同一次请求”。建议至少做到:

  • 请求 ID / traceId 透传
  • 下游服务名打点
  • 开始时间、结束时间、耗时
  • 成功/失败/降级状态

如果你用了 MDC,要注意异步线程默认不会自动继承上下文,需要手动透传或使用包装线程池。


6. 控制并发规模,别把聚合层变成流量放大器

一个请求聚合 5 个下游,如果 QPS 是 200,那么下游视角可能就是 1000 次调用/秒。
如果再叠加重试、超时补偿,请求数会继续放大。

所以你需要:

  • 限制聚合的下游数量
  • 做缓存的尽量缓存
  • 对热点接口做熔断降级
  • 根据下游 SLA 设置不同超时和并发策略

一个更清晰的结构建议

CompletableFuture 容易越写越“链式魔法”。为了可维护性,我更推荐把聚合代码拆成三层:

classDiagram
    class ProfileFacade {
        +getProfile(Long userId) ProfileResponse
    }

    class AsyncTaskFactory {
        +userFuture(Long userId)
        +pointsFuture(Long userId)
        +orderFuture(Long userId)
        +recommendFuture(Long userId)
        +benefitFuture(CompletableFuture~UserInfo~)
    }

    class ProfileAssembler {
        +assemble(...)
    }

    ProfileFacade --> AsyncTaskFactory
    ProfileFacade --> ProfileAssembler

也就是:

  • Facade:负责编排
  • TaskFactory:负责定义每个异步任务
  • Assembler:负责对象组装

这样做的好处是:

  • 编排逻辑和业务逻辑分离
  • 更方便单元测试
  • 降低超长方法带来的阅读负担

如果你现在项目里聚合接口很多,这个拆法很值得试。


什么时候不建议用 CompletableFuture

虽然它很好用,但也不是银弹。下面这些情况,我会更谨慎:

1. 下游只有一个,且耗时本来就很稳定

那异步化收益不大,复杂度反而上升。

2. 强依赖链太长

如果 A 依赖 B,B 依赖 C,C 依赖 D,本质上并行空间有限。
这时候优化重点应放在:

  • 减少依赖层级
  • 合并接口
  • 前移数据准备

3. 团队对异步调试还不熟

如果大家对线程池、超时、异常传播都不熟悉,贸然全面推广,后期维护成本会比较高。
这种情况下,先在关键接口里小范围实践更合适。


总结

CompletableFuture 做接口聚合,真正的价值不只是“把请求并发起来”,而是把以下几件事组织清楚:

  • 哪些任务能并行
  • 哪些任务有依赖
  • 哪些任务允许降级
  • 异常和超时如何处理
  • 线程池和资源边界如何控制

你可以先记住这套落地原则:

  1. 无依赖任务优先并行
  2. 依赖任务用 thenCompose
  3. 统一用 allOf 等待汇总
  4. 非核心链路必须有降级策略
  5. 一定使用自定义线程池
  6. 补齐超时、日志、traceId 透传
  7. 别过度抽象,先跑通一个真实接口

如果你正在优化一个“详情页聚合接口”或者“首页数据拼装接口”,我建议你就拿其中一个接口试一遍:先串行测一版,再异步编排一版,对比 RT、错误率、线程池指标。你会非常直观地看到收益,也更容易理解哪些边界最重要。

从工程经验来看,CompletableFuture 用得好的系统,代码通常不是最“炫”的,而是编排关系清楚、异常边界明确、性能收益可验证。这才是生产环境里真正有价值的异步化。


分享到:

上一篇
《Docker 多阶段构建与镜像瘦身实战:从构建缓存到安全加固的完整优化方案》
下一篇
《微服务架构下分布式事务实战:基于 Saga 模式的订单系统一致性设计与落地》