跳转到内容
123xiao | 无名键客

《Java中基于CompletableFuture的异步编排实战:并行调用、超时控制与异常兜底》

字数: 0 阅读时长: 1 分钟

Java中基于CompletableFuture的异步编排实战:并行调用、超时控制与异常兜底

在日常后端开发里,我们经常会遇到一种很典型的接口:

  • 先查用户基本信息
  • 再并行查订单、积分、优惠券
  • 其中某个下游慢了不能把整个接口拖死
  • 某个服务挂了,页面也不能直接 500,最好给个兜底结果

如果这些逻辑全用串行写法,代码不但慢,而且很难优雅地处理“部分失败”。
这正是 CompletableFuture 很擅长的场景:把多个异步任务编排起来,控制执行顺序、并行度、超时和异常恢复策略。

这篇文章我会从一个“聚合查询接口”的例子出发,带你完整走一遍:

  • 怎么并行调用多个远程服务
  • 怎么做整体或单任务超时控制
  • 怎么在异常时优雅兜底
  • 怎么避免线程池和阻塞使用不当带来的坑

文章偏实战,中级 Java 开发者读完就能直接上手。


背景与问题

假设我们要做一个“用户主页聚合接口”,一次请求要拼出下面这些数据:

  • 用户信息:UserService
  • 订单摘要:OrderService
  • 积分信息:PointService

串行写法的问题

最直观的写法是这样的:

User user = userService.getUser(userId);
OrderSummary orderSummary = orderService.getOrderSummary(userId);
Points points = pointService.getPoints(userId);
return new UserProfile(user, orderSummary, points);

问题有两个:

  1. :如果三个服务分别耗时 200ms、300ms、150ms,串行总耗时接近 650ms
  2. 脆弱:只要一个服务异常,整个接口就容易失败

而实际上:

  • 订单和积分查询并不依赖彼此,可以并行
  • 积分失败时可以给默认值
  • 订单服务如果超时,可以降级成“暂无订单信息”

这就是异步编排的切入点。


前置知识与环境准备

你需要了解的基础

开始之前,建议你至少熟悉这些概念:

  • Java 8 Lambda 表达式
  • 线程池 ExecutorService
  • 异常处理 try-catch
  • 基本的并发概念

建议 JDK 版本

虽然 CompletableFuture 在 Java 8 就可用,但如果你要更方便地做超时控制,推荐使用:

  • JDK 9+:可以用 orTimeout()completeOnTimeout()

如果你当前是 Java 8,也不是不能做,只是超时控制写法会稍微绕一点。本文主代码按 JDK 9+ 写。


核心原理

CompletableFuture 本质上做了两件事:

  1. 表示一个未来会完成的结果
  2. 允许你声明式地把多个异步任务串起来

它和传统 Future 最大的区别在于:
Future 更像“拿一个异步句柄然后手动等结果”,而 CompletableFuture 则支持“把任务连接起来”。

常见能力可以这样理解

  • supplyAsync():异步执行一个有返回值任务
  • runAsync():异步执行一个无返回值任务
  • thenApply():拿上一步结果继续做转换
  • thenCompose():把“异步套异步”拍平
  • thenCombine():把两个独立异步任务结果合并
  • allOf():等待多个任务全部完成
  • anyOf():谁先完成用谁
  • exceptionally():异常兜底
  • handle():统一处理成功/失败
  • orTimeout():超时则异常结束
  • completeOnTimeout():超时则返回默认值

Mermaid 图:CompletableFuture 编排关系

flowchart LR
    A[接收请求 userId] --> B[异步查询用户信息]
    A --> C[异步查询订单摘要]
    A --> D[异步查询积分信息]
    B --> E[合并结果]
    C --> E
    D --> E
    E --> F[返回用户主页数据]

这张图对应的是最基础的并行聚合流程。


核心原理拆解:并行、依赖、合并、兜底

1. 并行任务

彼此独立的调用,应该尽量同时发出,而不是一个接一个等。

2. 依赖任务

比如先查到用户等级,再根据等级异步查权益,这时就不是 thenApply(),而通常是 thenCompose()

3. 超时控制

超时控制至少分两层思考:

  • 单个任务超时:某个下游太慢就降级
  • 整体接口超时:整个聚合接口不能无限等

4. 异常兜底

不是所有异常都该直接抛给用户。
例如:

  • 优惠券查询失败 → 返回空列表
  • 推荐服务超时 → 返回默认推荐
  • 核心用户信息失败 → 整体失败

也就是说,要区分核心依赖和非核心依赖


Mermaid 图:异常与超时处理流程

sequenceDiagram
    participant Client as 调用方
    participant API as 聚合接口
    participant User as UserService
    participant Order as OrderService
    participant Point as PointService

    Client->>API: 请求用户主页
    API->>User: 异步调用
    API->>Order: 异步调用
    API->>Point: 异步调用

    User-->>API: 返回用户信息
    Order-->>API: 超时/异常
    Point-->>API: 返回积分

    API->>API: 订单结果兜底为默认值
    API->>API: 合并用户/订单/积分
    API-->>Client: 返回聚合结果

实战代码(可运行)

下面用一个完整示例来演示。
为了方便你直接复制运行,我把“远程服务”模拟成了带随机延迟的方法。

示例目标

实现一个 getUserProfile()

  • 用户信息、订单、积分并行查
  • 订单 300ms 超时则降级
  • 积分异常则给默认值
  • 最后组合成一个用户主页对象

第一步:定义数据模型

import java.util.List;

class User {
    private final Long id;
    private final String name;

    public User(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Long getId() {
        return id;
    }

    public String getName() {
        return name;
    }
}

class OrderSummary {
    private final int totalOrders;
    private final String lastOrderTime;

    public OrderSummary(int totalOrders, String lastOrderTime) {
        this.totalOrders = totalOrders;
        this.lastOrderTime = lastOrderTime;
    }

    public int getTotalOrders() {
        return totalOrders;
    }

    public String getLastOrderTime() {
        return lastOrderTime;
    }

    @Override
    public String toString() {
        return "OrderSummary{totalOrders=" + totalOrders + ", lastOrderTime='" + lastOrderTime + "'}";
    }
}

class Points {
    private final int value;

    public Points(int value) {
        this.value = value;
    }

    public int getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Points{value=" + value + "}";
    }
}

class UserProfile {
    private final User user;
    private final OrderSummary orderSummary;
    private final Points points;

    public UserProfile(User user, OrderSummary orderSummary, Points points) {
        this.user = user;
        this.orderSummary = orderSummary;
        this.points = points;
    }

    @Override
    public String toString() {
        return "UserProfile{" +
                "user=" + user.getName() +
                ", orderSummary=" + orderSummary +
                ", points=" + points +
                '}';
    }
}

第二步:模拟下游服务

import java.util.concurrent.ThreadLocalRandom;

class UserService {
    public User getUser(Long userId) {
        sleep(100);
        return new User(userId, "Alice");
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("UserService interrupted", e);
        }
    }
}

class OrderService {
    public OrderSummary getOrderSummary(Long userId) {
        int delay = ThreadLocalRandom.current().nextInt(100, 500);
        sleep(delay);
        return new OrderSummary(12, "2023-07-01 20:30:00");
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("OrderService interrupted", e);
        }
    }
}

class PointService {
    public Points getPoints(Long userId) {
        int random = ThreadLocalRandom.current().nextInt(10);
        sleep(150);

        if (random < 3) {
            throw new RuntimeException("PointService error");
        }
        return new Points(888);
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("PointService interrupted", e);
        }
    }
}

第三步:用 CompletableFuture 做异步编排

import java.util.concurrent.*;

public class CompletableFutureDemo {

    private final UserService userService = new UserService();
    private final OrderService orderService = new OrderService();
    private final PointService pointService = new PointService();

    private final ExecutorService executor = new ThreadPoolExecutor(
            8,
            16,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new ThreadFactory() {
                private int index = 1;
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "cf-worker-" + index++);
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public UserProfile getUserProfile(Long userId) {
        long start = System.currentTimeMillis();

        CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
                () -> userService.getUser(userId), executor
        );

        CompletableFuture<OrderSummary> orderFuture = CompletableFuture.supplyAsync(
                () -> orderService.getOrderSummary(userId), executor
        ).completeOnTimeout(
                new OrderSummary(0, "timeout-default"),
                300,
                TimeUnit.MILLISECONDS
        ).exceptionally(ex -> {
            System.out.println("order service failed: " + ex.getMessage());
            return new OrderSummary(0, "exception-default");
        });

        CompletableFuture<Points> pointsFuture = CompletableFuture.supplyAsync(
                () -> pointService.getPoints(userId), executor
        ).exceptionally(ex -> {
            System.out.println("point service failed: " + ex.getMessage());
            return new Points(0);
        });

        CompletableFuture<UserProfile> profileFuture =
                userFuture.thenCombine(orderFuture, (user, order) -> new Object[]{user, order})
                          .thenCombine(pointsFuture, (arr, points) ->
                                  new UserProfile((User) arr[0], (OrderSummary) arr[1], points)
                          );

        try {
            UserProfile result = profileFuture.orTimeout(1, TimeUnit.SECONDS).join();
            long cost = System.currentTimeMillis() - start;
            System.out.println("total cost: " + cost + " ms");
            return result;
        } catch (CompletionException e) {
            throw new RuntimeException("getUserProfile failed", e.getCause());
        }
    }

    public void shutdown() {
        executor.shutdown();
    }

    public static void main(String[] args) {
        CompletableFutureDemo demo = new CompletableFutureDemo();
        try {
            UserProfile profile = demo.getUserProfile(1001L);
            System.out.println(profile);
        } finally {
            demo.shutdown();
        }
    }
}

运行后你会看到什么

这个示例每次运行结果可能不一样,因为我故意加了随机延迟和随机异常。
常见情况包括:

  • 订单在 300ms 内返回,正常合并
  • 订单超过 300ms,返回超时默认值
  • 积分服务抛异常,返回 Points(0)
  • 整体超过 1 秒时,整个接口超时失败

这正是线上接口经常遇到的真实情况:不是所有依赖都稳定,也不是所有依赖都值得死等。


再进一步:依赖型任务用 thenCompose

上面的例子主要是“并行”。
但在实际项目里,另一个高频场景是“后一个异步任务依赖前一个结果”。

比如:

  1. 先查用户
  2. 再根据用户等级查权益

这时应该用 thenCompose(),而不是返回嵌套的 CompletableFuture<CompletableFuture<T>>

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

class BenefitService {
    public String getBenefitsByLevel(String level) {
        return "VIP".equals(level) ? "premium-benefits" : "normal-benefits";
    }
}

class UserWithLevel extends User {
    private final String level;

    public UserWithLevel(Long id, String name, String level) {
        super(id, name);
        this.level = level;
    }

    public String getLevel() {
        return level;
    }
}

class ComposeExample {
    private final ExecutorService executor;
    private final BenefitService benefitService = new BenefitService();

    public ComposeExample(ExecutorService executor) {
        this.executor = executor;
    }

    public CompletableFuture<String> getUserBenefits(Long userId) {
        return CompletableFuture
                .supplyAsync(() -> new UserWithLevel(userId, "Bob", "VIP"), executor)
                .thenCompose(user -> CompletableFuture.supplyAsync(
                        () -> benefitService.getBenefitsByLevel(user.getLevel()), executor
                ));
    }
}

什么时候用 thenApply,什么时候用 thenCompose?

可以记一个很实用的判断法:

  • 如果下一步是同步加工结果,用 thenApply
  • 如果下一步是再发起一个异步任务,用 thenCompose

逐步验证清单

如果你准备把这套模式落到业务代码里,我建议按下面顺序验证,不要一上来就把链路写得很复杂。

验证 1:先确认线程池是否生效

打印线程名,确认不是全跑到主线程或公共线程池里。

CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    return "ok";
}, executor).join();

验证 2:确认并行是否真的发生

把两个任务都 sleep(300ms),如果总耗时接近 300ms,说明是并行;
如果接近 600ms,多半写成串行了。

验证 3:确认超时降级是否生效

故意让某个服务 sleep(1000ms),检查:

  • 是不是返回默认值
  • 是不是没有把整个接口拖死

验证 4:确认异常是否被吞掉或被正确记录

很多人第一次用 exceptionally() 时只返回默认值,却忘了打日志,结果排查起来非常痛苦。


常见坑与排查

这一部分我尽量讲得“像线上事故复盘”,因为这些坑我自己和团队里都踩过。

坑 1:默认线程池用得太随意

如果你直接写:

CompletableFuture.supplyAsync(() -> doSomething());

默认会走 ForkJoinPool.commonPool()
这在 demo 里没问题,但业务系统里通常不建议这么干。

为什么不建议?

  • 不方便做隔离
  • 不方便监控
  • 容易和其他异步任务互相影响
  • 遇到阻塞 IO 时,公共线程池表现可能不稳定

建议

核心业务异步任务尽量显式传入自定义线程池。


坑 2:在异步线程里做长时间阻塞

CompletableFuture 很容易让人误以为“写了异步就高性能了”,其实不是。
如果你在线程池里做的都是:

  • HTTP 阻塞调用
  • 数据库阻塞查询
  • 大量 Thread.sleep()

那本质上还是在消耗线程等待。

排查信号

  • 线程池活跃线程数很高
  • 队列堆积明显
  • 接口 RT 抖动严重
  • CPU 不高,但吞吐上不去

建议

  • 给 IO 型任务单独线程池
  • 设置合理队列长度
  • 不要把所有异步任务都塞同一个池子

坑 3:join/get 用错位置,导致“假异步”

比如下面写法:

User user = userFuture.join();
OrderSummary order = orderFuture.join();
Points points = pointsFuture.join();

这本身不是错,但如果你的 future 之间本来可以继续声明式组合,这么早 join() 会让代码变成“手工拼装”,可读性和扩展性都变差。

更糟的是,有些人写成:

CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> userService.getUser(userId), executor);
User user = userFuture.join();

CompletableFuture<OrderSummary> orderFuture = CompletableFuture.supplyAsync(() -> orderService.getOrderSummary(userId), executor);
OrderSummary order = orderFuture.join();

这就退化成串行了。

建议

  • 尽量在链路末端统一 join()get()
  • 中间过程多用 thenCombine / allOf / thenCompose

坑 4:异常被包装,看不清根因

join() 抛的是 CompletionExceptionget() 抛的是 ExecutionException
真正的业务异常通常在 cause 里。

try {
    future.join();
} catch (CompletionException e) {
    Throwable rootCause = e.getCause();
    rootCause.printStackTrace();
}

建议

日志里尽量打印根因,不然排查时全是“CompletionException”,信息很弱。


坑 5:只做单任务超时,不做整体超时

很多系统会给每个下游都设 300ms 超时,然后以为万无一失。
但如果有多个任务、重试、排队、线程竞争,整体 RT 仍然可能超预期。

建议

同时考虑两类超时:

  • 单任务超时:某个依赖别拖死整体
  • 总链路超时:整个接口有最终截止时间

Mermaid 图:推荐的超时与兜底层次

flowchart TD
    A[请求进入] --> B[发起多个异步调用]
    B --> C1[用户服务 核心依赖]
    B --> C2[订单服务 非核心依赖]
    B --> C3[积分服务 非核心依赖]

    C2 --> D2[单任务超时后默认值]
    C3 --> D3[异常后默认值]
    C1 --> E[失败则整体失败]

    D2 --> F[聚合结果]
    D3 --> F
    E --> F
    F --> G[整体超时控制]

安全/性能最佳实践

这一节给你一些可以直接落地的建议。

1. 区分核心依赖与非核心依赖

不是所有服务都值得“一票否决”。

核心依赖

失败就整体失败,例如:

  • 用户身份信息
  • 权限信息
  • 支付状态

非核心依赖

失败可以兜底,例如:

  • 推荐列表
  • 积分角标
  • 营销信息

这一步如果不先想清楚,后面的异常处理很容易乱。


2. 为不同任务类型配置独立线程池

最常见的合理拆分:

  • IO 密集型线程池:远程调用、数据库查询
  • CPU 密集型线程池:计算、加密、压缩

不要把所有任务都塞到一个线程池里。
否则某个慢服务一堆阻塞任务,就可能把整个系统异步能力拖垮。


3. 线程池参数别照抄

一个常见误区是直接复制网上的线程池参数。
实际上线程池大小要结合:

  • 机器核数
  • 下游 RT
  • 峰值并发
  • 任务类型
  • 可接受排队时长

对于大多数聚合接口来说,更重要的是:

  • 队列不能无限大
  • 拒绝策略要明确
  • 线程名要可观测

4. 超时值不要拍脑袋定

超时不是越短越好,也不是越长越稳。

建议按下面思路定:

  • 先看下游 P95 / P99 响应时间
  • 给核心依赖略宽一些预算
  • 给非核心依赖更激进的超时与降级
  • 总超时必须小于上游调用方超时

例如:

  • 用户服务:500ms
  • 订单服务:300ms,超时降级
  • 积分服务:200ms,异常兜底
  • 聚合接口总超时:800ms

5. 日志和监控一定要带上下文

异步代码一旦没有上下文,排查真的会非常痛苦。
至少建议记录:

  • 请求 ID
  • userId
  • 下游服务名
  • 超时时间
  • 实际耗时
  • 降级次数
  • 异常类型

如果有链路追踪系统,记得把异步线程里的上下文透传好。
否则日志看起来就像“凭空冒出的一条异常”。


6. 避免在回调里写过重逻辑

thenApplyhandle 这些回调里,尽量只做轻量处理。
如果你在回调里塞了复杂计算、二次远程调用甚至数据库写入,链路很容易变得难以维护。

更稳妥的方式

  • 回调中只做结果转换与路由判断
  • 复杂逻辑拆成独立方法
  • 明确每一步是否允许失败、是否可降级

7. 注意敏感信息与异常透出

安全上有个容易被忽略的点:
兜底时不要把内部异常信息直接返回给前端。

错误示例:

.exceptionally(ex -> {
    return new Points(-1); // 同时把 ex.getMessage() 原样返回给前端描述
});

更合理的做法是:

  • 对外返回统一、可理解的业务语义
  • 对内保留完整异常日志
  • 避免泄露内部服务名、库表结构、网络信息

一个更稳的写法:allOf 统一等待

如果任务比较多,thenCombine 一层层写下去会变得有点绕。
这时可以考虑 allOf()

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class AllOfExample {

    public static void main(String[] args) {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "user");
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "order");
        CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "points");

        CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3)
                .orTimeout(1, TimeUnit.SECONDS);

        all.join();

        String result = f1.join() + "-" + f2.join() + "-" + f3.join();
        System.out.println(result);
    }
}

适用场景

  • 任务数量较多
  • 所有任务都要完成后再统一组装
  • 聚合逻辑更关注“等齐再处理”

注意点

allOf() 自己不直接返回组合结果,仍然需要你从各个 future 中取值。


方案选型小结:什么时候适合 CompletableFuture

CompletableFuture 非常适合:

  • 单机场景下的异步编排
  • 聚合接口
  • 多下游并行查询
  • 轻量级依赖流程控制

但它也不是万能的。

比较适合的情况

  • 依赖数量中等
  • 编排逻辑主要在当前进程内
  • 你能接受命令式 + 函数式混合风格
  • 需要快速落地,不想引入更重框架

可能不那么适合的情况

  • 工作流非常复杂,分支特别多
  • 需要跨服务持久化编排状态
  • 需要可视化任务流和补偿机制
  • 团队对函数式回调风格不熟悉

这种时候,也许该考虑更高层的工作流引擎或响应式方案,而不是硬把所有逻辑塞进 CompletableFuture 链里。


总结

我们把一个典型的聚合接口场景,用 CompletableFuture 从头到尾走了一遍。核心要点可以收成 6 句话:

  1. 独立调用尽量并行,不要串行等
  2. 依赖后续异步任务时,用 thenCompose
  3. 单任务要有超时,整体接口也要有总超时
  4. 异常处理要区分核心依赖和非核心依赖
  5. 生产环境尽量使用自定义线程池,不依赖公共线程池
  6. 异步代码一定要配套日志、监控和上下文透传

如果你现在要把它落到项目里,我建议按这个顺序做:

  • 先找一个聚合接口做试点
  • 把独立下游改成并行调用
  • 给非核心依赖加默认值兜底
  • 给每个下游加超时预算
  • 最后补齐线程池、监控和日志

这样改造,风险最可控,收益也通常最明显。

很多人第一次接触 CompletableFuture 时,会觉得 API 有点多、有点杂。其实不用一上来全会。
你先把这几个用熟:

  • supplyAsync
  • thenCombine
  • thenCompose
  • allOf
  • exceptionally
  • completeOnTimeout
  • orTimeout

已经足够覆盖大多数业务异步编排场景了。

如果你在实际项目里只记住一件事,我希望是这句:
异步编排的重点不只是“快”,而是“在不稳定环境下,仍然可控地返回结果”。


分享到:

上一篇
《微服务架构中基于服务网格的灰度发布与流量治理实战指南》
下一篇
《从单体到集群:中级工程师实现高可用服务架构的拆分、负载均衡与故障转移实战》