跳转到内容
123xiao | 无名键客

《微服务架构中的分布式事务实战:基于 Saga 模式的设计、落地与避坑》

字数: 0 阅读时长: 1 分钟

微服务架构中的分布式事务实战:基于 Saga 模式的设计、落地与避坑

在单体应用里,我们习惯了一个本地事务包打天下:下单、扣库存、扣余额、生成物流单,要么一起成功,要么一起失败。

但到了微服务架构里,事情就没这么简单了。

订单服务、库存服务、支付服务、积分服务各自有独立数据库。你会很快发现:ACID 风格的跨服务强一致事务,不仅贵,而且经常不现实。这时候,Saga 模式就成了很多团队“既能跑起来,又不至于把系统拖死”的选择。

这篇文章我不打算只讲概念,而是从架构设计、实战代码、补偿模型、监控排查几个角度,带你把 Saga 真正落到工程里。


背景与问题

先看一个最典型的业务链路:电商下单。

  1. 订单服务创建订单
  2. 库存服务冻结库存
  3. 账户服务扣减余额
  4. 订单服务确认完成

如果其中某一步失败,比如余额不足,那前面已经成功的步骤就需要撤销:

  • 已创建的订单要取消
  • 已冻结的库存要释放

这就是分布式事务最核心的问题:跨多个服务和数据库的操作,如何保证最终状态一致

为什么 2PC 在微服务里不够好用

很多人第一次接触分布式事务,会先想到 2PC(两阶段提交)。理论上它很严谨,但在微服务场景下常见问题很明显:

  • 协调器成为关键瓶颈
  • 参与者需要长时间持锁
  • 网络抖动会放大阻塞问题
  • 异构存储、不支持 XA 的中间件很多
  • 对高并发互联网业务不够友好

所以现实中,很多业务会转向最终一致性方案,而 Saga 是其中最实用的一类。


方案对比与取舍分析

在设计前,先明确 Saga 不是“万能解”。

方案一致性性能实现复杂度适用场景
2PC/XA强一致较低金融核心、强一致优先
TCC较强很高核心链路、可精细控制资源
Saga最终一致长事务、业务可补偿
本地消息表/事务消息最终一致事件驱动、解耦明显

什么时候 Saga 适合

Saga 适合以下场景:

  • 业务流程长,跨多个服务
  • 可以接受短时间不一致
  • 每一步都能设计“补偿动作”
  • 对吞吐量要求高,不希望全链路锁资源

什么时候不要硬上 Saga

如果业务天然不支持补偿,比如:

  • 已经对外发货且不可撤销
  • 已发送不可逆通知
  • 已触发外部结算且回滚成本极高

那就不能只靠 Saga,需要把流程拆分、增加人工兜底,或者使用更强的一致性设计。


核心原理

Saga 的核心思想其实很朴素:

把一个长事务拆成多个本地事务,每个本地事务成功后继续往下走;如果中途失败,就按相反顺序执行补偿操作。

它有两种常见实现方式:

  1. Choreography(事件编排)
    • 各服务通过事件自行驱动
    • 耦合看起来低,但链路容易分散
  2. Orchestration(中心协调)
    • 由一个 Saga 协调器统一下发步骤
    • 更容易观察、排查、控制超时和补偿

在中级团队和大多数中等规模业务中,我更推荐先用 Orchestration。原因很现实:链路可见性更强,出了问题比较好查


一张图看懂 Saga 的执行过程

flowchart LR
    A[创建订单] --> B[冻结库存]
    B --> C[扣减余额]
    C --> D[确认订单]
    C -.失败.-> E[释放库存]
    E --> F[取消订单]

上图对应的是最基础的补偿链路。注意补偿顺序通常与正向执行相反。


Saga 状态流转设计

很多线上问题,不是因为“不会写代码”,而是因为“没有设计状态机”。Saga 一定要有明确状态。

stateDiagram-v2
    [*] --> Pending
    Pending --> Running
    Running --> Success
    Running --> Compensating: 任一步骤失败
    Compensating --> Compensated
    Compensating --> Failed: 补偿失败
    Failed --> Compensating: 重试

建议至少定义这些状态:

  • PENDING:待执行
  • RUNNING:执行中
  • SUCCESS:全部成功
  • COMPENSATING:补偿中
  • COMPENSATED:补偿完成
  • FAILED:失败待人工/自动重试

编排式 Saga 的设计骨架

下面用一个“订单 + 库存 + 支付”的案例来讲。

参与角色

  • OrderService
    • 创建订单
    • 取消订单
    • 确认订单
  • InventoryService
    • 冻结库存
    • 释放库存
  • PaymentService
    • 扣款
    • 退款

顺序图

sequenceDiagram
    participant Client
    participant Saga as Saga Orchestrator
    participant Order as OrderService
    participant Inv as InventoryService
    participant Pay as PaymentService

    Client->>Saga: 创建订单请求
    Saga->>Order: createOrder()
    Order-->>Saga: orderId
    Saga->>Inv: reserveStock(orderId)
    Inv-->>Saga: success
    Saga->>Pay: charge(orderId)
    alt 扣款成功
        Pay-->>Saga: success
        Saga->>Order: confirmOrder(orderId)
        Order-->>Saga: success
        Saga-->>Client: success
    else 扣款失败
        Pay-->>Saga: fail
        Saga->>Inv: releaseStock(orderId)
        Saga->>Order: cancelOrder(orderId)
        Saga-->>Client: failed
    end

设计落地时必须先想清楚的 5 件事

1. 每一步都必须可重试

分布式环境里,超时不代表失败,成功也可能因为响应丢失而“看起来像失败”。

所以任何步骤都要满足:

  • 重复执行不会造成副作用放大
  • 补偿也可以重复执行
  • 每个请求带唯一业务 ID

这就是幂等性

2. 补偿不等于回滚

很多同学会把补偿理解成数据库 rollback。其实不是。

例如扣款失败后释放库存,这不是把数据库时光倒流,而是执行了一个新的业务动作来抵消影响。

所以补偿本质上是:

  • 新事务
  • 有业务语义
  • 也可能失败
  • 也需要重试和监控

3. 空补偿要处理

所谓空补偿,就是补偿请求先到了,但正向操作其实还没真正成功。

比如:

  • 协调器认为库存冻结成功,准备后续执行
  • 但网络抖动导致库存服务实际没落库
  • 随后协调器触发释放库存

如果库存服务不处理空补偿,可能会报错,甚至把别的库存释放掉。

4. 悬挂问题要防

悬挂指的是:

  • 补偿已经执行
  • 但迟到的正向请求又来了

这会造成“本该回滚的事务又被执行一次”。

常见解法是引入事务日志表/幂等表,记录每个步骤的执行与补偿状态,拒绝非法状态迁移。

5. Saga 日志是生命线

我踩过最大的坑之一,就是一开始只看业务日志,没单独保存 Saga 实例状态。结果线上一出问题,根本不知道现在卡在哪一步。

最低要求:

  • Saga 实例表
  • Saga 步骤表
  • 每步请求/响应摘要
  • 重试次数
  • 最后错误信息
  • traceId / sagaId / bizId

实战代码(可运行)

下面用 Python 做一个简化但可运行的 Saga 编排示例。它不依赖外部服务,直接模拟订单、库存、支付三个服务,重点展示:

  • 本地事务
  • 补偿逻辑
  • 幂等思路
  • 失败回滚

你可以直接保存为 saga_demo.py 运行。

from dataclasses import dataclass, field
from typing import List, Dict, Callable, Optional
import uuid


class SagaStepError(Exception):
    pass


@dataclass
class StepRecord:
    name: str
    executed: bool = False
    compensated: bool = False
    result: Optional[dict] = None


@dataclass
class SagaContext:
    saga_id: str
    biz_id: str
    data: Dict = field(default_factory=dict)
    steps: List[StepRecord] = field(default_factory=list)


class InMemoryDB:
    def __init__(self):
        self.orders = {}
        self.stock = {"item-1": 10}
        self.reserved_stock = {}
        self.balances = {"user-1": 100}
        self.payments = {}
        self.idempotency = set()


db = InMemoryDB()


class OrderService:
    def create_order(self, biz_id: str, user_id: str, item_id: str, amount: int):
        key = f"create_order:{biz_id}"
        if key in db.idempotency:
            return db.orders[biz_id]
        db.idempotency.add(key)
        db.orders[biz_id] = {
            "status": "PENDING",
            "user_id": user_id,
            "item_id": item_id,
            "amount": amount
        }
        return db.orders[biz_id]

    def confirm_order(self, biz_id: str):
        key = f"confirm_order:{biz_id}"
        if key in db.idempotency:
            return db.orders[biz_id]
        db.idempotency.add(key)
        if biz_id not in db.orders:
            raise SagaStepError("order not found")
        db.orders[biz_id]["status"] = "CONFIRMED"
        return db.orders[biz_id]

    def cancel_order(self, biz_id: str):
        key = f"cancel_order:{biz_id}"
        if key in db.idempotency:
            return db.orders.get(biz_id)
        db.idempotency.add(key)
        if biz_id in db.orders:
            db.orders[biz_id]["status"] = "CANCELLED"
        return db.orders.get(biz_id)


class InventoryService:
    def reserve_stock(self, biz_id: str, item_id: str, count: int):
        key = f"reserve_stock:{biz_id}"
        if key in db.idempotency:
            return {"reserved": True}
        if db.stock.get(item_id, 0) < count:
            raise SagaStepError("insufficient stock")
        db.idempotency.add(key)
        db.stock[item_id] -= count
        db.reserved_stock[biz_id] = {"item_id": item_id, "count": count}
        return {"reserved": True}

    def release_stock(self, biz_id: str):
        key = f"release_stock:{biz_id}"
        if key in db.idempotency:
            return {"released": True}
        db.idempotency.add(key)
        reserved = db.reserved_stock.get(biz_id)
        if reserved:
            db.stock[reserved["item_id"]] += reserved["count"]
            del db.reserved_stock[biz_id]
        return {"released": True}


class PaymentService:
    def charge(self, biz_id: str, user_id: str, amount: int, fail=False):
        key = f"charge:{biz_id}"
        if key in db.idempotency:
            return db.payments.get(biz_id, {"charged": True})
        if fail:
            raise SagaStepError("payment declined")
        if db.balances.get(user_id, 0) < amount:
            raise SagaStepError("insufficient balance")
        db.idempotency.add(key)
        db.balances[user_id] -= amount
        db.payments[biz_id] = {"user_id": user_id, "amount": amount, "status": "CHARGED"}
        return db.payments[biz_id]

    def refund(self, biz_id: str):
        key = f"refund:{biz_id}"
        if key in db.idempotency:
            return {"refunded": True}
        db.idempotency.add(key)
        payment = db.payments.get(biz_id)
        if payment and payment["status"] == "CHARGED":
            db.balances[payment["user_id"]] += payment["amount"]
            payment["status"] = "REFUNDED"
        return {"refunded": True}


@dataclass
class SagaStep:
    name: str
    action: Callable[[SagaContext], dict]
    compensation: Callable[[SagaContext], dict]


class SagaOrchestrator:
    def __init__(self, steps: List[SagaStep]):
        self.steps = steps

    def execute(self, ctx: SagaContext):
        executed_steps = []

        try:
            for step in self.steps:
                result = step.action(ctx)
                ctx.steps.append(StepRecord(name=step.name, executed=True, result=result))
                executed_steps.append(step)
            return {"status": "SUCCESS", "saga_id": ctx.saga_id}
        except Exception as e:
            for step in reversed(executed_steps):
                try:
                    step.compensation(ctx)
                    for record in ctx.steps:
                        if record.name == step.name:
                            record.compensated = True
                except Exception as ce:
                    return {
                        "status": "FAILED",
                        "saga_id": ctx.saga_id,
                        "error": f"compensation failed: {ce}"
                    }
            return {
                "status": "COMPENSATED",
                "saga_id": ctx.saga_id,
                "error": str(e)
            }


def run_demo(payment_fail=False):
    order_service = OrderService()
    inventory_service = InventoryService()
    payment_service = PaymentService()

    biz_id = "order-1001"
    ctx = SagaContext(
        saga_id=str(uuid.uuid4()),
        biz_id=biz_id,
        data={
            "user_id": "user-1",
            "item_id": "item-1",
            "count": 2,
            "amount": 30,
            "payment_fail": payment_fail
        }
    )

    steps = [
        SagaStep(
            name="create_order",
            action=lambda c: order_service.create_order(
                c.biz_id, c.data["user_id"], c.data["item_id"], c.data["amount"]
            ),
            compensation=lambda c: order_service.cancel_order(c.biz_id),
        ),
        SagaStep(
            name="reserve_stock",
            action=lambda c: inventory_service.reserve_stock(
                c.biz_id, c.data["item_id"], c.data["count"]
            ),
            compensation=lambda c: inventory_service.release_stock(c.biz_id),
        ),
        SagaStep(
            name="charge",
            action=lambda c: payment_service.charge(
                c.biz_id, c.data["user_id"], c.data["amount"], c.data["payment_fail"]
            ),
            compensation=lambda c: payment_service.refund(c.biz_id),
        ),
        SagaStep(
            name="confirm_order",
            action=lambda c: order_service.confirm_order(c.biz_id),
            compensation=lambda c: order_service.cancel_order(c.biz_id),
        ),
    ]

    orchestrator = SagaOrchestrator(steps)
    result = orchestrator.execute(ctx)

    print("=== saga result ===")
    print(result)
    print("=== orders ===")
    print(db.orders)
    print("=== stock ===")
    print(db.stock)
    print("=== reserved_stock ===")
    print(db.reserved_stock)
    print("=== balances ===")
    print(db.balances)
    print("=== payments ===")
    print(db.payments)
    print("=== step records ===")
    for step in ctx.steps:
        print(step)


if __name__ == "__main__":
    print("\n--- case 1: success ---")
    run_demo(payment_fail=False)

    print("\n--- case 2: payment failed, compensation triggered ---")
    db.__init__()
    run_demo(payment_fail=True)

运行效果说明

成功场景

  • 订单状态变为 CONFIRMED
  • 库存扣减成功
  • 余额扣减成功

失败场景

payment_fail=True 时:

  • 订单先创建
  • 库存先冻结
  • 支付失败
  • 触发补偿:释放库存、取消订单

这个示例是内存版,真实项目中你需要把这些状态放到数据库或持久化日志里。


工程化落地:表设计建议

如果你打算在线上使用,至少需要两张表。

1. Saga 实例表

CREATE TABLE saga_instance (
  saga_id VARCHAR(64) PRIMARY KEY,
  biz_id VARCHAR(64) NOT NULL,
  state VARCHAR(32) NOT NULL,
  current_step VARCHAR(64),
  retry_count INT DEFAULT 0,
  last_error TEXT,
  created_at TIMESTAMP NOT NULL,
  updated_at TIMESTAMP NOT NULL
);

2. Saga 步骤表

CREATE TABLE saga_step (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  saga_id VARCHAR(64) NOT NULL,
  step_name VARCHAR(64) NOT NULL,
  action_status VARCHAR(32) NOT NULL,
  compensation_status VARCHAR(32) NOT NULL,
  request_payload TEXT,
  response_payload TEXT,
  retry_count INT DEFAULT 0,
  updated_at TIMESTAMP NOT NULL,
  UNIQUE KEY uk_saga_step (saga_id, step_name)
);

状态建议

action_status 可取:

  • INIT
  • SUCCESS
  • FAILED

compensation_status 可取:

  • NONE
  • DONE
  • FAILED

常见坑与排查

这一部分很重要。很多团队 Saga 写出来了,但线上总有“偶现问题”,本质上都出在分布式场景的细节上。

坑 1:补偿接口不幂等

表现:

  • 同一个订单被重复退款
  • 库存重复释放,数量变多
  • 订单状态来回跳

排查思路:

  1. 检查是否使用了全局 bizId / sagaId
  2. 检查补偿动作是否有唯一约束
  3. 看服务超时后协调器是否重试了多次

解决建议:

  • 每个动作和补偿都落幂等记录
  • 使用业务唯一键防重
  • 明确“已处理”直接返回成功

坑 2:只补偿了业务数据,没补偿副作用

比如:

  • 数据库已经回滚
  • 但消息已经发出
  • 短信已经发送
  • 优惠券已核销

这种场景我见得不少,最后业务同学会说:“库里看着没问题,但用户就是收到了通知。”

解决建议:

  • 把外部副作用尽量后置
  • 通过 Outbox 模式保证消息一致性
  • 对不可撤销副作用设计人工兜底流程

坑 3:超时被当成失败,实际却已成功

表现:

  • 协调器超时后触发补偿
  • 下游服务实际已经成功落库
  • 最后出现“正向成功 + 补偿成功”的双向污染

解决建议:

  • 请求必须带唯一事务号
  • 服务端持久化请求处理结果
  • 查询步骤最终态,而不是只依赖同步返回

坑 4:补偿链路本身失败

很多人把注意力都放在正向链路,补偿逻辑写得很草率。实际上线上更容易出问题的是补偿。

建议:

  • 补偿也必须可重试
  • 补偿失败进入专门队列
  • 区分“暂时失败”和“永久失败”
  • 准备人工干预工具

坑 5:链路观测不足

如果没有统一 Trace,你会非常痛苦。

建议至少记录:

  • traceId
  • sagaId
  • bizId
  • stepName
  • action/compensation
  • 请求耗时
  • 重试次数
  • 失败原因

一套可执行的排查路径

当线上出现“订单失败但库存没回滚”时,可以按这个顺序查:

flowchart TD
    A[按 bizId 查 saga_instance] --> B{状态是否为 COMPENSATING/FAILED}
    B -- 是 --> C[查 saga_step 看卡在哪一步]
    B -- 否 --> D[查下游服务日志与 traceId]
    C --> E[检查补偿接口是否幂等]
    E --> F[检查消息重试/死信队列]
    D --> G[确认是超时假失败还是真实失败]
    F --> H[决定自动重试或人工修复]
    G --> H

这个流程的重点是:先看 Saga 主状态,再看步骤,再看服务日志。不要一上来就翻全量日志,很容易被噪音带偏。


安全/性能最佳实践

Saga 通常被讨论一致性,但实际上安全和性能也很关键。

安全最佳实践

1. 补偿接口不要裸奔

补偿接口往往比正向接口更危险,因为它直接改变状态。

建议:

  • 只允许内网调用
  • 使用服务间认证
  • 做签名或 mTLS
  • 校验 sagaIdbizId、步骤状态是否合法

2. 防止重放攻击

如果请求被重复发送,可能触发多次补偿或多次扣款。

建议:

  • 每个请求携带唯一幂等键
  • 幂等记录设置过期策略但不能过短
  • 对关键接口加时间戳与签名校验

3. 敏感信息脱敏

Saga 日志中经常会记录请求体、错误信息、回包摘要。

不要直接打出:

  • 用户手机号
  • 身份证号
  • 银行卡号
  • 完整支付信息

4. 人工修复工具要有权限边界

很多团队最后都会做“事务补偿控制台”。这个工具很有用,但权限一定要收紧:

  • 谁能重试
  • 谁能跳过
  • 谁能强制完成
  • 所有操作必须审计

性能最佳实践

1. 缩短本地事务时间

Saga 的每一步最好都是短事务:

  • 不做大查询
  • 不做复杂联表
  • 不在事务里远程调用

本地事务越短,吞吐越稳。

2. 控制补偿风暴

当某个依赖服务异常时,大量 Saga 会同时失败并进入补偿,可能把系统打崩。

建议:

  • 补偿任务限流
  • 分批重试
  • 加指数退避
  • 配置熔断与降级

3. 区分同步体验与异步一致性

用户请求链路不一定要同步等待 Saga 全部完成。

例如下单后可以先返回:

  • “订单处理中”
  • 后续通过异步通知变更状态

这样能显著降低接口 RT。

4. 容量估算别忽略补偿流量

很多容量评估只算正向调用,其实 Saga 至少要估算:

  • 正向步骤调用量
  • 失败率
  • 补偿调用量
  • 重试放大量

一个简单估算公式:

总调用量 ≈ 正向调用量 × 步骤数 + 失败请求量 × 平均补偿步数 × 重试系数

举个例子:

  • 每秒 1000 笔订单
  • 4 个步骤
  • 失败率 2%
  • 平均补偿 2 步
  • 重试系数 1.5

则总调用量约为:

  • 正向:1000 × 4 = 4000 次/秒
  • 补偿:1000 × 2% × 2 × 1.5 = 60 次/秒
  • 总计约 4060 次/秒

正常时看起来补偿不多,但一旦依赖服务故障,补偿流量会飙升,所以一定要预留弹性。


落地建议:从“小闭环”开始,不要一上来全链路 Saga

如果你准备在现有系统里引入 Saga,我建议这样分阶段推进:

第一阶段:先选 1 条可补偿链路

比如:

  • 下单 -> 冻结库存 -> 扣余额

不要一开始就把营销、积分、优惠券、物流、发票全串上。参与方越多,补偿复杂度越高。

第二阶段:先补齐幂等和状态日志

比起“写一个很厉害的协调器”,更重要的是:

  • 步骤状态表
  • 请求唯一 ID
  • 补偿幂等
  • 失败重试

这些是基础设施。

第三阶段:加监控和人工干预

没有控制台、没有重试、没有告警的 Saga,线上会非常被动。

至少需要:

  • 卡单数监控
  • 补偿失败告警
  • 超时事务扫描
  • 手动重试能力

总结

Saga 模式不是“最强一致”的分布式事务方案,但它往往是微服务里最务实、最容易大规模落地的一种。

你可以把它记成三句话:

  1. 把长事务拆成多个本地事务
  2. 失败时按反向顺序执行补偿
  3. 真正的难点不在流程图,而在幂等、状态机、重试和观测

如果你正准备在项目里落地,我的可执行建议是:

  • 优先选用编排式 Saga
  • 每一步都设计幂等 + 可重试 + 可补偿
  • 落地 Saga 实例表 + 步骤表
  • 对“空补偿、悬挂、超时假失败”提前建模
  • 补偿链路和正向链路一样严格对待
  • 给运维和研发准备最基本的人工修复工具

最后再强调一个边界条件:不是所有业务都适合 Saga。如果某一步不可逆、不可撤销,就不要假装它能补偿。架构设计最怕“概念上能行,业务上不成立”。

把这个边界想清楚,Saga 才会是帮你兜底的方案,而不是新的事故源。


分享到:

上一篇
《Web逆向实战:中级工程师如何定位并复现前端签名参数生成逻辑》
下一篇
《分布式架构中的分库分表实战:一致性、扩容与查询性能优化指南》