跳转到内容
123xiao | 无名键客

《微服务架构中分布式事务的实战治理:基于 Saga、消息最终一致性与补偿机制的落地方案》

字数: 0 阅读时长: 1 分钟

背景与问题

微服务拆得越细,事务就越难做。

在单体应用里,我们习惯了一个本地事务把库存、订单、支付一次性提交:要么都成功,要么都回滚。但到了微服务架构里,这几个动作通常分别属于不同服务、不同数据库,甚至不同团队维护。这个时候,如果还想用传统的两阶段提交(2PC)强行兜底,往往会遇到几个现实问题:

  • 数据库和中间件支持不统一
  • 性能开销大,锁持有时间长
  • 链路一长,失败概率明显升高
  • 服务之间耦合变重,演进困难

所以,真正落地时,很多团队最后都会走向一句话:接受最终一致性,而不是执着于强一致性。

但“接受最终一致性”不等于“放任不管”。我见过不少系统嘴上说用 Saga,实际上只是“失败了写个日志”;也见过消息最终一致性方案里,消息发出去了,业务库没落盘,最后账怎么都对不上。

这篇文章我从治理视角来讲,不只讲概念,而是把 Saga、可靠消息、补偿机制怎么组合落地讲清楚,重点回答几个实战问题:

  • 什么场景用 Saga,什么场景用消息最终一致性?
  • 补偿事务应该怎么设计,才不会越补越乱?
  • 代码层面如何做到“业务提交成功,消息也不会丢”?
  • 排查线上“不一致”时,应该先看哪里?

先看一个典型业务场景

以“用户下单”为例,通常会串起这些服务:

  1. 订单服务:创建订单
  2. 库存服务:扣减库存
  3. 支付服务:冻结或扣款
  4. 营销服务:锁定优惠券
  5. 物流服务:创建发货单

如果这是单库系统,问题不大;但在微服务里,它天然就是一个分布式事务。

失败会长什么样?

例如:

  • 订单创建成功,但库存扣减失败
  • 库存扣减成功,但支付超时
  • 支付成功了,订单状态没更新
  • 消费消息重复,导致库存多扣一次
  • 补偿流程执行一半,系统又崩了

这些问题的共同点是:链路跨服务、跨存储、跨时序。所以不能只盯着“事务”两个字,而要从“状态流转 + 消息可靠 + 幂等补偿”三个维度一起治理。


方案对比与取舍分析

在架构设计时,我一般先把候选方案放到一张表里看,别一上来就喊“全站 Saga”。

方案一致性性能实现复杂度适用场景不适用场景
2PC / XA强一致少量核心链路、技术栈统一高并发、长事务、异构系统
TCC强一致趋近很高资金、额度、资源预留明确老系统难改造、接口不具备 Try/Confirm/Cancel
Saga最终一致长事务、多步骤业务编排无法定义补偿动作
消息最终一致性最终一致事件驱动、异步解耦需要同步返回强一致结果

一句实战建议

  • 业务步骤明确、需要有序补偿:优先考虑 Saga
  • 服务间以事件驱动传播状态:优先考虑 可靠消息最终一致性
  • 金额类核心扣减、且业务可预留资源:再考虑 TCC
  • 不要轻易在高并发核心链路上用 XA

很多时候,真正落地不是二选一,而是:

主链路用 Saga 管控业务状态,服务间传播用可靠消息保障最终一致性,失败时靠补偿机制兜底。


核心原理

1. Saga:把一个大事务拆成多个本地事务

Saga 的核心思想很朴素:

  • 每个服务只做自己的本地事务
  • 每一步成功后再推进下一步
  • 中间某一步失败,就按相反顺序执行补偿事务

例如:

  • 创建订单
  • 扣库存
  • 冻结支付
  • 锁优惠券

如果冻结支付失败,就需要:

  • 释放已锁优惠券
  • 回补库存
  • 取消订单

这不是数据库层回滚,而是业务层回滚

2. 消息最终一致性:先落库,再投递

消息最终一致性的关键,不是“用了 MQ”,而是业务数据和待发送消息必须在同一个本地事务里落地。常见做法是 Outbox 模式

  • 在本地事务中:
    • 更新业务表
    • 写入消息表 outbox
  • 事务提交后,由消息投递器异步扫描 outbox,把消息发到 MQ
  • 消费方处理后,再通过幂等机制避免重复消费

这样即使服务在事务提交后瞬间宕机,消息也不会丢,因为 outbox 里还在。

3. 补偿机制:不是简单“反向操作”

补偿要考虑三个原则:

  • 可重试
  • 幂等
  • 可观测

比如“回补库存”不能直接 stock = stock + 1 完事,而要基于业务单号、冻结记录、补偿状态去执行。否则重复补偿几次,库存就飞了。


两种 Saga 组织方式

编排式 Saga

由一个协调者统一驱动步骤。

优点:

  • 流程清晰,便于治理
  • 审计和排查更方便

缺点:

  • 协调者容易成为中心依赖
  • 灵活性稍弱

协同式 Saga

各服务通过事件驱动推进下一步。

优点:

  • 去中心化,耦合更松
  • 更适合事件驱动架构

缺点:

  • 状态链路分散,排查复杂
  • 容易出现“谁该补偿”的边界不清

对于大多数中型团队,我通常建议:

优先从编排式 Saga 起步,等领域边界清晰、事件治理成熟后,再逐步引入协同式 Saga。


flowchart LR
    A[创建订单] --> B[扣减库存]
    B --> C[冻结支付]
    C --> D[锁定优惠券]
    D --> E[订单完成]

    C -- 失败 --> B2[补偿库存]
    B2 --> A2[取消订单]

    D -- 失败 --> C2[解冻支付]
    C2 --> B2

Saga 与消息最终一致性的组合落地

单独用 Saga,能管理“过程”;单独用消息,能传播“结果”。实战里最好把两者合起来。

一个比较稳的落地形态是:

  1. Saga 编排服务推进业务步骤
  2. 每个参与者服务执行本地事务时,同时写业务表和 outbox 表
  3. 投递器把 outbox 消息发到 MQ
  4. 下游服务消费消息并执行业务逻辑
  5. 失败则重试;超过阈值则进入人工或自动补偿队列

sequenceDiagram
    participant O as Orchestrator
    participant S1 as 订单服务
    participant S2 as 库存服务
    participant MQ as 消息队列
    participant S3 as 支付服务

    O->>S1: 创建订单
    S1->>S1: 本地事务: 订单表 + outbox
    S1-->>O: 成功

    O->>S2: 扣减库存
    S2->>S2: 本地事务: 库存表 + outbox
    S2-->>O: 成功

    S2->>MQ: 投递库存已扣减事件
    MQ-->>S3: 消费事件
    S3->>S3: 冻结支付

    S3-->>O: 失败/超时
    O->>S2: 补偿库存
    O->>S1: 取消订单

实战代码(可运行)

下面给一个简化但可运行的示例,用 Python + SQLite 模拟:

  • 订单服务本地事务
  • Outbox 可靠消息
  • Saga 编排
  • 幂等消费
  • 补偿操作

这不是生产级框架,但足够把关键点跑通。

目录思路

我们用一个脚本模拟多个服务,重点看机制:

  • orders:订单表
  • inventory:库存表
  • outbox:待投递消息表
  • processed_messages:消费幂等表

代码示例

import sqlite3
import json
import time
import uuid

DB = "demo.db"

def get_conn():
    return sqlite3.connect(DB)

def init_db():
    conn = get_conn()
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS orders (
        order_id TEXT PRIMARY KEY,
        user_id TEXT,
        product_id TEXT,
        amount INTEGER,
        status TEXT
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS inventory (
        product_id TEXT PRIMARY KEY,
        stock INTEGER,
        frozen INTEGER DEFAULT 0
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS outbox (
        msg_id TEXT PRIMARY KEY,
        topic TEXT,
        payload TEXT,
        status TEXT,
        retry_count INTEGER DEFAULT 0,
        created_at REAL
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS processed_messages (
        consumer TEXT,
        msg_id TEXT,
        PRIMARY KEY (consumer, msg_id)
    )
    """)

    conn.commit()
    conn.close()

def seed_data():
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("INSERT OR IGNORE INTO inventory(product_id, stock, frozen) VALUES (?, ?, ?)", ("p1", 10, 0))
    conn.commit()
    conn.close()

def create_order_with_outbox(order_id, user_id, product_id, amount):
    conn = get_conn()
    cur = conn.cursor()
    try:
        cur.execute("BEGIN")

        cur.execute("""
        INSERT INTO orders(order_id, user_id, product_id, amount, status)
        VALUES (?, ?, ?, ?, ?)
        """, (order_id, user_id, product_id, amount, "CREATED"))

        msg_id = str(uuid.uuid4())
        payload = json.dumps({
            "event": "OrderCreated",
            "order_id": order_id,
            "user_id": user_id,
            "product_id": product_id,
            "amount": amount
        })
        cur.execute("""
        INSERT INTO outbox(msg_id, topic, payload, status, created_at)
        VALUES (?, ?, ?, ?, ?)
        """, (msg_id, "order-events", payload, "NEW", time.time()))

        conn.commit()
        print(f"[订单服务] 创建订单成功: {order_id}")
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

def reserve_inventory(order_id, product_id, amount):
    conn = get_conn()
    cur = conn.cursor()
    try:
        cur.execute("BEGIN")

        cur.execute("SELECT stock, frozen FROM inventory WHERE product_id = ?", (product_id,))
        row = cur.fetchone()
        if not row:
            raise Exception("商品不存在")

        stock, frozen = row
        if stock < amount:
            raise Exception("库存不足")

        cur.execute("""
        UPDATE inventory
        SET stock = stock - ?, frozen = frozen + ?
        WHERE product_id = ?
        """, (amount, amount, product_id))

        msg_id = str(uuid.uuid4())
        payload = json.dumps({
            "event": "InventoryReserved",
            "order_id": order_id,
            "product_id": product_id,
            "amount": amount
        })
        cur.execute("""
        INSERT INTO outbox(msg_id, topic, payload, status, created_at)
        VALUES (?, ?, ?, ?, ?)
        """, (msg_id, "inventory-events", payload, "NEW", time.time()))

        conn.commit()
        print(f"[库存服务] 预扣库存成功: order={order_id}, amount={amount}")
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

def compensate_inventory(order_id, product_id, amount):
    conn = get_conn()
    cur = conn.cursor()
    try:
        cur.execute("BEGIN")
        cur.execute("""
        UPDATE inventory
        SET stock = stock + ?, frozen = CASE WHEN frozen >= ? THEN frozen - ? ELSE 0 END
        WHERE product_id = ?
        """, (amount, amount, amount, product_id))
        conn.commit()
        print(f"[库存服务] 补偿库存成功: order={order_id}, amount={amount}")
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

def cancel_order(order_id):
    conn = get_conn()
    cur = conn.cursor()
    try:
        cur.execute("""
        UPDATE orders SET status = ? WHERE order_id = ?
        """, ("CANCELLED", order_id))
        conn.commit()
        print(f"[订单服务] 取消订单成功: {order_id}")
    finally:
        conn.close()

def mark_order_completed(order_id):
    conn = get_conn()
    cur = conn.cursor()
    try:
        cur.execute("""
        UPDATE orders SET status = ? WHERE order_id = ?
        """, ("COMPLETED", order_id))
        conn.commit()
        print(f"[订单服务] 订单完成: {order_id}")
    finally:
        conn.close()

def relay_outbox():
    conn = get_conn()
    cur = conn.cursor()
    cur.execute("SELECT msg_id, topic, payload FROM outbox WHERE status = 'NEW' ORDER BY created_at")
    rows = cur.fetchall()

    for msg_id, topic, payload in rows:
        print(f"[Outbox投递器] 投递消息: {msg_id}, topic={topic}, payload={payload}")
        cur.execute("UPDATE outbox SET status = 'SENT' WHERE msg_id = ?", (msg_id,))

    conn.commit()
    conn.close()

def consume_message(consumer, msg_id):
    conn = get_conn()
    cur = conn.cursor()
    try:
        cur.execute("BEGIN")
        cur.execute("""
        INSERT INTO processed_messages(consumer, msg_id)
        VALUES (?, ?)
        """, (consumer, msg_id))
        conn.commit()
        return True
    except sqlite3.IntegrityError:
        conn.rollback()
        print(f"[{consumer}] 检测到重复消息: {msg_id}")
        return False
    finally:
        conn.close()

def mock_payment(order_id, should_fail=False):
    print(f"[支付服务] 冻结支付: order={order_id}")
    if should_fail:
        raise Exception("支付服务超时")
    print(f"[支付服务] 支付冻结成功: order={order_id}")

def saga_place_order(user_id, product_id, amount, payment_fail=False):
    order_id = str(uuid.uuid4())
    try:
        create_order_with_outbox(order_id, user_id, product_id, amount)
        reserve_inventory(order_id, product_id, amount)
        mock_payment(order_id, should_fail=payment_fail)
        mark_order_completed(order_id)
        print(f"[Saga] 下单流程成功: {order_id}")
    except Exception as e:
        print(f"[Saga] 流程失败,开始补偿: {e}")
        try:
            compensate_inventory(order_id, product_id, amount)
        except Exception as ce:
            print(f"[Saga] 库存补偿失败: {ce}")
        cancel_order(order_id)

def show_state():
    conn = get_conn()
    cur = conn.cursor()

    print("\n=== orders ===")
    for row in cur.execute("SELECT * FROM orders"):
        print(row)

    print("\n=== inventory ===")
    for row in cur.execute("SELECT * FROM inventory"):
        print(row)

    print("\n=== outbox ===")
    for row in cur.execute("SELECT msg_id, topic, status FROM outbox"):
        print(row)

    conn.close()

if __name__ == "__main__":
    init_db()
    seed_data()

    print("\n--- 场景1:成功下单 ---")
    saga_place_order("u1", "p1", 2, payment_fail=False)
    relay_outbox()
    show_state()

    print("\n--- 场景2:支付失败触发补偿 ---")
    saga_place_order("u2", "p1", 3, payment_fail=True)
    relay_outbox()
    show_state()

如何运行

python3 app.py

这段代码演示了什么

虽然它是简化版,但已经体现出几个关键落地点:

  1. 订单创建与 outbox 写入在同一个本地事务内
  2. 库存扣减与库存事件写入也在同一个本地事务内
  3. Saga 在支付失败时触发补偿
  4. 消费端可通过 processed_messages 做幂等
  5. 消息投递不是“顺手发一下”,而是独立 relay 过程

一个更完整的状态流转设计

很多分布式事务最后难维护,不是因为框架不够强,而是状态设计太随意。建议给每个 Saga 实例明确状态机。

stateDiagram-v2
    [*] --> INIT
    INIT --> ORDER_CREATED
    ORDER_CREATED --> INVENTORY_RESERVED
    INVENTORY_RESERVED --> PAYMENT_PENDING
    PAYMENT_PENDING --> COMPLETED
    PAYMENT_PENDING --> COMPENSATING
    INVENTORY_RESERVED --> COMPENSATING
    ORDER_CREATED --> COMPENSATING
    COMPENSATING --> CANCELLED
    COMPLETED --> [*]
    CANCELLED --> [*]

这张图的意义很大:

  • 你能知道当前卡在哪一步
  • 能知道下一步应该推进还是补偿
  • 重试和人工介入时有依据

我在项目里通常会把 saga_instancesaga_step 两张表单独建出来,而不是只靠日志猜流程走到哪了。


常见坑与排查

这一部分非常重要,因为很多团队“方案设计”都没问题,问题出在边界条件和工程细节

1. 业务成功了,但消息没发出去

常见原因

  • 业务事务提交后,直接调用 MQ 发送,发送前服务宕机
  • MQ 发送失败没有落重试记录
  • 异步线程发消息,主进程退出导致消息丢失

正确姿势

  • 采用 Outbox 模式
  • 业务表与消息表同事务提交
  • 用独立投递器扫描重试

排查路径

  1. 查业务表是否提交成功
  2. 查 outbox 表是否有对应消息
  3. 查 outbox 状态是否停留在 NEW
  4. 查投递器日志、MQ 发送异常、重试次数

2. 消息重复消费,库存被多扣

这是最常见的坑之一。我当时第一次做库存服务时,也踩过:MQ 明明只发了一次,怎么库存少了两次?后来发现其实是消费者超时重投

正确姿势

  • 消费逻辑必须幂等
  • msg_idbusiness_key 建唯一约束
  • 业务更新最好带状态条件

例如:

UPDATE coupon
SET status = 'USED'
WHERE coupon_id = 'c1' AND status = 'LOCKED';

这样即使重复执行,也不会无限改坏数据。


3. 补偿执行了两次

原因

  • 编排器超时后重试
  • 补偿接口本身网络超时,但服务实际已执行成功
  • 人工补偿和自动补偿并发触发

建议

  • 补偿动作也要有补偿记录表
  • 补偿接口按 order_id + step_name 做幂等
  • 人工操作入口必须走统一补偿平台,而不是直接改表

4. 出现“悬挂”与“空补偿”

这个问题在 TCC 更典型,但 Saga 里同样可能出现。

  • 空补偿:原动作没成功,补偿先到了
  • 悬挂:补偿已执行,后续原动作又来了

建议

给每个步骤设计明确的状态位,例如:

  • INIT
  • DONE
  • COMPENSATED

然后所有接口只允许合法状态迁移,非法迁移直接拒绝并记录。


5. 补偿不是严格逆操作

比如支付服务“扣款”后,不一定能立即“退款”成功;优惠券释放后,也可能已过期;库存回补时还要考虑锁定仓、批次仓。

这说明一件事:

补偿事务是业务语义,不是数学反函数。

所以设计补偿时,一定要拉上业务方确认这些边界:

  • 是否允许人工审核后补偿
  • 是否允许延迟补偿
  • 是否需要“退款中”“补偿中”等中间态

安全/性能最佳实践

安全最佳实践

1. 消息体不要裸奔传敏感数据

MQ 里常会流转:

  • 用户手机号
  • 支付信息
  • 身份标识

建议:

  • 只传必要字段
  • 敏感字段脱敏或加密
  • 关键消息加签,防止伪造

2. 补偿接口必须鉴权

很多团队把“补偿接口”当内部接口,结果运维脚本、测试环境账号都能调,风险很高。

建议:

  • 内部接口也做鉴权
  • 接口调用必须带 traceId / operator
  • 所有补偿操作保留审计日志

3. 防止重放攻击与恶意重试

对消息消费和回调处理:

  • 校验消息签名
  • 限制重试频率
  • 对同一业务单号做时间窗控制

性能最佳实践

1. 缩短本地事务时间

本地事务内只做:

  • 核心业务更新
  • outbox 写入

不要在事务里做这些事:

  • 调外部服务
  • 查大量历史数据
  • 复杂报表聚合

2. 补偿优先做成异步

补偿不是为了“立刻完成”,而是为了“最终恢复一致”。因此:

  • 可以先进补偿队列
  • 后台按顺序重试
  • 超阈值再人工介入

这样能降低主链路阻塞。

3. 给 outbox 做分片与归档

业务量一大,outbox 会膨胀得很快。

建议至少考虑:

  • 按时间归档
  • 按状态索引
  • 按业务域拆表
  • relay 批量拉取、批量发送

4. 做容量估算

简单估算方法:

  • 每日订单量:1000 万
  • 每单平均事件数:4
  • 每日 outbox 增量:4000 万

如果单条消息平均 1 KB,那一天就是约 40 GB 级别的数据写入量,还没算索引和重试副本。

所以,不要把 outbox 当一张“顺手建”的小表。它本质上是高频写入的核心基础设施。


落地治理建议

如果你所在团队正准备做分布式事务治理,我建议按下面的顺序落地,而不是一口气追求“全链路完美”。

第一步:先统一最小规范

至少统一这些东西:

  • 全局业务单号
  • 消息唯一 ID
  • 幂等表规范
  • 失败重试次数
  • 死信处理规则
  • traceId 贯穿日志

这一步做完,排查效率会直接提升。

第二步:优先治理高风险链路

优先挑这些链路:

  • 订单、支付、库存
  • 优惠券核销
  • 积分增减
  • 账户余额与额度

因为它们一旦不一致,影响最直观,也最容易被用户投诉。

第三步:建立“补偿平台”而不是散落脚本

成熟团队通常会做统一的补偿治理能力:

  • 失败任务列表
  • 自动重试
  • 人工审核
  • 补偿回放
  • 状态查询
  • 审计留痕

否则最后会变成每个服务都有一套“神秘补偿 SQL”。


边界条件:什么时候别硬上 Saga

Saga 很好,但不是银弹。以下情况要谨慎:

1. 没法定义补偿动作

如果业务上无法撤销,也无法逆向修正,那 Saga 很难做稳。

比如某些外部不可逆操作,一旦调用成功,就只能靠后续人工流程修正。

2. 步骤太多,跨域太散

如果一个 Saga 一口气串 12 个服务、5 个团队,编排器再漂亮也会很难维护。这个时候应该先反思:

  • 领域边界是不是拆错了
  • 是否把同步强依赖做得太多
  • 是否可以缩短事务链路

3. 强一致要求真的很高

例如实时扣款、核心账务记账,如果每一分都不能接受短暂不一致,就不要强行套 Saga。应该优先从:

  • 单一账务中心
  • 余额预留
  • TCC
  • 双分录账本

这些方向设计。


总结

分布式事务治理,真正落地时不是选一个“最厉害的模式”,而是把几件事同时做好:

  • Saga 管好跨服务业务步骤和补偿顺序
  • Outbox + MQ 保证消息最终一致性
  • 幂等设计 防止重复执行
  • 状态机和审计日志 提升可观测性
  • 统一补偿平台 承接异常恢复

如果让我给一个最务实的建议,那就是:

先别追求绝对一致,先把“失败后可恢复”做扎实。

因为在线上系统里,真正可怕的不是失败,而是失败之后你不知道数据错在哪、也不知道怎么补回来。

最后给几个可直接执行的建议:

  1. 新链路默认接入 outbox,而不是“业务成功后直接发 MQ”
  2. 每个补偿接口都必须支持幂等
  3. 每条核心 Saga 链路都要有清晰状态机
  4. 失败任务必须可查询、可重试、可审计
  5. 金额类、库存类、券类链路优先治理

做到这些,分布式事务就不再是“理论很美,线上很乱”,而会变成一套真正能扛业务增长的工程能力。


分享到:

上一篇
《分布式架构下的服务拆分与数据库拆分实战:从单体系统平滑演进到高可用微服务》
下一篇
《Web3 中级实战:从智能合约审计到前端签名验证,构建一套安全的 DApp 登录与授权方案》