分布式架构中基于 Saga 模式的跨服务事务设计与落地实践
在单体应用里,事务这件事往往是“理所当然”的:一个数据库连接、一段 begin/commit/rollback,事情就结束了。可一旦进入微服务或分布式架构,订单、库存、支付、优惠券、积分分别由不同服务和不同数据库维护,传统本地事务就不够用了。
很多团队在这个阶段会先问一个问题:跨服务事务到底要不要追求强一致?
我自己的经验是,业务系统里绝大多数“交易链路”其实并不需要所有服务在同一时刻绝对一致,而是需要:
- 最终能收敛到正确状态
- 中间过程可观测、可重试、可补偿
- 出问题时能定位、能止血、能人工兜底
这也是 Saga 模式特别适合的地方。
背景与问题
为什么本地事务不够了
假设一个下单流程包含以下步骤:
- 订单服务创建订单
- 库存服务扣减库存
- 支付服务冻结或扣款
- 营销服务核销优惠券
- 积分服务发放积分
如果这几个动作分散在多个服务、多个数据库中,就会面临几个典型问题:
- 数据库事务无法跨服务传播
- 网络调用可能超时、重复、乱序
- 服务成功与响应成功不是一回事
- 回滚往往不是技术回滚,而是业务补偿
举个很真实的例子:
- 订单服务已经创建成功
- 库存已扣减
- 支付服务调用超时,但实际上支付已成功
- 调用方误以为失败,开始回滚库存和订单
这时候如果没有统一的事务编排和幂等机制,系统就会进入非常难缠的“半成功状态”。
传统方案为什么不总适合
跨服务事务常见有几类思路:
| 方案 | 一致性 | 实现复杂度 | 性能影响 | 适用场景 |
|---|---|---|---|---|
| 2PC/XA | 强一致 | 高 | 大 | 传统集中式系统、资源支持 XA |
| TCC | 较强 | 很高 | 中 | 核心资金链路、可显式预留资源 |
| Saga | 最终一致 | 中 | 较小 | 订单、履约、营销等长流程业务 |
| 本地消息表/事件驱动 | 最终一致 | 中 | 小 | 解耦型异步业务 |
Saga 的核心优势在于:它承认分布式环境中的失败是常态,因此不强行做“跨库原子提交”,而是通过前向执行 + 失败补偿来完成业务闭环。
核心原理
Saga 可以粗略理解为:
把一个长事务拆成多个本地事务,每一步成功后进入下一步;如果中途失败,则按相反顺序执行补偿操作。
两种常见形态
1. 编排式 Saga(Orchestration)
由一个统一的 Saga 协调器负责驱动流程:
- 调订单服务
- 调库存服务
- 调支付服务
- 失败时统一触发补偿
优点:
- 流程集中,可观测性强
- 便于统一重试、超时控制、告警
缺点:
- 协调器容易变成“流程大脑”
- 对协调器可用性要求高
2. 协同式 Saga(Choreography)
每个服务通过事件驱动下一步:
- 订单创建事件 -> 库存扣减
- 库存扣减事件 -> 支付处理
- 支付失败事件 -> 触发订单取消和库存回补
优点:
- 服务解耦
- 更符合事件驱动架构
缺点:
- 流程散落在多个服务中
- 排查链路和全局状态更复杂
对于中级工程团队,如果业务流程比较明确,我通常会优先建议从编排式 Saga 起步,因为落地、排查、治理都更直接。
方案对比与取舍分析
Saga 与 TCC 的取舍
很多人会把 Saga 和 TCC 混在一起。它们都处理分布式事务,但适用点不同。
| 维度 | Saga | TCC |
|---|---|---|
| 事务模型 | 本地事务 + 补偿 | Try/Confirm/Cancel |
| 对业务侵入 | 中 | 高 |
| 资源预留能力 | 弱 | 强 |
| 一致性控制 | 最终一致 | 更强 |
| 落地成本 | 相对较低 | 较高 |
如果你的业务是:
- 电商下单、营销核销、履约处理:Saga 很合适
- 账户余额、支付扣款、资金冻结:更适合 TCC 或专门账务模型
一句话概括:
Saga 擅长“把事做完再修正”,TCC 擅长“先占住资源再确认”。
Saga 的边界条件
Saga 不是银弹。以下场景要慎用:
- 补偿动作无法定义,或者补偿成本极高
- 外部系统不支持幂等和状态查询
- 某一步是不可逆动作,例如已经向第三方真实出款
- 业务无法接受较长时间的中间不一致
Saga 事务流程设计
下面以“订单创建”流程为例:
- 创建订单
- 扣减库存
- 扣减余额
- 任一步失败,执行补偿:
- 退款/解冻余额
- 回补库存
- 取消订单
流程图
flowchart TD
A[用户提交订单] --> B[创建订单 PENDING]
B --> C[扣减库存]
C --> D[扣减余额]
D --> E[订单确认 CONFIRMED]
C -.失败.-> X1[取消订单]
D -.失败.-> X2[回补库存]
X2 --> X1
时序图
sequenceDiagram
participant Client as 客户端
participant Saga as Saga协调器
participant Order as 订单服务
participant Inventory as 库存服务
participant Payment as 支付服务
Client->>Saga: 提交创建订单
Saga->>Order: createPendingOrder()
Order-->>Saga: orderId
Saga->>Inventory: deduct(orderId, sku, count)
Inventory-->>Saga: success
Saga->>Payment: debit(orderId, userId, amount)
alt 支付成功
Payment-->>Saga: success
Saga->>Order: confirm(orderId)
Order-->>Saga: success
Saga-->>Client: 下单成功
else 支付失败
Payment-->>Saga: failed
Saga->>Inventory: compensateDeduct(orderId)
Inventory-->>Saga: success
Saga->>Order: cancel(orderId)
Order-->>Saga: success
Saga-->>Client: 下单失败
end
状态机建议
状态一定要设计清楚,不然补偿和重试很容易打架。
stateDiagram-v2
[*] --> PENDING
PENDING --> INVENTORY_DEDUCTED
INVENTORY_DEDUCTED --> PAYMENT_DONE
PAYMENT_DONE --> CONFIRMED
PENDING --> CANCELED
INVENTORY_DEDUCTED --> COMPENSATING
COMPENSATING --> CANCELED
COMPENSATING --> COMPENSATION_FAILED
实战代码(可运行)
下面我用 Python 做一个可运行的最小 Saga 编排示例。它不是生产级框架,但足够把核心逻辑讲透:
- 每个服务维护自己的本地状态
- Saga 协调器负责编排和补偿
- 所有操作都做了简单幂等
- 可以模拟失败
你可以直接保存为 saga_demo.py 运行。
from dataclasses import dataclass, field
from typing import Dict, Set
import uuid
@dataclass
class Order:
order_id: str
user_id: str
sku: str
count: int
amount: int
status: str = "PENDING"
class OrderService:
def __init__(self):
self.orders: Dict[str, Order] = {}
self.created_ops: Set[str] = set()
self.confirmed_ops: Set[str] = set()
self.canceled_ops: Set[str] = set()
def create_pending_order(self, op_id: str, user_id: str, sku: str, count: int, amount: int) -> str:
if op_id in self.created_ops:
# 幂等:重复请求返回原订单
for oid, order in self.orders.items():
if order.user_id == user_id and order.sku == sku and order.count == count and order.amount == amount:
return oid
order_id = str(uuid.uuid4())
self.orders[order_id] = Order(
order_id=order_id,
user_id=user_id,
sku=sku,
count=count,
amount=amount,
status="PENDING"
)
self.created_ops.add(op_id)
print(f"[Order] create pending order: {order_id}")
return order_id
def confirm_order(self, op_id: str, order_id: str):
if op_id in self.confirmed_ops:
print(f"[Order] confirm idempotent: {order_id}")
return
order = self.orders[order_id]
if order.status == "CANCELED":
raise Exception("cannot confirm canceled order")
order.status = "CONFIRMED"
self.confirmed_ops.add(op_id)
print(f"[Order] confirmed: {order_id}")
def cancel_order(self, op_id: str, order_id: str):
if op_id in self.canceled_ops:
print(f"[Order] cancel idempotent: {order_id}")
return
order = self.orders[order_id]
if order.status == "CONFIRMED":
raise Exception("cannot cancel confirmed order")
order.status = "CANCELED"
self.canceled_ops.add(op_id)
print(f"[Order] canceled: {order_id}")
class InventoryService:
def __init__(self):
self.stock = {"SKU-1": 10}
self.deducted_ops: Set[str] = set()
self.compensated_ops: Set[str] = set()
def deduct(self, op_id: str, sku: str, count: int):
if op_id in self.deducted_ops:
print(f"[Inventory] deduct idempotent: {sku}, {count}")
return
remain = self.stock.get(sku, 0)
if remain < count:
raise Exception("insufficient stock")
self.stock[sku] = remain - count
self.deducted_ops.add(op_id)
print(f"[Inventory] deducted: {sku}, count={count}, remain={self.stock[sku]}")
def compensate_deduct(self, op_id: str, sku: str, count: int):
if op_id in self.compensated_ops:
print(f"[Inventory] compensate idempotent: {sku}, {count}")
return
self.stock[sku] = self.stock.get(sku, 0) + count
self.compensated_ops.add(op_id)
print(f"[Inventory] compensated: {sku}, count={count}, remain={self.stock[sku]}")
class PaymentService:
def __init__(self):
self.balance = {"U-1": 100}
self.debited_ops: Set[str] = set()
self.refunded_ops: Set[str] = set()
def debit(self, op_id: str, user_id: str, amount: int, fail=False):
if op_id in self.debited_ops:
print(f"[Payment] debit idempotent: {user_id}, {amount}")
return
if fail:
raise Exception("simulated payment failure")
remain = self.balance.get(user_id, 0)
if remain < amount:
raise Exception("insufficient balance")
self.balance[user_id] = remain - amount
self.debited_ops.add(op_id)
print(f"[Payment] debited: {user_id}, amount={amount}, remain={self.balance[user_id]}")
def refund(self, op_id: str, user_id: str, amount: int):
if op_id in self.refunded_ops:
print(f"[Payment] refund idempotent: {user_id}, {amount}")
return
self.balance[user_id] = self.balance.get(user_id, 0) + amount
self.refunded_ops.add(op_id)
print(f"[Payment] refunded: {user_id}, amount={amount}, remain={self.balance[user_id]}")
class SagaCoordinator:
def __init__(self, order_service: OrderService, inventory_service: InventoryService, payment_service: PaymentService):
self.order_service = order_service
self.inventory_service = inventory_service
self.payment_service = payment_service
def create_order(self, user_id: str, sku: str, count: int, amount: int, payment_fail=False):
saga_id = str(uuid.uuid4())
print(f"\n=== start saga: {saga_id} ===")
order_id = None
inventory_done = False
payment_done = False
try:
order_id = self.order_service.create_pending_order(
op_id=f"{saga_id}:create_order",
user_id=user_id,
sku=sku,
count=count,
amount=amount
)
self.inventory_service.deduct(
op_id=f"{saga_id}:deduct_inventory",
sku=sku,
count=count
)
inventory_done = True
self.payment_service.debit(
op_id=f"{saga_id}:debit_payment",
user_id=user_id,
amount=amount,
fail=payment_fail
)
payment_done = True
self.order_service.confirm_order(
op_id=f"{saga_id}:confirm_order",
order_id=order_id
)
print(f"=== saga success: {saga_id} ===")
return order_id
except Exception as e:
print(f"[Saga] failed: {e}, begin compensation")
if payment_done:
self.payment_service.refund(
op_id=f"{saga_id}:refund_payment",
user_id=user_id,
amount=amount
)
if inventory_done:
self.inventory_service.compensate_deduct(
op_id=f"{saga_id}:compensate_inventory",
sku=sku,
count=count
)
if order_id:
self.order_service.cancel_order(
op_id=f"{saga_id}:cancel_order",
order_id=order_id
)
print(f"=== saga compensated: {saga_id} ===")
return None
if __name__ == "__main__":
order_service = OrderService()
inventory_service = InventoryService()
payment_service = PaymentService()
saga = SagaCoordinator(order_service, inventory_service, payment_service)
print(">>> case1: success")
oid1 = saga.create_order(user_id="U-1", sku="SKU-1", count=2, amount=30, payment_fail=False)
print("\n>>> case2: payment fail")
oid2 = saga.create_order(user_id="U-1", sku="SKU-1", count=3, amount=20, payment_fail=True)
print("\n>>> final state")
print("order ids:", oid1, oid2)
print("orders:", order_service.orders)
print("stock:", inventory_service.stock)
print("balance:", payment_service.balance)
运行结果会看到什么
- 第一笔订单成功:订单确认、库存减少、余额减少
- 第二笔订单支付失败:库存回补、订单取消
- 最终各服务状态能收敛
这就是 Saga 的最核心目标:不是每一步都不失败,而是失败后系统仍可恢复到业务可接受状态。
落地时的关键设计点
代码只是骨架,真正上线时,下面这些设计点更重要。
1. 全局事务 ID
每个 Saga 必须有唯一 sagaId,并在所有服务间透传:
- 用于日志串联
- 用于幂等键
- 用于重试与补偿定位
常见做法:
traceId负责链路追踪sagaId负责业务事务- 子步骤再带
stepId/opId
2. 补偿不是回滚
很多人刚接触 Saga 时,脑子里还是数据库回滚思维。实际上补偿更接近:
- 扣库存 -> 回补库存
- 扣余额 -> 退款
- 创建订单 -> 取消订单
注意,这些动作不一定能把数据恢复到“原封不动”的旧值,只是恢复到业务上合理的新状态。
3. 幂等必须内建
我踩过一个坑:补偿任务超时后重试,结果库存被回补了两次,最后库存凭空增加。
所以每个动作都要支持幂等:
- 正向动作幂等
- 补偿动作幂等
- 查询状态幂等
- 消息投递重复可接受
常见实现:
- 唯一业务键 + 去重表
- 操作流水表
- 状态机防重入
- 乐观锁 / 唯一索引
4. 状态机优先于布尔值
不要把流程状态拆成一堆布尔字段,例如:
inventoryDeductedpaymentDonecouponUsed
这种设计后期非常容易出现组合爆炸。更稳妥的方式是:
- 明确主状态
- 明确补偿状态
- 明确终态
- 每次状态变更都有事件和时间戳
常见坑与排查
Saga 最难的往往不是“写出来”,而是“线上出了问题能不能搞清楚”。
坑 1:调用超时,但下游其实成功了
现象
上游收到超时,触发补偿;下游晚一点又返回成功,导致状态冲突。
根因
- 网络超时不等于业务失败
- 没有状态查询接口
- 没有“处理中”语义
排查建议
先看三个维度:
- 请求日志:是否真正发出
- 下游处理日志:是否落库成功
- 状态表:最终状态是否已推进
解决方法
- 每个关键步骤提供
queryStatus(opId)接口 - 对超时步骤先查状态,再决定重试还是补偿
- 引入“处理中/未知”中间状态,避免过早补偿
坑 2:补偿失败,事务悬挂
现象
支付失败后库存补偿也失败,订单一直卡在“补偿中”。
根因
- 补偿链路没设计重试
- 补偿服务自身依赖了更多外部系统
- 补偿动作不是天然可逆
排查建议
重点看:
- 补偿任务表是否有重试次数
- 补偿失败是否进入死信或人工队列
- 依赖服务是否存在雪崩
解决方法
- 补偿任务持久化
- 指数退避重试
- 超阈值转人工处理
- 提前定义“兜底状态”
例如:
COMPENSATINGCOMPENSATION_FAILEDMANUAL_REVIEW
坑 3:消息重复消费导致多次执行
现象
库存被扣了两次,或者退款了两次。
根因
- 消息至少一次投递
- 消费端缺乏幂等
排查建议
- 检查消息唯一键是否稳定
- 检查消费记录表是否按业务主键去重
- 看是否因为消费者重平衡导致重复消费
解决方法
- 消费前落幂等记录
- 业务表加唯一约束
- “先判重,再执行业务,再提交消费位点”
坑 4:补偿顺序不对
现象
先取消订单,再尝试退款,导致退款逻辑因为订单状态非法而失败。
解决方法
补偿顺序通常遵循逆序原则:
- 最后执行的正向动作,最先补偿
- 尽量避免补偿动作依赖已被销毁的上下文
安全/性能最佳实践
Saga 讨论里大家容易只盯着一致性,其实安全和性能一样重要。
安全最佳实践
1. 补偿接口要鉴权
补偿接口本质上可以“撤销业务动作”,如果暴露不当,风险很大。
建议:
- 仅允许内网访问
- 使用服务间身份认证
- 对
sagaId/opId做签名校验或权限校验 - 记录完整审计日志
2. 防止伪造重试
如果重试任务由消息系统或任务系统触发,要避免外部伪造请求反复打补偿接口。
可做法:
- 请求带内部 token
- 操作表校验状态流转合法性
- 相同
opId只允许执行到特定终态
3. 敏感数据最小化透传
Saga 上下文中不要传完整敏感信息,例如:
- 银行卡号
- 身份证号
- 明文手机号
建议只传:
- 业务 ID
- 脱敏后的展示字段
- 下游查询所需最小信息
性能最佳实践
1. 缩短本地事务时间
本地事务中不要做这些事:
- 远程调用
- 慢 SQL
- 大批量扫描
- 大对象序列化
原则是:
本地事务只做本地可靠落库,远程动作交给事务编排或消息机制。
2. 控制 Saga 上下文大小
我见过有人把整个订单 JSON 透传给每一步服务,最后消息体越来越大,链路延迟明显上升。
建议上下文只保留:
sagaIdorderIduserId- 必要金额/数量
- 当前步骤和版本号
3. 重试要有限流和退避
补偿失败后疯狂重试,会把下游压垮,最终从“局部错误”变成“系统事故”。
建议:
- 指数退避
- 最大重试次数
- 熔断与降级
- 死信队列 + 人工介入
4. 监控关键指标
至少监控这些指标:
- Saga 成功率
- 平均完成时长
- 补偿率
- 补偿失败率
- 长时间未结束事务数
- 每步骤超时分布
- 重试次数分布
容量估算与治理建议
架构文章不能只停留在“模式正确”,还要考虑跑起来后的规模问题。
一个粗略估算方法
假设:
- 下单峰值 QPS:500
- 每个 Saga 平均 4 个步骤
- 每步平均 1 次消息投递
- 失败补偿比例:2%
那么:
- 每秒正向步骤请求数约:
500 × 4 = 2000 - 每秒补偿额外请求数约:
500 × 4 × 2% = 40 - 如果每步都记录事件日志,日志写入约:
2000 ~ 3000+/s
这意味着你至少要评估:
- 事务日志表写入能力
- 消息队列吞吐
- 补偿任务堆积上限
- 监控系统基数增长
治理建议
- Saga 状态表按时间分区或分表
- 历史完成事务归档
- 长事务单独监控
- 补偿失败工单化
- 对热点业务链路做压测演练
一套更贴近生产的落地建议
如果你准备在真实系统中上 Saga,我建议按下面顺序推进,而不是一下子搞很重的框架。
第一步:先统一模型
明确每个步骤的:
- 正向动作
- 补偿动作
- 幂等键
- 超时阈值
- 可查询状态
- 最终终态
第二步:先做编排式
先有一个简单可靠的协调器,哪怕是应用内模块,也比“每个服务自己猜流程”更可控。
第三步:补齐可观测性
至少做到:
sagaId全链路日志- 状态变化事件
- 补偿告警
- 超时任务扫描
第四步:准备人工兜底
请务必接受现实:总会有自动补偿处理不了的情况。
所以生产系统中要有:
- 补偿失败列表
- 人工重试入口
- 人工终止/标记完成
- 操作审计记录
这部分经常被忽略,但真的很关键。
总结
Saga 模式适合解决分布式架构中大量跨服务、长流程、可补偿的事务问题。它不追求跨服务的瞬时强一致,而是通过本地事务、状态推进、失败补偿、幂等重试来实现最终一致。
如果你只记住几条落地建议,我建议是这几条:
- 优先选择编排式 Saga 起步,流程更清晰,排查更直观
- 每一步都要幂等,包括正向、补偿、重试、查询
- 状态机要先设计好,不要靠布尔字段拼流程
- 超时不等于失败,先查状态再决定补偿
- 补偿失败要可重试、可告警、可人工接管
- 明确边界:资金强一致场景,不要勉强用 Saga
最后一句比较务实的话:
Saga 真正的难点不是“怎么把流程串起来”,而是“失败之后系统还能不能优雅地收回来”。
只要你围绕这个目标设计,方案通常就不会跑偏。