跳转到内容
123xiao | 无名键客

《微服务架构下的分布式事务实战:基于 Saga 模式实现订单与库存一致性》

字数: 0 阅读时长: 1 分钟

微服务架构下的分布式事务实战:基于 Saga 模式实现订单与库存一致性

在单体应用里,订单表和库存表往往在一个数据库里,一个本地事务就能保证“一起成功、一起失败”。但一旦拆成微服务,事情就没那么简单了:订单服务有自己的库,库存服务也有自己的库,中间隔着网络、消息队列、重试机制,任何一个环节出问题,都可能让“订单已创建但库存没扣成”或者“库存扣了但订单没成功”这种问题浮出水面。

这篇文章我不打算只讲概念,而是带你从一个非常典型的场景出发:用户下单时,订单服务与库存服务如何在微服务架构下保持最终一致性。核心方案是 Saga 模式,并且会给出一套可运行的示例代码,帮助你把设计和实现对上号。


背景与问题

假设我们有两个服务:

  • 订单服务 Order Service
  • 库存服务 Inventory Service

用户下单的流程看起来很直观:

  1. 创建订单
  2. 扣减库存
  3. 扣减成功后把订单标记为已确认
  4. 扣减失败则取消订单

如果这两个动作不在一个数据库里,就无法直接使用本地事务。很多团队刚开始会尝试以下几种做法:

方案一:同步调用 + try/catch

订单服务先写订单,再调用库存服务扣库存。失败就把订单状态改为取消。

这个方案“看上去能跑”,但问题很多:

  • 订单已落库,库存接口超时,调用方不知道库存到底扣没扣
  • 重试可能造成重复扣减
  • 中间任一步骤宕机,会留下中间状态
  • 并发高时,排查起来非常痛苦

方案二:两阶段提交(2PC)

理论上能保证强一致,但在微服务场景里通常不太现实:

  • 协调器复杂
  • 锁持有时间长,吞吐下降明显
  • 服务异构、多数据库、多中间件时实现困难
  • 对可用性不友好

为什么 Saga 更适合

Saga 的核心思路是:

把一个大事务拆成多个本地事务,每个本地事务提交后,通过后续步骤继续推进;如果中途失败,则执行已完成步骤对应的补偿操作。

也就是说,我们不追求强一致的“同时成功”,而是接受短时间的不一致,但最终让系统收敛到正确状态。


方案对比与取舍分析

在订单与库存这个场景里,常见方案可以做一个横向比较:

方案一致性实现复杂度性能适用场景
本地事务强一致单体或单库
2PC/XA强一致低~中少量核心强一致场景
TCC强一致偏最终很高资金、账户等关键业务
Saga最终一致订单、库存、物流等链路

为什么这题更推荐 Saga

订单和库存通常具备这些特点:

  • 允许短时间中间状态存在
  • 业务天然可以补偿,例如“取消订单”“释放库存”
  • 服务间解耦要求高
  • 对吞吐和可用性要求高于绝对强一致

所以,从工程落地角度看,Saga 往往是更平衡的方案。


核心原理

Saga 有两种主要实现方式:

  • 编排式(Orchestration):由一个 Saga 协调器统一驱动流程
  • 协同式(Choreography):服务通过事件自行协作

这篇文章采用更容易落地和理解的 编排式 Saga。原因很简单:订单创建、库存冻结/扣减、订单确认/取消,本身就是一个清晰的业务流程,用一个协调器来串起来,排查和补偿逻辑更集中。

一次下单 Saga 的基本流程

flowchart TD
    A[用户提交订单] --> B[订单服务 创建PENDING订单]
    B --> C[Saga协调器 请求库存预留]
    C --> D{库存预留成功?}
    D -- 是 --> E[订单服务 确认订单 CONFIRMED]
    D -- 否 --> F[订单服务 取消订单 CANCELLED]
    E --> G[流程结束]
    F --> G

这里我故意用了“库存预留”而不是直接“扣减库存”。这是一个很重要的实践点:

  • 预留:先占住库存,防止超卖
  • 确认:订单最终成功后,真正消耗
  • 释放:订单失败或超时后,把预留库存归还

相比直接扣减,预留机制更适合 Saga,因为它天然支持补偿。

状态机视角

订单和库存都应该是状态驱动,而不是“接口调一下就算完成”。

stateDiagram-v2
    [*] --> PENDING
    PENDING --> CONFIRMED: 库存预留成功
    PENDING --> CANCELLED: 库存预留失败/超时
    CONFIRMED --> [*]
    CANCELLED --> [*]

库存侧也可以有类似状态:

  • available:可用库存
  • reserved:已预留库存
  • consumed:已消耗库存

一个关键设计:补偿不是回滚

很多人第一次接触 Saga,会把补偿理解成“分布式回滚”。实际上不是。

补偿动作是新的业务操作,例如:

  • 创建订单的补偿:取消订单
  • 预留库存的补偿:释放库存

它不要求把数据库精确恢复到某个历史快照,而是让业务状态回归正确。

幂等性必须内建

Saga 最大的工程难点之一不是流程本身,而是:

  • 网络超时
  • 消息重复投递
  • 调用方重试
  • 协调器重放

所以每个参与方都必须做到:

  • 相同 sagaId 的请求重复执行不出错
  • 补偿操作重复执行也安全
  • 状态迁移不可逆、不可乱跳

架构设计与数据模型

下面给一个简化但实战够用的设计。

服务职责划分

  • Order Service
    • 创建订单
    • 确认订单
    • 取消订单
  • Inventory Service
    • 预留库存
    • 释放库存
    • 确认消耗库存
  • Saga Coordinator
    • 驱动流程
    • 记录 Saga 执行状态
    • 触发补偿

时序图

sequenceDiagram
    participant U as User
    participant S as Saga Coordinator
    participant O as Order Service
    participant I as Inventory Service

    U->>S: 发起下单(sagaId, productId, qty)
    S->>O: createOrder(PENDING)
    O-->>S: orderCreated(orderId)

    S->>I: reserveInventory(sagaId, productId, qty)
    alt 预留成功
        I-->>S: reserved
        S->>O: confirmOrder(orderId)
        O-->>S: confirmed
    else 预留失败
        I-->>S: failed
        S->>O: cancelOrder(orderId)
        O-->>S: cancelled
    end

    S-->>U: 返回最终结果

建议的数据表

订单表

CREATE TABLE orders (
  order_id VARCHAR(64) PRIMARY KEY,
  saga_id VARCHAR(64) NOT NULL UNIQUE,
  product_id VARCHAR(64) NOT NULL,
  quantity INT NOT NULL,
  status VARCHAR(32) NOT NULL,
  created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

库存表

CREATE TABLE inventory (
  product_id VARCHAR(64) PRIMARY KEY,
  available INT NOT NULL,
  reserved INT NOT NULL DEFAULT 0
);

库存预留记录表

CREATE TABLE inventory_reservations (
  saga_id VARCHAR(64) PRIMARY KEY,
  product_id VARCHAR(64) NOT NULL,
  quantity INT NOT NULL,
  status VARCHAR(32) NOT NULL,
  created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

这个 inventory_reservations 表非常关键,它是库存服务实现幂等和补偿的基础。


实战代码(可运行)

下面我用 Python + Flask 做一个最小可运行示例。它不是生产级框架,但足够把 Saga 的关键动作演示清楚。

你可以把它理解成三个模块:

  • 订单服务
  • 库存服务
  • Saga 协调器

为了方便运行,示例放在一个文件里,用内存数据模拟数据库。重点不是框架,而是状态流转和补偿逻辑。

完整示例代码

from flask import Flask, request, jsonify
from threading import Lock
import uuid

app = Flask(__name__)

# 模拟数据库
orders = {}
inventory = {
    "sku-1": {"available": 10, "reserved": 0}
}
reservations = {}
saga_logs = {}

db_lock = Lock()


def generate_id():
    return str(uuid.uuid4())


# =========================
# Order Service
# =========================
def create_order(saga_id, product_id, quantity):
    with db_lock:
        if saga_id in [o["saga_id"] for o in orders.values()]:
            # 幂等:相同 saga_id 重复请求,直接返回已有订单
            for order_id, order in orders.items():
                if order["saga_id"] == saga_id:
                    return {"order_id": order_id, "status": order["status"]}

        order_id = generate_id()
        orders[order_id] = {
            "order_id": order_id,
            "saga_id": saga_id,
            "product_id": product_id,
            "quantity": quantity,
            "status": "PENDING"
        }
        return {"order_id": order_id, "status": "PENDING"}


def confirm_order(order_id):
    with db_lock:
        order = orders.get(order_id)
        if not order:
            raise ValueError("order not found")

        if order["status"] == "CONFIRMED":
            return {"order_id": order_id, "status": "CONFIRMED"}

        if order["status"] == "CANCELLED":
            raise ValueError("cannot confirm cancelled order")

        order["status"] = "CONFIRMED"
        return {"order_id": order_id, "status": "CONFIRMED"}


def cancel_order(order_id):
    with db_lock:
        order = orders.get(order_id)
        if not order:
            raise ValueError("order not found")

        if order["status"] == "CANCELLED":
            return {"order_id": order_id, "status": "CANCELLED"}

        if order["status"] == "CONFIRMED":
            raise ValueError("cannot cancel confirmed order")

        order["status"] = "CANCELLED"
        return {"order_id": order_id, "status": "CANCELLED"}


# =========================
# Inventory Service
# =========================
def reserve_inventory(saga_id, product_id, quantity):
    with db_lock:
        # 幂等:重复预留请求直接返回历史结果
        if saga_id in reservations:
            return {
                "saga_id": saga_id,
                "status": reservations[saga_id]["status"]
            }

        item = inventory.get(product_id)
        if not item:
            reservations[saga_id] = {
                "product_id": product_id,
                "quantity": quantity,
                "status": "FAILED"
            }
            return {"saga_id": saga_id, "status": "FAILED", "reason": "product not found"}

        if item["available"] < quantity:
            reservations[saga_id] = {
                "product_id": product_id,
                "quantity": quantity,
                "status": "FAILED"
            }
            return {"saga_id": saga_id, "status": "FAILED", "reason": "insufficient stock"}

        item["available"] -= quantity
        item["reserved"] += quantity
        reservations[saga_id] = {
            "product_id": product_id,
            "quantity": quantity,
            "status": "RESERVED"
        }
        return {"saga_id": saga_id, "status": "RESERVED"}


def release_inventory(saga_id):
    with db_lock:
        reservation = reservations.get(saga_id)
        if not reservation:
            return {"saga_id": saga_id, "status": "NOT_FOUND"}

        if reservation["status"] == "RELEASED":
            return {"saga_id": saga_id, "status": "RELEASED"}

        if reservation["status"] == "FAILED":
            return {"saga_id": saga_id, "status": "FAILED"}

        if reservation["status"] == "CONSUMED":
            raise ValueError("cannot release consumed inventory")

        product_id = reservation["product_id"]
        quantity = reservation["quantity"]
        item = inventory[product_id]
        item["available"] += quantity
        item["reserved"] -= quantity
        reservation["status"] = "RELEASED"
        return {"saga_id": saga_id, "status": "RELEASED"}


def commit_inventory(saga_id):
    with db_lock:
        reservation = reservations.get(saga_id)
        if not reservation:
            raise ValueError("reservation not found")

        if reservation["status"] == "CONSUMED":
            return {"saga_id": saga_id, "status": "CONSUMED"}

        if reservation["status"] != "RESERVED":
            raise ValueError(f"cannot commit reservation in status {reservation['status']}")

        product_id = reservation["product_id"]
        quantity = reservation["quantity"]
        item = inventory[product_id]
        item["reserved"] -= quantity
        reservation["status"] = "CONSUMED"
        return {"saga_id": saga_id, "status": "CONSUMED"}


# =========================
# Saga Coordinator
# =========================
def execute_order_saga(product_id, quantity):
    saga_id = generate_id()
    saga_logs[saga_id] = {"status": "STARTED"}

    try:
        order_result = create_order(saga_id, product_id, quantity)
        order_id = order_result["order_id"]
        saga_logs[saga_id]["order_id"] = order_id
        saga_logs[saga_id]["status"] = "ORDER_CREATED"

        reserve_result = reserve_inventory(saga_id, product_id, quantity)
        if reserve_result["status"] != "RESERVED":
            cancel_order(order_id)
            saga_logs[saga_id]["status"] = "CANCELLED"
            return {
                "saga_id": saga_id,
                "order_id": order_id,
                "result": "FAILED",
                "reason": reserve_result.get("reason", "reserve failed")
            }

        saga_logs[saga_id]["status"] = "INVENTORY_RESERVED"

        confirm_order(order_id)
        commit_inventory(saga_id)
        saga_logs[saga_id]["status"] = "COMPLETED"

        return {
            "saga_id": saga_id,
            "order_id": order_id,
            "result": "SUCCESS"
        }

    except Exception as e:
        # 补偿逻辑
        order_id = saga_logs[saga_id].get("order_id")
        try:
            release_inventory(saga_id)
        except Exception:
            pass

        if order_id:
            try:
                cancel_order(order_id)
            except Exception:
                pass

        saga_logs[saga_id]["status"] = "COMPENSATED"
        return {
            "saga_id": saga_id,
            "result": "FAILED",
            "reason": str(e)
        }


# =========================
# HTTP API
# =========================
@app.route("/checkout", methods=["POST"])
def checkout():
    data = request.json
    product_id = data["product_id"]
    quantity = int(data["quantity"])
    result = execute_order_saga(product_id, quantity)
    return jsonify(result)


@app.route("/debug/orders", methods=["GET"])
def debug_orders():
    return jsonify(orders)


@app.route("/debug/inventory", methods=["GET"])
def debug_inventory():
    return jsonify(inventory)


@app.route("/debug/reservations", methods=["GET"])
def debug_reservations():
    return jsonify(reservations)


@app.route("/debug/sagas", methods=["GET"])
def debug_sagas():
    return jsonify(saga_logs)


if __name__ == "__main__":
    app.run(debug=True, port=5000)

如何运行这个示例

安装依赖

pip install flask

启动服务

python app.py

发起一次成功下单

curl -X POST http://127.0.0.1:5000/checkout \
  -H "Content-Type: application/json" \
  -d '{"product_id":"sku-1","quantity":3}'

预期返回:

{
  "order_id": "xxxx",
  "result": "SUCCESS",
  "saga_id": "xxxx"
}

查看订单和库存状态

curl http://127.0.0.1:5000/debug/orders
curl http://127.0.0.1:5000/debug/inventory

触发库存不足场景

curl -X POST http://127.0.0.1:5000/checkout \
  -H "Content-Type: application/json" \
  -d '{"product_id":"sku-1","quantity":100}'

这时会发生:

  • 订单先创建为 PENDING
  • 库存预留失败
  • 订单被补偿为 CANCELLED

这就是 Saga 的典型行为:不追求一个大事务原子提交,而是通过状态推进和补偿实现最终一致


从示例走向生产:真正要补上的东西

上面的代码能跑,但离生产可用还差不少。下面这些是你在真实项目里几乎一定要考虑的。

1. 本地事务 + 事件落库要绑定

在真实系统中,订单服务不能只是“改状态”,还要把“订单已创建”“订单已取消”之类的事件可靠发出去。常见做法是 Outbox Pattern

  • 在同一个本地事务里:
    • 更新业务表
    • 写一条待发送事件到 outbox 表
  • 再由异步任务把 outbox 表里的事件投递到 MQ

这样可以避免“数据库提交了,但消息没发出去”的问题。

2. 状态机要显式定义

不要在代码里随便 if else 改状态,建议明确约束:

  • PENDING -> CONFIRMED
  • PENDING -> CANCELLED
  • 禁止 CANCELLED -> CONFIRMED
  • 禁止 CONFIRMED -> CANCELLED

库存预留记录也是同样道理。

3. 每个动作都要幂等

至少需要一个全局唯一标识,例如:

  • sagaId
  • requestId
  • reservationId

并在服务侧落库去重。

4. 超时要有兜底补偿

有一种很常见的故障:库存服务其实已经预留成功,但响应丢了,协调器等超时后准备重试。这时如果没有幂等,就可能重复预留;如果没有定时回查,就可能一直卡在中间状态。

生产上建议:

  • Saga 状态表记录每一步开始时间
  • 定时任务扫描超时实例
  • 根据业务状态回查后继续推进或补偿

常见坑与排查

这一节我会重点讲实战里最容易翻车的地方。我自己踩过的坑里,前两个最常见。

坑一:订单取消了,但库存没释放

表现:

  • 订单状态是 CANCELLED
  • 库存的 reserved 没减回去
  • 一段时间后出现“库存越来越少,但实际卖单不多”

根因通常有几个:

  • 补偿逻辑没执行
  • 补偿执行失败后没有重试
  • 补偿动作不是幂等,第一次失败后后续不敢重试
  • Saga 状态表缺失,系统不知道该补偿谁

排查路径:

  1. saga_logs 是否进入 COMPENSATED
  2. 查库存预留记录状态是否仍为 RESERVED
  3. 查补偿任务日志是否有异常堆栈
  4. 查 MQ 消息是否积压或消费失败

止血方案:

  • 先做离线对账:扫描 CANCELLED 订单关联的库存预留记录
  • 对状态仍为 RESERVED 的记录执行补偿释放
  • 补偿接口必须支持重复执行

坑二:重复扣库存

表现:

  • 一笔订单只下了一次,库存却少了两次
  • 日志里有超时重试

根因:

  • 调用重试没有幂等 key
  • 消息重复消费没有去重
  • 预留和消费动作混在一个接口里,重入难处理

排查建议:

  • 检查是否以 sagaIdreservationId 做唯一键
  • 检查库存服务是否在数据库层有唯一约束
  • 检查消费日志中同一请求是否被处理多次

坑三:补偿顺序错误

如果一个 Saga 涉及多个下游服务,补偿顺序必须与执行顺序相反。比如:

  1. 创建订单
  2. 预留库存
  3. 创建物流单

失败时应该:

  1. 取消物流单
  2. 释放库存
  3. 取消订单

而不是想到哪补到哪。否则会把状态越补越乱。

坑四:把“失败”与“未知”混为一谈

比如库存服务调用超时,很多系统直接当失败处理。但超时只说明你没拿到结果,不代表对方没成功。

这时正确做法通常是:

  • 先把 Saga 标成 UNKNOWN
  • 通过查询接口或事件回查真实状态
  • 再决定继续、重试还是补偿

这个细节非常重要,不然很容易出现“对方已成功,我方又执行补偿”的反向事故。


安全/性能最佳实践

Saga 讨论多了,大家容易把注意力都放在一致性上,但安全和性能同样关键。

安全最佳实践

1. 所有内部接口也要做鉴权

不要因为是内网调用就裸奔。至少应该有:

  • 服务间认证
  • 请求签名或 mTLS
  • 基于角色的接口访问控制

特别是像“释放库存”“取消订单”这种补偿接口,如果被误调用,后果很直接。

2. 防重放

对于 Saga 请求,建议携带:

  • requestId
  • 时间戳
  • 签名

并限制请求有效时间窗口,避免旧请求被重复利用。

3. 审计日志要完整

至少记录:

  • sagaId
  • orderId
  • 参与服务
  • 请求参数摘要
  • 状态变更前后值
  • 操作时间
  • 错误原因

这样出了事故,不至于只能靠猜。

性能最佳实践

1. 用预留而不是长事务锁

不要在跨服务调用期间持有数据库锁,这会直接把吞吐拖垮。Saga 的本质就是让每个服务只做自己的短事务。

2. 热点库存要特别处理

如果某个商品是秒杀热点,单纯依赖数据库行更新可能会成为瓶颈。可以结合:

  • Redis 预扣
  • 异步落库
  • 分段库存
  • 限流与排队

但注意:引入缓存后,一致性复杂度会更高,必须有对账机制。

3. 补偿任务要分级限速

系统故障恢复时,可能会堆积大量待补偿 Saga。如果你一口气全量重试,下游服务会被打崩。建议:

  • 分批扫描
  • 指数退避重试
  • 按业务优先级处理
  • 设置熔断和限流

4. 容量估算别只看成功链路

很多团队压测时只测“正常下单成功”,但真正压垮系统的往往是:

  • 超时重试
  • 重复消费
  • 补偿风暴
  • MQ 堆积后的追赶流量

粗略估算时,至少把以下流量算进去:

  • 正常请求量
  • 重试放大量
  • 补偿请求量
  • 对账回查量

比如正常每秒 1000 单,如果超时重试系数是 1.3,补偿比例高峰期 10%,那下游承压可能不是 1000 QPS,而是:

1000 + 300 + 100 = 1400 QPS

而这还没算回查任务。


一个更贴近生产的落地建议

如果你正准备在团队里落地 Saga,我建议按下面这个顺序推进,而不是一开始就把架构搞得特别花哨。

第一步:先把状态机和补偿动作定义清楚

文档里明确:

  • 哪些是正向动作
  • 哪些是补偿动作
  • 每个动作的输入输出
  • 允许的状态迁移
  • 失败后的处理策略

第二步:实现幂等和唯一约束

在数据库层就做掉,而不是只靠代码记忆。

例如:

ALTER TABLE orders ADD CONSTRAINT uk_orders_saga_id UNIQUE (saga_id);
ALTER TABLE inventory_reservations ADD CONSTRAINT uk_reservation_saga_id UNIQUE (saga_id);

第三步:补齐可观测性

至少要有:

  • sagaId 全链路日志
  • 指标监控:成功率、补偿率、超时率
  • 告警:卡住的 Saga 数量、积压时长

第四步:做对账系统

别觉得“有 Saga 就不需要对账”。现实里,任何复杂系统都需要定期兜底校验。

对账的典型规则包括:

  • CONFIRMED 订单必须对应 CONSUMED 库存预留
  • CANCELLED 订单不能对应 RESERVED 库存预留
  • 超时未完成的 Saga 必须进入人工或自动回查队列

总结

订单与库存一致性,是微服务分布式事务里最经典也最容易出事故的一题。真正实战时,关键不在于背出 Saga 的定义,而在于把下面几件事做到位:

  1. 接受最终一致,而不是执着强一致
  2. 把大事务拆成多个本地事务
  3. 为每一步准备可重复执行的补偿动作
  4. 通过幂等、状态机、唯一约束来抵御重试和重复消息
  5. 用超时回查、对账和监控补上工程上的最后一公里

如果你问我一个最实用的建议,我会说:

在订单库存场景里,优先采用“订单 PENDING + 库存预留 + 成功确认 / 失败释放”的 Saga 结构,不要一上来就做跨服务强一致。

它足够稳,也足够贴近业务现实。

当然,Saga 也有边界条件。像余额扣款、证券交易这类对强一致要求极高的场景,Saga 往往不是首选,这时需要评估 TCC 或更强的一致性方案。技术方案从来不是越重越好,而是要和业务容错边界匹配。

如果你已经在做微服务拆分,订单和库存又不在一个库里,那么这套 Saga 思路,基本就是值得优先落地的一条主线。


分享到:

上一篇
《Spring Boot 中基于 Spring Cache + Redis 的多级缓存实战:设计、穿透击穿防护与一致性方案》
下一篇
《微服务架构下的分布式事务实战:基于 Saga 模式的设计、实现与故障补偿》