跳转到内容
123xiao | 无名键客

《分布式架构中基于 Saga 模式的分布式事务落地实践:从服务拆分到一致性保障》

字数: 0 阅读时长: 1 分钟

分布式架构中基于 Saga 模式的分布式事务落地实践:从服务拆分到一致性保障

在单体应用里,事务这件事通常不太难:一个数据库连接,一个 BEGIN,出了问题 ROLLBACK
但服务一旦拆开,订单、库存、支付、积分各自都有自己的库,问题就变了——你会很快发现,原来“提交成功”只是某个服务自己成功了,不代表整个业务成功

我第一次真正踩到这个坑,是在一个“下单扣库存再扣余额”的流程里:订单服务创建成功了,库存也锁住了,结果支付服务超时。最后用户看到了“支付失败”,但库存还被占着,运营同学第二天就来问:为什么明明没付款,商品卖不出去了?

这篇文章就从这个典型场景出发,讲讲 Saga 模式 在分布式架构里的落地方式:它到底解决什么问题,怎么设计,代码怎么写,最容易踩哪些坑,以及在安全和性能上该怎么做。


背景与问题

服务拆分之后,事务边界断了

典型电商下单流程可以拆成几个服务:

  • 订单服务:创建订单、维护状态
  • 库存服务:预扣库存、释放库存
  • 支付服务:扣款、退款
  • 通知服务:发送支付结果、订单结果

在单体里,这些动作可能在一个本地事务里完成。
但拆成微服务后,每个服务有自己的数据库,传统单库事务不再适用。

为什么不直接上 2PC / XA?

很多人一开始会想到“两阶段提交”。理论上它可以做强一致,但在实际业务系统里常常不合适:

  • 协调器复杂,依赖重
  • 数据库、驱动、中间件兼容性要求高
  • 阻塞明显,吞吐差
  • 一旦出现网络抖动或参与方长时间不可用,系统会很难受

对于大多数互联网业务来说,真正需要的是:

  • 可以接受短时间不一致
  • 最终能恢复到正确状态
  • 故障时有补偿手段
  • 吞吐优先于强一致

这正是 Saga 的适用场景。


方案对比与取舍分析

在开始实现之前,先把边界说清楚。Saga 不是银弹,它适合的是长事务、可补偿、接受最终一致性的业务。

常见方案对比

方案一致性性能实现复杂度适用场景
本地事务强一致单体、单库
2PC/XA强一致低到中核心强一致场景
TCC强一致趋近很高资金、账户、核心资源预留
Saga最终一致订单、库存、履约等长流程

Saga 适合什么,不适合什么

适合:

  • 下单、履约、发券、积分等业务流程
  • 每一步都有明确补偿动作
  • 允许短时间状态不一致

不适合:

  • 绝对不能出现中间态的资金总账
  • 补偿成本极高或不可逆的操作
  • 外部系统不支持幂等和状态查询

一句话总结:
Saga 的本质不是“事务不失败”,而是“失败后系统能回到可接受状态”。


核心原理

Saga 可以理解为:
把一个大事务拆成多个本地事务,每个本地事务成功后进入下一步;如果某一步失败,就按相反顺序执行补偿操作。

一个典型 Saga 链路

以“下单”为例:

  1. 创建订单
  2. 预扣库存
  3. 扣减余额
  4. 确认订单完成

如果第 3 步失败,则触发补偿:

  • 释放库存
  • 取消订单

编排式与协同式

Saga 落地通常有两种风格:

1. 编排式 Saga(Orchestration)

由一个统一的 Saga 协调器决定下一步做什么。

优点:

  • 流程清晰
  • 状态集中
  • 排查容易

缺点:

  • 协调器可能变成核心依赖

2. 协同式 Saga(Choreography)

各服务通过事件驱动,自己监听上一步结果并决定后续动作。

优点:

  • 更松耦合
  • 没有中心协调器单点

缺点:

  • 流程分散
  • 链路排查复杂
  • 事件风暴时难维护

对于中级团队或者刚开始治理分布式事务的系统,我通常建议:
先用编排式,把流程走通、补偿做好、监控建全,再考虑演进到协同式。


Saga 执行流程图

flowchart TD
    A[用户提交订单] --> B[订单服务: 创建订单 PENDING]
    B --> C[库存服务: 预扣库存]
    C --> D[支付服务: 扣款]
    D --> E[订单服务: 更新为 CONFIRMED]
    E --> F[流程成功结束]

    D -- 失败 --> G[库存服务: 释放库存]
    G --> H[订单服务: 更新为 CANCELLED]
    H --> I[流程补偿结束]

    C -- 失败 --> H

状态流转设计

Saga 设计里,状态机非常关键。
如果没有明确状态,线上就会出现“到底该不该补偿”“补偿到哪一步了”“重试是否安全”这些非常头疼的问题。

stateDiagram-v2
    [*] --> PENDING
    PENDING --> STOCK_RESERVED: 库存预扣成功
    STOCK_RESERVED --> PAID: 支付成功
    PAID --> CONFIRMED: 订单确认成功

    STOCK_RESERVED --> CANCELLING: 支付失败
    PENDING --> CANCELLING: 库存失败
    CANCELLING --> CANCELLED: 补偿完成

    PAID --> REFUNDING: 确认失败需回滚
    REFUNDING --> CANCELLED: 退款+释放库存完成

核心原理拆解:落地时必须具备的 4 个能力

1. 本地事务

每一步服务内部,必须先保证自己的数据一致。
比如库存服务执行“预扣库存”时,要在一个本地事务里完成:

  • 检查可用库存
  • 扣减可用库存
  • 增加冻结库存
  • 记录操作日志

2. 补偿事务

补偿不是简单“反向执行”这么粗糙。
比如:

  • 创建订单的补偿:不是物理删除,而是改状态为 CANCELLED
  • 扣库存的补偿:释放冻结库存,不是盲目加库存
  • 扣款的补偿:如果已扣款则退款,如果未扣款则跳过

3. 幂等性

这是 Saga 能不能稳定运行的生命线。

为什么?因为分布式环境里你一定会遇到:

  • 请求超时但实际执行成功
  • 消息重复投递
  • 协调器重试
  • 服务实例重启

所以每个动作和补偿动作都必须是幂等的。
同一个 saga_id + step 重复执行,结果必须一致。

4. 可观测性

如果没有统一的 saga_id、步骤状态、失败原因、重试次数,你的 Saga 在生产环境里会像黑盒一样。
出问题时只能“猜”。


时序图:成功与失败路径

sequenceDiagram
    participant U as 用户
    participant O as Saga协调器
    participant S1 as 订单服务
    participant S2 as 库存服务
    participant S3 as 支付服务

    U->>O: 提交订单
    O->>S1: createOrder(sagaId, orderId)
    S1-->>O: success
    O->>S2: reserveStock(sagaId, orderId)
    S2-->>O: success
    O->>S3: pay(sagaId, orderId)
    S3-->>O: failed

    O->>S2: releaseStock(sagaId, orderId)
    S2-->>O: success
    O->>S1: cancelOrder(sagaId, orderId)
    S1-->>O: success
    O-->>U: 下单失败,已完成补偿

实战代码(可运行)

下面用 Python 做一个可运行的编排式 Saga 示例
这个示例不依赖第三方框架,重点展示:

  • Saga 协调器
  • 步骤定义
  • 正向动作
  • 补偿动作
  • 幂等记录
  • 失败回滚

你可以直接保存为 saga_demo.py 运行。

from dataclasses import dataclass, field
from typing import Callable, List, Dict, Set
import uuid


class BusinessError(Exception):
    pass


@dataclass
class OrderService:
    orders: Dict[str, str] = field(default_factory=dict)
    executed: Set[str] = field(default_factory=set)

    def create_order(self, saga_id: str, order_id: str):
        key = f"{saga_id}:create_order"
        if key in self.executed:
            print("[OrderService] create_order 幂等命中")
            return
        self.orders[order_id] = "PENDING"
        self.executed.add(key)
        print(f"[OrderService] 订单已创建: {order_id}, status=PENDING")

    def cancel_order(self, saga_id: str, order_id: str):
        key = f"{saga_id}:cancel_order"
        if key in self.executed:
            print("[OrderService] cancel_order 幂等命中")
            return
        if order_id in self.orders and self.orders[order_id] != "CANCELLED":
            self.orders[order_id] = "CANCELLED"
        self.executed.add(key)
        print(f"[OrderService] 订单已取消: {order_id}")

    def confirm_order(self, saga_id: str, order_id: str):
        key = f"{saga_id}:confirm_order"
        if key in self.executed:
            print("[OrderService] confirm_order 幂等命中")
            return
        if order_id not in self.orders:
            raise BusinessError("订单不存在")
        self.orders[order_id] = "CONFIRMED"
        self.executed.add(key)
        print(f"[OrderService] 订单已确认: {order_id}")


@dataclass
class InventoryService:
    stock: Dict[str, int] = field(default_factory=lambda: {"apple": 10})
    reserved: Dict[str, int] = field(default_factory=dict)
    executed: Set[str] = field(default_factory=set)

    def reserve_stock(self, saga_id: str, order_id: str, sku: str, count: int):
        key = f"{saga_id}:reserve_stock"
        if key in self.executed:
            print("[InventoryService] reserve_stock 幂等命中")
            return
        if self.stock.get(sku, 0) < count:
            raise BusinessError("库存不足")
        self.stock[sku] -= count
        self.reserved[order_id] = self.reserved.get(order_id, 0) + count
        self.executed.add(key)
        print(f"[InventoryService] 已预扣库存: sku={sku}, count={count}")

    def release_stock(self, saga_id: str, order_id: str, sku: str):
        key = f"{saga_id}:release_stock"
        if key in self.executed:
            print("[InventoryService] release_stock 幂等命中")
            return
        count = self.reserved.get(order_id, 0)
        if count > 0:
            self.stock[sku] = self.stock.get(sku, 0) + count
            self.reserved[order_id] = 0
        self.executed.add(key)
        print(f"[InventoryService] 已释放库存: sku={sku}, count={count}")


@dataclass
class PaymentService:
    balances: Dict[str, int] = field(default_factory=lambda: {"u1001": 100})
    paid_orders: Set[str] = field(default_factory=set)
    executed: Set[str] = field(default_factory=set)

    def pay(self, saga_id: str, user_id: str, order_id: str, amount: int):
        key = f"{saga_id}:pay"
        if key in self.executed:
            print("[PaymentService] pay 幂等命中")
            return
        if self.balances.get(user_id, 0) < amount:
            raise BusinessError("余额不足")
        self.balances[user_id] -= amount
        self.paid_orders.add(order_id)
        self.executed.add(key)
        print(f"[PaymentService] 支付成功: user={user_id}, amount={amount}")

    def refund(self, saga_id: str, user_id: str, order_id: str, amount: int):
        key = f"{saga_id}:refund"
        if key in self.executed:
            print("[PaymentService] refund 幂等命中")
            return
        if order_id in self.paid_orders:
            self.balances[user_id] = self.balances.get(user_id, 0) + amount
            self.paid_orders.remove(order_id)
        self.executed.add(key)
        print(f"[PaymentService] 已退款: user={user_id}, amount={amount}")


@dataclass
class SagaStep:
    name: str
    action: Callable[[], None]
    compensation: Callable[[], None]


class SagaOrchestrator:
    def __init__(self):
        self.completed_steps: List[SagaStep] = []

    def execute(self, steps: List[SagaStep]):
        try:
            for step in steps:
                print(f"\n[Saga] 执行步骤: {step.name}")
                step.action()
                self.completed_steps.append(step)
            print("\n[Saga] 全部步骤执行成功")
        except Exception as e:
            print(f"\n[Saga] 步骤失败: {e}")
            self.compensate()
            raise

    def compensate(self):
        print("\n[Saga] 开始补偿...")
        for step in reversed(self.completed_steps):
            try:
                print(f"[Saga] 补偿步骤: {step.name}")
                step.compensation()
            except Exception as ce:
                print(f"[Saga] 补偿失败(需人工介入): {step.name}, error={ce}")
        print("[Saga] 补偿结束")


def run_demo(balance: int):
    saga_id = str(uuid.uuid4())
    order_id = "ORD-10001"
    user_id = "u1001"
    sku = "apple"
    count = 2
    amount = 80

    order_service = OrderService()
    inventory_service = InventoryService()
    payment_service = PaymentService(balances={user_id: balance})

    orchestrator = SagaOrchestrator()

    steps = [
        SagaStep(
            name="创建订单",
            action=lambda: order_service.create_order(saga_id, order_id),
            compensation=lambda: order_service.cancel_order(saga_id, order_id)
        ),
        SagaStep(
            name="预扣库存",
            action=lambda: inventory_service.reserve_stock(saga_id, order_id, sku, count),
            compensation=lambda: inventory_service.release_stock(saga_id, order_id, sku)
        ),
        SagaStep(
            name="支付扣款",
            action=lambda: payment_service.pay(saga_id, user_id, order_id, amount),
            compensation=lambda: payment_service.refund(saga_id, user_id, order_id, amount)
        ),
        SagaStep(
            name="确认订单",
            action=lambda: order_service.confirm_order(saga_id, order_id),
            compensation=lambda: order_service.cancel_order(saga_id, order_id)
        ),
    ]

    try:
        orchestrator.execute(steps)
    except Exception:
        print("[Demo] Saga 执行失败")
    finally:
        print("\n[最终状态]")
        print("orders =", order_service.orders)
        print("stock =", inventory_service.stock)
        print("reserved =", inventory_service.reserved)
        print("balances =", payment_service.balances)


if __name__ == "__main__":
    print("==== 场景1:余额充足,成功 ====")
    run_demo(balance=100)

    print("\n\n==== 场景2:余额不足,触发补偿 ====")
    run_demo(balance=50)

运行效果说明

  • 当余额为 100 时,流程成功:

    • 订单创建成功
    • 库存预扣成功
    • 支付成功
    • 订单确认成功
  • 当余额为 50 时,支付失败:

    • 已执行的“创建订单”“预扣库存”会触发补偿
    • 最终订单被取消,库存恢复

这个示例是内存版,但它已经把 Saga 的几个关键点体现出来了:

  1. 每一步都有补偿动作
  2. 补偿逆序执行
  3. 动作和补偿都带幂等键
  4. 协调器负责整体推进和失败回滚

生产环境如何真正落地

上面的代码是“原理版”,生产里还要补几个关键构件。

1. Saga 日志表

你至少需要一张表记录事务实例和步骤执行情况。

CREATE TABLE saga_instance (
  saga_id VARCHAR(64) PRIMARY KEY,
  business_key VARCHAR(128) NOT NULL,
  status VARCHAR(32) NOT NULL,
  created_at TIMESTAMP NOT NULL,
  updated_at TIMESTAMP NOT NULL
);

CREATE TABLE saga_step_log (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  saga_id VARCHAR(64) NOT NULL,
  step_name VARCHAR(64) NOT NULL,
  action_status VARCHAR(32) NOT NULL,
  compensation_status VARCHAR(32) NOT NULL,
  retry_count INT NOT NULL DEFAULT 0,
  error_message VARCHAR(512),
  updated_at TIMESTAMP NOT NULL,
  UNIQUE KEY uk_saga_step (saga_id, step_name)
);

建议状态值至少包括:

  • INIT
  • RUNNING
  • SUCCESS
  • FAILED
  • COMPENSATING
  • COMPENSATED
  • PARTIAL_FAILED

2. 业务去重键

幂等不能只靠“感觉”。
实际系统里,建议使用这些维度之一:

  • saga_id + step_name
  • business_order_no + action_type
  • 消息唯一 ID

3. Outbox / 可靠消息

如果你的 Saga 是事件驱动的,那么“本地事务提交了,但消息没发出去”会直接破坏一致性。
这时建议引入 Outbox 模式

  • 业务数据和消息记录写入同一个本地事务
  • 后台任务异步把 Outbox 推送到 MQ
  • 消费端做幂等

常见坑与排查

这是我觉得最值得说的一部分。很多系统不是不会写 Saga,而是写出来后线上不稳定

坑 1:把补偿当成“简单回滚”

这是最常见误区。

例如支付服务:

  • 正向动作:扣款
  • 补偿动作:退款

但退款不等于回滚数据库字段。
因为支付可能已经影响外部渠道、清算流水、账单记录。补偿应该是一笔新的业务动作,而不是试图把历史“抹掉”。

排查建议:

  • 补偿动作是否有独立流水号?
  • 补偿是否可审计?
  • 补偿后状态是否闭环?

坑 2:补偿失败后没有兜底

如果释放库存时又失败了怎么办?
很多系统写到这里就结束了,结果库存长期冻结。

建议做法:

  • 补偿失败进入 COMPENSATION_RETRY
  • 启动定时任务继续重试
  • 超过阈值后告警并人工介入
  • 提供后台补偿工具

我自己的经验是:
自动重试 + 人工补偿台 几乎是必须的,不要幻想线上不会出极端状态。


坑 3:没有幂等,重试越多越乱

典型现象:

  • 重试一次,库存多扣一次
  • 重试一次,用户多退一笔钱
  • MQ 重复消费,订单状态反复跳变

排查路径:

  1. 看请求是否带全局唯一 ID
  2. 看服务端是否落幂等记录
  3. 看数据库是否有唯一索引约束
  4. 看消费逻辑是否先检查状态再执行业务

坑 4:状态机不完整

很多项目一开始只定义:

  • 成功
  • 失败

实际上这远远不够。
你至少要区分:

  • 执行中
  • 执行成功
  • 执行失败
  • 补偿中
  • 补偿成功
  • 补偿失败
  • 待人工处理

否则线上一出问题,根本不知道该重试正向还是重试补偿。


坑 5:超时与“假失败”

一个非常真实的线上场景:

  • 协调器调用支付服务
  • 5 秒没返回,协调器判定失败并开始补偿
  • 第 6 秒支付服务其实扣款成功了

于是你会得到一个极其尴尬的状态:
订单取消了,但用户的钱扣了。

排查建议:

  • 远程调用失败后,不要立刻把结果当成“业务失败”
  • 优先设计“可查询状态”的接口
  • 对于未知结果,先进入 UNKNOWN 状态,异步对账确认

这件事非常重要:
分布式系统里,超时通常表示“未知”,不等于“失败”。


安全/性能最佳实践

Saga 主要讨论一致性,但安全和性能也不能忽略。

安全最佳实践

1. 补偿接口必须鉴权

补偿接口不是普通业务接口。
如果被错误调用,可能直接释放库存、取消订单、退款。

建议:

  • 只允许内网调用
  • 做服务间身份认证
  • 校验 saga_id、业务主键、签名
  • 落审计日志

2. 避免敏感信息在 Saga 日志中裸奔

不要在 saga 日志、消息体中直接存:

  • 明文手机号
  • 银行卡号
  • 身份证号
  • 支付凭证原文

日志里保留必要业务键即可。

3. 人工补偿台要有权限分级

运营可查看,不一定可执行。
执行补偿建议至少做到:

  • RBAC 权限控制
  • 双人审批(高风险操作)
  • 操作审计可追溯

性能最佳实践

1. 缩短 Saga 链路

参与服务越多,失败概率越高。
能合并的步骤尽量合并,能本地化的不远程化。

例如:

  • “创建订单 + 初始化订单明细”放在一个服务内本地事务完成
  • 不要把非常细碎的状态更新拆成多个远程步骤

2. 补偿逻辑要轻量

补偿不是做全量重算。
它应该尽可能只做必要的逆向修正,减少资源占用和锁竞争。

3. 使用异步推进非关键步骤

比如通知短信、站内信、积分赠送,很多时候不必放进主 Saga 链路。
否则会拖慢核心交易路径。

4. 控制重试策略

不要无脑立即重试。建议使用:

  • 指数退避
  • 最大重试次数
  • 熔断与降级
  • 死信队列

容量估算与治理建议

Saga 系统除了能跑,还得考虑规模。

一个简单估算思路

假设:

  • 峰值下单 QPS:500
  • 每笔订单平均 4 个 Saga 步骤
  • 每步平均 1 次状态写入
  • 失败率 2%,失败补偿平均 2 步

那么每秒大约产生:

  • 正向步骤写入:500 * 4 = 2000
  • 补偿写入:500 * 2% * 2 = 20

总计约 2020 次步骤记录写入/秒。

这意味着:

  • saga 日志表要分库分表或按时间归档
  • 索引不能过多
  • 热点业务键要避免集中冲突
  • 监控和查询系统不能直接压主库

我建议优先治理的 3 件事

  1. 先把状态机建完整
  2. 所有动作都做幂等
  3. 建立失败重试和人工补偿闭环

如果这三件事没做好,系统规模越大,事故只会越频繁。


一个更接近真实项目的落地清单

如果你准备在项目里上 Saga,可以按下面这个顺序推进:

第一阶段:跑通最小闭环

  • 定义 saga_id
  • 明确每一步 action / compensation
  • 设计状态机
  • 完成幂等处理
  • 做基础日志记录

第二阶段:补足可靠性

  • 引入 Saga 实例表和步骤日志表
  • 加重试机制
  • 增加超时与未知状态处理
  • 接入告警和链路追踪

第三阶段:工程化治理

  • 对接 Outbox/MQ
  • 增加人工补偿后台
  • 做对账任务
  • 建立容量评估和归档策略

总结

Saga 模式的价值,不在于把分布式系统变成“看起来像单机事务”,而在于它提供了一套可恢复、可追踪、可扩展的一致性解决方案。

你可以把它记成三句话:

  1. 把大事务拆成多个本地事务
  2. 每一步都必须有可执行的补偿
  3. 用幂等、状态机、日志和重试把最终一致性兜住

如果你正在做订单、库存、支付、履约这类流程,我的建议很直接:

  • 优先选择编排式 Saga 起步
  • 先收敛业务状态机,再谈框架选型
  • 补偿要当成正式业务能力建设,不是异常分支凑合写一下
  • 对于“超时”的情况,永远先当成“未知”处理

最后说个很现实的边界条件:
如果你的场景是“总账绝对准确,任何中间态都不可接受”,那 Saga 往往不是最佳答案,应该评估 TCC 或更强一致的方案。
但如果你做的是大多数互联网交易流程,Saga 几乎是绕不过去的一课,而且值得认真做好。


分享到:

上一篇
《Java开发踩坑实战:排查并修复线程池误用导致的接口超时与内存飙升问题-310》
下一篇
《Java开发踩坑实录:线程池参数误配导致服务雪崩的排查与优化实践》