跳转到内容
123xiao | 无名键客

《分布式架构中服务拆分后的事务一致性实战:基于 Saga、Outbox 与幂等设计的落地方案》

字数: 0 阅读时长: 1 分钟

背景与问题

单体时代,我们习惯了一个本地事务包打天下:

BEGIN;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
INSERT INTO orders(order_id, user_id, amount) VALUES('O1001', 1, 100);
COMMIT;

一旦服务拆分成订单、库存、支付、账户几个微服务,这种舒服的日子基本就结束了。
你会很快遇到这些问题:

  • 订单创建成功了,但库存扣减失败
  • 库存扣了,支付超时了
  • 消息发出去了,但业务库事务回滚了
  • 消费端因为重试把同一笔钱扣了两次
  • 补偿逻辑写得不完整,数据“看起来都成功”,但实际账对不上

很多团队第一反应是:能不能上分布式事务框架,一把梭?
理论上可以,但真实生产里,2PC/XA 往往有这些现实问题:

  • 对数据库和中间件要求高
  • 性能抖动明显
  • 长事务导致锁持有时间变长
  • 跨团队、跨语言、跨存储实现困难

所以在大多数互联网业务里,更常见的落地方式是:

  1. Saga:把大事务拆成一组本地事务 + 补偿动作
  2. Outbox:保证“业务数据更新”和“事件投递”尽量原子
  3. 幂等设计:接受消息重复、超时重试、网络抖动,把结果做对

这篇文章我会从一个常见的下单场景出发,把这三件事串起来讲清楚,并给出一套能跑起来的示例。


一个典型场景:下单链路怎么保证一致性

假设下单流程如下:

  1. 订单服务创建订单
  2. 库存服务扣减库存
  3. 支付服务完成扣款
  4. 订单服务将状态改为 CONFIRMED

如果中途失败,要做补偿:

  • 库存已扣但支付失败,则回补库存
  • 订单已创建但最终失败,则订单取消

这不是“所有服务同时成功”的强一致,而是最终一致性

flowchart LR
    A[用户提交订单] --> B[订单服务创建 PENDING 订单]
    B --> C[Outbox 写入 OrderCreated 事件]
    C --> D[消息投递器发送事件]
    D --> E[库存服务扣减库存]
    E --> F[支付服务执行扣款]
    F --> G[订单服务确认订单]
    F -->|失败| H[触发补偿]
    H --> I[库存回补]
    I --> J[订单取消]

核心原理

1. Saga:把大事务拆成一段段本地事务

Saga 的核心思想很朴素:
每个服务只管自己的本地事务,失败时执行相反方向的补偿动作。

比如下单 Saga:

  • T1:订单服务创建订单
  • T2:库存服务扣减库存
  • T3:支付服务扣款
  • T4:订单服务确认订单

对应补偿:

  • C2:库存回补
  • C1:订单取消

注意,补偿不是“数据库回滚”,而是新的业务动作
这点特别重要。我见过有人把补偿理解成把数据库改回去,结果一旦中间插入了别的业务行为,逻辑就全乱了。

编排式 vs 协同式

Saga 常见两种风格:

  • 编排式(Orchestration):由一个协调者控制流程
  • 协同式(Choreography):服务靠事件彼此驱动

如果团队刚开始做分布式事务,我一般更建议先用编排式,因为:

  • 流程可视化更好
  • 排障路径更清晰
  • 更适合复杂补偿

协同式在简单流程里很轻,但事件多了以后,容易出现“谁触发了谁,已经没人说得清”的情况。

sequenceDiagram
    participant U as 用户
    participant O as 订单服务
    participant X as Outbox投递器
    participant S as 库存服务
    participant P as 支付服务

    U->>O: 创建订单
    O->>O: 本地事务:订单=PENDING + Outbox事件
    O-->>U: 返回受理成功
    X->>S: 投递 OrderCreated
    S->>S: 幂等扣库存
    S-->>X: InventoryReserved
    X->>P: 投递 InventoryReserved
    P->>P: 幂等扣款
    alt 支付成功
        P-->>X: PaymentSucceeded
        X->>O: 更新订单为 CONFIRMED
    else 支付失败
        P-->>X: PaymentFailed
        X->>S: 触发库存补偿
        X->>O: 更新订单为 CANCELED
    end

2. Outbox:解决“数据库成功了,但消息没发出去”

这是微服务里最常见、也最隐蔽的一类问题。

错误写法通常是:

  1. 先提交订单
  2. 再调用 MQ 发送消息

如果在第 2 步宕机,就会发生:

  • 订单已经存在
  • 但库存服务永远收不到事件

Outbox 模式的做法是:

  • 同一个本地事务
    • 写业务表
    • 写 outbox 事件表
  • 事务提交后,由后台任务扫描 outbox 表并投递到 MQ
  • 投递成功后更新 outbox 状态

这样能保证:
只要业务数据提交成功,事件就一定有机会被发送出去。

Outbox 的关键点

  • 事件表和业务表必须在同一个库、同一个事务内
  • 投递器必须可重试
  • 消息发送可能重复,消费者必须幂等
  • 事件要带唯一 ID 和业务键

3. 幂等设计:不要假设消息只来一次

在分布式系统里,“只投递一次”几乎都是理想状态。
现实通常是“至少一次”。

所以消费端一定要实现幂等。常见办法:

基于业务唯一键幂等

例如扣减库存时,使用 order_id + sku_id 作为唯一约束。

基于消息 ID 幂等

每条消息带 event_id,消费时先记录处理结果:

  • 没处理过:执行业务逻辑
  • 已处理过:直接返回成功

状态机幂等

订单状态从:

  • PENDING -> CONFIRMED
  • PENDING -> CANCELED

如果已经是 CONFIRMED,再来一次成功消息,也不能重复变更。

stateDiagram-v2
    [*] --> PENDING
    PENDING --> CONFIRMED: PaymentSucceeded
    PENDING --> CANCELED: PaymentFailed / Timeout
    CONFIRMED --> [*]
    CANCELED --> [*]

方案对比与取舍分析

在真正落地前,先把几个常见方案摆在一起看,会更清楚边界。

方案一致性性能实现复杂度适用场景
本地事务强一致单体、单库
XA/2PC强一致中低金融级少量核心链路
TCC强一致倾向很高业务可预留资源、模型稳定
Saga + Outbox + 幂等最终一致大多数互联网业务
纯消息最终一致最终一致中低允许短时不一致的异步流程

我的经验是:

  • 高频主链路:优先 Saga + Outbox + 幂等
  • 资金强约束场景:谨慎评估 TCC 或账户账本模型
  • 跨团队复杂协作:尽量避免要求所有服务都支持 XA

落地设计:一个可运行的简化实现

下面我们用 Python + SQLite 做一个单机版示例,模拟:

  • 订单创建
  • Outbox 写事件
  • 后台投递
  • 库存服务幂等消费
  • 支付失败后的补偿

这个示例不是完整生产代码,但逻辑是可以跑通的,便于理解核心机制。


实战代码(可运行)

1. 数据表设计

CREATE TABLE IF NOT EXISTS orders (
    order_id TEXT PRIMARY KEY,
    user_id TEXT NOT NULL,
    amount INTEGER NOT NULL,
    status TEXT NOT NULL,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE IF NOT EXISTS inventory (
    sku_id TEXT PRIMARY KEY,
    stock INTEGER NOT NULL
);

CREATE TABLE IF NOT EXISTS inventory_reservations (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    order_id TEXT NOT NULL,
    sku_id TEXT NOT NULL,
    quantity INTEGER NOT NULL,
    status TEXT NOT NULL,
    UNIQUE(order_id, sku_id)
);

CREATE TABLE IF NOT EXISTS outbox_events (
    event_id TEXT PRIMARY KEY,
    aggregate_id TEXT NOT NULL,
    event_type TEXT NOT NULL,
    payload TEXT NOT NULL,
    status TEXT NOT NULL DEFAULT 'NEW',
    retry_count INTEGER NOT NULL DEFAULT 0,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE IF NOT EXISTS processed_messages (
    consumer_name TEXT NOT NULL,
    event_id TEXT NOT NULL,
    processed_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (consumer_name, event_id)
);

2. Python 示例代码

把下面代码保存为 saga_outbox_demo.py,直接运行即可。

import sqlite3
import json
import uuid
from contextlib import contextmanager

DB = "demo.db"


@contextmanager
def get_conn():
    conn = sqlite3.connect(DB)
    conn.row_factory = sqlite3.Row
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


def init_db():
    with get_conn() as conn:
        conn.executescript("""
        CREATE TABLE IF NOT EXISTS orders (
            order_id TEXT PRIMARY KEY,
            user_id TEXT NOT NULL,
            amount INTEGER NOT NULL,
            status TEXT NOT NULL,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS inventory (
            sku_id TEXT PRIMARY KEY,
            stock INTEGER NOT NULL
        );

        CREATE TABLE IF NOT EXISTS inventory_reservations (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            order_id TEXT NOT NULL,
            sku_id TEXT NOT NULL,
            quantity INTEGER NOT NULL,
            status TEXT NOT NULL,
            UNIQUE(order_id, sku_id)
        );

        CREATE TABLE IF NOT EXISTS outbox_events (
            event_id TEXT PRIMARY KEY,
            aggregate_id TEXT NOT NULL,
            event_type TEXT NOT NULL,
            payload TEXT NOT NULL,
            status TEXT NOT NULL DEFAULT 'NEW',
            retry_count INTEGER NOT NULL DEFAULT 0,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS processed_messages (
            consumer_name TEXT NOT NULL,
            event_id TEXT NOT NULL,
            processed_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (consumer_name, event_id)
        );
        """)

        conn.execute("INSERT OR IGNORE INTO inventory(sku_id, stock) VALUES(?, ?)", ("SKU-1", 10))


def new_event(aggregate_id, event_type, payload):
    return {
        "event_id": str(uuid.uuid4()),
        "aggregate_id": aggregate_id,
        "event_type": event_type,
        "payload": json.dumps(payload, ensure_ascii=False)
    }


def create_order(order_id, user_id, amount, sku_id, quantity):
    event = new_event(order_id, "OrderCreated", {
        "order_id": order_id,
        "user_id": user_id,
        "amount": amount,
        "sku_id": sku_id,
        "quantity": quantity
    })

    with get_conn() as conn:
        conn.execute(
            "INSERT INTO orders(order_id, user_id, amount, status) VALUES (?, ?, ?, ?)",
            (order_id, user_id, amount, "PENDING")
        )
        conn.execute(
            "INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
            (event["event_id"], event["aggregate_id"], event["event_type"], event["payload"])
        )
    print(f"[订单服务] 创建订单成功: {order_id}")


def mark_message_processed(conn, consumer_name, event_id):
    conn.execute(
        "INSERT INTO processed_messages(consumer_name, event_id) VALUES (?, ?)",
        (consumer_name, event_id)
    )


def is_message_processed(conn, consumer_name, event_id):
    row = conn.execute(
        "SELECT 1 FROM processed_messages WHERE consumer_name=? AND event_id=?",
        (consumer_name, event_id)
    ).fetchone()
    return row is not None


def inventory_consume_order_created(event):
    consumer_name = "inventory_service"
    payload = json.loads(event["payload"])
    order_id = payload["order_id"]
    sku_id = payload["sku_id"]
    quantity = payload["quantity"]

    with get_conn() as conn:
        if is_message_processed(conn, consumer_name, event["event_id"]):
            print(f"[库存服务] 幂等命中,忽略重复消息: {event['event_id']}")
            return

        reservation = conn.execute(
            "SELECT * FROM inventory_reservations WHERE order_id=? AND sku_id=?",
            (order_id, sku_id)
        ).fetchone()
        if reservation:
            mark_message_processed(conn, consumer_name, event["event_id"])
            print(f"[库存服务] 业务幂等命中,订单已预留库存: {order_id}")
            return

        stock_row = conn.execute(
            "SELECT stock FROM inventory WHERE sku_id=?",
            (sku_id,)
        ).fetchone()

        if not stock_row or stock_row["stock"] < quantity:
            fail_event = new_event(order_id, "InventoryFailed", {
                "order_id": order_id,
                "reason": "库存不足"
            })
            conn.execute(
                "INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
                (fail_event["event_id"], fail_event["aggregate_id"], fail_event["event_type"], fail_event["payload"])
            )
            mark_message_processed(conn, consumer_name, event["event_id"])
            print(f"[库存服务] 库存不足: {order_id}")
            return

        conn.execute(
            "UPDATE inventory SET stock = stock - ? WHERE sku_id=?",
            (quantity, sku_id)
        )
        conn.execute(
            "INSERT INTO inventory_reservations(order_id, sku_id, quantity, status) VALUES (?, ?, ?, ?)",
            (order_id, sku_id, quantity, "RESERVED")
        )

        ok_event = new_event(order_id, "InventoryReserved", {
            "order_id": order_id,
            "sku_id": sku_id,
            "quantity": quantity
        })
        conn.execute(
            "INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
            (ok_event["event_id"], ok_event["aggregate_id"], ok_event["event_type"], ok_event["payload"])
        )

        mark_message_processed(conn, consumer_name, event["event_id"])
        print(f"[库存服务] 库存预留成功: {order_id}")


def payment_consume_inventory_reserved(event, should_fail=True):
    payload = json.loads(event["payload"])
    order_id = payload["order_id"]

    with get_conn() as conn:
        if should_fail:
            fail_event = new_event(order_id, "PaymentFailed", {
                "order_id": order_id,
                "reason": "支付超时"
            })
            conn.execute(
                "INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
                (fail_event["event_id"], fail_event["aggregate_id"], fail_event["event_type"], fail_event["payload"])
            )
            print(f"[支付服务] 支付失败: {order_id}")
        else:
            ok_event = new_event(order_id, "PaymentSucceeded", {
                "order_id": order_id
            })
            conn.execute(
                "INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
                (ok_event["event_id"], ok_event["aggregate_id"], ok_event["event_type"], ok_event["payload"])
            )
            print(f"[支付服务] 支付成功: {order_id}")


def order_consume_payment_succeeded(event):
    payload = json.loads(event["payload"])
    order_id = payload["order_id"]
    with get_conn() as conn:
        conn.execute(
            "UPDATE orders SET status='CONFIRMED' WHERE order_id=? AND status='PENDING'",
            (order_id,)
        )
        print(f"[订单服务] 订单确认: {order_id}")


def order_consume_payment_failed(event):
    payload = json.loads(event["payload"])
    order_id = payload["order_id"]

    with get_conn() as conn:
        reservation = conn.execute(
            "SELECT * FROM inventory_reservations WHERE order_id=? AND status='RESERVED'",
            (order_id,)
        ).fetchone()

        if reservation:
            compensate_event = new_event(order_id, "ReleaseInventory", {
                "order_id": order_id,
                "sku_id": reservation["sku_id"],
                "quantity": reservation["quantity"]
            })
            conn.execute(
                "INSERT INTO outbox_events(event_id, aggregate_id, event_type, payload, status) VALUES (?, ?, ?, ?, 'NEW')",
                (compensate_event["event_id"], compensate_event["aggregate_id"], compensate_event["event_type"], compensate_event["payload"])
            )

        conn.execute(
            "UPDATE orders SET status='CANCELED' WHERE order_id=? AND status='PENDING'",
            (order_id,)
        )
        print(f"[订单服务] 订单取消: {order_id}")


def inventory_consume_release(event):
    payload = json.loads(event["payload"])
    order_id = payload["order_id"]
    sku_id = payload["sku_id"]
    quantity = payload["quantity"]

    with get_conn() as conn:
        reservation = conn.execute(
            "SELECT * FROM inventory_reservations WHERE order_id=? AND sku_id=?",
            (order_id, sku_id)
        ).fetchone()

        if not reservation:
            print(f"[库存服务] 无需回补,预留记录不存在: {order_id}")
            return

        if reservation["status"] == "RELEASED":
            print(f"[库存服务] 幂等命中,库存已回补: {order_id}")
            return

        conn.execute(
            "UPDATE inventory SET stock = stock + ? WHERE sku_id=?",
            (quantity, sku_id)
        )
        conn.execute(
            "UPDATE inventory_reservations SET status='RELEASED' WHERE order_id=? AND sku_id=?",
            (order_id, sku_id)
        )
        print(f"[库存服务] 库存回补成功: {order_id}")


def dispatch_event(event):
    et = event["event_type"]
    if et == "OrderCreated":
        inventory_consume_order_created(event)
    elif et == "InventoryReserved":
        payment_consume_inventory_reserved(event, should_fail=True)
    elif et == "PaymentSucceeded":
        order_consume_payment_succeeded(event)
    elif et == "PaymentFailed":
        order_consume_payment_failed(event)
    elif et == "ReleaseInventory":
        inventory_consume_release(event)
    elif et == "InventoryFailed":
        print(f"[系统] 订单失败,原因:库存不足, order_id={json.loads(event['payload'])['order_id']}")
    else:
        print(f"[系统] 未识别事件类型: {et}")


def outbox_publisher_once():
    with get_conn() as conn:
        events = conn.execute(
            "SELECT * FROM outbox_events WHERE status='NEW' ORDER BY created_at LIMIT 20"
        ).fetchall()

    for event in events:
        try:
            dispatch_event(event)
            with get_conn() as conn:
                conn.execute(
                    "UPDATE outbox_events SET status='SENT' WHERE event_id=?",
                    (event["event_id"],)
                )
        except Exception as e:
            with get_conn() as conn:
                conn.execute(
                    "UPDATE outbox_events SET retry_count = retry_count + 1 WHERE event_id=?",
                    (event["event_id"],)
                )
            print(f"[投递器] 处理失败 event_id={event['event_id']}, error={e}")


def print_state():
    with get_conn() as conn:
        orders = conn.execute("SELECT * FROM orders").fetchall()
        inventory = conn.execute("SELECT * FROM inventory").fetchall()
        reservations = conn.execute("SELECT * FROM inventory_reservations").fetchall()

    print("\n=== 当前状态 ===")
    print("orders:")
    for r in orders:
        print(dict(r))
    print("inventory:")
    for r in inventory:
        print(dict(r))
    print("inventory_reservations:")
    for r in reservations:
        print(dict(r))
    print("================\n")


if __name__ == "__main__":
    init_db()
    create_order("ORD-1001", "U-1", 100, "SKU-1", 2)

    for _ in range(5):
        outbox_publisher_once()

    print_state()

3. 运行结果你会看到什么

上面的代码里,我故意让支付阶段失败:

payment_consume_inventory_reserved(event, should_fail=True)

所以最终效果应该是:

  • 订单先创建为 PENDING
  • 库存预留成功
  • 支付失败
  • 订单变为 CANCELED
  • 库存被回补

这就是一个最小可运行的 Saga + Outbox + 幂等的闭环。


关键设计说明

1. 为什么订单先返回成功,而不是等所有步骤完成?

因为跨服务链路可能很长,支付和库存也可能有重试。
如果同步等到底:

  • 用户响应时间很难控制
  • 调用链耦合严重
  • 某个下游慢就把整条链路拖垮

更常见做法是:

  • 先返回“受理中”
  • 前端轮询或订阅订单状态
  • 后台完成 Saga

2. 为什么一定要接受“重复消息”?

因为这些情况永远存在:

  • MQ 投递后 ack 丢了
  • 消费者处理成功,但回执超时
  • 投递器宕机重启重新扫描
  • 网络抖动导致发送方误判失败后重发

与其追求理想化的“消息绝不重复”,不如把系统设计成:
重复也没关系。


3. 补偿动作为什么也要幂等?

因为补偿本身也可能重复执行。
比如 ReleaseInventory 收到两次:

  • 第一次把库存从 8 加回 10
  • 第二次如果再加一次,就变 12 了

所以补偿必须检查当前状态,保证“执行一次和执行十次效果一样”。


常见坑与排查

这一部分我想讲得更接地气一点,因为我自己在项目里真踩过。

坑 1:业务数据提交了,但事件表没写进去

现象

  • 订单表有数据
  • outbox 没事件
  • 下游完全没感知

根因

  • 业务写库和 outbox 写库不在同一个事务
  • 某些 ORM 配置导致自动提交

排查方法

  1. 打开 SQL 日志
  2. 核对是否同一个连接、同一个事务
  3. 查看失败时数据库中订单和 outbox 的时间戳差异

建议

  • 显式开启事务
  • 不要把“写订单”和“写 outbox”放在两个仓储层事务里各自提交

坑 2:消费者已经处理成功,但投递状态没更新

现象

  • 下游业务已执行
  • outbox 仍是 NEW
  • 后台继续重复发送

根因

  • 发送成功和状态更新之间崩溃
  • 这其实是 Outbox 的正常设计权衡,不是 bug

正确处理

  • 接受重复发送
  • 让消费者幂等

如果系统要求“不能重复”,最后通常会把系统复杂度推到很高,收益却不一定值得。


坑 3:补偿顺序写反了

现象

  • 先取消订单,再回补库存
  • 结果有其他流程看到订单已取消,提前做了售后或退款处理
  • 数据链路变得很难解释

建议

补偿要按照依赖关系逆序执行。一般来说:

  1. 先释放资源
  2. 再关闭业务单据

坑 4:状态机没有封口

现象

订单既收到 PaymentSucceeded,又迟到一个 PaymentFailed,最后状态来回跳。

建议

状态流转必须加条件:

UPDATE orders
SET status = 'CONFIRMED'
WHERE order_id = ? AND status = 'PENDING';

再比如取消:

UPDATE orders
SET status = 'CANCELED'
WHERE order_id = ? AND status = 'PENDING';

这样迟到消息只会影响 0 行,不会污染最终结果。


坑 5:把 Saga 当成“万能一致性方案”

Saga 很适合订单、库存、履约这类业务。
但如果你处理的是:

  • 实时资金账户余额
  • 高价值转账
  • 强监管账务

就不能只靠“订单状态对了就行”。这类场景往往需要:

  • 账户账本
  • 冻结/解冻模型
  • 更细的对账机制
  • 必要时更强一致手段

安全/性能最佳实践

安全方面

1. 事件载荷避免敏感信息裸奔

Outbox 和 MQ 里不要直接放:

  • 银行卡号
  • 身份证号
  • 完整支付凭证
  • 用户隐私字段

建议:

  • 只传必要业务键
  • 敏感字段脱敏
  • 必要时加密存储

2. 事件签名与来源校验

跨系统消费时,尤其是多团队共享消息总线,要校验:

  • 事件来源
  • schema 版本
  • 签名或认证信息

否则一条伪造消息就可能触发错误补偿。

3. 补偿接口要防重放

补偿接口经常被忽略,但它同样需要:

  • 鉴权
  • 幂等令牌
  • 调用审计日志

性能方面

1. Outbox 表要按状态和时间建索引

典型索引:

CREATE INDEX idx_outbox_status_created
ON outbox_events(status, created_at);

否则扫描投递会越来越慢。

2. 分批拉取,避免大事务

不要一次扫 10 万条事件。
建议按批次投递,比如:

  • 每批 100~500 条
  • 控制单次事务大小
  • 支持并发 worker

3. 做好归档与清理

Outbox、processed_messages 这种表增长非常快。
建议:

  • 保留最近 3~30 天热数据
  • 历史归档到冷库
  • 定期删除已完成事件

4. 消费者处理要短小

如果消费逻辑太重:

  • 不要长时间占用消费线程
  • 复杂计算拆到异步任务
  • 防止消息积压引发雪崩

容量估算思路

架构文章不能只讲概念,最好顺手把量级感带上。

假设:

  • 日订单量:100 万
  • 每单平均产生 4 个事件
  • 单事件记录平均 1 KB

那么每天 outbox 原始写入量大约:

100 万 * 4 * 1 KB = 约 4 GB/天

再加上索引、重试记录、processed_messages,真实存储往往会更高。
这意味着你需要提前考虑:

  • 事件表分区或分表
  • 冷热分层
  • 消费延迟监控
  • 积压峰值时的扩容能力

如果你的系统峰值 TPS 高于平时 5~10 倍,投递器和消费者的并发模型也必须按峰值设计,而不是按平均值设计。


监控与可观测性建议

这块很容易被忽略,但没有它,排障非常痛苦。

建议至少监控这些指标:

  • Outbox NEW 状态事件堆积数
  • 事件平均投递延迟
  • 事件重试次数分布
  • 各类 Saga 成功率/补偿率
  • 订单从 PENDING 到终态的耗时
  • 幂等命中次数
  • 死信消息数量

日志中建议统一打印:

  • trace_id
  • order_id
  • event_id
  • event_type
  • 当前状态
  • 重试次数

这样你查一笔订单时,才能把整条链路串起来。


一套比较稳妥的落地建议

如果你准备在真实项目里上这套方案,我建议按下面顺序推进:

  1. 先定义状态机

    • 哪些状态是终态
    • 哪些事件允许触发流转
    • 迟到消息怎么处理
  2. 再定义事件模型

    • 事件名
    • 唯一 ID
    • schema 版本
    • 业务键
  3. 实现 Outbox

    • 业务表与事件表同事务
    • 投递器支持重试、告警、归档
  4. 实现消费者幂等

    • 消息级幂等
    • 业务级幂等
    • 补偿级幂等
  5. 最后再补监控和对账

    • 别等出事后才补

其中最容易被低估的是对账
因为即使你设计得很好,生产环境里还是会有:

  • 人工改数
  • 历史脏数据
  • 中间件异常
  • 版本升级带来的兼容问题

所以建议保留离线对账任务,定期核对:

  • 订单状态
  • 库存预留状态
  • 支付状态
  • 补偿完成情况

总结

服务拆分后,事务一致性不再是“一个事务提交”的问题,而是一条业务链如何在失败、重试、重复、延迟下仍然收敛到正确结果

这篇文章的核心结论可以浓缩成三句话:

  1. Saga 负责把跨服务事务拆成可补偿的业务步骤
  2. Outbox 负责保证业务提交后事件不会凭空丢失
  3. 幂等设计 负责把重复消息、重复补偿、迟到事件变成可接受现实

如果你让我给一个最实用的落地建议,那就是:

  • 不要追求“绝不失败”
  • 也不要迷信“消息只会来一次”
  • 要把系统设计成:失败可补偿、消息可重放、结果可收敛

边界条件也要说清楚:

  • 这套方案适合大多数订单、库存、履约类业务
  • 对资金强一致、监管严苛场景,要引入账本、冻结模型甚至更强事务机制
  • 如果团队缺少状态机、事件建模、监控能力,先小范围试点,不要一上来全链路改造

最后一句更接近工程实践的话:
分布式一致性不是“选一个模式”就结束,而是模式 + 状态机 + 幂等 + 监控 + 对账一起落地,系统才真正站得住。


分享到:

上一篇
《Web逆向实战:中级开发者如何定位并复现前端签名算法与接口加密流程》
下一篇
《Spring Boot 中基于拦截器与 AOP 的接口幂等性设计与实战》