跳转到内容
123xiao | 无名键客

《微服务架构中分布式事务的实战设计:基于 Saga 模式实现订单与库存一致性》

字数: 0 阅读时长: 1 分钟

背景与问题

在单体应用里,订单表和库存表通常在同一个数据库里,一个本地事务就能解决“创建订单 + 扣减库存”这件事。
但一旦拆成微服务:

  • 订单服务有自己的库
  • 库存服务有自己的库
  • 可能还有支付服务、优惠券服务、物流服务

这时最典型的问题就来了:

订单已经创建成功了,但库存没扣成功,怎么办?

或者反过来:

库存已经扣了,但订单因为某个异常没落库,库存是不是被“凭空吃掉”了?

很多团队第一反应会想到 2PC/XA。理论上它能提供强一致性,但在微服务环境里,往往会遇到几个现实问题:

  • 数据库、中间件、驱动支持不统一
  • 性能开销大,锁持有时间长
  • 协调器成为系统复杂度和可用性风险点
  • 云原生环境下很难优雅扩展

所以在业务系统里,最终一致性通常比“全局强一致”更实际。
而在最终一致性的方案里,Saga 模式是最常见也最接地气的一种。

这篇文章我会从“订单与库存一致性”这个高频场景切入,讲清楚:

  1. Saga 到底解决了什么问题
  2. 编排式 Saga 如何落地
  3. 如何写出能跑起来的示例代码
  4. 线上最容易踩哪些坑,怎么排查
  5. 怎么在安全性和性能之间做平衡

先明确业务目标与边界

在动手设计前,先把目标说清楚。
对于“下单 + 扣库存”这类链路,很多团队其实并不需要绝对实时强一致,而是需要下面几件事:

  • 订单不能无缘无故成功
  • 库存不能被重复扣减
  • 异常时能补偿
  • 系统重试后结果仍然正确
  • 高并发下不超卖

这意味着我们设计的不是“永不失败”的系统,而是:

一个允许局部失败,但能恢复到正确状态的系统。

这就是 Saga 的核心思路。


方案对比与取舍分析

在进入 Saga 之前,先横向看一下常见方案。

方案一致性性能实现复杂度适用场景
本地事务强一致单体、单库
2PC / XA强一致低到中少量核心强一致场景
TCC很强很高金融、账户类强约束业务
Saga最终一致订单、库存、履约类业务
可靠消息最终一致最终一致异步驱动链路

对于订单与库存,一般推荐:

  • 库存预留 + Saga 补偿
  • 或者 本地事务 + Outbox + 事件驱动

本文聚焦第一种:基于 Saga 模式实现订单与库存一致性


核心原理

什么是 Saga

Saga 可以理解为:

把一个跨服务的大事务,拆成多个本地事务;如果某一步失败,就按相反顺序执行补偿动作。

比如下单流程:

  1. 创建订单,状态为 PENDING
  2. 预留库存
  3. 成功则把订单改为 CONFIRMED
  4. 如果库存预留失败,则把订单取消

注意这里我特意说的是“预留库存”,不是直接“永久扣减库存”。
这是实战里的一个关键点:

  • 预留:先冻结可用库存
  • 确认:支付成功后正式扣减
  • 释放:失败或超时后解除冻结

这样比“先直接扣掉库存”更适合复杂链路,因为补偿更自然。

Saga 的两种实现方式

1. Choreography(事件编排/协同)

每个服务监听事件并继续下游动作,没有中心调度者。

优点:

  • 去中心化
  • 服务自治强

缺点:

  • 流程一复杂,事件会越来越绕
  • 排查问题时要跨多个服务追踪

2. Orchestration(中心编排)

由一个 Saga Orchestrator 统一驱动每一步。

优点:

  • 流程清晰
  • 更适合订单这类长链路业务
  • 失败补偿路径明确

缺点:

  • 需要引入流程编排组件或状态机

这篇文章采用 编排式 Saga,因为它更适合中级工程师在业务系统里落地,也更容易治理。


一个可落地的业务建模

我们定义三个核心状态:

订单状态

  • PENDING:订单已创建,等待库存
  • CONFIRMED:库存预留成功,订单确认
  • CANCELLED:库存失败或超时,订单取消

库存状态

  • available:可用库存
  • reserved:已预留库存

Saga 状态

  • STARTED
  • ORDER_CREATED
  • INVENTORY_RESERVED
  • COMPLETED
  • COMPENSATING
  • FAILED
  • COMPENSATED

flowchart TD
    A[用户提交订单] --> B[Orchestrator 创建 Saga]
    B --> C[订单服务: 创建 PENDING 订单]
    C --> D[库存服务: 预留库存]
    D -->|成功| E[订单服务: 更新为 CONFIRMED]
    E --> F[Saga 完成]

    D -->|失败| G[订单服务: 取消订单]
    G --> H[Saga 补偿完成]

时序图:成功与失败路径

sequenceDiagram
    participant U as User
    participant O as Saga Orchestrator
    participant OS as Order Service
    participant IS as Inventory Service

    U->>O: submitOrder(productId, qty)
    O->>OS: createOrder()
    OS-->>O: orderId, PENDING

    O->>IS: reserveInventory(productId, qty, sagaId)
    alt reserve success
        IS-->>O: reserved
        O->>OS: confirmOrder(orderId)
        OS-->>O: CONFIRMED
        O-->>U: success
    else reserve failed
        IS-->>O: insufficient stock
        O->>OS: cancelOrder(orderId)
        OS-->>O: CANCELLED
        O-->>U: failed
    end

实战设计:为什么“库存预留”比“直接扣减”更稳

很多项目一开始会这样设计:

  1. 创建订单
  2. 直接扣减库存
  3. 后续失败再加回来

这个方案不是不能用,但一到复杂业务就容易痛:

  • 如果失败恢复慢,库存短时间会被错误占用
  • 如果补偿失败,会出现“账不平”
  • 若支付、优惠券、履约再串进来,补偿路径会越来越复杂

更稳妥的方式通常是:

  1. 创建订单(PENDING
  2. 预留库存(available -> reserved
  3. 支付成功后确认库存(reserved -> consumed,本文简化为确认订单)
  4. 若失败或超时,释放库存(reserved -> available

这样设计的本质是把“业务承诺”和“资源占用”拆开,减少补偿成本。


数据模型设计

下面给一个简化但可运行的 SQLite 版本数据模型,便于你本地直接实验。

表设计要点

  • orders:订单主表
  • inventory:库存主表
  • inventory_reservation:库存预留记录,保证幂等和可追踪
  • saga_log:Saga 状态记录,用于恢复与排查

classDiagram
    class Order {
        +id: TEXT
        +product_id: TEXT
        +quantity: INTEGER
        +status: TEXT
        +created_at: TEXT
    }

    class Inventory {
        +product_id: TEXT
        +available: INTEGER
        +reserved: INTEGER
        +updated_at: TEXT
    }

    class InventoryReservation {
        +id: TEXT
        +saga_id: TEXT
        +order_id: TEXT
        +product_id: TEXT
        +quantity: INTEGER
        +status: TEXT
        +created_at: TEXT
    }

    class SagaLog {
        +saga_id: TEXT
        +state: TEXT
        +last_error: TEXT
        +updated_at: TEXT
    }

    Order --> InventoryReservation
    Inventory --> InventoryReservation
    SagaLog --> Order

实战代码(可运行)

下面我用 Python + SQLite + Flask 写一个最小可运行示例。
它不是生产级框架,但非常适合把 Saga 的骨架跑通。

你可以把下面代码保存为 app.py,安装 Flask 后直接运行。

运行方式

pip install flask
python app.py

启动后访问:

  • 初始化数据:POST /init
  • 下单:POST /orders
  • 查看库存:GET /inventory/p1
  • 查看订单:GET /orders/<order_id>
  • 查看 Saga:GET /saga/<saga_id>

完整示例代码

from flask import Flask, request, jsonify
import sqlite3
import uuid
from datetime import datetime

app = Flask(__name__)
DB_FILE = "saga_demo.db"


def now():
    return datetime.utcnow().isoformat()


def get_conn():
    conn = sqlite3.connect(DB_FILE)
    conn.row_factory = sqlite3.Row
    return conn


def init_db():
    conn = get_conn()
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS orders (
        id TEXT PRIMARY KEY,
        product_id TEXT NOT NULL,
        quantity INTEGER NOT NULL,
        status TEXT NOT NULL,
        created_at TEXT NOT NULL
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS inventory (
        product_id TEXT PRIMARY KEY,
        available INTEGER NOT NULL,
        reserved INTEGER NOT NULL,
        updated_at TEXT NOT NULL
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS inventory_reservation (
        id TEXT PRIMARY KEY,
        saga_id TEXT NOT NULL,
        order_id TEXT NOT NULL,
        product_id TEXT NOT NULL,
        quantity INTEGER NOT NULL,
        status TEXT NOT NULL,
        created_at TEXT NOT NULL,
        UNIQUE(saga_id, product_id)
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS saga_log (
        saga_id TEXT PRIMARY KEY,
        state TEXT NOT NULL,
        last_error TEXT,
        updated_at TEXT NOT NULL
    )
    """)

    conn.commit()
    conn.close()


def set_saga_state(saga_id, state, last_error=None):
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("""
    INSERT INTO saga_log (saga_id, state, last_error, updated_at)
    VALUES (?, ?, ?, ?)
    ON CONFLICT(saga_id) DO UPDATE SET
        state=excluded.state,
        last_error=excluded.last_error,
        updated_at=excluded.updated_at
    """, (saga_id, state, last_error, now()))
    conn.commit()
    conn.close()


# ---------- Order Service ----------
def create_order(order_id, product_id, quantity):
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("""
    INSERT INTO orders (id, product_id, quantity, status, created_at)
    VALUES (?, ?, ?, ?, ?)
    """, (order_id, product_id, quantity, "PENDING", now()))
    conn.commit()
    conn.close()


def confirm_order(order_id):
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("UPDATE orders SET status=? WHERE id=?", ("CONFIRMED", order_id))
    conn.commit()
    conn.close()


def cancel_order(order_id):
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("UPDATE orders SET status=? WHERE id=?", ("CANCELLED", order_id))
    conn.commit()
    conn.close()


def get_order(order_id):
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("SELECT * FROM orders WHERE id=?", (order_id,))
    row = cur.fetchone()
    conn.close()
    return dict(row) if row else None


# ---------- Inventory Service ----------
def reserve_inventory(saga_id, order_id, product_id, quantity):
    conn = get_conn()
    cur = conn.cursor()

    try:
        cur.execute("BEGIN")

        # 幂等检查:相同 saga_id + product_id 已预留则直接返回成功
        cur.execute("""
        SELECT * FROM inventory_reservation
        WHERE saga_id=? AND product_id=?
        """, (saga_id, product_id))
        existing = cur.fetchone()
        if existing:
            if existing["status"] == "RESERVED":
                conn.commit()
                return True, "already reserved"
            elif existing["status"] == "RELEASED":
                conn.rollback()
                return False, "already released"

        cur.execute("SELECT * FROM inventory WHERE product_id=?", (product_id,))
        inv = cur.fetchone()
        if not inv:
            conn.rollback()
            return False, "inventory not found"

        if inv["available"] < quantity:
            conn.rollback()
            return False, "insufficient stock"

        cur.execute("""
        UPDATE inventory
        SET available = available - ?,
            reserved = reserved + ?,
            updated_at = ?
        WHERE product_id = ? AND available >= ?
        """, (quantity, quantity, now(), product_id, quantity))

        if cur.rowcount == 0:
            conn.rollback()
            return False, "concurrent stock update failed"

        reservation_id = str(uuid.uuid4())
        cur.execute("""
        INSERT INTO inventory_reservation
        (id, saga_id, order_id, product_id, quantity, status, created_at)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (reservation_id, saga_id, order_id, product_id, quantity, "RESERVED", now()))

        conn.commit()
        return True, "reserved"
    except Exception as e:
        conn.rollback()
        return False, str(e)
    finally:
        conn.close()


def release_inventory(saga_id, product_id):
    conn = get_conn()
    cur = conn.cursor()

    try:
        cur.execute("BEGIN")

        cur.execute("""
        SELECT * FROM inventory_reservation
        WHERE saga_id=? AND product_id=?
        """, (saga_id, product_id))
        res = cur.fetchone()

        if not res:
            conn.commit()
            return True, "nothing to release"

        if res["status"] == "RELEASED":
            conn.commit()
            return True, "already released"

        qty = res["quantity"]

        cur.execute("""
        UPDATE inventory
        SET available = available + ?,
            reserved = reserved - ?,
            updated_at = ?
        WHERE product_id = ?
        """, (qty, qty, now(), product_id))

        cur.execute("""
        UPDATE inventory_reservation
        SET status = ?
        WHERE id = ?
        """, ("RELEASED", res["id"]))

        conn.commit()
        return True, "released"
    except Exception as e:
        conn.rollback()
        return False, str(e)
    finally:
        conn.close()


def get_inventory(product_id):
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("SELECT * FROM inventory WHERE product_id=?", (product_id,))
    row = cur.fetchone()
    conn.close()
    return dict(row) if row else None


# ---------- Saga Orchestrator ----------
def create_order_saga(product_id, quantity):
    saga_id = str(uuid.uuid4())
    order_id = str(uuid.uuid4())

    try:
        set_saga_state(saga_id, "STARTED")

        create_order(order_id, product_id, quantity)
        set_saga_state(saga_id, "ORDER_CREATED")

        ok, msg = reserve_inventory(saga_id, order_id, product_id, quantity)
        if not ok:
            set_saga_state(saga_id, "COMPENSATING", msg)
            cancel_order(order_id)
            set_saga_state(saga_id, "COMPENSATED", msg)
            return {
                "success": False,
                "saga_id": saga_id,
                "order_id": order_id,
                "message": f"reserve inventory failed: {msg}"
            }

        set_saga_state(saga_id, "INVENTORY_RESERVED")

        confirm_order(order_id)
        set_saga_state(saga_id, "COMPLETED")

        return {
            "success": True,
            "saga_id": saga_id,
            "order_id": order_id,
            "message": "order created and inventory reserved"
        }

    except Exception as e:
        set_saga_state(saga_id, "FAILED", str(e))
        # 尝试补偿
        release_inventory(saga_id, product_id)
        cancel_order(order_id)
        set_saga_state(saga_id, "COMPENSATED", str(e))

        return {
            "success": False,
            "saga_id": saga_id,
            "order_id": order_id,
            "message": f"saga failed: {str(e)}"
        }


# ---------- HTTP APIs ----------
@app.route("/init", methods=["POST"])
def init_data():
    init_db()
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("DELETE FROM inventory")
    cur.execute("DELETE FROM orders")
    cur.execute("DELETE FROM inventory_reservation")
    cur.execute("DELETE FROM saga_log")
    cur.execute("""
    INSERT INTO inventory (product_id, available, reserved, updated_at)
    VALUES (?, ?, ?, ?)
    """, ("p1", 10, 0, now()))
    conn.commit()
    conn.close()
    return jsonify({"message": "initialized", "inventory": get_inventory("p1")})


@app.route("/orders", methods=["POST"])
def create_order_api():
    data = request.json or {}
    product_id = data.get("product_id", "p1")
    quantity = int(data.get("quantity", 1))
    result = create_order_saga(product_id, quantity)
    return jsonify(result)


@app.route("/orders/<order_id>", methods=["GET"])
def get_order_api(order_id):
    order = get_order(order_id)
    if not order:
        return jsonify({"message": "not found"}), 404
    return jsonify(order)


@app.route("/inventory/<product_id>", methods=["GET"])
def get_inventory_api(product_id):
    inv = get_inventory(product_id)
    if not inv:
        return jsonify({"message": "not found"}), 404
    return jsonify(inv)


@app.route("/saga/<saga_id>", methods=["GET"])
def get_saga_api(saga_id):
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("SELECT * FROM saga_log WHERE saga_id=?", (saga_id,))
    row = cur.fetchone()
    conn.close()
    if not row:
        return jsonify({"message": "not found"}), 404
    return jsonify(dict(row))


if __name__ == "__main__":
    init_db()
    app.run(debug=True, port=5000)

如何验证这段代码

1. 初始化库存

curl -X POST http://127.0.0.1:5000/init

期望返回:

{
  "message": "initialized",
  "inventory": {
    "product_id": "p1",
    "available": 10,
    "reserved": 0,
    "updated_at": "..."
  }
}

2. 下一个正常订单

curl -X POST http://127.0.0.1:5000/orders \
  -H "Content-Type: application/json" \
  -d '{"product_id":"p1","quantity":3}'

成功后:

  • 订单状态应为 CONFIRMED
  • 库存变成:
    • available = 7
    • reserved = 3

这里为了简化演示,我把“预留成功后立即确认订单”串成一条链了。
真实项目中,往往还会接支付服务,支付成功后再将预留转正式消耗。

3. 下一个超库存订单

curl -X POST http://127.0.0.1:5000/orders \
  -H "Content-Type: application/json" \
  -d '{"product_id":"p1","quantity":100}'

此时应该看到:

  • 订单被创建后又取消
  • Saga 状态进入 COMPENSATED
  • 库存不发生错误扣减

这段代码里最关键的几个设计点

1. 每个服务只做本地事务

订单创建、库存预留、库存释放,都是各自数据库里的本地事务。
这正是 Saga 与 XA 最大的不同:没有全局锁,没有分布式两阶段提交

2. 补偿动作必须显式存在

Saga 不是“失败了自动回滚”,而是:

  • 创建订单 ↔ 取消订单
  • 预留库存 ↔ 释放库存

也就是说,每一步正向动作都要能找到对应补偿动作
如果某一步天然不可补偿,比如“短信已发出”“第三方接口已经不可逆扣款”,那就不能简单套 Saga,需要重新拆业务边界。

3. 幂等是生死线

我见过很多 Saga 设计看起来流程没问题,但线上一重试就乱。
原因往往是没做好幂等。

在上面的示例中,库存预留用了这个唯一约束:

UNIQUE(saga_id, product_id)

它的作用是:

  • 同一个 Saga 重试时,不会重复预留
  • 网络超时导致客户端重复请求时,也有机会兜住

生产环境里,通常还会引入:

  • request_id
  • biz_key
  • 消息消费去重表
  • 状态机版本号

4. 状态机比“if-else”更靠谱

简单 demo 用 if-else 可以,但业务一复杂就建议你把 Saga 状态建成显式状态机。
因为状态机能清楚约束:

  • 允许从哪个状态转到哪个状态
  • 补偿只能在什么条件下执行
  • 重试时能不能跳过已完成步骤

Saga 状态流转建议

stateDiagram-v2
    [*] --> STARTED
    STARTED --> ORDER_CREATED
    ORDER_CREATED --> INVENTORY_RESERVED
    INVENTORY_RESERVED --> COMPLETED

    ORDER_CREATED --> COMPENSATING
    INVENTORY_RESERVED --> COMPENSATING
    STARTED --> FAILED

    COMPENSATING --> COMPENSATED
    FAILED --> COMPENSATED

常见坑与排查

这一部分非常重要。真正让 Saga 难落地的,往往不是“流程设计”,而是这些边角问题。

坑 1:把“补偿”理解成数据库回滚

这是最常见的误区。
Saga 的补偿不是数据库自动回滚,而是一个新的业务动作。

比如:

  • 创建订单成功了,补偿是“取消订单”
  • 扣优惠券成功了,补偿可能是“返还优惠券”
  • 调用外部营销接口成功了,补偿可能是“发送撤销请求”

补偿并不一定能恢复到绝对原始状态,只能恢复到“业务可接受状态”。

排查建议

当你发现“补偿后数据还是对不上”时,优先检查:

  • 是否真的定义了补偿动作
  • 补偿动作是否允许重复执行
  • 补偿动作是否覆盖所有中间态

坑 2:库存接口没做幂等,重试导致重复扣减

典型场景:

  • Orchestrator 调库存服务超时
  • 实际上库存已经预留成功
  • Orchestrator 重试一次
  • 如果库存接口没幂等,就重复扣减了

排查建议

重点查:

  • 是否基于 saga_id / request_id 做唯一约束
  • 是否有消费幂等表
  • 是否把“超时”误判成“失败未执行”

坑 3:补偿顺序反了

Saga 的补偿通常要逆序执行
比如你先创建订单,再预留库存,那么失败时通常先释放库存,再取消订单,还是先取消订单再释放库存,要看你的业务语义。

在订单-库存场景里,一般两种都可行,但要统一约束。
如果流程更长,比如:

  1. 创建订单
  2. 预留库存
  3. 锁定优惠券
  4. 创建配送单

那补偿通常要按 4 → 3 → 2 → 1 的逆序来。

排查建议

如果线上出现“部分资源释放了,部分没释放”,检查:

  • 补偿链路是否按固定顺序执行
  • 是否某个补偿动作失败后直接中断了后续补偿
  • 是否有补偿任务扫描与重试机制

坑 4:只有流程,没有恢复机制

很多系统只写了同步调用链路,没有恢复任务。
但现实里一定会遇到:

  • 服务临时不可用
  • MQ 积压
  • 数据库锁冲突
  • 网络分区
  • 补偿执行到一半进程崩了

所以 Saga 不能只靠请求线程“当场处理完”,还要有异步恢复能力

排查建议

至少准备这些能力:

  • 定时扫描 STARTEDCOMPENSATINGFAILED 等异常状态
  • 根据状态机继续推进或补偿
  • 记录最后一次错误原因与重试次数
  • 人工可干预的管理后台

坑 5:高并发下超卖

如果库存扣减只是“先查再改”,很容易超卖。
正确方式是:

  • 用原子 SQL 更新
  • WHERE available >= ? 条件下扣减
  • 通过 rowcount 判断是否成功

上面的示例代码就是这么做的:

UPDATE inventory
SET available = available - ?,
    reserved = reserved + ?
WHERE product_id = ? AND available >= ?

排查建议

如果发现库存出现负数或超卖:

  • 检查是不是用了非原子读写
  • 检查事务隔离级别
  • 检查是否多个实例绕过了同一条约束逻辑
  • 检查缓存库存与数据库库存是否双写不一致

安全/性能最佳实践

分布式事务不只是“对不对”,还涉及“扛不扛得住”。

安全最佳实践

1. 接口幂等令牌要防重放

如果下单接口直接暴露给外部调用方,建议引入:

  • 请求幂等键 Idempotency-Key
  • 签名校验
  • 时间戳与过期机制

否则恶意重放请求可能不断触发 Saga。

2. 补偿接口不能对外裸露

像“取消订单”“释放库存”这种补偿动作,最好不要直接暴露为公网开放接口。
即便要开放,也应该:

  • 做服务间鉴权
  • 校验来源身份
  • 记录审计日志

3. Saga 日志要可审计

至少记录:

  • saga_id
  • 订单号
  • 业务主键
  • 每一步开始/结束时间
  • 错误码
  • 重试次数
  • 操作来源

这在排查“到底是没执行,还是执行了但超时”时特别关键。


性能最佳实践

1. 缩短本地事务时间

库存预留的本地事务里,不要干这些事:

  • 调远程接口
  • 查一堆无关数据
  • 写大量日志
  • 做复杂业务计算

本地事务越短,锁冲突越少。

2. 预留库存表要建好索引

至少给这些字段建索引:

CREATE INDEX IF NOT EXISTS idx_inventory_reservation_saga
ON inventory_reservation(saga_id);

CREATE INDEX IF NOT EXISTS idx_inventory_reservation_order
ON inventory_reservation(order_id);

CREATE INDEX IF NOT EXISTS idx_saga_log_state
ON saga_log(state);

这样扫描未完成 Saga、按订单排查、按 Saga 恢复时才不会拖垮库。

3. 把同步主链路与异步恢复链路拆开

主链路关注:

  • 快速返回成功/失败
  • 保证关键步骤原子性

恢复链路关注:

  • 补偿
  • 超时扫描
  • 死信处理
  • 人工重放

不要把所有异常都在用户请求线程里“死磕到底”。

4. 容量估算要看“在途事务”

Saga 容量不是只看 QPS,还要看在途事务数
一个简单估算公式:

在途 Saga 数 ≈ 峰值 TPS × 平均完成时长(秒)

比如:

  • 峰值 800 TPS
  • 平均 2 秒完成

那在途 Saga 大约就是 1600 个。

这会影响:

  • Saga 状态表大小
  • 定时扫描频率
  • 补偿任务堆积
  • 监控告警阈值

生产落地建议:别把 Demo 直接搬上线

上面的代码用于讲原理和做本地验证很合适,但生产环境还需要补全很多东西。

推荐补齐的能力

1. Outbox / 可靠消息

如果你的订单确认、支付结果、库存释放依赖 MQ,建议使用:

  • 本地事务 + Outbox
  • 事务消息
  • 消费幂等表

否则很容易出现:

  • 数据库提交了,但消息没发出去
  • 消息发了,但本地状态没落库

2. 分布式追踪

建议把 saga_id 作为贯穿全链路的 trace 维度之一,打进:

  • 日志 MDC
  • Trace Span Tag
  • 指标标签

这样排查时能从一次下单直接串到库存、支付、补偿全过程。

3. 人工干预后台

不要幻想所有 Saga 都能自动恢复。
线上总会碰到边界场景,需要运营或研发人工处理,所以后台至少要支持:

  • saga_id 查询
  • 查看失败步骤
  • 手动重试
  • 手动补偿
  • 标记忽略/结束

4. 超时机制

库存预留不能无限期占着。
通常会设置:

  • 订单待支付超时 15 分钟
  • 超时后自动取消订单
  • 同步释放预留库存

这也是库存一致性设计里很容易被漏掉的一步。


一个实战判断标准:什么时候适合 Saga,什么时候不适合

适合 Saga 的场景

  • 订单、库存、优惠券、物流等业务链路
  • 允许最终一致
  • 每一步都能定义补偿动作
  • 可以接受短暂中间态

不太适合 Saga 的场景

  • 银行转账类极强一致场景
  • 不可逆外部动作太多
  • 无法设计幂等补偿
  • 业务方不能接受任何中间不一致

如果你发现一个流程里有多个步骤都“不可补偿”,那就别硬套 Saga。
这时更适合考虑:

  • TCC
  • 强一致资源中心
  • 重新划分服务边界
  • 降低跨服务事务范围

总结

对于微服务里的“订单与库存一致性”,Saga 之所以实用,不是因为它完美,而是因为它在复杂度、性能和一致性之间做了一个现实的平衡:

  • 不依赖全局事务
  • 通过本地事务 + 补偿保证最终一致
  • 通过幂等、状态机、恢复任务扛住重试和失败

如果你准备在项目里真正落地,我建议按这个顺序推进:

  1. 先把业务状态建清楚:订单、库存、Saga 各有哪些状态
  2. 定义每个正向动作的补偿动作
  3. 所有跨服务步骤都做幂等
  4. 库存操作必须原子更新,避免超卖
  5. 引入 Saga 日志和异常恢复任务
  6. 上线前压测高并发与补偿重试场景

最后给一个很实际的结论:

Saga 不是“分布式事务银弹”,但对订单与库存这类场景,它往往是最能跑、最能维护、也最容易扩展的方案之一。

如果你的链路还会接入支付、优惠券、履约,我建议继续沿着“预留资源 + 显式补偿 + 状态机驱动”这个方向扩展,而不是重新回到重型全局事务。这样系统会更稳,排障也会轻松很多。


分享到:

下一篇
《Web逆向实战:中级开发者如何定位并复现前端签名算法实现接口自动化调用》