跳转到内容
123xiao | 无名键客

《微服务架构中的分布式事务实战:基于 Saga 模式的设计、落地与避坑-239》

字数: 0 阅读时长: 1 分钟

微服务架构中的分布式事务实战:基于 Saga 模式的设计、落地与避坑

在单体应用里,事务这件事通常没那么“吓人”:一个数据库连接、一个本地事务,提交或者回滚,世界就清净了。
但一旦系统拆成微服务,订单、库存、支付、营销、积分各自拥有自己的数据库之后,问题就变了:

一个用户下单,到底怎样才能既保证业务最终一致,又不把系统拖进性能泥潭?

这正是分布式事务的典型战场。而在很多互联网业务里,Saga 模式往往比 2PC、3PC 更接地气,也更适合高并发、弱一致、可补偿的业务场景。

这篇文章我会从架构视角 + 可运行代码 + 常见踩坑三个层面,带你把 Saga 真正“跑起来”。


背景与问题

先看一个最常见的电商下单流程:

  1. 订单服务创建订单
  2. 库存服务扣减库存
  3. 支付服务冻结或扣款
  4. 营销服务核销优惠券
  5. 积分服务发放积分

如果这是一个单库单服务,很容易用一个事务包起来。
但在微服务里,每一步都可能是远程调用,每个服务都有自己的数据库,此时你会遇到几个现实问题:

  • 跨服务无法共享本地事务
  • 某个服务成功了,后续服务失败了,前面的动作怎么办
  • 网络抖动、超时、重试会导致“看起来失败,实际上成功”
  • 用户重复提交、消息重复投递会导致重复扣库存、重复扣款
  • 高峰期下,强一致方案容易拖垮吞吐量

为什么很多团队不直接上 2PC?

因为 2PC 的理论很美,但工程上常见几个问题:

  • 协调者成为瓶颈
  • 参与方长时间锁资源
  • 一个节点卡住,整体吞吐急剧下降
  • 云原生、异构存储、中间件混搭场景下接入复杂

所以,对于允许最终一致可以设计补偿动作的业务,Saga 更实用。


方案对比与取舍分析

在正式展开 Saga 之前,先把几种常见方案摆在一起。

方案一致性性能实现复杂度适用场景
本地事务强一致单体、单库
2PC/XA强一致低到中少量核心链路、传统数据库体系
TCC强一致趋近很高资金、账户等强业务约束场景
Saga最终一致订单、库存、营销等可补偿业务
本地消息表/事务消息最终一致事件驱动、异步解耦场景

Saga 适合什么,不适合什么

适合:

  • 订单创建后取消可接受
  • 库存扣减失败可以回补
  • 优惠券核销失败可以返还
  • 积分发放错了可以撤销

不适合:

  • 银行实时转账这类不能“先做后补偿”的关键资金场景
  • 补偿动作本身不可逆的场景
  • 业务方无法接受中间态暴露的场景

一句话概括:
Saga 的核心前提不是“技术能做”,而是“业务允许补偿”。


核心原理

Saga 可以理解为:

把一个长事务拆成多个本地事务,每个本地事务成功后继续下一步;如果中途失败,就按相反顺序执行补偿动作。

Saga 常见有两种实现方式:

  1. Choreography(事件编排/协同):各服务通过事件自行驱动
  2. Orchestration(中心编排):由一个 Saga Coordinator 统一调度

对于中级工程师落地来说,我更推荐先从中心编排开始,因为它更容易观察、排查、回放。

Saga 执行模型

flowchart LR
    A[创建订单] --> B[扣减库存]
    B --> C[冻结支付]
    C --> D[核销优惠券]
    D --> E[发放积分]

    C -.失败.-> C1[支付补偿]
    C1 --> B1[库存补偿]
    B1 --> A1[订单取消]

这里有两个关键点:

  • 正向事务:业务动作,比如创建订单、扣库存
  • 反向补偿:撤销动作,比如恢复库存、取消订单

状态流转

Saga 不是简单的“成功/失败”,通常还需要有细粒度状态:

stateDiagram-v2
    [*] --> INIT
    INIT --> PROCESSING
    PROCESSING --> SUCCESS
    PROCESSING --> COMPENSATING
    COMPENSATING --> COMPENSATED
    COMPENSATING --> COMPENSATION_FAILED
    SUCCESS --> [*]
    COMPENSATED --> [*]
    COMPENSATION_FAILED --> [*]

这张状态图非常重要,因为很多线上问题都不是“失败”,而是:

  • 某一步其实成功了,但调用方超时
  • 补偿执行了一半卡住
  • 任务重复调度
  • 人工介入后需要重新驱动

所以,状态机设计比“写几个接口”更重要。

编排式 Saga 的典型交互

sequenceDiagram
    participant Client as 客户端
    participant SC as Saga协调器
    participant Order as 订单服务
    participant Stock as 库存服务
    participant Payment as 支付服务

    Client->>SC: 发起下单
    SC->>Order: 创建订单
    Order-->>SC: 成功
    SC->>Stock: 扣减库存
    Stock-->>SC: 成功
    SC->>Payment: 冻结支付
    Payment-->>SC: 失败
    SC->>Stock: 执行库存补偿
    Stock-->>SC: 补偿成功
    SC->>Order: 取消订单
    Order-->>SC: 补偿成功
    SC-->>Client: 下单失败

设计落地:一个能跑的编排式 Saga

下面我们用 Python 做一个最小可运行版本,演示 Saga 协调器如何执行正向动作与补偿动作。

这个示例强调三个核心能力:

  • 步骤顺序执行
  • 失败自动补偿
  • 每个动作支持幂等

说明:示例是单进程模拟,方便理解原理。真实生产环境会替换为 HTTP/gRPC/消息队列调用,并将 Saga 状态持久化到数据库。


实战代码(可运行)

1)完整示例

from dataclasses import dataclass, field
from typing import Callable, List, Dict


class SagaExecutionError(Exception):
    pass


@dataclass
class SagaStep:
    name: str
    action: Callable[[Dict], None]
    compensate: Callable[[Dict], None]


@dataclass
class SagaContext:
    data: Dict = field(default_factory=dict)
    executed_steps: List[SagaStep] = field(default_factory=list)
    logs: List[str] = field(default_factory=list)

    def log(self, message: str):
        self.logs.append(message)
        print(message)


class IdempotentService:
    def __init__(self):
        self.done_ops = set()

    def once(self, op_key: str, func: Callable):
        if op_key in self.done_ops:
            return
        func()
        self.done_ops.add(op_key)


class OrderService(IdempotentService):
    def __init__(self):
        super().__init__()
        self.orders = {}

    def create_order(self, order_id: str):
        self.once(f"create_order:{order_id}", lambda: self.orders.update({
            order_id: "CREATED"
        }))

    def cancel_order(self, order_id: str):
        self.once(f"cancel_order:{order_id}", lambda: self.orders.update({
            order_id: "CANCELLED"
        }))


class StockService(IdempotentService):
    def __init__(self, initial_stock: int):
        super().__init__()
        self.stock = initial_stock

    def deduct(self, order_id: str, count: int):
        def do():
            if self.stock < count:
                raise SagaExecutionError("库存不足")
            self.stock -= count
        self.once(f"deduct:{order_id}", do)

    def restore(self, order_id: str, count: int):
        self.once(f"restore:{order_id}", lambda: setattr(self, "stock", self.stock + count))


class PaymentService(IdempotentService):
    def __init__(self, should_fail=False):
        super().__init__()
        self.payments = {}
        self.should_fail = should_fail

    def freeze(self, order_id: str, amount: int):
        def do():
            if self.should_fail:
                raise SagaExecutionError("支付冻结失败")
            self.payments[order_id] = f"FROZEN:{amount}"
        self.once(f"freeze:{order_id}", do)

    def unfreeze(self, order_id: str):
        self.once(f"unfreeze:{order_id}", lambda: self.payments.update({
            order_id: "UNFROZEN"
        }))


class SagaCoordinator:
    def __init__(self, steps: List[SagaStep]):
        self.steps = steps

    def execute(self, context: SagaContext):
        try:
            for step in self.steps:
                context.log(f"[ACTION] 开始执行: {step.name}")
                step.action(context.data)
                context.executed_steps.append(step)
                context.log(f"[ACTION] 执行成功: {step.name}")
            context.log("[SAGA] 全部步骤执行成功")
            return True
        except Exception as e:
            context.log(f"[SAGA] 执行失败: {e}")
            self.compensate(context)
            return False

    def compensate(self, context: SagaContext):
        context.log("[SAGA] 开始补偿")
        for step in reversed(context.executed_steps):
            try:
                context.log(f"[COMPENSATE] 开始补偿: {step.name}")
                step.compensate(context.data)
                context.log(f"[COMPENSATE] 补偿成功: {step.name}")
            except Exception as e:
                context.log(f"[COMPENSATE] 补偿失败: {step.name}, error={e}")
        context.log("[SAGA] 补偿结束")


if __name__ == "__main__":
    order_service = OrderService()
    stock_service = StockService(initial_stock=10)
    payment_service = PaymentService(should_fail=True)

    order_id = "ORD-1001"
    count = 2
    amount = 100

    context = SagaContext(data={
        "order_id": order_id,
        "count": count,
        "amount": amount
    })

    steps = [
        SagaStep(
            name="创建订单",
            action=lambda data: order_service.create_order(data["order_id"]),
            compensate=lambda data: order_service.cancel_order(data["order_id"])
        ),
        SagaStep(
            name="扣减库存",
            action=lambda data: stock_service.deduct(data["order_id"], data["count"]),
            compensate=lambda data: stock_service.restore(data["order_id"], data["count"])
        ),
        SagaStep(
            name="冻结支付",
            action=lambda data: payment_service.freeze(data["order_id"], data["amount"]),
            compensate=lambda data: payment_service.unfreeze(data["order_id"])
        )
    ]

    coordinator = SagaCoordinator(steps)
    result = coordinator.execute(context)

    print("\n=== 最终结果 ===")
    print("执行结果:", result)
    print("订单状态:", order_service.orders)
    print("库存剩余:", stock_service.stock)
    print("支付状态:", payment_service.payments)

2)运行结果说明

因为这里 PaymentService(should_fail=True),所以执行流程会是:

  • 创建订单成功
  • 扣减库存成功
  • 冻结支付失败
  • 触发补偿
  • 恢复库存
  • 取消订单

你会看到最终状态类似:

[ACTION] 开始执行: 创建订单
[ACTION] 执行成功: 创建订单
[ACTION] 开始执行: 扣减库存
[ACTION] 执行成功: 扣减库存
[ACTION] 开始执行: 冻结支付
[SAGA] 执行失败: 支付冻结失败
[SAGA] 开始补偿
[COMPENSATE] 开始补偿: 扣减库存
[COMPENSATE] 补偿成功: 扣减库存
[COMPENSATE] 开始补偿: 创建订单
[COMPENSATE] 补偿成功: 创建订单
[SAGA] 补偿结束

这就是 Saga 最核心的运行逻辑。


从 Demo 到生产:必须补上的几层能力

很多团队第一次做 Saga,会停留在“协调器 + 几个 HTTP 接口”的阶段。
这个能跑,但扛不住线上事故。要上生产,至少得补这几层。

1)Saga 状态持久化

你不能只把状态放内存里。协调器重启后,未完成事务会全部“失忆”。

建议至少有一张 saga_instance 表:

CREATE TABLE saga_instance (
    saga_id VARCHAR(64) PRIMARY KEY,
    biz_id VARCHAR(64) NOT NULL,
    state VARCHAR(32) NOT NULL,
    current_step VARCHAR(64),
    context_json TEXT NOT NULL,
    retry_count INT NOT NULL DEFAULT 0,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

再配一张 saga_step_log 记录步骤执行明细:

CREATE TABLE saga_step_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    saga_id VARCHAR(64) NOT NULL,
    step_name VARCHAR(64) NOT NULL,
    action_status VARCHAR(32) NOT NULL,
    compensate_status VARCHAR(32),
    error_message TEXT,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

2)步骤接口幂等化

这件事我想强调三次:

  • 正向操作要幂等
  • 补偿操作要幂等
  • 查询接口也要能识别重试后的状态

比如库存服务的扣减,不应该只靠“收到请求就减”。
你应该传入 biz_idsaga_id + step_name 作为幂等键,数据库唯一约束防重。

示意表:

CREATE TABLE stock_deduct_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    biz_key VARCHAR(128) NOT NULL UNIQUE,
    order_id VARCHAR(64) NOT NULL,
    sku_id VARCHAR(64) NOT NULL,
    count INT NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

3)空补偿与悬挂处理

这是 Saga/TCC 里很容易被忽视的问题。

  • 空补偿:某步骤其实没成功,但补偿请求先到了
  • 悬挂:补偿已经执行,正向请求又迟到了

比如网络乱序时,库存服务可能先收到“恢复库存”,后收到“扣减库存”。

正确做法通常是:

  • 为每一步记录状态:TRYING / DONE / COMPENSATED
  • 补偿前检查正向是否真的成功
  • 正向执行前检查是否已被补偿过

否则就会出现“凭空多出库存”的事故。


常见坑与排查

这一部分我尽量讲得接近线上真实场景,因为很多坑,只有上线后才会显形。

坑 1:把 Saga 当成“自动回滚事务”

这是最常见误区。

Saga 的补偿不是数据库回滚,它是一个新的业务动作
例如:

  • “删除订单”不等于“取消订单”
  • “退款”不等于“支付回滚”
  • “恢复库存”也要考虑库存锁定、批次、仓位

排查方法:

  • 检查补偿语义是否真的业务对等
  • 拉产品、业务一起确认“补偿后的业务可接受状态”
  • 画出正向/反向动作矩阵,不要只看代码

坑 2:补偿失败后没有兜底

很多实现只要补偿失败就打印一条日志,然后结束。
这在线上基本等于埋雷。

建议:

  • 补偿失败进入 COMPENSATION_FAILED
  • 定时任务扫描失败实例并重试
  • 设置最大重试次数
  • 超过阈值后进入人工处理池
  • 提供后台“重试补偿/跳过补偿/强制关闭”能力

坑 3:接口超时后直接判失败

超时不等于失败。
这是我见过最容易导致重复执行的一类问题。

例如:

  1. 协调器调用库存服务
  2. 库存已经成功扣减
  3. 但响应包超时了
  4. 协调器以为失败,开始补偿或重试
  5. 最终出现重复扣减或状态错乱

排查方法:

  • 下游接口是否提供“按业务键查状态”
  • 超时后是否先查状态,再决定重试/补偿
  • 日志中是否有统一的 trace_id / saga_id / biz_id

坑 4:消息驱动下重复消费

如果 Saga 某些步骤通过消息队列异步推进,重复投递是常态,不是异常。

建议做法:

  • 消费端幂等
  • 消息表或消费表记录处理结果
  • 使用业务主键去重,而不是只靠 MQ 自带 messageId

坑 5:把所有步骤都做成同步串行

这是架构设计上很容易“图省事”的地方。
但一旦链路长、服务多,同步串行会显著拉长 RT。

优化建议:

  • 非关键步骤异步化,比如积分发放、通知发送
  • 将强依赖步骤前置,弱依赖步骤事件化
  • 不要让用户请求一直阻塞到全链路结束

一套实用的排查路径

如果你线上碰到“订单失败但库存扣了”这类问题,我建议按下面顺序排。

第一步:先看 Saga 实例状态

关注字段:

  • saga_id
  • 当前步骤
  • saga 状态
  • 已执行步骤
  • 补偿状态
  • 重试次数

第二步:沿着业务键查各服务本地状态

至少查:

  • 订单状态
  • 库存扣减/回补日志
  • 支付冻结/解冻日志
  • 优惠券核销/返还日志

第三步:判断是三类哪一种

  1. 正向未完成
  2. 正向已完成但协调器未知
  3. 补偿执行不完整

这三类问题处理方式完全不同,别混在一起。

第四步:决定处理动作

常见动作有:

  • 重试当前步骤
  • 重试补偿步骤
  • 根据下游真实状态修正 Saga 状态
  • 人工补单/人工退款/人工回补库存

安全/性能最佳实践

Saga 常被理解成“偏业务一致性”的设计,但它同样涉及安全与性能问题。

安全最佳实践

1)补偿接口不能裸奔

补偿接口通常影响业务资金、库存、优惠券,非常敏感。

建议至少做到:

  • 内部鉴权,如 mTLS、签名或服务间 token
  • 限制调用来源,只允许协调器或指定服务调用
  • 审计日志记录调用人、调用链路、请求参数摘要

2)避免参数篡改

正向和补偿动作都应基于服务端可信上下文,不要完全依赖客户端传参。

例如:

  • 客户端传了 order_id
  • 服务端应该自行查订单金额、库存数量
  • 不要让外部直接传“恢复 100 件库存”这种危险参数

3)人工干预要有审批和留痕

线上总会有人要“强制恢复”“手工退款”。
这个能力一定要有,但必须:

  • 分角色授权
  • 记录操作前后状态
  • 保留操作原因
  • 可追溯到工单/事故单

性能最佳实践

1)控制 Saga 粒度

Saga 步骤不是越细越好。
步骤太多,状态管理和补偿复杂度会急剧上升。

经验上建议:

  • 一个 Saga 聚焦一个业务目标
  • 3~6 个核心步骤比较常见
  • 能在服务内本地事务完成的,不要拆成跨服务

2)避免长时间占用业务资源

比如库存不是“无限期锁定”,支付冻结也不能一直挂着。
应为每一步设置超时和自动释放机制。

3)异步化非核心链路

像积分、通知、埋点,往往没必要放在主 Saga 里强耦合执行。
能事件驱动就事件驱动,主流程只保留真正影响成交的步骤。

4)重试要指数退避

补偿失败时,不要疯狂瞬时重试。
否则下游雪崩时,你的协调器会变成放大器。

伪代码如下:

import time

def retry_with_backoff(func, max_retry=5, base_delay=0.5):
    for i in range(max_retry):
        try:
            return func()
        except Exception:
            delay = base_delay * (2 ** i)
            time.sleep(delay)
    raise RuntimeError("重试耗尽")

容量估算与架构建议

这部分是很多架构设计文档容易漏掉的,但实际上非常关键。

1)协调器吞吐估算

假设:

  • 每秒下单 500
  • 每个 Saga 平均 4 个步骤
  • 每个步骤平均 1 次调用 + 0.1 次补偿/重试

那么协调器大概需要处理:

  • 正向调用:500 * 4 = 2000 ops/s
  • 额外重试/补偿:500 * 4 * 0.1 = 200 ops/s

总计约 2200 ops/s 的调度和状态落库压力。

这意味着你需要重点关注:

  • Saga 状态表写入能力
  • 定时补偿扫描的效率
  • 日志量与 trace 采样率
  • 是否需要分片或按业务类型拆 Saga 协调器

2)协调器高可用建议

  • 协调器实例无状态化
  • 状态统一落库
  • 通过分布式锁或数据库乐观锁避免重复调度
  • 定时任务扫描待补偿任务时按分片处理

3)日志与可观测性

至少要有三层标识:

  • trace_id:链路级
  • saga_id:事务级
  • biz_id/order_id:业务级

如果缺这三层,线上排查时会非常痛苦。


一份生产可落地的设计清单

如果你准备在项目里引入 Saga,我建议上线前对照这份清单:

  • 是否明确哪些业务允许最终一致
  • 每个步骤是否都定义了补偿动作
  • 补偿动作是否真的是业务等价逆操作
  • 正向/补偿接口是否幂等
  • 是否有 Saga 状态持久化
  • 是否支持超时后查状态
  • 是否处理了空补偿与悬挂
  • 是否有失败重试与人工干预通道
  • 是否具备 trace_id / saga_id / biz_id 全链路日志
  • 是否明确了哪些步骤同步,哪些步骤异步

总结

Saga 不是“完美事务”,它本质上是一个工程化折中方案:

  • 最终一致换取高可用和高吞吐
  • 业务补偿代替数据库回滚
  • 状态机和幂等设计抵抗网络不确定性

如果你问我,Saga 落地最重要的三件事是什么,我会给出非常具体的答案:

  1. 先和业务确认补偿边界,别只从技术角度拍板
  2. 把幂等、状态持久化、失败重试当成必选项,不是优化项
  3. 让排查链路可观测,否则出事后只能靠猜

最后给一个实操建议:

  • 如果你们团队是第一次做分布式事务,优先选编排式 Saga
  • 先在订单、库存、优惠券这类可补偿业务试点
  • 不要一上来就追求“通用事务框架”,先把一个核心链路跑稳

真正的难点从来不在“知道 Saga 是什么”,而在于你是否把那些看起来琐碎的细节——幂等、补偿、状态、重试、观测——认真做到位。做到位了,Saga 就是微服务架构里非常可靠的一把刀。


分享到:

上一篇
《Node.js 中基于 Worker Threads 与消息队列的高并发任务处理实战》
下一篇
《集群架构中服务发现与负载均衡的实战设计:从注册中心到故障切换策略》