跳转到内容
123xiao | 无名键客

《微服务架构中的分布式事务实战:基于 Saga 模式的设计、落地与排障》

字数: 0 阅读时长: 1 分钟

背景与问题

只要系统一拆成微服务,分布式事务几乎迟早会找上门。

一个很典型的业务链路是这样的:用户下单后,订单服务创建订单,库存服务扣减库存,账户服务扣减余额,最后通知服务发消息。放在单体应用里,这事一个本地事务就能包住;但到了微服务里,服务之间跨网络、跨数据库,@Transactional 立刻失效,传统 2PC/XA 又常常因为性能、耦合度、数据库支持和运维复杂度,在互联网业务里很难真正落地。

于是很多团队会走到 Saga。

Saga 不是“强一致事务”,而是把一个长事务拆成一组本地事务 + 补偿动作。每一步成功就继续,某一步失败就按相反顺序执行补偿,把系统拉回到一个业务上可接受的状态。

但真正在生产里用 Saga,坑并不少:

  • 订单创建成功了,库存也扣了,但余额服务超时,怎么补?
  • 补偿执行到一半又失败怎么办?
  • 消息重复投递导致库存扣了两次怎么办?
  • 编排器重启后,Saga 状态丢了怎么办?
  • 查问题时,日志里只有一堆“超时”“重试中”,根本串不起整条链路

这篇文章我会从排障视角切入,不只讲“Saga 是什么”,更重点讲:

  1. 怎么设计一个可落地的 Saga;
  2. 怎么写出最小可运行代码;
  3. 出问题时按什么路径定位;
  4. 怎么做止血和长期治理。

核心原理

Saga 有两种常见实现方式:

  • Choreography(事件编排/协同):各服务通过事件自行驱动下一步
  • Orchestration(中心编排):由一个 Saga Orchestrator 显式控制流程

如果你的重点是可观测、可排障、可回放,我更建议优先用 Orchestration。原因很现实:出了故障后,中心编排器能清楚告诉你“卡在哪一步、补偿到哪一步、重试了几次”。

1. Saga 的基本结构

每个业务步骤包含两部分:

  • 正向操作:例如“扣库存”
  • 补偿操作:例如“加回库存”

注意,补偿不是数据库层面的回滚,而是一条新的业务指令。这也是很多人第一次接触 Saga 时最容易误解的地方。

flowchart LR
    A[创建订单] --> B[扣减库存]
    B --> C[扣减余额]
    C --> D[确认订单]

    C -- 失败 --> B2[补偿余额/若已扣]
    B2 --> A2[补偿库存]
    A2 --> A3[取消订单]

2. 状态机思维比“if-else 流程图”更重要

真正上线后,Saga 不是简单的线性流程,而是一个状态机:

  • PENDING
  • RUNNING
  • COMPENSATING
  • COMPLETED
  • FAILED
  • PARTIAL_COMPENSATED

当你排障时,最关键的问题不是“代码走到哪了”,而是:

  • 这个 Saga 当前是什么状态?
  • 当前步骤的最后一次执行结果是什么?
  • 是否已经触发过补偿?
  • 补偿是否幂等?
  • 是否允许人工重放?
stateDiagram-v2
    [*] --> PENDING
    PENDING --> RUNNING
    RUNNING --> COMPLETED: 全部成功
    RUNNING --> COMPENSATING: 某步骤失败
    COMPENSATING --> FAILED: 补偿完成
    COMPENSATING --> PARTIAL_COMPENSATED: 补偿部分失败
    PARTIAL_COMPENSATED --> COMPENSATING: 重试/人工介入

3. 设计 Saga 时的三个硬约束

幂等

每个正向动作和补偿动作,都必须支持幂等。
因为超时并不等于失败,消息重复也不是小概率事件。

例如扣库存接口最好不是:

  • deduct(productId, count)

而是:

  • deduct(sagaId, orderId, productId, count)

这样就能以 sagaId + step 为幂等键,避免重复扣减。

可恢复

编排器崩了、服务重启了、消息重复了,Saga 也应该能恢复。
所以 Saga 状态必须持久化,不能只放内存。

可观测

每一步都要有:

  • sagaId
  • stepName
  • status
  • retryCount
  • lastError
  • updatedAt

没有这些字段,排障时基本只能靠猜。


方案设计:一个最小但真实的下单 Saga

为了让示例容易运行,我们用 Python 做一个内存版 Saga 编排器,模拟三个服务:

  • OrderService
  • InventoryService
  • PaymentService

场景目标:

  1. 创建订单
  2. 扣库存
  3. 扣余额
  4. 全成功则订单确认
  5. 任一步失败则倒序补偿

流程时序图

sequenceDiagram
    participant Client
    participant Orchestrator
    participant OrderService
    participant InventoryService
    participant PaymentService

    Client->>Orchestrator: 创建订单请求
    Orchestrator->>OrderService: create_order()
    OrderService-->>Orchestrator: order_created
    Orchestrator->>InventoryService: deduct()
    InventoryService-->>Orchestrator: deducted
    Orchestrator->>PaymentService: charge()
    alt 支付成功
        PaymentService-->>Orchestrator: charged
        Orchestrator->>OrderService: confirm_order()
        OrderService-->>Orchestrator: confirmed
    else 支付失败
        PaymentService-->>Orchestrator: failed
        Orchestrator->>InventoryService: compensate_add()
        Orchestrator->>OrderService: cancel_order()
    end

实战代码(可运行)

下面这段代码可以直接运行,重点展示:

  • Saga 编排
  • 本地事务与补偿
  • 幂等处理
  • 故障注入
  • 排障日志
from dataclasses import dataclass, field
from typing import Dict, List, Callable, Optional
import uuid
import traceback


class SagaError(Exception):
    pass


@dataclass
class SagaStepLog:
    step: str
    action: str
    success: bool
    message: str


@dataclass
class SagaContext:
    saga_id: str
    order_id: str
    user_id: str
    product_id: str
    amount: int
    logs: List[SagaStepLog] = field(default_factory=list)
    status: str = "PENDING"


class OrderService:
    def __init__(self):
        self.orders: Dict[str, str] = {}
        self.idempotent_ops = set()

    def create_order(self, ctx: SagaContext):
        key = (ctx.saga_id, "create_order")
        if key in self.idempotent_ops:
            return
        self.orders[ctx.order_id] = "CREATED"
        self.idempotent_ops.add(key)

    def confirm_order(self, ctx: SagaContext):
        key = (ctx.saga_id, "confirm_order")
        if key in self.idempotent_ops:
            return
        if ctx.order_id not in self.orders:
            raise SagaError("order not found")
        self.orders[ctx.order_id] = "CONFIRMED"
        self.idempotent_ops.add(key)

    def cancel_order(self, ctx: SagaContext):
        key = (ctx.saga_id, "cancel_order")
        if key in self.idempotent_ops:
            return
        if ctx.order_id in self.orders:
            self.orders[ctx.order_id] = "CANCELLED"
        self.idempotent_ops.add(key)


class InventoryService:
    def __init__(self):
        self.stock = {"SKU-1": 10}
        self.idempotent_ops = set()

    def deduct(self, ctx: SagaContext):
        key = (ctx.saga_id, "deduct_inventory")
        if key in self.idempotent_ops:
            return
        if self.stock.get(ctx.product_id, 0) < 1:
            raise SagaError("insufficient stock")
        self.stock[ctx.product_id] -= 1
        self.idempotent_ops.add(key)

    def compensate_add(self, ctx: SagaContext):
        key = (ctx.saga_id, "compensate_inventory")
        if key in self.idempotent_ops:
            return
        self.stock[ctx.product_id] = self.stock.get(ctx.product_id, 0) + 1
        self.idempotent_ops.add(key)


class PaymentService:
    def __init__(self):
        self.balance = {"U-1": 100}
        self.idempotent_ops = set()
        self.fail_next = False

    def charge(self, ctx: SagaContext):
        key = (ctx.saga_id, "charge_payment")
        if key in self.idempotent_ops:
            return
        if self.fail_next:
            self.fail_next = False
            raise SagaError("payment gateway timeout")
        if self.balance.get(ctx.user_id, 0) < ctx.amount:
            raise SagaError("insufficient balance")
        self.balance[ctx.user_id] -= ctx.amount
        self.idempotent_ops.add(key)

    def refund(self, ctx: SagaContext):
        key = (ctx.saga_id, "refund_payment")
        if key in self.idempotent_ops:
            return
        self.balance[ctx.user_id] = self.balance.get(ctx.user_id, 0) + ctx.amount
        self.idempotent_ops.add(key)


class SagaOrchestrator:
    def __init__(self, order_service, inventory_service, payment_service):
        self.order_service = order_service
        self.inventory_service = inventory_service
        self.payment_service = payment_service
        self.store: Dict[str, SagaContext] = {}

    def _run_step(self, ctx: SagaContext, step_name: str, action: Callable):
        try:
            action(ctx)
            ctx.logs.append(SagaStepLog(step_name, "forward", True, "ok"))
        except Exception as e:
            ctx.logs.append(SagaStepLog(step_name, "forward", False, str(e)))
            raise

    def _run_compensation(self, ctx: SagaContext, step_name: str, action: Callable):
        try:
            action(ctx)
            ctx.logs.append(SagaStepLog(step_name, "compensate", True, "ok"))
        except Exception as e:
            ctx.logs.append(SagaStepLog(step_name, "compensate", False, str(e)))
            raise

    def create_order_saga(self, user_id: str, product_id: str, amount: int):
        ctx = SagaContext(
            saga_id=str(uuid.uuid4()),
            order_id=str(uuid.uuid4()),
            user_id=user_id,
            product_id=product_id,
            amount=amount,
            status="RUNNING"
        )
        self.store[ctx.saga_id] = ctx

        completed_steps: List[str] = []

        try:
            self._run_step(ctx, "create_order", self.order_service.create_order)
            completed_steps.append("create_order")

            self._run_step(ctx, "deduct_inventory", self.inventory_service.deduct)
            completed_steps.append("deduct_inventory")

            self._run_step(ctx, "charge_payment", self.payment_service.charge)
            completed_steps.append("charge_payment")

            self._run_step(ctx, "confirm_order", self.order_service.confirm_order)
            completed_steps.append("confirm_order")

            ctx.status = "COMPLETED"
            return ctx

        except Exception:
            ctx.status = "COMPENSATING"

            # 倒序补偿,仅对已完成步骤补偿
            try:
                if "charge_payment" in completed_steps:
                    self._run_compensation(ctx, "refund_payment", self.payment_service.refund)

                if "deduct_inventory" in completed_steps:
                    self._run_compensation(ctx, "compensate_inventory", self.inventory_service.compensate_add)

                if "create_order" in completed_steps:
                    self._run_compensation(ctx, "cancel_order", self.order_service.cancel_order)

                ctx.status = "FAILED"
            except Exception:
                ctx.status = "PARTIAL_COMPENSATED"
                ctx.logs.append(SagaStepLog("system", "diagnose", False, traceback.format_exc()))

            return ctx


def print_result(title: str, ctx: SagaContext, order_service, inventory_service, payment_service):
    print("=" * 60)
    print(title)
    print("saga_id:", ctx.saga_id)
    print("order_id:", ctx.order_id)
    print("status:", ctx.status)
    print("order_state:", order_service.orders.get(ctx.order_id))
    print("stock:", inventory_service.stock)
    print("balance:", payment_service.balance)
    print("logs:")
    for log in ctx.logs:
        print(f"  - step={log.step}, action={log.action}, success={log.success}, msg={log.message}")


if __name__ == "__main__":
    order_service = OrderService()
    inventory_service = InventoryService()
    payment_service = PaymentService()
    orchestrator = SagaOrchestrator(order_service, inventory_service, payment_service)

    # 场景1:成功
    ctx1 = orchestrator.create_order_saga(user_id="U-1", product_id="SKU-1", amount=20)
    print_result("场景1:Saga 成功", ctx1, order_service, inventory_service, payment_service)

    # 场景2:支付超时,触发补偿
    payment_service.fail_next = True
    ctx2 = orchestrator.create_order_saga(user_id="U-1", product_id="SKU-1", amount=20)
    print_result("场景2:支付失败,触发补偿", ctx2, order_service, inventory_service, payment_service)

运行后的观察点

你会看到两种典型结果:

  • 成功场景:订单 CONFIRMED,库存减 1,余额减 20
  • 失败场景:支付超时后,库存加回,订单取消

这个示例是内存版,方便理解原理。但它已经包含生产系统里最关键的几个骨架:

  • 有 Saga 上下文
  • 有状态
  • 有步骤日志
  • 有补偿
  • 有幂等键

现象复现:线上最容易遇到的 4 类问题

下面这些问题,我在项目里都见过,而且都不算“稀有事故”。

1. 现象:接口超时,但用户最终被扣了钱

为什么会这样

支付服务可能已经成功扣款,但返回结果在网络层超时了。
编排器以为失败,于是开始补偿。结果就会出现:

  • 先扣款成功
  • 编排器未收到成功响应
  • 启动退款
  • 退款又失败或延迟
  • 用户侧看到“状态混乱”

根因

把“响应超时”错误地等同于“业务失败”。

正确做法

支付类动作要设计成可查询最终状态

  • charge(requestId) 发起扣款
  • query(requestId) 查询扣款结果

也就是说,发生超时时不要立刻补偿,而是先进入“待确认”状态,再查一次。


2. 现象:库存被重复扣减

常见触发方式

  • 消息重投
  • 编排器重试
  • 消费者重启后重复消费
  • 客户端重复提交

根因

接口不是幂等的,只根据“调用次数”执行,没有业务唯一键。

正确做法

给每一步都加幂等键:

  • sagaId + stepName
  • businessId + actionType

并且把幂等记录持久化到数据库,而不是只放在内存 Set 里。


3. 现象:补偿执行了,但状态还是不一致

典型表现

  • 订单已取消,但库存没加回来
  • 余额退了,但订单状态仍是“处理中”
  • 一半补偿成功,一半补偿失败

根因

补偿也可能失败。很多系统只设计了正向流程,没把补偿当作“一等公民”。

正确做法

补偿动作需要:

  • 独立重试
  • 幂等
  • 告警
  • 人工介入入口

当补偿失败时,状态不应该简单写成 FAILED,而应该标为:

  • PARTIAL_COMPENSATED
  • COMPENSATION_PENDING

这样运维和开发才能识别“这不是结束,而是待处理状态”。


4. 现象:日志很多,但根本串不起来

根因

没有统一链路标识。
订单服务打一条日志、库存服务打一条日志、支付服务打一条日志,但没有共同的 traceId / sagaId

正确做法

所有日志都带上:

  • traceId
  • sagaId
  • orderId
  • stepName

我自己排这类问题时,第一步永远不是看错误堆栈,而是先搜 sagaId。搜不到,后面基本没法排。


常见坑与排查

这一节按“现象 -> 定位路径 -> 止血方案”的方式来讲。

坑 1:把 Saga 当成数据库回滚

误区

很多人会天然觉得补偿就是“撤销前面的操作”。
但现实业务里,撤销并不总是等价。

例如:

  • 发券之后可能已经被用户使用,不能简单删掉
  • 发短信之后不能“撤回”
  • 调用外部支付渠道后,退款和撤销是两种业务

定位路径

  1. 列出每个步骤的“正向动作”和“补偿动作”
  2. 判断补偿是否真正可逆
  3. 判断是否存在时间窗口导致不可逆

止血方案

如果业务动作天然不可逆,不要强行塞进 Saga。
改用:

  • TCC
  • 冻结余额/冻结库存
  • 人工对账修复
  • 异步最终一致 + 定期校正

坑 2:步骤顺序不合理,导致补偿成本过高

例子

先扣款,再扣库存。
一旦库存不足,就得退款;退款往往比库存恢复更贵、更慢、风控限制更多。

定位路径

把步骤按这三个维度排序:

  • 是否容易失败
  • 补偿成本高低
  • 是否涉及外部系统

推荐经验

一般优先:

  1. 本地、便宜、容易失败的步骤先做
  2. 外部、昂贵、不可逆的步骤后做

比如:

  • 创建订单
  • 预留库存
  • 预扣余额/冻结资金
  • 最后确认

而不是一开始就做真实扣款。


坑 3:编排器单点故障

现象

编排器重启后,部分 Saga 永远卡在 RUNNING

根因

Saga 状态只存在内存,没有持久化;或者持久化了,但没有恢复扫描机制。

定位路径

检查三件事:

  1. Saga 实例状态是否落库
  2. 步骤执行日志是否落库
  3. 是否有“超时扫描 + 恢复任务”

止血方案

最少要补上:

  • Saga 表
  • Step 表
  • 定时扫描任务
  • 超时后自动转 PENDING_RETRYWAIT_CONFIRM

一个常见表设计如下:

CREATE TABLE saga_instance (
    saga_id VARCHAR(64) PRIMARY KEY,
    business_key VARCHAR(128) NOT NULL,
    status VARCHAR(32) NOT NULL,
    current_step VARCHAR(64),
    retry_count INT DEFAULT 0,
    last_error TEXT,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

CREATE TABLE saga_step (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    saga_id VARCHAR(64) NOT NULL,
    step_name VARCHAR(64) NOT NULL,
    action_type VARCHAR(16) NOT NULL,
    status VARCHAR(32) NOT NULL,
    idempotent_key VARCHAR(128) NOT NULL,
    retry_count INT DEFAULT 0,
    error_message TEXT,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

坑 4:消息已发送,但本地事务没提交

这是分布式事务里非常经典的问题。

现象

订单服务发出“订单已创建”消息,但数据库里其实没有这个订单记录。

根因

本地事务和消息发送不是原子操作。

定位路径

检查是否使用了:

  • 本地消息表
  • 事务消息
  • Outbox Pattern

止血方案

优先使用 Outbox Pattern

  1. 订单写库
  2. 同事务写一条 outbox 消息
  3. 后台任务或 CDC 把 outbox 发到 MQ
  4. 消费端按幂等处理

这样即使服务崩溃,也能从 outbox 恢复发送。

flowchart TD
    A[业务事务提交] --> B[写订单表]
    A --> C[写 Outbox 表]
    C --> D[消息投递任务]
    D --> E[MQ]
    E --> F[下游服务幂等消费]

安全/性能最佳实践

Saga 不是只解决一致性,还会影响吞吐、风控和数据安全。

1. 不要在补偿接口暴露“任意退款/任意回滚”能力

补偿接口本质上是高危接口。
如果设计成:

  • /refund?orderId=xxx&amount=1000

而没有校验 saga 状态、调用来源、签名和权限,那就是埋雷。

建议

  • 补偿接口只允许内部调用
  • 校验 sagaId、步骤状态、调用方身份
  • 记录完整审计日志
  • 敏感操作要求防重放签名

2. 对重试设置上限与退避策略

无脑重试会把局部故障放大成系统雪崩。

建议

  • 指数退避:1s、2s、4s、8s
  • 设置最大重试次数
  • 区分可重试错误和不可重试错误

例如:

  • 网络超时:可重试
  • 参数非法:不可重试
  • 库存不足:通常不可重试
  • 下游限流:可退避重试

3. Saga 步骤尽量短小,避免长时间锁定业务资源

一个 Saga 如果持续几分钟甚至几小时,就不要再按“实时事务”思路设计了。

建议

  • 把真正需要事务边界的步骤收缩到最小
  • 其他动作异步化
  • 使用“冻结/预留”代替“直接扣减”

例如电商下单:

  • 预留库存
  • 冻结余额
  • 支付成功后再确认扣减

这通常比“失败后再补偿”更稳。


4. 做好可观测性建设

这是排障成功率最高的投资之一。

最低配置

  • 统一 traceId / sagaId
  • 每一步状态变更日志
  • Saga 成功率、补偿率、超时率指标
  • 卡单监控
  • 补偿失败告警

建议监控指标

  • saga_started_total
  • saga_completed_total
  • saga_failed_total
  • saga_compensation_total
  • saga_compensation_failed_total
  • saga_step_duration_ms
  • saga_running_timeout_total

一套实用的排障清单

如果线上有人说“订单状态不对”,我建议按下面这个顺序查:

第一步:找业务主键

先拿到:

  • orderId
  • sagaId
  • traceId

至少有一个,否则日志查找效率很低。

第二步:看 Saga 实例状态

重点看:

  • 当前状态是什么
  • 最后执行到哪个步骤
  • 最后错误是什么
  • 是否进入过补偿

第三步:核对每个步骤的事实状态

不要只看编排器记录,还要去下游服务核对“真实世界”:

  • 订单是否存在
  • 库存是否真的扣了
  • 余额是否真的扣了
  • 消息是否真的发出
  • 补偿是否真的落地

第四步:判断是“未知”还是“失败”

这一步非常关键:

  • 失败:明确没执行成功
  • 未知:超时、响应丢失、状态未回传

未知状态不要贸然补偿,先查最终结果。

第五步:止血优先,修复其次

线上先解决用户影响,再谈长期治理。

常见止血动作:

  • 暂停新单流量
  • 对失败步骤熔断
  • 关闭有问题的自动补偿
  • 启用人工审核队列
  • 批量重放待恢复的 Saga

边界条件:什么时候不适合 Saga

Saga 很好用,但不是银弹。

以下场景要谨慎:

1. 强一致金融核心记账

如果业务要求严格原子性,且任何短暂不一致都不可接受,Saga 通常不合适。
更适合:

  • 单库强事务
  • 专用账务系统
  • TCC
  • 双账本对账

2. 补偿动作不可定义

如果一个动作做了就无法撤销,也不能用业务补偿近似恢复,Saga 会非常难受。

3. 事务链路过长、参与方过多

参与服务太多时,补偿链会指数级复杂,排障成本也直线上升。
这时应先考虑业务拆分,缩小事务边界。


总结

Saga 解决的不是“像单机事务一样绝对回滚”,而是:

在微服务环境里,用本地事务、补偿、幂等和可观测性,把长业务流程控制在一个可恢复、可排障、可接受的范围内。

如果你准备在生产里落地 Saga,我建议至少做到这 6 件事:

  1. 优先用编排式 Saga,方便排障和回放
  2. 每一步都做幂等,包括补偿
  3. 区分失败与未知,超时先查最终状态
  4. 持久化 Saga 状态和步骤日志,别只放内存
  5. 引入 Outbox 或事务消息,解决本地事务与消息一致性
  6. 把监控和人工介入入口一起设计,别等出事再补

最后说一句比较实际的话:
Saga 真正难的从来不是“写出一个流程”,而是“出故障时你能不能把它救回来”。如果你的设计让故障只能靠人工猜,那它就还没准备好上生产。


分享到:

上一篇
《Java 中基于 CompletableFuture 的异步编排实战:并行调用、超时控制与异常兜底-363》
下一篇
《Java开发踩坑实战:排查并修复线程池误用导致的内存暴涨与请求超时问题》