跳转到内容
123xiao | 无名键客

《Java 中 CompletableFuture 异步编排实战:从并行任务聚合到超时控制与异常处理》

字数: 0 阅读时长: 1 分钟

Java 中 CompletableFuture 异步编排实战:从并行任务聚合到超时控制与异常处理

在 Java 后端开发里,很多接口慢,并不一定是“单个 SQL 太慢”,更多时候是一个请求要串好几个下游服务:查用户信息、查订单、查库存、查优惠、算风控……如果全都同步串行,响应时间很容易被拉长。

这类场景里,CompletableFuture 很适合做一件事:把多个互相独立的任务并行化,再把结果编排起来。但真正落地时,往往不是 allOf() 一把梭那么简单,问题会很快变成:

  • 并行任务怎么聚合结果?
  • 某个任务超时了怎么办?
  • 一个任务失败,是整体失败还是局部降级?
  • 线程池怎么配,才不至于“异步写了个寂寞”?
  • 为什么异常看起来被“吞掉”了?

这篇文章我会用一个完整的示例,把这些问题串起来讲清楚。重点不是 API 罗列,而是如何把它用成生产可控的异步编排工具


背景与问题

假设我们要实现一个“用户首页聚合接口”,需要同时拿到:

  1. 用户基本信息
  2. 最近订单
  3. 推荐商品

这三项彼此独立,很适合并行执行。目标大概是:

  • 总耗时接近三者中的最大值,而不是总和
  • 某些非核心模块超时后允许降级
  • 核心模块失败时能明确返回错误
  • 日志里能看见真正的异常来源

如果直接用同步写法,通常是这样:

UserProfile profile = userService.getProfile(userId);
List<Order> orders = orderService.getRecentOrders(userId);
List<Product> recommendations = recommendationService.recommend(userId);
return new HomePage(profile, orders, recommendations);

问题很明显:三个调用顺序执行,总耗时 = 3 段耗时之和。

而用 CompletableFuture 之后,理想流程应该是下面这样。

flowchart LR
    A[请求进入] --> B[并行查询用户信息]
    A --> C[并行查询最近订单]
    A --> D[并行查询推荐商品]
    B --> E[聚合结果]
    C --> E
    D --> E
    E --> F[返回首页数据]

前置知识与环境准备

本文示例基于:

  • JDK 8+
  • 推荐你至少熟悉:
    • Java Lambda
    • ExecutorService
    • 基本异常处理

如果你是 JDK 9+,可以直接使用 orTimeout()completeOnTimeout()
如果你还在 JDK 8,我后面也会给一个通用超时封装思路。


核心原理

CompletableFuture 本质上做了两件事:

  1. 表示一个“未来会完成”的结果
  2. 支持对结果进行链式编排

最常用的几组能力可以这样理解:

1. 创建异步任务

  • supplyAsync(Supplier):有返回值
  • runAsync(Runnable):无返回值
CompletableFuture<String> future =
    CompletableFuture.supplyAsync(() -> "hello");

2. 串联后续处理

  • thenApply:拿到上一步结果,做转换
  • thenAccept:消费结果,无返回值
  • thenCompose:把“Future 里再套 Future”拍平
  • thenCombine:合并两个独立任务结果

3. 聚合多个任务

  • allOf(f1, f2, f3):全部完成后继续
  • anyOf(f1, f2, f3):谁先完成用谁

4. 处理异常

  • exceptionally:出错时兜底
  • handle:不管成功失败都能处理
  • whenComplete:更偏向记录日志、埋点,不改变结果

5. 指定线程池

这是实战里的重点之一。
如果你不传线程池,默认使用 ForkJoinPool.commonPool()。看起来省事,但在服务端代码里,我通常不建议直接依赖默认线程池,因为:

  • 不容易做隔离
  • 不方便观测
  • 阻塞任务可能拖垮公共线程池

一个完整的编排模型

先看一下这类聚合接口的典型执行路径:

sequenceDiagram
    participant Client as 调用方
    participant API as 聚合接口
    participant TP as 业务线程池
    participant U as 用户服务
    participant O as 订单服务
    participant R as 推荐服务

    Client->>API: 请求首页数据
    API->>TP: 提交用户信息任务
    API->>TP: 提交订单任务
    API->>TP: 提交推荐任务
    TP->>U: 查询用户信息
    TP->>O: 查询最近订单
    TP->>R: 查询推荐商品
    U-->>TP: 返回结果
    O-->>TP: 返回结果
    R-->>TP: 超时/成功
    TP-->>API: 聚合结果
    API-->>Client: 返回响应

这里面最关键的是:不是所有任务都必须同等对待

举个很实用的策略:

  • 用户基本信息:核心数据,失败就整体失败
  • 最近订单:重要但可兜底,失败后返回空列表
  • 推荐商品:非核心,超时后直接降级为空

这就不是简单的“并行执行”了,而是带业务语义的异步编排


实战代码(可运行)

下面这个例子可以直接运行。它模拟三个远程调用,并演示:

  • 并行聚合
  • 超时控制
  • 异常兜底
  • 自定义线程池
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class CompletableFutureDemo {

    private static final ExecutorService BIZ_POOL = new ThreadPoolExecutor(
            8,
            16,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            r -> {
                Thread t = new Thread(r);
                t.setName("biz-cf-" + t.getId());
                return t;
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) {
        CompletableFutureDemo demo = new CompletableFutureDemo();
        HomePage page = demo.buildHomePage("u1001");
        System.out.println(page);
        BIZ_POOL.shutdown();
    }

    public HomePage buildHomePage(String userId) {
        long start = System.currentTimeMillis();

        CompletableFuture<UserProfile> profileFuture =
                CompletableFuture.supplyAsync(() -> getProfile(userId), BIZ_POOL)
                        .orTimeout(800, TimeUnit.MILLISECONDS);

        CompletableFuture<List<Order>> ordersFuture =
                CompletableFuture.supplyAsync(() -> getRecentOrders(userId), BIZ_POOL)
                        .orTimeout(700, TimeUnit.MILLISECONDS)
                        .exceptionally(ex -> {
                            log("orders fallback: " + unwrap(ex).getMessage());
                            return Collections.emptyList();
                        });

        CompletableFuture<List<Product>> recommendFuture =
                CompletableFuture.supplyAsync(() -> getRecommendations(userId), BIZ_POOL)
                        .completeOnTimeout(Collections.emptyList(), 500, TimeUnit.MILLISECONDS)
                        .exceptionally(ex -> {
                            log("recommend fallback: " + unwrap(ex).getMessage());
                            return Collections.emptyList();
                        });

        CompletableFuture<Void> all =
                CompletableFuture.allOf(profileFuture, ordersFuture, recommendFuture);

        try {
            all.join();

            HomePage result = new HomePage(
                    profileFuture.join(),
                    ordersFuture.join(),
                    recommendFuture.join()
            );

            log("buildHomePage success, cost=" + (System.currentTimeMillis() - start) + "ms");
            return result;

        } catch (CompletionException ex) {
            Throwable root = unwrap(ex);
            log("buildHomePage failed: " + root.getClass().getSimpleName() + ", " + root.getMessage());
            throw new RuntimeException("首页聚合失败", root);
        }
    }

    private UserProfile getProfile(String userId) {
        sleep(300);
        return new UserProfile(userId, "Alice", 28);
    }

    private List<Order> getRecentOrders(String userId) {
        sleep(600);
        if (Math.random() > 0.7) {
            throw new RuntimeException("订单服务异常");
        }
        return Arrays.asList(
                new Order("O100", 199.0),
                new Order("O101", 299.0)
        );
    }

    private List<Product> getRecommendations(String userId) {
        sleep(900); // 故意比 500ms 超时更长
        return Arrays.asList(
                new Product("P100", "Mechanical Keyboard"),
                new Product("P101", "Noise Cancelling Headphones")
        );
    }

    private static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("线程被中断", e);
        }
    }

    private static Throwable unwrap(Throwable ex) {
        if (ex instanceof CompletionException || ex instanceof ExecutionException) {
            if (ex.getCause() != null) {
                return ex.getCause();
            }
        }
        return ex;
    }

    private static void log(String msg) {
        System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
    }

    static class HomePage {
        private final UserProfile profile;
        private final List<Order> orders;
        private final List<Product> recommendations;

        public HomePage(UserProfile profile, List<Order> orders, List<Product> recommendations) {
            this.profile = profile;
            this.orders = orders;
            this.recommendations = recommendations;
        }

        @Override
        public String toString() {
            return "HomePage{" +
                    "profile=" + profile +
                    ", orders=" + orders +
                    ", recommendations=" + recommendations +
                    '}';
        }
    }

    static class UserProfile {
        private final String userId;
        private final String name;
        private final int age;

        public UserProfile(String userId, String name, int age) {
            this.userId = userId;
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return "UserProfile{" +
                    "userId='" + userId + '\'' +
                    ", name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }

    static class Order {
        private final String orderId;
        private final double amount;

        public Order(String orderId, double amount) {
            this.orderId = orderId;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "Order{" +
                    "orderId='" + orderId + '\'' +
                    ", amount=" + amount +
                    '}';
        }
    }

    static class Product {
        private final String productId;
        private final String title;

        public Product(String productId, String title) {
            this.productId = productId;
            this.title = title;
        }

        @Override
        public String toString() {
            return "Product{" +
                    "productId='" + productId + '\'' +
                    ", title='" + title + '\'' +
                    '}';
        }
    }
}

逐步拆解这段代码

1. 核心任务直接失败,非核心任务降级

CompletableFuture<UserProfile> profileFuture =
        CompletableFuture.supplyAsync(() -> getProfile(userId), BIZ_POOL)
                .orTimeout(800, TimeUnit.MILLISECONDS);

用户信息是核心数据,所以这里只做超时限制,不做兜底。超时就让它失败。

而订单、推荐则允许降级:

.exceptionally(ex -> Collections.emptyList())

这类写法很适合“可空、可缺省”的模块。


2. allOf() 只负责“等全部完成”,不帮你收集结果

这是一个很容易误解的点。很多人第一次用时以为:

CompletableFuture.allOf(f1, f2, f3)

会直接返回所有结果。实际上不是,它返回的是 CompletableFuture<Void>,意思只是:这些任务都结束了

所以你后面还得自己取:

all.join();
HomePage result = new HomePage(
    profileFuture.join(),
    ordersFuture.join(),
    recommendFuture.join()
);

如果你嫌这个写法啰嗦,可以自己封装聚合方法。


3. join()get() 怎么选?

两者都能取结果,但区别是:

  • get() 抛受检异常:InterruptedExceptionExecutionException
  • join() 抛非受检异常:CompletionException

在业务代码里,我一般更常用 join(),因为链式代码更顺手,不用到处 try-catch。但也要记住:异常会被包一层,所以你常常要 unwrap


超时控制的几种方式

超时是异步编排里最容易被忽略,却又最关键的点。
因为“异步”不等于“不会卡住”,如果下游一直慢,你只是把阻塞换了个地方。

1. orTimeout()

超时后直接失败:

future.orTimeout(500, TimeUnit.MILLISECONDS);

适合核心流程,超时就快速报错。

2. completeOnTimeout()

超时后返回默认值:

future.completeOnTimeout(Collections.emptyList(), 500, TimeUnit.MILLISECONDS);

适合推荐列表、标签列表、埋点信息这类非核心数据。

3. JDK 8 的通用超时封装思路

如果你没有 orTimeout(),可以用一个定时任务去完成异常。

import java.util.concurrent.*;

public class TimeoutHelper {
    private static final ScheduledExecutorService SCHEDULER =
            Executors.newScheduledThreadPool(1);

    public static <T> CompletableFuture<T> failAfter(long timeout, TimeUnit unit) {
        CompletableFuture<T> promise = new CompletableFuture<>();
        SCHEDULER.schedule(() ->
                promise.completeExceptionally(new TimeoutException("超时")),
                timeout, unit);
        return promise;
    }

    public static <T> CompletableFuture<T> withTimeout(
            CompletableFuture<T> future, long timeout, TimeUnit unit) {
        return future.applyToEither(failAfter(timeout, unit), t -> t);
    }
}

使用方式:

CompletableFuture<String> task =
        CompletableFuture.supplyAsync(() -> "result");

CompletableFuture<String> withTimeout =
        TimeoutHelper.withTimeout(task, 300, TimeUnit.MILLISECONDS);

异常处理:别只会用 exceptionally

很多团队在 CompletableFuture 里只会一个 exceptionally,然后慢慢把异常处理写成一团。更稳妥的做法,是区分场景。

exceptionally:失败后兜底

future.exceptionally(ex -> defaultValue);

特点:

  • 只在异常时执行
  • 会把异常转换成正常结果

适合降级返回默认值。


handle:成功失败都能改写结果

future.handle((result, ex) -> {
    if (ex != null) {
        return defaultValue;
    }
    return transform(result);
});

适合统一收口,尤其是你想把“成功”和“失败”都映射成一种输出结构时。


whenComplete:更适合记录,不适合兜底

future.whenComplete((result, ex) -> {
    if (ex != null) {
        log("error: " + ex.getMessage());
    }
});

这个方法不会吞掉异常,更适合做:

  • 日志
  • 指标
  • tracing
  • 审计

如果你以为 whenComplete 能把异常变成正常结果,通常会踩坑。


多任务编排的几种常见写法

1. 并行后聚合:allOf

最常见,也最适合聚合接口。

2. 两个结果合并:thenCombine

比如先并行查“用户信息”和“会员等级”,再组合成一个对象:

CompletableFuture<UserProfile> f1 = CompletableFuture.supplyAsync(() -> getProfile("u1"));
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> getLevel("u1"));

CompletableFuture<String> combined =
        f1.thenCombine(f2, (profile, level) -> profile + ", level=" + level);

3. 依赖上一步结果继续异步:thenCompose

这是避免 Future 嵌套的关键。

错误示例:

CompletableFuture<CompletableFuture<String>> nested =
        CompletableFuture.supplyAsync(() -> "u1")
                .thenApply(userId -> CompletableFuture.supplyAsync(() -> "detail-" + userId));

正确写法:

CompletableFuture<String> flat =
        CompletableFuture.supplyAsync(() -> "u1")
                .thenCompose(userId -> CompletableFuture.supplyAsync(() -> "detail-" + userId));

状态变化怎么理解

如果你总觉得 CompletableFuture 的执行过程有点抽象,可以把它看成下面几个状态:

stateDiagram-v2
    [*] --> Pending
    Pending --> Running: 提交任务
    Running --> Success: 正常完成
    Running --> Failed: 抛出异常
    Running --> TimedOut: 超时完成异常
    Failed --> Recovered: exceptionally/handle 降级
    TimedOut --> Recovered: completeOnTimeout/兜底
    Success --> [*]
    Recovered --> [*]
    Failed --> [*]
    TimedOut --> [*]

理解这个状态图后,很多 API 的职责就更清楚了:

  • orTimeout:让 Running 进入 TimedOut
  • exceptionally:把 Failed 转成 Recovered
  • handle:Success / Failed 都能收口
  • allOf:等所有分支都到终态

常见坑与排查

下面这些问题,我自己和团队里同学都踩过。

坑 1:默认线程池被阻塞,异步反而更慢

如果异步任务里包含:

  • HTTP 调用
  • JDBC 查询
  • Redis 阻塞操作
  • 大量 Thread.sleep

那它本质上是阻塞型任务。这时使用 ForkJoinPool.commonPool() 往往不合适。

排查方式

  • 看线程名是不是 ForkJoinPool.commonPool-worker-*
  • 看任务是否存在长时间阻塞
  • 看线程池是否出现排队、CPU 利用率不高但 RT 很高

建议

给业务异步任务使用独立线程池,并区分:

  • IO 密集型线程池
  • CPU 密集型线程池

坑 2:allOf() 之后只知道失败了,不知道谁失败了

allOf() 聚合失败时,异常信息常常不够直观。尤其多任务并行时,你可能只看到一个包装后的 CompletionException

排查建议

给每个分支都加带上下文的日志:

future.whenComplete((r, ex) -> {
    if (ex != null) {
        log("profileFuture failed, userId=" + userId + ", ex=" + unwrap(ex).getMessage());
    }
});

如果系统里有 traceId,也要一并打印。


坑 3:异常被“吃掉”了

最常见的原因是你在中间用了 exceptionally 返回了默认值,导致链路后面再也看不到失败。

这不一定是错,但你要明确:
一旦兜底,后面拿到的是“正常结果”而不是异常。

如果你既想降级,又想保留可观测性,建议这样写:

future.exceptionally(ex -> {
    log("recommend degrade, ex=" + unwrap(ex).getMessage());
    return Collections.emptyList();
});

坑 4:任务超时了,但底层调用没停

这个坑非常常见。
orTimeout()completeOnTimeout() 控制的,是 Future 的完成状态,不一定能真正中断底层 IO 调用。

也就是说:

  • 上层看起来已经超时返回了
  • 但底层线程可能还在跑

这意味着什么?

如果下游调用本身没有超时配置,你的线程资源仍然可能被耗住。

建议

  • HTTP 客户端设置连接超时、读超时
  • 数据库连接设置查询超时
  • 不要把 Future 超时当成唯一超时控制手段

坑 5:链式调用里混用同步/异步方法,线程切换失控

例如:

  • thenApply():通常在前一个任务完成的线程上继续执行
  • thenApplyAsync():会切到线程池执行

如果你在复杂链路里乱用 Async 后缀,线程切换次数会明显增加,排查时也更乱。

我的经验建议

  • 纯计算、轻转换:优先 thenApply
  • 明确要切线程池隔离时:再用 thenApplyAsync(..., executor)

安全/性能最佳实践

这一节是最偏“生产化”的部分。

1. 一定要用自定义线程池

建议最少做到:

  • 核心线程数、最大线程数可配置
  • 队列长度可控
  • 拒绝策略明确
  • 线程名可识别

示例:

ExecutorService pool = new ThreadPoolExecutor(
        16, 32,
        60, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(200),
        r -> {
            Thread t = new Thread(r);
            t.setName("order-agg-" + t.getId());
            return t;
        },
        new ThreadPoolExecutor.CallerRunsPolicy()
);

2. 为每个下游调用设置“双超时”

我通常会强调两个层面的超时:

  • 客户端调用超时:真正约束 IO
  • CompletableFuture 编排超时:约束聚合链路

两层都要有,缺一不可。


3. 区分核心链路和非核心链路

不要所有任务都“失败即整体失败”,也不要所有任务都“一律降级”。

更实用的思路是:

  • 核心:用户身份、库存锁定、支付结果
  • 可降级:推荐、标签、画像补充、埋点扩展字段

这其实是业务策略,不只是技术写法。


4. 避免在异步任务里放重 CPU 运算

如果任务本身是 CPU 密集型,比如:

  • 大量 JSON 序列化/反序列化
  • 大批量加密解密
  • 复杂规则计算

那就不要和 IO 任务混在同一个线程池里。否则一边跑计算,一边等网络,很容易互相拖慢。


5. 保留上下文信息

异步之后最难受的事情之一,是日志断链。
建议至少保留:

  • userId
  • requestId / traceId
  • 任务名称
  • 开始时间 / 耗时
  • 失败原因

如果你的项目用了 MDC,要注意线程切换后上下文可能丢失,需要做传递封装。


6. 不要滥用异步

这是我最后特别想提醒的一点。
不是所有逻辑都值得上 CompletableFuture

如果只有一个下游调用,或者任务之间强依赖、总耗时本来就短,那为了“显得高级”硬上异步,通常只会:

  • 增加复杂度
  • 增加排障成本
  • 让异常栈更难读

异步编排最适合:多个独立、耗时较高、可并行、可降级的任务。


逐步验证清单

如果你准备把本文的模式搬到自己项目里,可以按这个清单验证:

功能层

  • 多个独立任务是否确实已经并行
  • allOf() 后是否正确收集了每个结果
  • 核心任务失败是否会整体失败
  • 非核心任务失败是否能按预期降级

超时层

  • 每个下游客户端本身是否设置了超时
  • Future 层是否设置了编排超时
  • 超时后是否仍有线程长期占用

观测层

  • 是否能从日志看出哪个分支失败
  • 是否打印了 traceId / requestId
  • 是否上报了任务耗时和异常指标

资源层

  • 是否使用了独立线程池
  • 线程池队列是否可能打满
  • 拒绝策略是否符合业务预期

总结

CompletableFuture 真正的价值,不只是“把同步代码改成异步”,而是把原本串行的、互相独立的调用,组织成一个可并行、可超时、可降级、可观测的执行流。

你可以重点记住这几条:

  1. 并行聚合:用 supplyAsync + allOf
  2. 结果收集allOf() 不返回结果,要自己 join()
  3. 超时控制:核心链路用 orTimeout(),非核心链路可用 completeOnTimeout()
  4. 异常处理exceptionally 兜底,handle 收口,whenComplete 做日志
  5. 线程池隔离:服务端不要过度依赖默认公共线程池
  6. 生产关键点:Future 超时不等于底层 IO 真停,客户端超时也必须配

如果你现在正准备重构一个“聚合查询接口”,我的建议是先从一个简单场景开始:

  • 先挑 2~3 个独立下游
  • 先做并行化
  • 再补超时和降级
  • 最后补日志、指标和线程池治理

这样最稳,也最容易看见收益。
很多时候,CompletableFuture 的难点不在 API,而在于你是否把业务优先级、失败策略、资源边界想清楚了。只要这三件事明确,异步编排就会真正变成你的加速器,而不是新的故障源。


分享到:

上一篇
《大模型推理性能优化实战:从量化部署到 KV Cache 调优的完整方案》
下一篇
《从源码到部署:基于开源项目 MinIO 搭建高可用对象存储服务的实战指南》