跳转到内容
123xiao | 无名键客

《Java 中基于 CompletableFuture 的异步编排实战:从并行调用、超时控制到异常兜底的落地方案》

字数: 0 阅读时长: 1 分钟

Java 中基于 CompletableFuture 的异步编排实战:从并行调用、超时控制到异常兜底的落地方案

在 Java 服务端开发里,一个接口同时依赖多个下游服务几乎是家常便饭:查用户信息、查订单统计、查优惠券、查推荐结果……如果这些调用串行执行,整体耗时往往会被最慢的那个接口拖垮。

这时候,CompletableFuture 就很适合上场。它不只是“异步执行一个任务”,更关键的是:它能把多个异步任务编排起来,让并行调用、超时控制、异常降级、结果合并都变成可读、可维护的代码。

这篇文章我想从“能上线的写法”来讲,不停留在 API 罗列,而是直接带你做一个可运行的小案例。


背景与问题

先看一个典型聚合接口场景:

前端请求“用户首页数据”,服务端需要同时调用:

  1. 用户基础信息服务
  2. 账户余额服务
  3. 优惠券服务
  4. 推荐服务

如果你用串行方式写,大概是这样:

UserInfo userInfo = userService.getUserInfo(userId);
Balance balance = accountService.getBalance(userId);
List<Coupon> coupons = couponService.queryCoupons(userId);
List<String> recommends = recommendService.getRecommends(userId);

问题很直接:

  • 总耗时高:四个调用的耗时累加
  • 局部失败拖全局:一个下游报错,整个接口可能直接失败
  • 超时不可控:某个依赖卡住,主线程也跟着等
  • 代码膨胀:加降级、重试、日志后会越来越难读

而聚合接口真正需要的是:

  • 可以并行执行
  • 可以给每个任务设置超时
  • 某些任务失败时可以兜底返回默认值
  • 最后统一合并结果
  • 尽量避免把线程池打爆

这正是 CompletableFuture 擅长的方向。


前置知识与环境准备

本文示例基于:

  • JDK 9+(因为会用到 orTimeout / completeOnTimeout
  • 任意 Java 工程均可运行
  • 示例用 Executors.newFixedThreadPool 演示,生产环境建议自己封装线程池

如果你是 JDK 8,也不是不能做,只是超时控制会稍微麻烦一点,需要配合 ScheduledExecutorService 手工实现。


核心原理

1. CompletableFuture 到底解决什么问题

你可以把 CompletableFuture 理解成:

  • 一个“未来会完成”的结果容器
  • 也是一条“异步处理流水线”

它的价值不是单个异步任务,而是任务之间的编排能力

  • supplyAsync:异步提交有返回值任务
  • runAsync:异步提交无返回值任务
  • thenApply:拿上一步结果继续转换
  • thenCompose:把嵌套异步拍平
  • thenCombine:合并两个独立异步结果
  • allOf:等待一组任务都完成
  • anyOf:谁先完成用谁
  • exceptionally / handle:异常处理
  • orTimeout / completeOnTimeout:超时控制

2. 一张图看懂异步编排链路

flowchart LR
    A[请求进入聚合接口] --> B[并行发起用户信息]
    A --> C[并行发起余额查询]
    A --> D[并行发起优惠券查询]
    A --> E[并行发起推荐服务]

    B --> F[结果收集]
    C --> F
    D --> F
    E --> F

    F --> G[异常兜底/默认值填充]
    G --> H[组装首页 DTO]
    H --> I[返回响应]

3. CompletableFuture 常见编排模式

模式一:并行执行后统一汇总

适合多个服务互不依赖的场景。

sequenceDiagram
    participant Client as 调用方
    participant Agg as 聚合服务
    participant U as 用户服务
    participant B as 余额服务
    participant C as 优惠券服务

    Client->>Agg: 请求首页数据
    Agg->>U: 异步请求
    Agg->>B: 异步请求
    Agg->>C: 异步请求

    U-->>Agg: 用户信息
    B-->>Agg: 余额
    C-->>Agg: 优惠券

    Agg-->>Client: 聚合结果

模式二:依赖式串联

比如先查用户,再根据用户等级查推荐配置,这时适合用 thenCompose

模式三:失败不影响主流程

例如推荐服务只是“锦上添花”,超时或失败时给默认推荐即可,这时用 exceptionallycompleteOnTimeout 很顺手。


实战代码(可运行)

下面做一个完整示例:聚合“用户首页数据”。

目标:

  • 用户信息、余额、优惠券、推荐并行查询
  • 推荐服务超时时返回默认推荐
  • 优惠券服务异常时返回空列表
  • 最后统一汇总结果

1. 完整示例代码

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;

public class CompletableFutureOrchestrationDemo {

    public static void main(String[] args) {
        ExecutorService executor = new ThreadPoolExecutor(
                8,
                16,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(200),
                new ThreadFactory() {
                    private int index = 1;
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r, "biz-cf-" + index++);
                        t.setDaemon(false);
                        return t;
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        HomeService homeService = new HomeService(executor);

        try {
            HomePageDTO dto = homeService.queryHomePage(1001L);
            System.out.println("最终结果:");
            System.out.println(dto);
        } finally {
            executor.shutdown();
        }
    }

    static class HomeService {
        private final Executor executor;
        private final MockRemoteService remoteService = new MockRemoteService();

        public HomeService(Executor executor) {
            this.executor = executor;
        }

        public HomePageDTO queryHomePage(Long userId) {
            long start = System.currentTimeMillis();

            CompletableFuture<UserInfo> userFuture =
                    CompletableFuture.supplyAsync(logCost("用户信息", () -> remoteService.getUserInfo(userId)), executor)
                            .orTimeout(800, TimeUnit.MILLISECONDS);

            CompletableFuture<Balance> balanceFuture =
                    CompletableFuture.supplyAsync(logCost("余额信息", () -> remoteService.getBalance(userId)), executor)
                            .orTimeout(800, TimeUnit.MILLISECONDS)
                            .exceptionally(ex -> {
                                System.out.println("[降级] 余额查询失败: " + ex.getMessage());
                                return new Balance(BigDecimal.ZERO);
                            });

            CompletableFuture<List<Coupon>> couponFuture =
                    CompletableFuture.supplyAsync(logCost("优惠券", () -> remoteService.getCoupons(userId)), executor)
                            .orTimeout(500, TimeUnit.MILLISECONDS)
                            .exceptionally(ex -> {
                                System.out.println("[降级] 优惠券查询失败: " + ex.getMessage());
                                return Collections.emptyList();
                            });

            CompletableFuture<List<String>> recommendFuture =
                    CompletableFuture.supplyAsync(logCost("推荐信息", () -> remoteService.getRecommendations(userId)), executor)
                            .completeOnTimeout(
                                    Arrays.asList("默认推荐A", "默认推荐B"),
                                    300,
                                    TimeUnit.MILLISECONDS
                            )
                            .exceptionally(ex -> {
                                System.out.println("[降级] 推荐服务异常: " + ex.getMessage());
                                return Arrays.asList("兜底推荐X");
                            });

            CompletableFuture<Void> all = CompletableFuture.allOf(
                    userFuture, balanceFuture, couponFuture, recommendFuture
            );

            try {
                all.join();
            } catch (CompletionException ex) {
                System.out.println("[告警] 聚合阶段出现异常: " + ex.getMessage());
            }

            UserInfo userInfo = userFuture.join();
            Balance balance = balanceFuture.join();
            List<Coupon> coupons = couponFuture.join();
            List<String> recommendations = recommendFuture.join();

            HomePageDTO dto = new HomePageDTO();
            dto.setUserId(userId);
            dto.setUserName(userInfo.getName());
            dto.setLevel(userInfo.getLevel());
            dto.setBalance(balance.getAmount());
            dto.setCoupons(coupons);
            dto.setRecommendations(recommendations);
            dto.setQueryTime(LocalDateTime.now());

            long cost = System.currentTimeMillis() - start;
            System.out.println("queryHomePage 总耗时: " + cost + "ms");
            return dto;
        }

        private <T> Supplier<T> logCost(String taskName, Supplier<T> supplier) {
            return () -> {
                long start = System.currentTimeMillis();
                try {
                    return supplier.get();
                } finally {
                    long cost = System.currentTimeMillis() - start;
                    System.out.println(taskName + " 耗时: " + cost + "ms, thread=" + Thread.currentThread().getName());
                }
            };
        }
    }

    static class MockRemoteService {

        public UserInfo getUserInfo(Long userId) {
            sleep(200);
            return new UserInfo(userId, "张三", "VIP");
        }

        public Balance getBalance(Long userId) {
            sleep(250);
            return new Balance(new BigDecimal("1024.88"));
        }

        public List<Coupon> getCoupons(Long userId) {
            sleep(600);
            throw new RuntimeException("优惠券服务不可用");
        }

        public List<String> getRecommendations(Long userId) {
            sleep(1000);
            return Arrays.asList("商品A", "商品B", "商品C");
        }

        private void sleep(long ms) {
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("线程被中断", e);
            }
        }
    }

    static class HomePageDTO {
        private Long userId;
        private String userName;
        private String level;
        private BigDecimal balance;
        private List<Coupon> coupons;
        private List<String> recommendations;
        private LocalDateTime queryTime;

        public void setUserId(Long userId) { this.userId = userId; }
        public void setUserName(String userName) { this.userName = userName; }
        public void setLevel(String level) { this.level = level; }
        public void setBalance(BigDecimal balance) { this.balance = balance; }
        public void setCoupons(List<Coupon> coupons) { this.coupons = coupons; }
        public void setRecommendations(List<String> recommendations) { this.recommendations = recommendations; }
        public void setQueryTime(LocalDateTime queryTime) { this.queryTime = queryTime; }

        @Override
        public String toString() {
            return "HomePageDTO{" +
                    "userId=" + userId +
                    ", userName='" + userName + '\'' +
                    ", level='" + level + '\'' +
                    ", balance=" + balance +
                    ", coupons=" + coupons +
                    ", recommendations=" + recommendations +
                    ", queryTime=" + queryTime +
                    '}';
        }
    }

    static class UserInfo {
        private final Long userId;
        private final String name;
        private final String level;

        public UserInfo(Long userId, String name, String level) {
            this.userId = userId;
            this.name = name;
            this.level = level;
        }

        public Long getUserId() { return userId; }
        public String getName() { return name; }
        public String getLevel() { return level; }
    }

    static class Balance {
        private final BigDecimal amount;

        public Balance(BigDecimal amount) {
            this.amount = amount;
        }

        public BigDecimal getAmount() { return amount; }
    }

    static class Coupon {
        private final String code;

        public Coupon(String code) {
            this.code = code;
        }

        @Override
        public String toString() {
            return "Coupon{" +
                    "code='" + code + '\'' +
                    '}';
        }
    }
}

逐步拆解这段代码

1. 为什么不用默认线程池

很多人一上来就这样写:

CompletableFuture.supplyAsync(() -> remoteCall());

这会默认使用 ForkJoinPool.commonPool()。它不是不能用,但在服务端项目里我一般不建议直接依赖默认线程池,原因有两个:

  • 线程数和业务隔离不可控
  • 一旦里面有阻塞型 IO,请求多时很容易互相影响

所以示例里我显式传入了业务线程池:

CompletableFuture.supplyAsync(task, executor)

这是生产可控性的第一步。

2. 并行调用的关键是“同时提交”

下面四个 Future 在创建时就已经提交到线程池,因此能并行执行:

CompletableFuture<UserInfo> userFuture = ...
CompletableFuture<Balance> balanceFuture = ...
CompletableFuture<List<Coupon>> couponFuture = ...
CompletableFuture<List<String>> recommendFuture = ...

最后再统一等待:

CompletableFuture.allOf(userFuture, balanceFuture, couponFuture, recommendFuture)

3. orTimeoutcompleteOnTimeout 的区别

这是实战里经常混淆的点。

orTimeout

超时后让 Future 异常完成

future.orTimeout(800, TimeUnit.MILLISECONDS)

适合你想明确知道“这个任务超时了”的场景。

completeOnTimeout

超时后直接给一个默认值,Future 正常完成。

future.completeOnTimeout(defaultValue, 300, TimeUnit.MILLISECONDS)

适合推荐、画像标签、营销位之类“可降级”的场景。

我个人的经验是:

  • 核心链路:优先 orTimeout
  • 非核心链路:优先 completeOnTimeout

4. 为什么 allOf 后还要 join

allOf 的返回值是 CompletableFuture<Void>,它只表示“大家都结束了”,但不会帮你收集结果。所以还得逐个 join

all.join();

UserInfo userInfo = userFuture.join();
Balance balance = balanceFuture.join();
List<Coupon> coupons = couponFuture.join();
List<String> recommendations = recommendFuture.join();

一个更贴近业务的串联示例

有些场景不是简单并行,而是“先 A 后 B”。比如:

  1. 先查用户信息
  2. 再根据用户等级查权益信息

这时应该用 thenCompose,而不是在 thenApply 里再套一个 Future。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenComposeDemo {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        CompletableFuture<String> result =
                CompletableFuture.supplyAsync(() -> getUserLevel(1001L), executor)
                        .thenCompose(level ->
                                CompletableFuture.supplyAsync(() -> getBenefitByLevel(level), executor)
                        )
                        .exceptionally(ex -> "默认权益");

        System.out.println(result.join());
        executor.shutdown();
    }

    static String getUserLevel(Long userId) {
        return "VIP";
    }

    static String getBenefitByLevel(String level) {
        return level + " 专属权益包";
    }
}

如果你看到代码里出现 CompletableFuture<CompletableFuture<T>>,十有八九就是该用 thenCompose 了。


常见坑与排查

这一部分非常重要,我自己项目里踩过不少。

1. 误用 join() / get() 导致线程阻塞

虽然写的是异步,但如果你在中间步骤过早调用:

UserInfo userInfo = userFuture.join();
Balance balance = balanceFuture.join();

那就可能把原本能并行的链路重新写回串行。

建议:

  • 先把所有异步任务提交出去
  • 最后统一 allOf(...).join()
  • 再集中取结果

2. 忘记指定线程池

如果异步任务内部是 RPC、数据库、HTTP 调用,都是阻塞型操作,却用了公共线程池,高峰期很容易出现:

  • 任务排队
  • 延迟抖动
  • 线程争抢
  • 其他模块也被拖慢

排查手段:

  • 看线程名是否落到 ForkJoinPool.commonPool-worker-*
  • 看线程池活跃线程数、队列积压
  • 看接口 RT 是否在流量高峰显著上升

3. 异常被“吞掉”了

很多人发现任务失败了,但日志里什么都没有。原因通常是没有显式处理异常:

future.exceptionally(ex -> defaultValue)

或者:

future.handle((result, ex) -> {
    if (ex != null) {
        log.error("task failed", ex);
        return defaultValue;
    }
    return result;
});

经验建议:

  • 兜底可以做,但日志不能省
  • 降级时最好带上用户 ID、traceId、接口名

4. allOf 不是“自动成功”

如果参与 allOf 的任何一个 Future 异常完成,那么 allOf.join() 也会抛异常。
所以如果你希望整体尽量成功,应该在每个子任务上先做异常兜底。

比如:

CompletableFuture<List<Coupon>> couponFuture =
        CompletableFuture.supplyAsync(() -> queryCoupons(), executor)
                .exceptionally(ex -> Collections.emptyList());

这比等到 allOf 再统一处理更稳妥。

5. 超时后任务不一定真的停止

这是个特别容易误解的点。

orTimeout / completeOnTimeout 控制的是 Future 的完成状态,不一定能中断底层正在执行的远程调用。
也就是说:

  • 你的主流程可能已经返回了默认值
  • 但底层那个慢请求线程可能还在跑

这在 IO 调用里很常见。

解决思路:

  • HTTP/RPC 客户端自身要配置连接超时、读超时
  • 数据库查询要有超时配置
  • 不要只依赖 CompletableFuture 层面的超时

排查思路:接口变慢时先看什么

我一般按下面这条线排:

flowchart TD
    A[接口 RT 变慢] --> B{是否有线程池积压}
    B -- 是 --> C[检查核心线程数/队列长度/拒绝策略]
    B -- 否 --> D{是否有下游超时}
    D -- 是 --> E[检查 RPC/HTTP/DB 超时配置]
    D -- 否 --> F{是否提前 join 导致伪异步}
    F -- 是 --> G[调整为先提交后汇总]
    F -- 否 --> H[检查异常兜底与日志缺失]

这个流程不花哨,但很实用。很多“异步没提速”的问题,最后不是 API 用错,而是线程池、超时、阻塞调用这些基础设施没配好。


安全/性能最佳实践

这里说的“安全”主要指服务稳定性,而不只是代码层面的异常处理。

1. 为不同业务隔离线程池

不要把所有异步任务都扔进一个线程池。至少按业务类型分:

  • 用户中心类任务
  • 推荐/画像类任务
  • 营销活动类任务

这样某个模块抖动时,不会把整个聚合接口拖死。

2. 线程池参数要和任务类型匹配

如果是 IO 密集型任务,线程数可以适当高一些;如果是 CPU 密集型,不要盲目加线程。

一个简单原则:

  • CPU 密集:线程数接近 CPU 核数
  • IO 密集:线程数可以大于 CPU 核数,但要结合压测定

另外,队列不要无限大。无限队列看起来“稳”,其实容易把问题延后到内存和 RT 上。

3. 给非核心依赖设默认值

不是所有下游都值得“死等”。例如:

  • 推荐失败:给默认推荐
  • 优惠券失败:返回空列表
  • 用户画像失败:展示基础版页面

核心信息增强信息要分层对待。
这一点做对了,用户体验和系统可用性会明显提升。

4. 日志、监控、指标要成套

至少要有:

  • 每个异步任务耗时日志
  • 超时次数统计
  • 降级次数统计
  • 线程池活跃线程数、队列长度
  • 聚合接口总耗时

否则出了问题,你只能猜。

5. 小心上下文丢失

在异步线程里,像 ThreadLocal、MDC、traceId 这些上下文可能丢失。
如果你依赖链路追踪或日志关联,一定要考虑上下文传递。

常见方案:

  • 自定义线程池包装 Runnable/Callable
  • 使用支持上下文传递的框架或工具库
  • 在任务提交前显式复制必要上下文

6. 不要把 CompletableFuture 当“万能药”

有些场景就不适合上复杂异步编排:

  • 本身只有一个下游调用
  • 接口 RT 已经很低
  • 团队对异步调试经验不足
  • 下游服务并不稳定,异步只会放大并发压力

这时候,先优化下游、缓存、批量接口,往往比盲目异步更有效。


可执行的落地建议

如果你准备把它用到项目里,我建议按下面顺序落地:

第一步:只改“独立下游调用”

优先挑这种接口:

  • 一个聚合接口依赖 3~5 个互不相关的服务
  • 串行耗时明显
  • 某些依赖可以接受降级

这类场景最适合 CompletableFuture,收益也最直接。

第二步:先建专用线程池

别急着改业务代码,先把线程池定好:

  • 核心线程数
  • 最大线程数
  • 队列容量
  • 拒绝策略
  • 线程名前缀
  • 监控指标

第三步:明确依赖分级

把下游依赖分成:

  • 强依赖:失败就整体失败
  • 弱依赖:失败可以默认值兜底

这样你才能决定哪里用 orTimeout,哪里用 completeOnTimeout

第四步:压测验证

重点看:

  • P95 / P99 延迟
  • 线程池队列长度
  • 超时比例
  • 降级比例
  • 下游 QPS 是否被放大

异步编排通常会降低单请求等待时间,但也可能提高系统整体并发量,这一点一定要通过压测确认。


总结

CompletableFuture 真正的价值,不是“让代码异步一点”,而是帮你把复杂接口做成可编排、可降级、可控时延的结构。

你可以抓住这几个落地要点:

  1. 先并行提交,再统一汇总
  2. 核心链路用 orTimeout,非核心链路用 completeOnTimeout
  3. 每个子任务就地做异常兜底,不要把问题留到 allOf
  4. 一定要使用业务线程池,不要默认依赖 commonPool
  5. 超时控制要下沉到 RPC/HTTP/DB 客户端,不能只靠 Future 层
  6. 日志、监控、压测必须跟上

如果你是第一次在项目里用它,我建议别一口气重构所有接口。先拿一个典型聚合场景试点,把并行、超时、降级、监控跑顺,再慢慢复制到其他接口上。这样最稳,也最容易看到收益。

如果只记住一句话,那就是:

CompletableFuture 不是为了“炫技式异步”,而是为了在复杂依赖下,把响应时间和失败影响都控制在可接受范围内。

这才是它在生产环境里的真正意义。


分享到:

上一篇
《面向中型业务的集群架构实战:从服务拆分、负载均衡到高可用容灾设计》
下一篇
《自动化测试中的稳定性治理实战:从脆弱用例定位到持续集成中的误报收敛》