背景与问题
微服务拆得越细,事务就越难做。
在单体应用里,我们习惯了一个本地事务把库存、订单、支付一次性提交:要么都成功,要么都回滚。但到了微服务架构里,这几个动作通常分别属于不同服务、不同数据库,甚至不同团队维护。这个时候,如果还想用传统的两阶段提交(2PC)强行兜底,往往会遇到几个现实问题:
- 数据库和中间件支持不统一
- 性能开销大,锁持有时间长
- 链路一长,失败概率明显升高
- 服务之间耦合变重,演进困难
所以,真正落地时,很多团队最后都会走向一句话:接受最终一致性,而不是执着于强一致性。
但“接受最终一致性”不等于“放任不管”。我见过不少系统嘴上说用 Saga,实际上只是“失败了写个日志”;也见过消息最终一致性方案里,消息发出去了,业务库没落盘,最后账怎么都对不上。
这篇文章我从治理视角来讲,不只讲概念,而是把 Saga、可靠消息、补偿机制怎么组合落地讲清楚,重点回答几个实战问题:
- 什么场景用 Saga,什么场景用消息最终一致性?
- 补偿事务应该怎么设计,才不会越补越乱?
- 代码层面如何做到“业务提交成功,消息也不会丢”?
- 排查线上“不一致”时,应该先看哪里?
先看一个典型业务场景
以“用户下单”为例,通常会串起这些服务:
- 订单服务:创建订单
- 库存服务:扣减库存
- 支付服务:冻结或扣款
- 营销服务:锁定优惠券
- 物流服务:创建发货单
如果这是单库系统,问题不大;但在微服务里,它天然就是一个分布式事务。
失败会长什么样?
例如:
- 订单创建成功,但库存扣减失败
- 库存扣减成功,但支付超时
- 支付成功了,订单状态没更新
- 消费消息重复,导致库存多扣一次
- 补偿流程执行一半,系统又崩了
这些问题的共同点是:链路跨服务、跨存储、跨时序。所以不能只盯着“事务”两个字,而要从“状态流转 + 消息可靠 + 幂等补偿”三个维度一起治理。
方案对比与取舍分析
在架构设计时,我一般先把候选方案放到一张表里看,别一上来就喊“全站 Saga”。
| 方案 | 一致性 | 性能 | 实现复杂度 | 适用场景 | 不适用场景 |
|---|---|---|---|---|---|
| 2PC / XA | 强一致 | 低 | 高 | 少量核心链路、技术栈统一 | 高并发、长事务、异构系统 |
| TCC | 强一致趋近 | 中 | 很高 | 资金、额度、资源预留明确 | 老系统难改造、接口不具备 Try/Confirm/Cancel |
| Saga | 最终一致 | 高 | 中 | 长事务、多步骤业务编排 | 无法定义补偿动作 |
| 消息最终一致性 | 最终一致 | 高 | 中 | 事件驱动、异步解耦 | 需要同步返回强一致结果 |
一句实战建议
- 业务步骤明确、需要有序补偿:优先考虑 Saga
- 服务间以事件驱动传播状态:优先考虑 可靠消息最终一致性
- 金额类核心扣减、且业务可预留资源:再考虑 TCC
- 不要轻易在高并发核心链路上用 XA
很多时候,真正落地不是二选一,而是:
主链路用 Saga 管控业务状态,服务间传播用可靠消息保障最终一致性,失败时靠补偿机制兜底。
核心原理
1. Saga:把一个大事务拆成多个本地事务
Saga 的核心思想很朴素:
- 每个服务只做自己的本地事务
- 每一步成功后再推进下一步
- 中间某一步失败,就按相反顺序执行补偿事务
例如:
- 创建订单
- 扣库存
- 冻结支付
- 锁优惠券
如果冻结支付失败,就需要:
- 释放已锁优惠券
- 回补库存
- 取消订单
这不是数据库层回滚,而是业务层回滚。
2. 消息最终一致性:先落库,再投递
消息最终一致性的关键,不是“用了 MQ”,而是业务数据和待发送消息必须在同一个本地事务里落地。常见做法是 Outbox 模式:
- 在本地事务中:
- 更新业务表
- 写入消息表 outbox
- 事务提交后,由消息投递器异步扫描 outbox,把消息发到 MQ
- 消费方处理后,再通过幂等机制避免重复消费
这样即使服务在事务提交后瞬间宕机,消息也不会丢,因为 outbox 里还在。
3. 补偿机制:不是简单“反向操作”
补偿要考虑三个原则:
- 可重试
- 幂等
- 可观测
比如“回补库存”不能直接 stock = stock + 1 完事,而要基于业务单号、冻结记录、补偿状态去执行。否则重复补偿几次,库存就飞了。
两种 Saga 组织方式
编排式 Saga
由一个协调者统一驱动步骤。
优点:
- 流程清晰,便于治理
- 审计和排查更方便
缺点:
- 协调者容易成为中心依赖
- 灵活性稍弱
协同式 Saga
各服务通过事件驱动推进下一步。
优点:
- 去中心化,耦合更松
- 更适合事件驱动架构
缺点:
- 状态链路分散,排查复杂
- 容易出现“谁该补偿”的边界不清
对于大多数中型团队,我通常建议:
优先从编排式 Saga 起步,等领域边界清晰、事件治理成熟后,再逐步引入协同式 Saga。
flowchart LR
A[创建订单] --> B[扣减库存]
B --> C[冻结支付]
C --> D[锁定优惠券]
D --> E[订单完成]
C -- 失败 --> B2[补偿库存]
B2 --> A2[取消订单]
D -- 失败 --> C2[解冻支付]
C2 --> B2
Saga 与消息最终一致性的组合落地
单独用 Saga,能管理“过程”;单独用消息,能传播“结果”。实战里最好把两者合起来。
一个比较稳的落地形态是:
- Saga 编排服务推进业务步骤
- 每个参与者服务执行本地事务时,同时写业务表和 outbox 表
- 投递器把 outbox 消息发到 MQ
- 下游服务消费消息并执行业务逻辑
- 失败则重试;超过阈值则进入人工或自动补偿队列
sequenceDiagram
participant O as Orchestrator
participant S1 as 订单服务
participant S2 as 库存服务
participant MQ as 消息队列
participant S3 as 支付服务
O->>S1: 创建订单
S1->>S1: 本地事务: 订单表 + outbox
S1-->>O: 成功
O->>S2: 扣减库存
S2->>S2: 本地事务: 库存表 + outbox
S2-->>O: 成功
S2->>MQ: 投递库存已扣减事件
MQ-->>S3: 消费事件
S3->>S3: 冻结支付
S3-->>O: 失败/超时
O->>S2: 补偿库存
O->>S1: 取消订单
实战代码(可运行)
下面给一个简化但可运行的示例,用 Python + SQLite 模拟:
- 订单服务本地事务
- Outbox 可靠消息
- Saga 编排
- 幂等消费
- 补偿操作
这不是生产级框架,但足够把关键点跑通。
目录思路
我们用一个脚本模拟多个服务,重点看机制:
orders:订单表inventory:库存表outbox:待投递消息表processed_messages:消费幂等表
代码示例
import sqlite3
import json
import time
import uuid
DB = "demo.db"
def get_conn():
return sqlite3.connect(DB)
def init_db():
conn = get_conn()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
user_id TEXT,
product_id TEXT,
amount INTEGER,
status TEXT
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS inventory (
product_id TEXT PRIMARY KEY,
stock INTEGER,
frozen INTEGER DEFAULT 0
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS outbox (
msg_id TEXT PRIMARY KEY,
topic TEXT,
payload TEXT,
status TEXT,
retry_count INTEGER DEFAULT 0,
created_at REAL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS processed_messages (
consumer TEXT,
msg_id TEXT,
PRIMARY KEY (consumer, msg_id)
)
""")
conn.commit()
conn.close()
def seed_data():
conn = get_conn()
cur = conn.cursor()
cur.execute("INSERT OR IGNORE INTO inventory(product_id, stock, frozen) VALUES (?, ?, ?)", ("p1", 10, 0))
conn.commit()
conn.close()
def create_order_with_outbox(order_id, user_id, product_id, amount):
conn = get_conn()
cur = conn.cursor()
try:
cur.execute("BEGIN")
cur.execute("""
INSERT INTO orders(order_id, user_id, product_id, amount, status)
VALUES (?, ?, ?, ?, ?)
""", (order_id, user_id, product_id, amount, "CREATED"))
msg_id = str(uuid.uuid4())
payload = json.dumps({
"event": "OrderCreated",
"order_id": order_id,
"user_id": user_id,
"product_id": product_id,
"amount": amount
})
cur.execute("""
INSERT INTO outbox(msg_id, topic, payload, status, created_at)
VALUES (?, ?, ?, ?, ?)
""", (msg_id, "order-events", payload, "NEW", time.time()))
conn.commit()
print(f"[订单服务] 创建订单成功: {order_id}")
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def reserve_inventory(order_id, product_id, amount):
conn = get_conn()
cur = conn.cursor()
try:
cur.execute("BEGIN")
cur.execute("SELECT stock, frozen FROM inventory WHERE product_id = ?", (product_id,))
row = cur.fetchone()
if not row:
raise Exception("商品不存在")
stock, frozen = row
if stock < amount:
raise Exception("库存不足")
cur.execute("""
UPDATE inventory
SET stock = stock - ?, frozen = frozen + ?
WHERE product_id = ?
""", (amount, amount, product_id))
msg_id = str(uuid.uuid4())
payload = json.dumps({
"event": "InventoryReserved",
"order_id": order_id,
"product_id": product_id,
"amount": amount
})
cur.execute("""
INSERT INTO outbox(msg_id, topic, payload, status, created_at)
VALUES (?, ?, ?, ?, ?)
""", (msg_id, "inventory-events", payload, "NEW", time.time()))
conn.commit()
print(f"[库存服务] 预扣库存成功: order={order_id}, amount={amount}")
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def compensate_inventory(order_id, product_id, amount):
conn = get_conn()
cur = conn.cursor()
try:
cur.execute("BEGIN")
cur.execute("""
UPDATE inventory
SET stock = stock + ?, frozen = CASE WHEN frozen >= ? THEN frozen - ? ELSE 0 END
WHERE product_id = ?
""", (amount, amount, amount, product_id))
conn.commit()
print(f"[库存服务] 补偿库存成功: order={order_id}, amount={amount}")
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def cancel_order(order_id):
conn = get_conn()
cur = conn.cursor()
try:
cur.execute("""
UPDATE orders SET status = ? WHERE order_id = ?
""", ("CANCELLED", order_id))
conn.commit()
print(f"[订单服务] 取消订单成功: {order_id}")
finally:
conn.close()
def mark_order_completed(order_id):
conn = get_conn()
cur = conn.cursor()
try:
cur.execute("""
UPDATE orders SET status = ? WHERE order_id = ?
""", ("COMPLETED", order_id))
conn.commit()
print(f"[订单服务] 订单完成: {order_id}")
finally:
conn.close()
def relay_outbox():
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT msg_id, topic, payload FROM outbox WHERE status = 'NEW' ORDER BY created_at")
rows = cur.fetchall()
for msg_id, topic, payload in rows:
print(f"[Outbox投递器] 投递消息: {msg_id}, topic={topic}, payload={payload}")
cur.execute("UPDATE outbox SET status = 'SENT' WHERE msg_id = ?", (msg_id,))
conn.commit()
conn.close()
def consume_message(consumer, msg_id):
conn = get_conn()
cur = conn.cursor()
try:
cur.execute("BEGIN")
cur.execute("""
INSERT INTO processed_messages(consumer, msg_id)
VALUES (?, ?)
""", (consumer, msg_id))
conn.commit()
return True
except sqlite3.IntegrityError:
conn.rollback()
print(f"[{consumer}] 检测到重复消息: {msg_id}")
return False
finally:
conn.close()
def mock_payment(order_id, should_fail=False):
print(f"[支付服务] 冻结支付: order={order_id}")
if should_fail:
raise Exception("支付服务超时")
print(f"[支付服务] 支付冻结成功: order={order_id}")
def saga_place_order(user_id, product_id, amount, payment_fail=False):
order_id = str(uuid.uuid4())
try:
create_order_with_outbox(order_id, user_id, product_id, amount)
reserve_inventory(order_id, product_id, amount)
mock_payment(order_id, should_fail=payment_fail)
mark_order_completed(order_id)
print(f"[Saga] 下单流程成功: {order_id}")
except Exception as e:
print(f"[Saga] 流程失败,开始补偿: {e}")
try:
compensate_inventory(order_id, product_id, amount)
except Exception as ce:
print(f"[Saga] 库存补偿失败: {ce}")
cancel_order(order_id)
def show_state():
conn = get_conn()
cur = conn.cursor()
print("\n=== orders ===")
for row in cur.execute("SELECT * FROM orders"):
print(row)
print("\n=== inventory ===")
for row in cur.execute("SELECT * FROM inventory"):
print(row)
print("\n=== outbox ===")
for row in cur.execute("SELECT msg_id, topic, status FROM outbox"):
print(row)
conn.close()
if __name__ == "__main__":
init_db()
seed_data()
print("\n--- 场景1:成功下单 ---")
saga_place_order("u1", "p1", 2, payment_fail=False)
relay_outbox()
show_state()
print("\n--- 场景2:支付失败触发补偿 ---")
saga_place_order("u2", "p1", 3, payment_fail=True)
relay_outbox()
show_state()
如何运行
python3 app.py
这段代码演示了什么
虽然它是简化版,但已经体现出几个关键落地点:
- 订单创建与 outbox 写入在同一个本地事务内
- 库存扣减与库存事件写入也在同一个本地事务内
- Saga 在支付失败时触发补偿
- 消费端可通过
processed_messages做幂等 - 消息投递不是“顺手发一下”,而是独立 relay 过程
一个更完整的状态流转设计
很多分布式事务最后难维护,不是因为框架不够强,而是状态设计太随意。建议给每个 Saga 实例明确状态机。
stateDiagram-v2
[*] --> INIT
INIT --> ORDER_CREATED
ORDER_CREATED --> INVENTORY_RESERVED
INVENTORY_RESERVED --> PAYMENT_PENDING
PAYMENT_PENDING --> COMPLETED
PAYMENT_PENDING --> COMPENSATING
INVENTORY_RESERVED --> COMPENSATING
ORDER_CREATED --> COMPENSATING
COMPENSATING --> CANCELLED
COMPLETED --> [*]
CANCELLED --> [*]
这张图的意义很大:
- 你能知道当前卡在哪一步
- 能知道下一步应该推进还是补偿
- 重试和人工介入时有依据
我在项目里通常会把 saga_instance 和 saga_step 两张表单独建出来,而不是只靠日志猜流程走到哪了。
常见坑与排查
这一部分非常重要,因为很多团队“方案设计”都没问题,问题出在边界条件和工程细节。
1. 业务成功了,但消息没发出去
常见原因
- 业务事务提交后,直接调用 MQ 发送,发送前服务宕机
- MQ 发送失败没有落重试记录
- 异步线程发消息,主进程退出导致消息丢失
正确姿势
- 采用 Outbox 模式
- 业务表与消息表同事务提交
- 用独立投递器扫描重试
排查路径
- 查业务表是否提交成功
- 查 outbox 表是否有对应消息
- 查 outbox 状态是否停留在
NEW - 查投递器日志、MQ 发送异常、重试次数
2. 消息重复消费,库存被多扣
这是最常见的坑之一。我当时第一次做库存服务时,也踩过:MQ 明明只发了一次,怎么库存少了两次?后来发现其实是消费者超时重投。
正确姿势
- 消费逻辑必须幂等
- 以
msg_id或business_key建唯一约束 - 业务更新最好带状态条件
例如:
UPDATE coupon
SET status = 'USED'
WHERE coupon_id = 'c1' AND status = 'LOCKED';
这样即使重复执行,也不会无限改坏数据。
3. 补偿执行了两次
原因
- 编排器超时后重试
- 补偿接口本身网络超时,但服务实际已执行成功
- 人工补偿和自动补偿并发触发
建议
- 补偿动作也要有补偿记录表
- 补偿接口按
order_id + step_name做幂等 - 人工操作入口必须走统一补偿平台,而不是直接改表
4. 出现“悬挂”与“空补偿”
这个问题在 TCC 更典型,但 Saga 里同样可能出现。
- 空补偿:原动作没成功,补偿先到了
- 悬挂:补偿已执行,后续原动作又来了
建议
给每个步骤设计明确的状态位,例如:
- INIT
- DONE
- COMPENSATED
然后所有接口只允许合法状态迁移,非法迁移直接拒绝并记录。
5. 补偿不是严格逆操作
比如支付服务“扣款”后,不一定能立即“退款”成功;优惠券释放后,也可能已过期;库存回补时还要考虑锁定仓、批次仓。
这说明一件事:
补偿事务是业务语义,不是数学反函数。
所以设计补偿时,一定要拉上业务方确认这些边界:
- 是否允许人工审核后补偿
- 是否允许延迟补偿
- 是否需要“退款中”“补偿中”等中间态
安全/性能最佳实践
安全最佳实践
1. 消息体不要裸奔传敏感数据
MQ 里常会流转:
- 用户手机号
- 支付信息
- 身份标识
建议:
- 只传必要字段
- 敏感字段脱敏或加密
- 关键消息加签,防止伪造
2. 补偿接口必须鉴权
很多团队把“补偿接口”当内部接口,结果运维脚本、测试环境账号都能调,风险很高。
建议:
- 内部接口也做鉴权
- 接口调用必须带 traceId / operator
- 所有补偿操作保留审计日志
3. 防止重放攻击与恶意重试
对消息消费和回调处理:
- 校验消息签名
- 限制重试频率
- 对同一业务单号做时间窗控制
性能最佳实践
1. 缩短本地事务时间
本地事务内只做:
- 核心业务更新
- outbox 写入
不要在事务里做这些事:
- 调外部服务
- 查大量历史数据
- 复杂报表聚合
2. 补偿优先做成异步
补偿不是为了“立刻完成”,而是为了“最终恢复一致”。因此:
- 可以先进补偿队列
- 后台按顺序重试
- 超阈值再人工介入
这样能降低主链路阻塞。
3. 给 outbox 做分片与归档
业务量一大,outbox 会膨胀得很快。
建议至少考虑:
- 按时间归档
- 按状态索引
- 按业务域拆表
- relay 批量拉取、批量发送
4. 做容量估算
简单估算方法:
- 每日订单量:1000 万
- 每单平均事件数:4
- 每日 outbox 增量:4000 万
如果单条消息平均 1 KB,那一天就是约 40 GB 级别的数据写入量,还没算索引和重试副本。
所以,不要把 outbox 当一张“顺手建”的小表。它本质上是高频写入的核心基础设施。
落地治理建议
如果你所在团队正准备做分布式事务治理,我建议按下面的顺序落地,而不是一口气追求“全链路完美”。
第一步:先统一最小规范
至少统一这些东西:
- 全局业务单号
- 消息唯一 ID
- 幂等表规范
- 失败重试次数
- 死信处理规则
- traceId 贯穿日志
这一步做完,排查效率会直接提升。
第二步:优先治理高风险链路
优先挑这些链路:
- 订单、支付、库存
- 优惠券核销
- 积分增减
- 账户余额与额度
因为它们一旦不一致,影响最直观,也最容易被用户投诉。
第三步:建立“补偿平台”而不是散落脚本
成熟团队通常会做统一的补偿治理能力:
- 失败任务列表
- 自动重试
- 人工审核
- 补偿回放
- 状态查询
- 审计留痕
否则最后会变成每个服务都有一套“神秘补偿 SQL”。
边界条件:什么时候别硬上 Saga
Saga 很好,但不是银弹。以下情况要谨慎:
1. 没法定义补偿动作
如果业务上无法撤销,也无法逆向修正,那 Saga 很难做稳。
比如某些外部不可逆操作,一旦调用成功,就只能靠后续人工流程修正。
2. 步骤太多,跨域太散
如果一个 Saga 一口气串 12 个服务、5 个团队,编排器再漂亮也会很难维护。这个时候应该先反思:
- 领域边界是不是拆错了
- 是否把同步强依赖做得太多
- 是否可以缩短事务链路
3. 强一致要求真的很高
例如实时扣款、核心账务记账,如果每一分都不能接受短暂不一致,就不要强行套 Saga。应该优先从:
- 单一账务中心
- 余额预留
- TCC
- 双分录账本
这些方向设计。
总结
分布式事务治理,真正落地时不是选一个“最厉害的模式”,而是把几件事同时做好:
- 用 Saga 管好跨服务业务步骤和补偿顺序
- 用 Outbox + MQ 保证消息最终一致性
- 用 幂等设计 防止重复执行
- 用 状态机和审计日志 提升可观测性
- 用 统一补偿平台 承接异常恢复
如果让我给一个最务实的建议,那就是:
先别追求绝对一致,先把“失败后可恢复”做扎实。
因为在线上系统里,真正可怕的不是失败,而是失败之后你不知道数据错在哪、也不知道怎么补回来。
最后给几个可直接执行的建议:
- 新链路默认接入 outbox,而不是“业务成功后直接发 MQ”
- 每个补偿接口都必须支持幂等
- 每条核心 Saga 链路都要有清晰状态机
- 失败任务必须可查询、可重试、可审计
- 金额类、库存类、券类链路优先治理
做到这些,分布式事务就不再是“理论很美,线上很乱”,而会变成一套真正能扛业务增长的工程能力。