跳转到内容
123xiao | 无名键客

《分布式架构中基于 Saga 模式的分布式事务实战:从一致性设计到失败补偿落地》

字数: 0 阅读时长: 1 分钟

分布式架构中基于 Saga 模式的分布式事务实战:从一致性设计到失败补偿落地

在单体时代,我们习惯了一个本地事务把“创建订单、扣库存、冻结余额”一把梭地提交。可一旦拆成多个服务,事情就不再简单:订单服务、库存服务、支付服务各自有数据库,各自有可用性目标,再指望一个跨库强一致事务兜底,往往不现实,甚至会把系统拖慢。

这时候,Saga 模式就很常见了。它不是“强一致的银弹”,但它在业务可接受“最终一致”的场景里,确实是非常实用的一种工程解法。本文我会从架构视角,把 Saga 的设计原则、补偿策略、可运行代码示例、排查思路和性能/安全实践串起来,尽量讲得“能落地”。


背景与问题

先看一个典型业务:用户下单。

完整链路通常包括:

  1. 订单服务创建订单
  2. 库存服务预扣库存
  3. 支付服务冻结或扣减余额
  4. 最终确认订单成功

如果其中任何一步失败,前面已经成功的步骤就要“撤销”或者“补偿”。

为什么传统分布式事务不总是合适

很多团队一开始会想到 2PC/XA,但在微服务里常见问题是:

  • 耦合重:所有参与方都要支持同一种事务协议
  • 阻塞明显:协调者卡住会影响整体吞吐
  • 可用性下降:网络抖动、超时后恢复复杂
  • 云原生环境不友好:跨服务、跨存储、中间件异构时实现成本高

而业务真正要的,很多时候不是“绝对同步一致”,而是:

  • 核心数据别丢
  • 失败时可恢复
  • 状态能追踪
  • 延迟在可接受范围内

Saga 恰好适合这个区间。

Saga 解决的是什么问题

Saga 把一个长事务拆成多个本地事务。每个本地事务成功后,继续下一步;如果某一步失败,就执行前面步骤对应的补偿动作

它解决的是:

  • 跨服务事务边界难统一
  • 强一致成本太高
  • 业务允许短暂不一致,但必须最终收敛

但它也带来新的要求:

  • 补偿逻辑要真的可执行
  • 接口必须支持幂等
  • 状态机必须明确
  • 监控与重试机制必须完备

方案对比与取舍分析

在落地 Saga 之前,最好先明确它和其他方案的边界。

方案一致性性能实现复杂度适用场景
本地事务强一致单库单服务
2PC/XA强一致中低少量关键系统、同构存储
TCC较强一致很高核心资金、资源预留明确
Saga最终一致订单、履约、营销等流程型业务

什么时候更适合 Saga

适合:

  • 业务流程长,跨多个服务
  • 各步骤天然是业务动作,而不是数据库层事务
  • 能接受秒级、分钟级最终一致
  • 可以明确定义“补偿”语义

不太适合:

  • 资金实时账务的最终落账
  • 不允许任何中间态暴露的核心交易
  • 某些动作无法撤销,且也无法设计替代补偿

一句话概括:如果你的业务动作本身就像一串工作流,Saga 通常比 XA 更顺手;但如果每一步都要求严格锁定资源、强同步提交,那更接近 TCC 或专用账务设计。


核心原理

Saga 有两种主流组织方式:

  1. Choreography(事件编排/舞蹈式)

    • 各服务通过事件自行驱动
    • 去中心化,耦合低
    • 但流程全貌不易追踪,复杂场景下容易“隐式耦合”
  2. Orchestration(中心编排/指挥式)

    • 由一个 Saga Orchestrator 统一调度
    • 流程清晰,便于监控与补偿
    • 编排器本身成为关键组件

中级读者在实战里,我更建议优先理解 Orchestration,因为它更好讲清楚设计边界,也更利于排障。

Saga 的核心模型

一个 Saga 由若干步骤组成,每个步骤至少包含:

  • 正向操作(Do)
  • 补偿操作(Compensate)
  • 幂等键(Idempotency Key)
  • 超时与重试策略
  • 当前状态

可以把它理解成一个“可前进、可回滚、可观察”的状态机。

stateDiagram-v2
    [*] --> Pending
    Pending --> OrderCreated: 创建订单成功
    OrderCreated --> StockReserved: 预扣库存成功
    StockReserved --> PaymentCharged: 扣款成功
    PaymentCharged --> Completed: 全部完成

    OrderCreated --> Compensating: 库存失败
    StockReserved --> Compensating: 支付失败
    PaymentCharged --> Compensating: 后续确认失败

    Compensating --> Compensated: 补偿完成
    Compensating --> Failed: 补偿异常
    Completed --> [*]
    Compensated --> [*]
    Failed --> [*]

一个关键认知:补偿不是“回滚”

这是最容易踩坑的地方。

数据库事务里的 rollback 是系统级保证;Saga 里的 compensation 是业务级反向动作。比如:

  • 创建订单的补偿:关闭订单
  • 扣减库存的补偿:归还库存
  • 扣款的补偿:退款或解冻

它们不是同一件事。

比如支付已经走到第三方渠道,就算你在自己库里回滚,也不代表钱真的回来了。所以我做 Saga 设计时,一定会问一句:

这个步骤的“补偿动作”是否可验证、可追踪、可重试?

如果答案是否定的,那这个步骤还没准备好进 Saga。


一致性设计:先定义状态,再定义接口

很多团队上来就写代码,其实最该先画的是“状态图”和“失败路径”。

推荐的设计顺序

  1. 定义业务主状态
  2. 定义每一步的成功条件
  3. 定义补偿条件
  4. 定义补偿完成标识
  5. 定义重试边界
  6. 定义人工介入规则

示例:订单 Saga 状态

订单可以有:

  • PENDING
  • CREATED
  • STOCK_RESERVED
  • PAID
  • COMPLETED
  • CANCELING
  • CANCELED
  • FAILED

每个状态都应当有明确来源,避免“任意跳转”。

flowchart LR
    A[创建订单] --> B[预扣库存]
    B --> C[扣减余额]
    C --> D[确认完成]

    B -.失败.-> B1[关闭订单]
    C -.失败.-> C1[归还库存]
    C1 --> C2[关闭订单]
    D -.失败.-> D1[退款]
    D1 --> D2[归还库存]
    D2 --> D3[关闭订单]

接口设计的三个硬要求

1. 幂等

同一个 Saga 步骤被重复调用,结果必须一致。

例如库存预扣接口应支持:

  • sagaId + stepName 作为唯一幂等键
  • 已成功则直接返回成功
  • 已补偿则返回当前状态,而不是再次扣减

2. 可查询

每个步骤都需要状态查询接口,用于:

  • 补偿前确认是否已经执行
  • 超时后判断是否“未知成功”
  • 故障恢复时对账

3. 可补偿

补偿接口不能只写“TODO”。它必须满足:

  • 能重复执行
  • 能处理部分成功
  • 能记录补偿结果
  • 能支持延迟补偿

实战代码(可运行)

下面给一个可直接运行的 Python 示例,模拟一个基于编排器的 Saga:

  • 创建订单
  • 预扣库存
  • 扣款
  • 如果扣款失败,则依次补偿库存、关闭订单

这不是生产级框架,但足够把设计思路讲清楚。

运行方式

保存为 saga_demo.py,用 Python 3 运行:

python saga_demo.py

完整代码

import uuid
from dataclasses import dataclass, field
from typing import Callable, List, Dict


class SagaError(Exception):
    pass


@dataclass
class SagaStep:
    name: str
    action: Callable[[], None]
    compensate: Callable[[], None]


@dataclass
class SagaContext:
    saga_id: str
    data: Dict = field(default_factory=dict)


class OrderService:
    def __init__(self):
        self.orders = {}
        self.idempotency = set()

    def create_order(self, ctx: SagaContext):
        key = (ctx.saga_id, "create_order")
        if key in self.idempotency:
            print("[OrderService] create_order 幂等返回")
            return

        order_id = str(uuid.uuid4())[:8]
        self.orders[order_id] = "CREATED"
        ctx.data["order_id"] = order_id
        self.idempotency.add(key)
        print(f"[OrderService] 订单已创建: {order_id}")

    def cancel_order(self, ctx: SagaContext):
        order_id = ctx.data.get("order_id")
        if not order_id:
            print("[OrderService] 无订单可取消,跳过")
            return

        current = self.orders.get(order_id)
        if current in ("CANCELED", None):
            print(f"[OrderService] cancel_order 幂等返回: {order_id}")
            return

        self.orders[order_id] = "CANCELED"
        print(f"[OrderService] 订单已取消: {order_id}")


class InventoryService:
    def __init__(self):
        self.stock = {"itemA": 10}
        self.reservations = {}
        self.idempotency = set()

    def reserve(self, ctx: SagaContext, item_id: str, quantity: int):
        key = (ctx.saga_id, "reserve_stock")
        if key in self.idempotency:
            print("[InventoryService] reserve 幂等返回")
            return

        if self.stock.get(item_id, 0) < quantity:
            raise SagaError("库存不足")

        self.stock[item_id] -= quantity
        self.reservations[ctx.saga_id] = (item_id, quantity)
        self.idempotency.add(key)
        print(f"[InventoryService] 库存预扣成功: {item_id} x {quantity}")

    def release(self, ctx: SagaContext):
        reservation = self.reservations.get(ctx.saga_id)
        if not reservation:
            print("[InventoryService] 无库存预扣记录,跳过")
            return

        item_id, quantity = reservation
        self.stock[item_id] += quantity
        del self.reservations[ctx.saga_id]
        print(f"[InventoryService] 库存已归还: {item_id} x {quantity}")


class PaymentService:
    def __init__(self, fail=False):
        self.balances = {"user1": 100}
        self.charges = {}
        self.idempotency = set()
        self.fail = fail

    def charge(self, ctx: SagaContext, user_id: str, amount: int):
        key = (ctx.saga_id, "charge_payment")
        if key in self.idempotency:
            print("[PaymentService] charge 幂等返回")
            return

        if self.fail:
            raise SagaError("模拟支付失败")

        if self.balances.get(user_id, 0) < amount:
            raise SagaError("余额不足")

        self.balances[user_id] -= amount
        self.charges[ctx.saga_id] = (user_id, amount)
        self.idempotency.add(key)
        print(f"[PaymentService] 扣款成功: {user_id} -{amount}")

    def refund(self, ctx: SagaContext):
        charge = self.charges.get(ctx.saga_id)
        if not charge:
            print("[PaymentService] 无扣款记录,跳过")
            return

        user_id, amount = charge
        self.balances[user_id] += amount
        del self.charges[ctx.saga_id]
        print(f"[PaymentService] 退款成功: {user_id} +{amount}")


class SagaOrchestrator:
    def __init__(self, ctx: SagaContext):
        self.ctx = ctx
        self.steps: List[SagaStep] = []
        self.completed_steps: List[SagaStep] = []

    def add_step(self, step: SagaStep):
        self.steps.append(step)

    def execute(self):
        try:
            for step in self.steps:
                print(f"\n>>> 执行步骤: {step.name}")
                step.action()
                self.completed_steps.append(step)

            print(f"\n[Saga] 执行成功, saga_id={self.ctx.saga_id}")
            return True

        except Exception as e:
            print(f"\n[Saga] 执行失败: {e}")
            self.compensate()
            return False

    def compensate(self):
        print(f"[Saga] 开始补偿, saga_id={self.ctx.saga_id}")
        for step in reversed(self.completed_steps):
            try:
                print(f"<<< 补偿步骤: {step.name}")
                step.compensate()
            except Exception as e:
                print(f"[Saga] 补偿失败: step={step.name}, err={e}")
        print("[Saga] 补偿结束")


def run_demo(payment_fail=False):
    ctx = SagaContext(saga_id=str(uuid.uuid4()))

    order_service = OrderService()
    inventory_service = InventoryService()
    payment_service = PaymentService(fail=payment_fail)

    saga = SagaOrchestrator(ctx)

    saga.add_step(SagaStep(
        name="创建订单",
        action=lambda: order_service.create_order(ctx),
        compensate=lambda: order_service.cancel_order(ctx)
    ))

    saga.add_step(SagaStep(
        name="预扣库存",
        action=lambda: inventory_service.reserve(ctx, "itemA", 2),
        compensate=lambda: inventory_service.release(ctx)
    ))

    saga.add_step(SagaStep(
        name="扣减余额",
        action=lambda: payment_service.charge(ctx, "user1", 30),
        compensate=lambda: payment_service.refund(ctx)
    ))

    result = saga.execute()

    print("\n====== 最终状态 ======")
    print("result =", result)
    print("ctx =", ctx.data)
    print("orders =", order_service.orders)
    print("stock =", inventory_service.stock)
    print("balances =", payment_service.balances)


if __name__ == "__main__":
    print("========== 场景1:全部成功 ==========")
    run_demo(payment_fail=False)

    print("\n\n========== 场景2:支付失败,触发补偿 ==========")
    run_demo(payment_fail=True)

这段代码体现了什么

这个例子重点不是“功能多完整”,而是把 Saga 的几个关键点做出来:

  • 有统一的 SagaOrchestrator
  • 每个步骤同时定义 actioncompensate
  • 失败后按逆序补偿
  • 每个服务都做了一个最基础的幂等处理
  • 补偿允许“无记录跳过”,避免二次失败放大

生产环境还要补什么

如果要真正上线,至少还要加:

  • Saga 执行日志落库
  • 步骤状态持久化
  • 超时控制
  • 失败重试队列
  • 死信队列
  • 人工补偿后台
  • 审计日志

一次完整交互长什么样

下面用时序图看一遍指挥式 Saga 的成功与失败路径。

sequenceDiagram
    participant O as Orchestrator
    participant Order as 订单服务
    participant Inv as 库存服务
    participant Pay as 支付服务

    O->>Order: createOrder(sagaId)
    Order-->>O: success

    O->>Inv: reserveStock(sagaId)
    Inv-->>O: success

    O->>Pay: charge(sagaId)
    alt 支付成功
        Pay-->>O: success
        O-->>O: mark COMPLETED
    else 支付失败
        Pay-->>O: fail
        O->>Inv: releaseStock(sagaId)
        Inv-->>O: success
        O->>Order: cancelOrder(sagaId)
        Order-->>O: success
        O-->>O: mark COMPENSATED
    end

容量估算与架构落地建议

做架构设计时,除了流程正确,还要考虑规模。

粗略容量模型

假设:

  • 每天 100 万笔订单
  • 每个 Saga 平均 3 个正向步骤
  • 失败率 2%
  • 每次失败平均触发 2 个补偿步骤

那么一天调用量大约是:

  • 正向调用:100万 × 3 = 300万
  • 补偿调用:100万 × 2% × 2 = 4万

总计约 304 万次服务调用。

如果你再加上:

  • 状态查询
  • 重试
  • 事件投递
  • 审计日志

实际链路压力会比“业务接口数量”大得多。我见过一些系统压测只看下单 QPS,不看失败重试,结果线上一抖动,补偿风暴直接把自己打趴下。

三个非常实用的容量建议

1. 为补偿链路单独限流

补偿本来是救火的,别让它反过来把主链路挤死。

2. Saga 状态表要按时间或业务维度分片/归档

否则状态查询、扫描重试、人工排障都会越来越慢。

3. 重试必须指数退避

不要 1 秒一次无脑打下游。网络抖动时,这种“热心重试”通常只会雪上加霜。


常见坑与排查

这部分我想写得更接地气一点,因为很多问题不是不会写代码,而是“以为能补偿,实际上补不了”。

坑 1:补偿接口不是幂等的

现象

  • 一个订单被重复取消
  • 库存被多次归还,导致库存为正向漂移
  • 退款重复发起

根因

补偿接口只考虑了“正常调用一次”,没考虑:

  • 超时后重试
  • 消息重复投递
  • 恢复任务重复扫描

排查思路

看日志里是否存在:

  • 同一个 sagaId
  • 同一个 step
  • 多次执行成功

如果有,就说明幂等设计有缺口。

解决建议

  • 为每个步骤建立唯一业务键
  • 补偿前先查状态
  • 成功后写入幂等记录
  • 对外部系统调用保留请求号

坑 2:步骤超时后,实际状态未知

现象

编排器收到超时,但下游其实已经成功执行。

这会导致很尴尬的局面:

  • 编排器以为失败,开始补偿
  • 下游实际上成功,形成“成功后又被反向补偿”

这是 Saga 里最常见的灰度状态问题

我一般建议把调用结果拆成三类:

  • SUCCESS
  • FAIL
  • UNKNOWN

UNKNOWN,不要立刻补偿,而是先走状态查询

排查路径

  1. 查请求日志
  2. 查下游操作日志
  3. 查状态表是否已落库
  4. 查消息是否已投递成功
  5. 比对 traceId / sagaId

建议策略

  • 先查询后补偿
  • 未知状态进入待确认队列
  • 超过阈值再人工介入

坑 3:补偿顺序错了

现象

  • 先取消订单,再归还库存,导致订单状态和库存状态短时间不匹配
  • 先退款失败,又把订单标记已取消,后续对账困难

原则

补偿顺序必须与正向执行顺序相反。

因为后执行的步骤往往依赖前一步上下文。


坑 4:把“技术异常”和“业务失败”混在一起

比如:

  • 库存不足:这是业务失败,不该无限重试
  • RPC 超时:这是技术异常,可以重试
  • 参数非法:这是调用方问题,应快速失败

如果不区分,重试器就会对“库存不足”反复重试,纯属制造噪音。

建议分类

  • BusinessReject
  • RetryableException
  • FatalException

不同类型走不同补偿与告警策略。


坑 5:没有人工兜底后台

只靠自动补偿是不够的。生产环境里,总会有少量单据卡在:

  • 第三方接口长时间不一致
  • 补偿接口连续失败
  • 关键日志缺失
  • 数据已经部分人工修复

这时候必须有:

  • 按 sagaId 检索
  • 查看每一步状态
  • 手动触发重试/补偿
  • 加备注留痕

没有这个后台,线上值班会非常痛苦。


安全/性能最佳实践

Saga 讨论里,大家常盯着一致性,但安全和性能同样重要。

安全实践

1. 补偿接口必须鉴权

不要因为它是“内部接口”就裸奔。补偿接口一旦被误调,破坏性很大。

建议:

  • 服务间 mTLS
  • 网关签名校验
  • 操作来源审计
  • 仅允许编排器或消息消费端调用

2. 幂等键不要可预测

如果对外暴露,可预测的幂等键可能被恶意重放。

建议使用:

  • UUID
  • 雪花 ID
  • 带签名的业务请求号

3. 审计日志与业务日志分开存储

审计日志强调不可篡改、可追责;业务日志强调查询效率。不要混为一谈。


性能实践

1. 优先本地事务 + Outbox

在每个服务内部,更新业务数据和写待发送事件应放在同一个本地事务中。

这样能减少:

  • 状态已改但事件没发出去
  • 事件发出去了但本地状态没落库

这是 Saga 稳定性的基础设施之一。

2. 读写分离状态表

状态查询非常频繁,尤其在重试与排障阶段。可以考虑:

  • 热数据放主表
  • 历史数据归档
  • 二级索引按 saga_id / status / next_retry_time

3. 减少长链路同步等待

不是每一步都要同步 RPC。某些天然可异步的步骤,用消息驱动更稳。

经验上:

  • 用户强感知链路:尽量短
  • 后置确认链路:尽量异步

4. 监控补偿率,而不只监控错误率

有些系统表面 200 OK 很高,但补偿率越来越高,说明业务已经“带病运行”。

建议重点看:

  • Saga 成功率
  • 补偿率
  • 超时率
  • UNKNOWN 状态占比
  • 平均补偿耗时
  • 卡单量

一个更贴近生产的落地清单

如果你准备把 Saga 用在真实业务里,我建议至少检查下面这些项。

设计层

  • 每个步骤都有明确补偿动作
  • 补偿动作已定义幂等规则
  • 状态流转图已评审
  • 业务失败与技术失败已区分
  • 已定义人工介入场景

存储层

  • Saga 主表记录整体状态
  • Saga 步骤表记录每一步状态
  • 幂等表或唯一约束已建立
  • 重试任务表支持退避调度

运行层

  • 日志统一带 traceIdsagaId
  • 指标覆盖成功率、超时率、补偿率
  • 死信队列已接入告警
  • 有人工补偿后台
  • 有对账作业做最终收敛

总结

Saga 模式的价值,不在于“模拟数据库回滚”,而在于它提供了一套面向业务流程的最终一致性实现方式

  • 用多个本地事务代替全局强事务
  • 用补偿动作处理失败链路
  • 用幂等、状态机、重试和审计保障可恢复性

如果你让我给出几条最实用的建议,我会总结成下面这几句:

  1. 先画状态图,再写代码。
  2. 补偿不是 rollback,而是业务反向操作。
  3. 所有步骤都要幂等,所有未知状态都要可查询。
  4. 把自动补偿和人工兜底一起设计。
  5. 补偿链路要单独限流,不要让故障扩散。

最后也要说清边界:Saga 适合最终一致的流程型业务,不适合一切场景。对于核心资金、不可逆资源、强实时账务,你可能需要 TCC、账务系统、预留资源模型,甚至更严格的领域设计。

如果你现在正在做订单、库存、优惠券、履约这类系统,Saga 往往是一个很值得认真掌握的实战方案。它不花哨,但真的能扛事。


分享到:

上一篇
《前端性能实战:基于 Core Web Vitals 的页面加载优化与监控体系搭建》
下一篇
《Java Web 开发中基于 Spring Boot + JWT 的登录鉴权与权限控制实战指南》