跳转到内容
123xiao | 无名键客

《Java 中基于 CompletableFuture 与线程池的异步编排实战:性能优化、异常处理与最佳实践》

字数: 0 阅读时长: 1 分钟

Java 中基于 CompletableFuture 与线程池的异步编排实战:性能优化、异常处理与最佳实践

在 Java 里做并发开发,很多人一开始会从 FutureExecutorService 入手,但很快就会遇到一个现实问题:任务能提交,但不好编排
比如“先查用户,再并行查订单和优惠券,最后组装结果”,用传统 Future 写起来容易变成一堆 get(),不仅阻塞,还难处理异常和超时。

CompletableFuture 的价值,就在于它把“异步执行”和“结果编排”放到了一套统一模型里。再配合自定义线程池,基本可以覆盖绝大多数业务系统里的异步场景。

这篇文章我会从为什么要用、底层怎么想、怎么写、怎么避坑、怎么调优几个维度,带你完整走一遍。


背景与问题

先看一个常见业务场景:构建用户首页数据。

需要做这些事:

  1. 查询用户基础信息
  2. 查询用户订单列表
  3. 查询用户优惠券
  4. 查询推荐商品
  5. 聚合返回

其中:

  • 用户信息是后续部分的前置条件
  • 订单、优惠券、推荐商品可以并行
  • 任一子任务可能失败或超时
  • 主流程不能因为一个慢接口被拖死

如果用同步串行方式,耗时大约是:

总耗时 = 用户信息 + 订单 + 优惠券 + 推荐

如果合理并行,理论上可以接近:

总耗时 = 用户信息 + max(订单, 优惠券, 推荐)

这就是异步编排最直接的价值:降低尾延迟,提高吞吐

但问题也随之而来:

  • 默认线程池到底能不能直接用?
  • thenApplythenComposethenCombine 有什么差别?
  • 子任务异常后主流程怎么兜底?
  • join()get() 到底怎么选?
  • 线程池怎么配才不把服务拖垮?

这些,都是实战里绕不过去的。


前置知识与环境准备

本文示例基于:

  • JDK 8+
  • Maven/Gradle 均可
  • 仅使用 JDK 标准库

建议你至少了解:

  • ExecutorService
  • Callable / Future
  • Java 基本并发概念

核心原理

1. CompletableFuture 解决了什么

它本质上做了两件事:

  1. 异步执行任务
  2. 在任务完成后继续编排后续动作

你可以把它理解成一个“未来会完成的结果”,并且这个结果上可以继续挂回调。


2. 常见编排方式怎么理解

supplyAsync / runAsync

  • supplyAsync:有返回值
  • runAsync:无返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");

thenApply

对上一步结果做同步转换

future.thenApply(String::toUpperCase);

如果只是“拿到结果加工一下”,通常用它。


thenCompose

把“返回另一个 CompletableFuture 的操作”接起来,避免嵌套。

CompletableFuture<User> userFuture = getUserFuture();
CompletableFuture<Order> orderFuture =
    userFuture.thenCompose(user -> getOrderFuture(user.getId()));

如果你的下一步还是异步任务,优先考虑 thenCompose


thenCombine

两个独立任务并行执行,最后合并结果。

futureA.thenCombine(futureB, (a, b) -> a + b);

适合“各查各的,最后汇总”。


allOf / anyOf

  • allOf:全部完成再继续
  • anyOf:任意一个完成就继续

allOf 很适合“并行批量查数据再统一汇总”。


3. 默认线程池不是银弹

很多例子都直接写:

CompletableFuture.supplyAsync(() -> doSomething());

这会默认使用 ForkJoinPool.commonPool()
它不是不能用,但在服务端业务里,我一般不建议直接依赖默认线程池,原因很简单:

  • 线程数不一定适合你的业务
  • 里面混入 CPU 密集和 IO 密集任务时容易相互影响
  • 排查问题时不容易隔离
  • 线程命名和监控都不友好

更稳妥的方式是:按业务类型创建自定义线程池


一张图看懂异步编排流程

flowchart LR
    A[查询用户信息] --> B[并行查询订单]
    A --> C[并行查询优惠券]
    A --> D[并行查询推荐商品]
    B --> E[聚合首页结果]
    C --> E
    D --> E

CompletableFuture 与线程池的关系

sequenceDiagram
    participant Client as 调用方
    participant CF as CompletableFuture
    participant TP as 业务线程池
    participant S1 as 用户服务
    participant S2 as 订单服务
    participant S3 as 优惠券服务

    Client->>CF: supplyAsync(查询用户, TP)
    CF->>TP: 提交任务
    TP->>S1: 查询用户
    S1-->>CF: 返回用户结果
    CF->>TP: thenCompose/thenCombine 编排
    TP->>S2: 查询订单
    TP->>S3: 查询优惠券
    S2-->>CF: 订单结果
    S3-->>CF: 优惠券结果
    CF-->>Client: 聚合结果

实战代码(可运行)

下面我们写一个完整示例:模拟“构建用户首页”。

1. 示例目标

  • 自定义线程池
  • 使用 supplyAsync 发起异步任务
  • thenComposethenCombineallOf 编排流程
  • 做异常兜底
  • 打印耗时

2. 完整代码

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class CompletableFutureDemo {

    public static void main(String[] args) {
        ExecutorService ioPool = new ThreadPoolExecutor(
                8,
                16,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(200),
                new NamedThreadFactory("io-pool"),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        try {
            long start = System.currentTimeMillis();

            HomePageService service = new HomePageService(ioPool);
            HomePage page = service.buildHomePage(1001L);

            long cost = System.currentTimeMillis() - start;
            System.out.println("最终结果: " + page);
            System.out.println("总耗时: " + cost + " ms");
        } finally {
            ioPool.shutdown();
        }
    }

    static class HomePageService {
        private final Executor executor;

        HomePageService(Executor executor) {
            this.executor = executor;
        }

        public HomePage buildHomePage(Long userId) {
            CompletableFuture<User> userFuture = CompletableFuture
                    .supplyAsync(() -> getUser(userId), executor);

            CompletableFuture<List<Order>> orderFuture = userFuture
                    .thenCompose(user ->
                            CompletableFuture.supplyAsync(() -> getOrders(user.getId()), executor))
                    .exceptionally(ex -> {
                        log("订单查询失败: " + ex.getMessage());
                        return Collections.emptyList();
                    });

            CompletableFuture<List<Coupon>> couponFuture = userFuture
                    .thenCompose(user ->
                            CompletableFuture.supplyAsync(() -> getCoupons(user.getId()), executor))
                    .exceptionally(ex -> {
                        log("优惠券查询失败: " + ex.getMessage());
                        return Collections.emptyList();
                    });

            CompletableFuture<List<String>> recommendFuture = userFuture
                    .thenCompose(user ->
                            CompletableFuture.supplyAsync(() -> getRecommendations(user), executor))
                    .exceptionally(ex -> {
                        log("推荐查询失败: " + ex.getMessage());
                        return Arrays.asList("默认推荐商品A", "默认推荐商品B");
                    });

            CompletableFuture<Void> allDone = CompletableFuture.allOf(
                    userFuture, orderFuture, couponFuture, recommendFuture
            );

            return allDone.thenApply(v -> new HomePage(
                    userFuture.join(),
                    orderFuture.join(),
                    couponFuture.join(),
                    recommendFuture.join()
            )).join();
        }

        private User getUser(Long userId) {
            sleep(300);
            log("查询用户信息");
            return new User(userId, "张三", 3);
        }

        private List<Order> getOrders(Long userId) {
            sleep(500);
            log("查询订单列表");
            return Arrays.asList(
                    new Order("O-10001", 199.0),
                    new Order("O-10002", 299.0)
            );
        }

        private List<Coupon> getCoupons(Long userId) {
            sleep(400);
            log("查询优惠券");
            return Arrays.asList(
                    new Coupon("C-10", 10),
                    new Coupon("C-20", 20)
            );
        }

        private List<String> getRecommendations(User user) {
            sleep(350);
            log("查询推荐商品");
            if (user.getLevel() < 1) {
                throw new RuntimeException("用户等级异常");
            }
            return Arrays.asList("机械键盘", "显示器", "人体工学椅");
        }
    }

    static class User {
        private final Long id;
        private final String name;
        private final int level;

        public User(Long id, String name, int level) {
            this.id = id;
            this.name = name;
            this.level = level;
        }

        public Long getId() {
            return id;
        }

        public int getLevel() {
            return level;
        }

        @Override
        public String toString() {
            return "User{id=" + id + ", name='" + name + "', level=" + level + "}";
        }
    }

    static class Order {
        private final String orderNo;
        private final double amount;

        public Order(String orderNo, double amount) {
            this.orderNo = orderNo;
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "Order{orderNo='" + orderNo + "', amount=" + amount + "}";
        }
    }

    static class Coupon {
        private final String code;
        private final int discount;

        public Coupon(String code, int discount) {
            this.code = code;
            this.discount = discount;
        }

        @Override
        public String toString() {
            return "Coupon{code='" + code + "', discount=" + discount + "}";
        }
    }

    static class HomePage {
        private final User user;
        private final List<Order> orders;
        private final List<Coupon> coupons;
        private final List<String> recommendations;

        public HomePage(User user, List<Order> orders, List<Coupon> coupons, List<String> recommendations) {
            this.user = user;
            this.orders = orders;
            this.coupons = coupons;
            this.recommendations = recommendations;
        }

        @Override
        public String toString() {
            return "HomePage{" +
                    "user=" + user +
                    ", orders=" + orders +
                    ", coupons=" + coupons +
                    ", recommendations=" + recommendations +
                    '}';
        }
    }

    static class NamedThreadFactory implements ThreadFactory {
        private final String prefix;
        private final AtomicInteger count = new AtomicInteger(1);

        NamedThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName(prefix + "-" + count.getAndIncrement());
            return t;
        }
    }

    static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("线程被中断", e);
        }
    }

    static void log(String msg) {
        System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
    }
}

3. 代码里最值得注意的点

thenCompose 串联依赖任务

订单、优惠券、推荐都依赖用户信息,所以先拿到 user,再继续发起异步任务:

userFuture.thenCompose(user ->
    CompletableFuture.supplyAsync(() -> getOrders(user.getId()), executor))

如果这里用 thenApply,会得到嵌套结构:

CompletableFuture<CompletableFuture<List<Order>>>

这通常不是你想要的。


exceptionally 做降级

比如推荐系统挂了,并不一定要让整个首页失败:

.exceptionally(ex -> Arrays.asList("默认推荐商品A", "默认推荐商品B"))

这在真实项目里非常常见:
主链路成功优先,非核心数据允许降级。


allOf 等所有任务完成

CompletableFuture<Void> allDone = CompletableFuture.allOf(
    userFuture, orderFuture, couponFuture, recommendFuture
);

然后再统一 join() 取结果。


逐步验证清单

如果你想自己边学边验证,我建议按这个顺序:

  1. 先只保留 userFuture,确认异步任务能执行
  2. 加入 orderFuture,理解 thenCompose
  3. 再加 couponFuturerecommendFuture,观察并行效果
  4. 给某个任务手动抛异常,确认降级逻辑生效
  5. 调小线程池核心线程数,看整体耗时变化
  6. 将队列长度改小,观察拒绝策略行为

这样学得最扎实,不会只停留在“会抄代码”。


常见坑与排查

这部分我建议认真看,很多问题不是 API 不会用,而是运行行为和预期不一致


坑 1:把 join() 写早了,导致“伪异步”

错误示例:

CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> getUser(1L), executor);
User user = userFuture.join();

CompletableFuture<List<Order>> orderFuture =
        CompletableFuture.supplyAsync(() -> getOrders(user.getId()), executor);

这会先阻塞等待用户信息,后续并行能力就没了。

更合理的写法是继续编排:

CompletableFuture<List<Order>> orderFuture =
        userFuture.thenCompose(user ->
                CompletableFuture.supplyAsync(() -> getOrders(user.getId()), executor));

坑 2:默认线程池被阻塞任务拖慢

ForkJoinPool.commonPool() 更适合短小、CPU 型任务。
如果你在里面跑大量网络 IO、数据库 IO、远程调用,线程可能长期阻塞,导致整体吞吐下降。

排查思路:

  • 看线程 dump,是否大量线程阻塞
  • 看任务队列积压
  • 看接口 RT 是否周期性抖动
  • 看线程名是否全部落在 ForkJoinPool.commonPool-worker-*

只要是业务服务,我一般建议关键异步任务都显式传入线程池。


坑 3:异常被吞掉,日志看不见

有些同学会写:

future.thenApply(data -> process(data));

但既没有 exceptionally,也没有 handle,最终异常可能只在 join() 时才暴露,排查起来比较晚。

建议对关键链路统一补异常处理:

future.handle((result, ex) -> {
    if (ex != null) {
        log("任务失败: " + ex.getMessage());
        return defaultValue();
    }
    return result;
});

坑 4:线程池参数拍脑袋配置

线程池不是“线程越多越快”。

CPU 密集型

比如加密、计算、规则匹配:

线程数 ≈ CPU 核心数 或 CPU 核心数 + 1

IO 密集型

比如查库、调接口、读文件:

线程数可以适当放大,但必须结合:

  • 平均 RT
  • 峰值并发
  • 下游容量
  • JVM 内存
  • 队列长度

如果盲目拉高线程数,结果往往是:

  • 上下文切换增加
  • 内存涨
  • 下游服务被打爆
  • 超时更多

坑 5:任务里用了 ThreadLocal,却没考虑线程复用

线程池会复用线程。
如果异步任务依赖 ThreadLocal 上下文,比如用户信息、traceId、租户信息,可能出现:

  • 取不到值
  • 取到旧值
  • 日志链路断裂

这类问题非常隐蔽,我自己也踩过。
如果有上下文传递需求,要么显式传参,要么引入专门的上下文传递方案。


常见 API 选择建议

很多人不是不会用,而是总在几个 API 之间犹豫。下面给一个实用版判断表。

场景推荐 API
开启一个有返回值异步任务supplyAsync
开启一个无返回值异步任务runAsync
对结果做同步加工thenApply
下一步还是异步任务thenCompose
两个独立任务合并结果thenCombine
全部完成后统一处理allOf
任一完成就继续anyOf
异常降级exceptionally
无论成功失败都处理handle
只做结果消费thenAccept

异常传播与恢复流程图

flowchart TD
    A[异步任务开始] --> B{执行成功?}
    B -- 是 --> C[进入 thenApply / thenCompose]
    B -- 否 --> D[异常传播]
    D --> E{是否有 exceptionally / handle?}
    E -- 有 --> F[返回兜底结果或记录日志]
    E -- 无 --> G[join/get 时抛出异常]
    F --> H[主流程继续]
    C --> H

安全/性能最佳实践

这一节尽量讲“能直接落地”的。


1. 线程池按职责隔离

不要把所有异步任务都丢进一个线程池。
比较合理的做法:

  • 用户请求链路线程池
  • IO 调用线程池
  • 计算任务线程池
  • 定时/批处理线程池

这样做的好处是故障隔离
例如推荐服务抖了,不至于把订单查询线程也卡死。


2. 给线程起名字

别小看这件事。
线上排查时,线程名是最便宜也最有效的信息之一。

new NamedThreadFactory("io-pool")

看日志、看 dump、看监控时都会轻松很多。


3. 设置合理的队列和拒绝策略

一个常见误区是把队列设成无限大,比如:

new LinkedBlockingQueue<>()

这会隐藏问题。任务堆积时你不一定马上发现,最后可能演变成:

  • 内存上涨
  • 请求超时
  • Full GC
  • 服务雪崩

更建议:

  • 队列设置成有界
  • 明确拒绝策略
  • 结合业务做削峰或降级

例如:

new LinkedBlockingQueue<>(200)

搭配:

new ThreadPoolExecutor.CallerRunsPolicy()

适合在压力升高时让调用方感知背压。


4. 区分核心链路和非核心链路

不是所有任务都必须成功。

例如首页场景中:

  • 用户信息:核心,不可缺失
  • 订单:较核心
  • 优惠券:可降级
  • 推荐商品:可降级

所以设计时就应该分层:

  • 核心任务失败:快速失败或明确返回错误
  • 非核心任务失败:记录日志 + 返回默认值

这会直接影响用户体验和系统稳定性。


5. 避免在异步回调里做重阻塞操作

CompletableFuture 链路本来是为了提升并行效率。
如果你在回调里又调用了大量阻塞逻辑,比如:

  • 慢 SQL
  • 长时间网络请求
  • 大对象序列化

那就容易把线程占住,异步优势被抵消。

经验上可以记一句:
异步编排不是把阻塞藏起来,而是把依赖关系组织清楚。


6. 尽量加超时控制

JDK 8 原生对 CompletableFuture 超时支持比较弱,很多项目会自己封装。
如果你的调用可能卡住,建议至少在外围做超时控制,避免主流程无限等待。

一个简单思路是配合线程池、业务超时、远程调用超时一起设置。
不要只在最外层接口超时,内部异步任务却一直跑。


7. 监控比“理论正确”更重要

线上异步问题,最后都要靠监控说话。建议至少有:

  • 线程池活跃线程数
  • 队列长度
  • 任务拒绝次数
  • 各子任务 RT
  • 异常率
  • 超时次数

如果没有这些指标,异步越复杂,排查越痛苦。


一个更实用的线程池配置思路

下面给一个偏业务服务的参考模板:

ExecutorService executor = new ThreadPoolExecutor(
        8,
        16,
        60L,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(500),
        new NamedThreadFactory("biz-async"),
        new ThreadPoolExecutor.CallerRunsPolicy()
);

什么时候适合这种配置?

  • 任务主要是 IO 型
  • 希望有一定弹性线程
  • 队列希望有界
  • 希望压力过高时出现背压

但边界也要明确:

  • 如果任务强 CPU 密集,线程数别配太大
  • 如果下游容量很弱,线程数和队列都要更保守
  • 如果任务不能在调用线程执行,就别用 CallerRunsPolicy

没有万能参数,只有结合业务压测后的参数。


常见排查路径

如果线上出现“异步后反而更慢”,我通常按这个顺序看:

  1. 线程池是否打满

    • 活跃线程数是否接近最大值
    • 队列是否积压
  2. 是否有阻塞任务混入

    • 线程 dump 看 WAITING / TIMED_WAITING
    • 看是否卡在 HTTP、数据库、锁等待
  3. 是否过早 join/get

    • 代码里有没有把异步链路提前阻塞
  4. 异常是否被吞

    • 是否统一打印了异常日志
    • 是否用了不合理的默认值掩盖问题
  5. 下游是否成为瓶颈

    • 你快了,不代表下游扛得住
    • 并发提升后,下游 RT 可能恶化

这个顺序很实用,能先排掉大部分问题。


什么时候不建议用 CompletableFuture

虽然它很好用,但也不是所有场景都适合。

不太适合的情况

  1. 简单到只有一个异步任务

    • 直接线程池提交可能更直接
  2. 链路极其复杂,存在大量动态分支

    • 代码容易变得难读,建议封装或引入更高层抽象
  3. 需要强响应式流处理

    • 比如大量事件流、背压控制,这时 Reactor/RxJava 可能更合适

所以选择标准很简单:
如果你的核心需求是“少量异步任务之间的依赖与聚合”,CompletableFuture 非常合适。


总结

如果你只记住这篇文章里的几件事,我建议是下面这几条:

  1. CompletableFuture 的核心价值是编排,不只是异步
  2. 业务服务里尽量使用自定义线程池,不要过度依赖默认线程池
  3. 依赖任务用 thenCompose,并行汇总用 thenCombine / allOf
  4. 异常处理要前置设计,非核心链路要能降级
  5. 线程池参数、超时、监控、拒绝策略必须一起考虑
  6. 异步不是越多越好,目标是降低总耗时而不是制造复杂度

我自己的经验是:
异步编排真正难的,不是把代码“写成异步”,而是把依赖关系、资源边界、失败策略想清楚。想清楚了,CompletableFuture 会非常顺手;没想清楚,再优雅的 API 也可能把系统搞复杂。

如果你正准备把同步流程改造成异步,建议先从一条核心链路开始,小步验证线程池、超时、异常降级是否符合预期,再逐步推广。这样最稳。


分享到:

上一篇
《从源码到部署:基于开源项目 MinIO 搭建高可用对象存储服务的实战指南-343》
下一篇
《Spring Boot 中基于 Spring Security 与 JWT 的权限认证实战:从登录鉴权到接口级访问控制》