跳转到内容
123xiao | 无名键客

《Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常处理》

字数: 0 阅读时长: 1 分钟

Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常处理

在 Java 后端开发里,“把几个慢接口并行调一下,再兜住超时和异常” 是非常常见的需求。

比如一个商品详情页,可能要同时查:

  • 商品基础信息
  • 库存
  • 价格
  • 营销活动
  • 用户画像或推荐数据

如果这些调用串行执行,整体耗时往往是所有接口耗时之和;而如果并行执行,总耗时更接近最慢的那个接口。这也是 CompletableFuture 特别适合登场的地方。

这篇文章我会从**“为什么要用”讲到“怎么写能跑”,再到“线上容易踩什么坑”**。你可以把它当成一篇偏实战的上手指南。


背景与问题

先看一个很典型的同步写法:

public ProductPage getProductPage(String productId) {
    Product product = productService.getProduct(productId);
    Inventory inventory = inventoryService.getInventory(productId);
    Price price = priceService.getPrice(productId);
    Promotion promotion = promotionService.getPromotion(productId);

    return new ProductPage(product, inventory, price, promotion);
}

这个写法的问题不复杂,但很致命:

  1. 耗时叠加
    如果 4 个远程调用分别要 200ms、150ms、180ms、300ms,总耗时可能接近 830ms。

  2. 异常处理分散
    任意一个服务失败,整个页面可能直接报错,或者需要一堆 try-catch

  3. 超时不可控
    某个下游接口卡住,会把整个请求链路拖死。

  4. 扩展性差
    以后再加“推荐”、“评价摘要”、“物流承诺”,同步代码会越来越难维护。

这时就需要一种编排方式:

  • 并行发起任务
  • 等待全部完成或部分完成
  • 能做超时控制
  • 能做异常兜底
  • 能把多个结果最终聚合

CompletableFuture 正是为这类场景准备的。


前置知识与环境准备

适用版本

本文示例建议使用:

  • JDK 9+:可直接使用 orTimeoutcompleteOnTimeout
  • 如果你是 JDK 8,也能做,但超时控制要稍微绕一点

你需要知道的基础

如果你已经会这些,读起来会更顺:

  • Java 线程池基础
  • Lambda 表达式
  • Future 的基本概念
  • 异常传播机制

一个重要提醒

CompletableFuture 不是银弹。它更适合:

  • I/O 密集型任务并发
  • 多个相互独立的远程调用编排
  • 需要阶段式回调和结果聚合的场景

如果你拿它去做:

  • 大量 CPU 密集计算
  • 不受控的阻塞任务堆积
  • 无限并发的接口风暴

那很容易把线程池打爆。


核心原理

先别急着背 API,我更建议先理解它的三个核心角色。

1. 任务发起:异步执行

最常见的是:

  • supplyAsync:有返回值
  • runAsync:无返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");

2. 阶段编排:把后续动作串起来

你可以把它理解为:前一个任务完成后,接下来做什么

常见方法:

  • thenApply:转换结果
  • thenAccept:消费结果
  • thenRun:不关心结果,只执行后续动作
  • thenCompose:扁平化串联异步任务
  • thenCombine:合并两个异步结果

3. 结果汇总:等待多个任务

常用两个:

  • allOf:全部完成再继续
  • anyOf:任意一个完成就继续

一张图看懂 CompletableFuture 编排路径

flowchart LR
    A[收到请求] --> B[并行查询商品信息]
    B --> C1[商品基础信息]
    B --> C2[库存]
    B --> C3[价格]
    B --> C4[营销活动]
    C1 --> D[结果聚合]
    C2 --> D
    C3 --> D
    C4 --> D
    D --> E[返回商品详情页]

这张图背后的关键点是:

  • 每个子任务互相独立,可以并行
  • 聚合点统一收口
  • 异常和超时最好在每个子任务内部就处理掉,而不是最后一把梭

CompletableFuture 的常用编排方式

并行:多个任务同时发起

CompletableFuture<Product> productFuture =
        CompletableFuture.supplyAsync(() -> productService.getProduct(productId), executor);

CompletableFuture<Inventory> inventoryFuture =
        CompletableFuture.supplyAsync(() -> inventoryService.getInventory(productId), executor);

聚合:等待全部任务完成

CompletableFuture.allOf(productFuture, inventoryFuture).join();

转换:处理结果

CompletableFuture<String> nameFuture =
        productFuture.thenApply(Product::getName);

串联:依赖前一个异步结果

比如先查商品,再根据商品分类查推荐:

CompletableFuture<Recommendation> recFuture =
        productFuture.thenCompose(product ->
                CompletableFuture.supplyAsync(
                        () -> recommendationService.getRecommendation(product.getCategoryId()),
                        executor
                )
        );

这里很多人第一次会把 thenApplythenCompose 搞混。

  • thenApply:返回普通值
  • thenCompose:返回另一个 CompletableFuture

你可以简单记忆:
“如果回调里已经是异步了,就用 thenCompose。”


实战代码(可运行)

下面我们做一个完整示例:模拟一个商品聚合接口,包含:

  • 并行调用多个服务
  • 每个子任务设置超时
  • 异常自动降级
  • 最终聚合结果返回

示例目标

构造一个 ProductPageService,查询:

  • 商品信息
  • 库存
  • 价格
  • 营销活动

其中:

  • 库存服务偶发异常
  • 营销服务可能超时
  • 即使某个服务失败,页面仍然尽量返回可用信息

完整示例代码

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class CompletableFutureDemo {

    public static void main(String[] args) {
        ExecutorService executor = new ThreadPoolExecutor(
                8,
                16,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                new NamedThreadFactory("cf-demo-"),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        ProductService productService = new ProductService();
        InventoryService inventoryService = new InventoryService();
        PriceService priceService = new PriceService();
        PromotionService promotionService = new PromotionService();

        ProductPageService pageService = new ProductPageService(
                executor, productService, inventoryService, priceService, promotionService
        );

        long start = System.currentTimeMillis();
        ProductPage page = pageService.getProductPage("SKU-1001");
        long cost = System.currentTimeMillis() - start;

        System.out.println("结果:" + page);
        System.out.println("耗时:" + cost + " ms");

        executor.shutdown();
    }

    static class ProductPageService {
        private final Executor executor;
        private final ProductService productService;
        private final InventoryService inventoryService;
        private final PriceService priceService;
        private final PromotionService promotionService;

        public ProductPageService(
                Executor executor,
                ProductService productService,
                InventoryService inventoryService,
                PriceService priceService,
                PromotionService promotionService
        ) {
            this.executor = executor;
            this.productService = productService;
            this.inventoryService = inventoryService;
            this.priceService = priceService;
            this.promotionService = promotionService;
        }

        public ProductPage getProductPage(String productId) {
            CompletableFuture<Product> productFuture = CompletableFuture
                    .supplyAsync(() -> productService.getProduct(productId), executor)
                    .orTimeout(500, TimeUnit.MILLISECONDS)
                    .exceptionally(ex -> {
                        log("product service failed: " + ex.getMessage());
                        return Product.defaultProduct(productId);
                    });

            CompletableFuture<Inventory> inventoryFuture = CompletableFuture
                    .supplyAsync(() -> inventoryService.getInventory(productId), executor)
                    .orTimeout(300, TimeUnit.MILLISECONDS)
                    .exceptionally(ex -> {
                        log("inventory service failed: " + ex.getMessage());
                        return Inventory.unknown(productId);
                    });

            CompletableFuture<Price> priceFuture = CompletableFuture
                    .supplyAsync(() -> priceService.getPrice(productId), executor)
                    .orTimeout(300, TimeUnit.MILLISECONDS)
                    .exceptionally(ex -> {
                        log("price service failed: " + ex.getMessage());
                        return Price.defaultPrice(productId);
                    });

            CompletableFuture<Promotion> promotionFuture = CompletableFuture
                    .supplyAsync(() -> promotionService.getPromotion(productId), executor)
                    .completeOnTimeout(Promotion.noPromotion(productId), 200, TimeUnit.MILLISECONDS)
                    .exceptionally(ex -> {
                        log("promotion service failed: " + ex.getMessage());
                        return Promotion.noPromotion(productId);
                    });

            CompletableFuture<Void> all = CompletableFuture.allOf(
                    productFuture, inventoryFuture, priceFuture, promotionFuture
            );

            return all.thenApply(v -> new ProductPage(
                    productFuture.join(),
                    inventoryFuture.join(),
                    priceFuture.join(),
                    promotionFuture.join(),
                    LocalDateTime.now()
            )).join();
        }

        private void log(String msg) {
            System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
        }
    }

    static class ProductService {
        public Product getProduct(String productId) {
            sleep(120);
            return new Product(productId, "机械键盘", "外设");
        }
    }

    static class InventoryService {
        public Inventory getInventory(String productId) {
            sleep(180);
            if (ThreadLocalRandom.current().nextBoolean()) {
                throw new RuntimeException("库存服务偶发异常");
            }
            return new Inventory(productId, 99, true);
        }
    }

    static class PriceService {
        public Price getPrice(String productId) {
            sleep(150);
            return new Price(productId, new BigDecimal("399.00"), "CNY");
        }
    }

    static class PromotionService {
        public Promotion getPromotion(String productId) {
            sleep(400);
            return new Promotion(productId, "满 300 减 20");
        }
    }

    static class Product {
        private final String productId;
        private final String name;
        private final String category;

        public Product(String productId, String name, String category) {
            this.productId = productId;
            this.name = name;
            this.category = category;
        }

        public static Product defaultProduct(String productId) {
            return new Product(productId, "未知商品", "默认分类");
        }

        @Override
        public String toString() {
            return "Product{productId='" + productId + "', name='" + name + "', category='" + category + "'}";
        }
    }

    static class Inventory {
        private final String productId;
        private final int stock;
        private final boolean available;

        public Inventory(String productId, int stock, boolean available) {
            this.productId = productId;
            this.stock = stock;
            this.available = available;
        }

        public static Inventory unknown(String productId) {
            return new Inventory(productId, -1, false);
        }

        @Override
        public String toString() {
            return "Inventory{productId='" + productId + "', stock=" + stock + ", available=" + available + "}";
        }
    }

    static class Price {
        private final String productId;
        private final BigDecimal amount;
        private final String currency;

        public Price(String productId, BigDecimal amount, String currency) {
            this.productId = productId;
            this.amount = amount;
            this.currency = currency;
        }

        public static Price defaultPrice(String productId) {
            return new Price(productId, BigDecimal.ZERO, "CNY");
        }

        @Override
        public String toString() {
            return "Price{productId='" + productId + "', amount=" + amount + ", currency='" + currency + "'}";
        }
    }

    static class Promotion {
        private final String productId;
        private final String promotionText;

        public Promotion(String productId, String promotionText) {
            this.productId = productId;
            this.promotionText = promotionText;
        }

        public static Promotion noPromotion(String productId) {
            return new Promotion(productId, "无活动");
        }

        @Override
        public String toString() {
            return "Promotion{productId='" + productId + "', promotionText='" + promotionText + "'}";
        }
    }

    static class ProductPage {
        private final Product product;
        private final Inventory inventory;
        private final Price price;
        private final Promotion promotion;
        private final LocalDateTime queryTime;

        public ProductPage(Product product, Inventory inventory, Price price, Promotion promotion, LocalDateTime queryTime) {
            this.product = product;
            this.inventory = inventory;
            this.price = price;
            this.promotion = promotion;
            this.queryTime = queryTime;
        }

        @Override
        public String toString() {
            return "ProductPage{" +
                    "product=" + product +
                    ", inventory=" + inventory +
                    ", price=" + price +
                    ", promotion=" + promotion +
                    ", queryTime=" + queryTime +
                    '}';
        }
    }

    static class NamedThreadFactory implements ThreadFactory {
        private final String prefix;
        private final AtomicInteger counter = new AtomicInteger(1);

        public NamedThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, prefix + counter.getAndIncrement());
        }
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("线程被中断", e);
        }
    }
}

这段代码里最值得关注的点

1. 每个子任务单独兜底

这是我非常推荐的做法。

.exceptionally(ex -> Inventory.unknown(productId))

不要把所有异常都留到 allOf(...).join() 再统一处理。
原因很简单:一旦你希望“部分失败、整体可用”,就必须让每个任务自己降级。


2. orTimeoutcompleteOnTimeout 的区别

这两个 API 很像,但语义完全不同。

orTimeout

超时后让任务异常完成

future.orTimeout(300, TimeUnit.MILLISECONDS)

适合:

  • 你希望明确感知超时
  • 想走异常处理逻辑
  • 想统计超时率

completeOnTimeout

超时后返回一个默认值

future.completeOnTimeout(defaultValue, 300, TimeUnit.MILLISECONDS)

适合:

  • 这个字段不是强依赖
  • 你愿意降级展示
  • 不希望因为超时把主流程打断

比如“营销活动”、“推荐文案”这类信息,通常就适合 completeOnTimeout


3. join()get() 的区别

很多人第一次用时会问:为什么示例里用 join(),不用 get()

get()

  • 会抛受检异常:InterruptedExceptionExecutionException
  • 更偏传统 Future 风格

join()

  • 抛运行时异常:CompletionException
  • 写链式编排时代码更简洁

在业务编排里,我通常更常用 join()
但前提是:你知道异常会怎么传播,并且做好了日志与兜底。


再用一张图理解异常与超时流转

sequenceDiagram
    participant Client as 调用方
    participant Page as ProductPageService
    participant P as ProductService
    participant I as InventoryService
    participant R as PriceService
    participant M as PromotionService

    Client->>Page: getProductPage(productId)
    Page->>P: 异步查询商品
    Page->>I: 异步查询库存
    Page->>R: 异步查询价格
    Page->>M: 异步查询营销

    P-->>Page: 正常返回
    I-->>Page: 抛出异常
    R-->>Page: 正常返回
    M-->>Page: 超时

    Page->>Page: inventory exceptionally 降级
    Page->>Page: promotion completeOnTimeout 降级
    Page->>Page: allOf 聚合
    Page-->>Client: 返回可用页面结果

这就是线上非常常见的一类策略:

  • 核心字段不能缺
  • 非核心字段允许失败或超时
  • 聚合层不要轻易被单点拖垮

逐步验证清单

如果你准备把代码放到自己的项目里,我建议按下面顺序验证。

第一步:先验证并行是否生效

方法很简单:打印每个任务开始和结束时间,看总耗时是否接近最慢任务,而不是总和。

第二步:故意制造一个异常

比如让库存服务抛异常,观察:

  • 主流程是否仍然返回
  • 降级值是否正确
  • 日志里是否能定位到失败原因

第三步:故意制造超时

把营销服务睡眠时间调大,观察:

  • orTimeout 是否抛异常
  • completeOnTimeout 是否返回默认值
  • 主线程是否被长时间拖住

第四步:压测线程池

重点看:

  • 活跃线程数
  • 队列长度
  • 拒绝策略是否触发
  • RT 是否抖动明显

这是很多 demo 看不出来、线上却非常致命的部分。


常见坑与排查

这一节我想讲得稍微接地气一点,因为这些坑我自己也踩过。

坑 1:默认线程池用得太随意

如果你不传 executor,很多异步任务会跑到 ForkJoinPool.commonPool()

CompletableFuture.supplyAsync(() -> query());

这在 demo 里没问题,但在线上可能有风险:

  • 线程池和别的任务共享
  • 阻塞任务会拖累整个公共线程池
  • 性能问题不好定位

建议:业务异步编排尽量使用自定义线程池。


坑 2:把阻塞 I/O 当成轻量异步

CompletableFuture 本质上并不会让阻塞 I/O 自动变快。
它做的是:把多个阻塞任务并行化

如果你并发量大、每个任务都在阻塞等待远程结果,那么线程池容量、队列长度、下游承载能力都必须一起考虑。

否则你只是把“串行慢”变成了“并发更容易雪崩”。


坑 3:allOf 不会直接给你结果

这是初学者很容易误解的一点。

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);

allOf 返回的是 CompletableFuture<Void>,它只是告诉你:这些任务都完成了
你还得自己去取每个任务的结果:

all.join();
A a = f1.join();
B b = f2.join();
C c = f3.join();

坑 4:异常被包装,看日志看不懂

join() 抛出来的常常是 CompletionException,真正的业务异常在 getCause() 里。

排查时注意:

  • 打完整堆栈
  • 展开 root cause
  • 不要只打印 ex.getMessage()

更推荐这样记录:

.exceptionally(ex -> {
    Throwable cause = ex instanceof CompletionException ? ex.getCause() : ex;
    cause.printStackTrace();
    return fallbackValue;
})

坑 5:超时了,但任务未必真的停了

这点很关键。

orTimeout / completeOnTimeout 影响的是 CompletableFuture 的完成状态,
不等于一定中断底层正在执行的任务

也就是说:

  • 调用方看到“超时了”
  • 但底层线程里的远程调用可能还在继续跑

如果你特别关心资源回收,就要结合:

  • 下游 HTTP 客户端超时
  • 数据库查询超时
  • 可中断任务设计
  • 限流和熔断

不能只靠 CompletableFuture 一层。


安全/性能最佳实践

这一节给你一组更偏落地的建议。

1. 为异步编排单独配置线程池

不要偷懒用公共池。建议按业务隔离线程池,例如:

  • 商品详情聚合池
  • 推荐系统聚合池
  • 报表导出池

这样做的好处:

  • 故障隔离更强
  • 容量规划更清晰
  • 监控更容易做

示例配置思路:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        corePoolSize,
        maxPoolSize,
        60,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(queueSize),
        new NamedThreadFactory("biz-async-"),
        new ThreadPoolExecutor.CallerRunsPolicy()
);

2. 区分强依赖与弱依赖

不是所有字段都值得“死等”。

可以大致这样分:

强依赖

  • 商品基础信息
  • 价格
  • 库存

这类失败时,可能应该:

  • 快速失败
  • 返回明确错误
  • 或者严格降级

弱依赖

  • 推荐
  • 营销标签
  • 扩展信息

这类失败时,适合:

  • 超时默认值
  • 异常吞掉后记录日志
  • 继续返回主结果

这一步其实比“会不会写 API”更重要,它决定了你的系统韧性。


3. 限制并发扇出数量

假设一个请求一口气并行发 20 个下游调用,请求量一大,很容易把整个系统放大成“流量倍增器”。

建议:

  • 能合并的请求尽量合并
  • 非必要字段延迟加载
  • 给下游接口做并发上限控制
  • 用缓存挡掉热点数据

4. 日志要带链路信息

异步任务最烦的一点就是:日志容易碎。

至少建议记录:

  • 请求 ID
  • 商品 ID / 用户 ID
  • 子任务名称
  • 开始时间、结束时间、耗时
  • 异常类型、根因
  • 是否走了降级

否则线上排查会非常痛苦。


5. 监控比代码更重要

如果系统已经在线上跑了,建议重点监控:

  • 线程池活跃线程数
  • 队列积压
  • 任务拒绝次数
  • 各子任务 RT / 超时率 / 异常率
  • 降级触发比例

很多“异步写法看着很优雅”的问题,最后都要靠监控来救命。


6. 避免在回调里做重阻塞操作

比如:

future.thenApply(result -> {
    // 这里又发起一个阻塞数据库查询
    return slowQuery(result);
});

如果这样写,表面上是“异步回调”,实际上可能把线程池又堵住了。

更合理的方式:

  • 纯计算转换用 thenApply
  • 新的异步 I/O 用 thenCompose + supplyAsync

一个更稳妥的编排模板

如果你想在项目里快速落地,我建议采用下面这种模式:

  1. 每个子任务独立超时
  2. 每个子任务独立异常兜底
  3. 最后统一 allOf
  4. 聚合时只做对象组装,不做复杂逻辑

示例模板:

CompletableFuture<A> fa = CompletableFuture
        .supplyAsync(() -> serviceA.query(id), executor)
        .orTimeout(200, TimeUnit.MILLISECONDS)
        .exceptionally(ex -> A.defaultValue());

CompletableFuture<B> fb = CompletableFuture
        .supplyAsync(() -> serviceB.query(id), executor)
        .completeOnTimeout(B.defaultValue(), 100, TimeUnit.MILLISECONDS)
        .exceptionally(ex -> B.defaultValue());

CompletableFuture<Result> resultFuture = CompletableFuture
        .allOf(fa, fb)
        .thenApply(v -> new Result(fa.join(), fb.join()));

Result result = resultFuture.join();

这套结构的优点是:

  • 代码可读性比较稳定
  • 异常边界清晰
  • 聚合逻辑简单
  • 适合逐步扩展

适合用 CompletableFuture 的场景边界

最后再讲一下边界,避免“学会了锤子,看啥都像钉子”。

适合

  • BFF/聚合层接口
  • 多个微服务结果拼装
  • 非阻塞要求没那么极致,但需要较好并发表现
  • 对部分失败可容忍的业务场景

不太适合

  • 超高并发、对线程利用率极端敏感的系统
  • 大量事件流处理
  • 需要结构化并发、取消传播非常严格的场景
  • 复杂响应式数据流

这类场景可能更适合:

  • Reactor / WebFlux
  • Kotlin 协程
  • 更完整的响应式方案

总结

如果你只记住三件事,我建议记这三条:

  1. 并行调用能显著降低聚合接口耗时
  2. 超时和异常要在子任务级别处理,而不是最后统一补锅
  3. 一定要自定义线程池,并做好监控和容量控制

CompletableFuture 真正的价值,不只是“异步”两个字,而是它让你可以把一个复杂的聚合流程拆成:

  • 独立任务
  • 明确依赖
  • 可控超时
  • 可观测失败
  • 最终可聚合输出

这套思路一旦建立起来,你再看商品页、首页卡片流、用户画像拼装、订单确认页这些场景,都会清晰很多。


附:一张实战决策图

flowchart TD
    A[需要聚合多个下游调用] --> B{任务是否相互独立?}
    B -- 是 --> C[使用 supplyAsync 并行发起]
    B -- 否 --> D[使用 thenCompose 串联依赖任务]

    C --> E{是否允许部分失败?}
    D --> E

    E -- 允许 --> F[每个子任务设置 fallback]
    E -- 不允许 --> G[关键任务异常快速失败]

    F --> H{是否允许超时降级?}
    G --> I[统一聚合并返回]

    H -- 是 --> J[completeOnTimeout]
    H -- 否 --> K[orTimeout + 异常处理]

    J --> I
    K --> I

希望这篇文章能帮你把 CompletableFuture 从“会用几个 API”,真正推进到“能写上线代码”。


分享到:

上一篇
《从源码到部署:基于开源项目 MinIO 搭建高可用对象存储服务的实战指南-251》
下一篇
《自动化测试中的测试数据工厂实践:提升接口与端到端用例稳定性的落地方案》