跳转到内容
123xiao | 无名键客

《微服务架构中分布式事务的实战方案:基于 Saga 模式的设计、实现与落地优化》

字数: 0 阅读时长: 1 分钟

微服务架构中分布式事务的实战方案:基于 Saga 模式的设计、实现与落地优化

在单体应用里,事务这件事往往不太“惹眼”:一个数据库连接、一个本地事务,提交或回滚就结束了。
但到了微服务架构里,订单、库存、支付、积分、优惠券分别拆成独立服务后,事情一下子就复杂了:

  • 订单创建成功了,但库存扣减失败怎么办?
  • 支付成功了,但订单状态更新超时怎么办?
  • 某一步执行成功,后续失败了,是不是得“撤销前面做过的事”?
  • 网络抖动、消息重复、服务重试,会不会把补偿执行两次?

这些问题的本质,都是跨服务事务一致性

很多团队第一反应是找“分布式事务框架”一把梭,但实际落地时会发现:
强一致方案成本高、侵入强、性能压力大,很多互联网业务并不划算。

这时候,Saga 模式往往是一个更现实的选择。

这篇文章我会从架构设计、方案取舍、可运行代码、线上排查和性能优化几个层面,带你完整过一遍 Saga 的实战思路。重点不是讲概念,而是讲怎么在真实系统里把它做对


背景与问题

先看一个最典型的业务流程:下单。

一次下单可能涉及:

  1. 订单服务:创建订单
  2. 库存服务:冻结或扣减库存
  3. 支付服务:完成扣款
  4. 营销服务:核销优惠券
  5. 积分服务:增加积分
  6. 通知服务:发送短信或站内消息

如果这些操作都在一个库里,本地事务就够了。
但微服务拆分之后,每一步都是一个独立服务、独立数据库,传统 ACID 事务无法直接跨服务生效。

为什么不直接上 2PC / XA?

理论上可以,但在很多业务系统里并不优雅:

  • 协调器复杂,系统耦合重
  • 锁持有时间长,吞吐下降明显
  • 某个参与者卡住,整条链路一起阻塞
  • 云原生、中间件异构环境下支持度并不统一

换句话说,你买来的是强一致,付出的却可能是可用性、性能和系统复杂度。

Saga 解决的是什么问题?

Saga 不追求“一个大事务里同时提交或同时回滚”,而是把一个长事务拆成多个本地事务

  • 每一步本地事务独立提交
  • 如果后续步骤失败,就执行前面步骤对应的补偿动作
  • 通过“正向操作 + 逆向补偿”,达成最终一致性

它很像业务层面的“撤销链”。


方案对比与取舍分析

在分布式事务里,常见方案通常有这几类:

方案一致性性能复杂度适用场景
2PC / XA强一致较低金融核心、少量高价值操作
TCC强一致偏业务化很高可预留资源、接口可改造的核心链路
Saga最终一致订单、履约、营销等长事务
本地消息表 / Outbox最终一致事件驱动、异步集成

Saga 适合什么场景?

适合以下特点的业务:

  • 允许短时间不一致
  • 每一步都能设计出补偿动作
  • 更关心系统可用性与吞吐
  • 业务步骤多、执行时间长
  • 需要跨多个服务和存储

Saga 不适合什么场景?

这点非常重要,我见过不少团队把 Saga 用错地方:

  • 账户余额类强一致场景:如果补偿失败代价极高,不建议直接用 Saga
  • 不可逆操作:比如外部银行转账、第三方不可撤销扣款
  • 无法定义补偿语义:如果“回滚”只是理想化想象,别上

一个很实用的判断标准是:

如果你无法清楚回答“每一步失败后要怎么撤销”,那就不要急着做 Saga。


核心原理

Saga 常见有两种驱动方式:

  1. Choreography(事件编排/协同):服务之间通过事件彼此触发
  2. Orchestration(中心编排):由一个 Saga Orchestrator 控制执行顺序和补偿流程

对于中级读者和大多数业务团队,我更推荐中心编排先落地,因为它更直观,也更容易排查。

Saga 的基本结构

一个 Saga 通常包含:

  • Saga 实例 ID
  • 步骤列表
  • 每一步的执行状态
  • 补偿状态
  • 重试次数
  • 超时信息
  • 幂等键

正向执行与补偿执行

举个下单例子:

  • Step1:创建订单
  • Step2:冻结库存
  • Step3:扣减余额

如果 Step3 失败,就按相反顺序补偿:

  • Compensate Step2:解冻库存
  • Compensate Step1:取消订单

这个顺序不能乱。
补偿一般是逆序执行,因为它对应的是“撤销依赖链”。

Saga 状态流转

stateDiagram-v2
    [*] --> Started
    Started --> StepRunning
    StepRunning --> StepSucceeded
    StepRunning --> StepFailed
    StepSucceeded --> StepRunning: next step
    StepFailed --> Compensating
    Compensating --> CompensationSucceeded
    Compensating --> CompensationFailed
    CompensationSucceeded --> RolledBack
    CompensationFailed --> ManualIntervention
    StepSucceeded --> Completed: all steps success

编排型 Saga 的交互过程

sequenceDiagram
    participant Client
    participant Saga as Saga Orchestrator
    participant Order
    participant Inventory
    participant Payment

    Client->>Saga: 创建订单请求
    Saga->>Order: createOrder()
    Order-->>Saga: success(orderId)

    Saga->>Inventory: reserveStock(orderId, sku, count)
    Inventory-->>Saga: success

    Saga->>Payment: charge(orderId, amount)
    Payment-->>Saga: fail(timeout or insufficient)

    Saga->>Inventory: releaseStock(orderId, sku, count)
    Inventory-->>Saga: success

    Saga->>Order: cancelOrder(orderId)
    Order-->>Saga: success

    Saga-->>Client: 事务失败,已补偿

最关键的设计点

Saga 不是“调几个接口”那么简单,真正难的是下面这几个点:

1. 补偿不等于数据库回滚

补偿是新的业务动作,不是回到过去。

例如:

  • 创建订单的补偿不是删除订单,而是更新状态为 CANCELLED
  • 冻结库存的补偿不是物理恢复日志,而是解冻冻结量
  • 发券的补偿可能不是删掉券,而是标记券失效

所以设计补偿时,要尽量保证:

  • 语义明确
  • 可审计
  • 可重试
  • 幂等

2. 每一步都必须幂等

因为网络超时下,你不知道上游有没有真正执行成功。
最常见情况是:

  • 编排器调用库存服务超时
  • 其实库存已经冻结成功了
  • 编排器误以为失败,发起重试
  • 如果没有幂等,库存会被重复冻结

所以每个服务都要支持基于 sagaId + stepName 或业务唯一键的幂等控制。

3. 成功、失败、超时是三种不同状态

很多实现只区分成功和失败,这是不够的。

  • 成功:已确认执行完成
  • 失败:明确知道执行没成功
  • 超时/未知:结果不确定,需要查询或重试

线上真正麻烦的,往往就是“未知状态”。


架构设计:一个可落地的 Saga 方案

下面给一个相对稳妥、团队容易实现的架构。

flowchart LR
    A[Client/API] --> B[Saga Orchestrator]
    B --> C[(Saga State DB)]
    B --> D[Order Service]
    B --> E[Inventory Service]
    B --> F[Payment Service]
    D --> G[(Order DB)]
    E --> H[(Inventory DB)]
    F --> I[(Payment DB)]
    D -. 事件/日志 .-> J[Message Broker]
    E -. 事件/日志 .-> J
    F -. 事件/日志 .-> J
    B --> J

核心组件

1. Saga Orchestrator

职责:

  • 创建 Saga 实例
  • 记录步骤状态
  • 依次调用各服务
  • 失败时触发补偿
  • 管理重试和超时
  • 提供查询接口和人工干预入口

2. Saga State DB

建议至少保存这些字段:

  • saga_id
  • biz_id
  • status
  • current_step
  • step_details
  • retry_count
  • last_error
  • created_at
  • updated_at

3. 业务服务

每个业务服务都要提供:

  • 正向接口
  • 补偿接口
  • 幂等控制
  • 操作日志

容量与性能上的简单估算

如果系统每秒 500 单,每单 3 个步骤,平均每个步骤 1 次状态写入,失败率按 1% 估算:

  • 正常链路状态写入:500 * 3 = 1500 ops/s
  • 结束状态更新、补偿状态更新也会放大写入
  • 真实设计时最好按 3~5 倍业务 QPS 估算 Saga 状态库写压力

这意味着:

  • Saga 状态表必须建好索引
  • 热数据读写要控制字段大小
  • 大 JSON 日志不要无脑塞主表
  • 高并发下要考虑分库分表或冷热分离

实战代码(可运行)

下面用 Python 模拟一个最小可运行的编排型 Saga。
它不依赖外部框架,重点演示:

  • 步骤定义
  • 正向执行
  • 逆序补偿
  • 幂等控制
  • 状态记录

你可以直接保存为 saga_demo.py 运行。

from dataclasses import dataclass, field
from typing import Callable, List, Dict, Any
import uuid


class SagaExecutionError(Exception):
    pass


class IdempotencyStore:
    def __init__(self):
        self.executed = set()

    def already_done(self, key: str) -> bool:
        return key in self.executed

    def mark_done(self, key: str):
        self.executed.add(key)


idempotency_store = IdempotencyStore()


@dataclass
class SagaStep:
    name: str
    action: Callable[[Dict[str, Any]], None]
    compensate: Callable[[Dict[str, Any]], None]


@dataclass
class SagaContext:
    saga_id: str
    data: Dict[str, Any] = field(default_factory=dict)
    logs: List[str] = field(default_factory=list)

    def log(self, message: str):
        print(message)
        self.logs.append(message)


class OrderService:
    def create_order(self, ctx: Dict[str, Any]):
        key = f"{ctx['saga_id']}:create_order"
        if idempotency_store.already_done(key):
            return
        ctx["order_id"] = f"ORD-{uuid.uuid4().hex[:8]}"
        idempotency_store.mark_done(key)

    def cancel_order(self, ctx: Dict[str, Any]):
        key = f"{ctx['saga_id']}:cancel_order"
        if idempotency_store.already_done(key):
            return
        ctx["order_cancelled"] = True
        idempotency_store.mark_done(key)


class InventoryService:
    def reserve(self, ctx: Dict[str, Any]):
        key = f"{ctx['saga_id']}:reserve_inventory"
        if idempotency_store.already_done(key):
            return
        if ctx.get("stock", 0) < ctx.get("quantity", 0):
            raise SagaExecutionError("库存不足")
        ctx["stock"] -= ctx["quantity"]
        ctx["inventory_reserved"] = True
        idempotency_store.mark_done(key)

    def release(self, ctx: Dict[str, Any]):
        key = f"{ctx['saga_id']}:release_inventory"
        if idempotency_store.already_done(key):
            return
        if ctx.get("inventory_reserved"):
            ctx["stock"] += ctx["quantity"]
            ctx["inventory_released"] = True
        idempotency_store.mark_done(key)


class PaymentService:
    def charge(self, ctx: Dict[str, Any]):
        key = f"{ctx['saga_id']}:charge_payment"
        if idempotency_store.already_done(key):
            return
        if ctx.get("balance", 0) < ctx.get("amount", 0):
            raise SagaExecutionError("余额不足,支付失败")
        ctx["balance"] -= ctx["amount"]
        ctx["payment_done"] = True
        idempotency_store.mark_done(key)

    def refund(self, ctx: Dict[str, Any]):
        key = f"{ctx['saga_id']}:refund_payment"
        if idempotency_store.already_done(key):
            return
        if ctx.get("payment_done"):
            ctx["balance"] += ctx["amount"]
            ctx["payment_refunded"] = True
        idempotency_store.mark_done(key)


class SagaOrchestrator:
    def __init__(self, steps: List[SagaStep]):
        self.steps = steps

    def execute(self, saga_ctx: SagaContext):
        completed_steps = []
        try:
            for step in self.steps:
                saga_ctx.log(f"[EXECUTE] step={step.name}")
                step.action(saga_ctx.data)
                completed_steps.append(step)
                saga_ctx.log(f"[SUCCESS] step={step.name}")
            saga_ctx.log("[SAGA COMPLETED]")
        except Exception as e:
            saga_ctx.log(f"[FAILED] error={str(e)}")
            for step in reversed(completed_steps):
                try:
                    saga_ctx.log(f"[COMPENSATE] step={step.name}")
                    step.compensate(saga_ctx.data)
                    saga_ctx.log(f"[COMPENSATED] step={step.name}")
                except Exception as ce:
                    saga_ctx.log(f"[COMPENSATE FAILED] step={step.name}, error={str(ce)}")
            saga_ctx.log("[SAGA ROLLED BACK]")


def main():
    order_service = OrderService()
    inventory_service = InventoryService()
    payment_service = PaymentService()

    steps = [
        SagaStep(
            name="create_order",
            action=order_service.create_order,
            compensate=order_service.cancel_order
        ),
        SagaStep(
            name="reserve_inventory",
            action=inventory_service.reserve,
            compensate=inventory_service.release
        ),
        SagaStep(
            name="charge_payment",
            action=payment_service.charge,
            compensate=payment_service.refund
        )
    ]

    orchestrator = SagaOrchestrator(steps)

    ctx = SagaContext(
        saga_id=uuid.uuid4().hex,
        data={
            "saga_id": uuid.uuid4().hex,
            "stock": 10,
            "quantity": 2,
            "balance": 50,
            "amount": 100
        }
    )

    orchestrator.execute(ctx)

    print("\nFinal Context:")
    for k, v in ctx.data.items():
        print(f"{k}: {v}")


if __name__ == "__main__":
    main()

运行结果说明

上面示例里:

  • 库存够
  • 但余额不足
  • 所以支付失败
  • 接着会自动触发:
    • 释放库存
    • 取消订单

这就是一个最基础的 Saga 回滚链路。


进一步落地:数据库状态表设计

实际项目里,光有内存状态肯定不够。你至少需要一张 Saga 实例表。

CREATE TABLE saga_instance (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    saga_id VARCHAR(64) NOT NULL UNIQUE,
    biz_id VARCHAR(64) NOT NULL,
    status VARCHAR(32) NOT NULL,
    current_step VARCHAR(64),
    step_details TEXT,
    retry_count INT NOT NULL DEFAULT 0,
    last_error TEXT,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_biz_id (biz_id),
    INDEX idx_status_created (status, created_at)
);

再加一张步骤执行表会更清晰:

CREATE TABLE saga_step (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    saga_id VARCHAR(64) NOT NULL,
    step_name VARCHAR(64) NOT NULL,
    action_status VARCHAR(32) NOT NULL,
    compensate_status VARCHAR(32) DEFAULT NULL,
    idempotency_key VARCHAR(128) NOT NULL,
    retry_count INT NOT NULL DEFAULT 0,
    last_error TEXT,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    UNIQUE KEY uk_saga_step (saga_id, step_name),
    UNIQUE KEY uk_idempotency_key (idempotency_key)
);

我建议的状态枚举

不要随便用几个字符串糊弄,后期排查会非常痛苦。建议明确定义:

  • PENDING
  • RUNNING
  • SUCCESS
  • FAILED
  • COMPENSATING
  • COMPENSATED
  • COMPENSATE_FAILED
  • MANUAL_PROCESSING
  • FINISHED

常见坑与排查

这是 Saga 真正“难”的部分。理论大家都懂,线上事故却经常出在细节。

1. 补偿接口没有幂等

现象

  • 因为网络重试,补偿被调用多次
  • 库存被多释放一次
  • 余额被多退一次

根因

补偿逻辑被当作“回滚函数”,而不是一个独立业务接口来设计。

解决办法

  • 所有补偿接口都带 idempotency_key
  • 基于唯一键落库
  • 重复请求直接返回成功

2. 步骤成功了,但编排器没收到响应

现象

  • 服务端其实执行成功
  • 编排器因为超时判定失败
  • 接着重试,造成重复执行

处理策略

不要一超时就立刻补偿,先进入“未知状态”:

  1. 标记为 RUNNING_TIMEOUT
  2. 查询下游执行结果
  3. 若已成功,则推进下一步
  4. 若确认失败,再补偿
  5. 若持续未知,转人工或延迟重试

我自己踩过这个坑:早期把超时直接当失败,结果库存和支付都出现过重复处理。


3. 补偿失败后无人感知

现象

  • 主流程失败了
  • 补偿过程中又失败
  • 系统表面上“返回失败”,但内部状态半残

建议

必须做三层保障:

  • 告警:补偿失败立即通知
  • 死信/待处理队列:进入人工或自动修复流程
  • 后台操作台:支持按 sagaId 手动重试补偿

4. 步骤顺序设计错误

错误例子

先支付,再冻结库存。

如果支付成功、库存失败,就需要退款。退款通常比“先冻结库存再支付”更麻烦,尤其涉及第三方支付时更明显。

实战建议

尽量让步骤顺序遵循:

  1. 便宜、可逆、资源预留型操作优先
  2. 昂贵、外部依赖、不可逆操作靠后

比如更合理的是:

  • 创建订单
  • 冻结库存
  • 锁优惠券
  • 扣款
  • 发货/通知

5. 使用删除作为补偿

问题

很多人会把补偿写成“删除已创建数据”,这在审计上很危险,也容易导致状态追踪断裂。

建议

优先使用状态流转而不是物理删除:

  • CREATED -> CANCELLED
  • RESERVED -> RELEASED
  • PAID -> REFUNDED

这样更符合真实业务,也更利于追溯。


安全/性能最佳实践

Saga 不只是“一致性问题”,也涉及安全和性能。

安全最佳实践

1. 补偿接口必须鉴权

很多系统只保护正向接口,忘了补偿接口。
实际上补偿接口影响同样大,甚至更危险。

建议:

  • 仅允许编排器或受信服务调用
  • 使用服务间认证,如 mTLS、JWT、签名机制
  • 校验 sagaIdbizId、时间戳、防重放 nonce

2. 关键操作写审计日志

以下动作建议全部审计:

  • Saga 创建
  • 每一步执行
  • 每一步补偿
  • 人工重试
  • 人工终止
  • 状态变更

3. 防止参数篡改

跨服务调用时,补偿参数必须与原始执行参数可关联、可校验。
例如退款金额不能由调用方“现算”,而应该基于原始支付记录确定。


性能最佳实践

1. 把长耗时步骤异步化

例如短信发送、积分发放,通常不应放在主 Saga 链路里。
可以在核心事务完成后,通过事件异步触发,避免拉长主流程。

2. Saga 状态表轻量化

别把完整请求体、响应体、大对象都塞到主表。建议:

  • 主表只存关键状态
  • 明细日志单独存储
  • 大字段进入日志表或对象存储

3. 重试要有退避策略

别失败就立刻死循环重试。建议:

  • 指数退避
  • 最大重试次数
  • 熔断保护
  • 区分业务失败与系统失败

4. 控制补偿风暴

当某个下游服务故障时,大量 Saga 同时失败,可能引发大规模补偿,进一步压垮系统。

建议:

  • 补偿限流
  • 分批重试
  • 故障窗口内暂停新事务
  • 对单个服务设置熔断和隔离舱

一个更接近生产的实现建议

如果你准备把 Saga 真正在团队里上线,我建议按下面的成熟度路径推进:

第 1 阶段:先做中心编排 + 本地事务 + 幂等

目标不是“最先进”,而是先可控、可查、可回放

你至少要有:

  • Saga 状态持久化
  • 业务服务幂等
  • 补偿逻辑
  • 失败告警
  • 手工重试能力

第 2 阶段:加入消息 Outbox

为避免“本地提交成功但消息没发出去”的双写问题,可以在业务服务中引入 Outbox:

  • 业务数据与事件同库同事务写入
  • 后台任务异步投递消息
  • 提升事件驱动链路可靠性

第 3 阶段:做操作台和自动修复

真正成熟的系统,不能只靠日志 grep。

建议提供:

  • sagaId / bizId 查询
  • 查看每一步状态和错误
  • 手动重试步骤
  • 手动触发补偿
  • 手动标记完成/终止

边界条件:什么时候该停下来重新评估?

这也是实战里非常重要的一点。不是所有链路都值得做 Saga。

如果你发现出现以下情况,建议先暂停继续堆功能:

  • 补偿逻辑比正向逻辑还复杂
  • 每一步都依赖第三方,无法确认真实结果
  • 人工介入比例长期偏高
  • 下游接口根本不支持幂等
  • 强一致要求高于系统可用性要求

这时候可以考虑:

  • 回到 TCC
  • 缩小事务边界
  • 合并服务
  • 将关键资源集中到同一事务域

有时候,最好的分布式事务优化,是不要制造不必要的分布式事务。


总结

Saga 模式的核心价值,不是“模拟数据库回滚”,而是:

  • 把跨服务长事务拆成多个本地事务
  • 通过补偿机制实现最终一致性
  • 在一致性、性能、可用性之间做工程化平衡

如果你要在项目里落地,我建议按这几个原则执行:

  1. 先判断场景适不适合 Saga

    • 可补偿、可幂等、允许最终一致,才适合
  2. 优先选择编排型 Saga

    • 状态集中、链路清晰、排查方便
  3. 把补偿当成正式业务接口来设计

    • 幂等、审计、可重试,一个都不能少
  4. 超时不要直接当失败

    • 先识别未知状态,再决定查询、重试还是补偿
  5. 一定要有操作台和告警

    • 没有可观测性,Saga 很快会变成“线上黑盒”
  6. 不要把所有步骤都塞进主事务链路

    • 识别核心步骤和非核心步骤,能异步就异步

最后给一个很务实的落地建议:

第一次做 Saga,不要追求“大而全”。
先选一条订单类链路,把状态表、幂等、补偿、告警、人工重试打通,比空谈“最终一致性架构”有价值得多。

如果你的系统已经进入微服务阶段,而又不想为强一致付出过高代价,那么 Saga 确实是一个值得认真建设的实战方案。


分享到:

上一篇
《自动化测试中的测试数据管理实战:从环境隔离到数据构造与回收策略》
下一篇
《Docker 多阶段构建与镜像瘦身实战:从构建加速到安全优化的完整方案》