背景与问题
在单体应用里,下单 + 扣库存 + 更新支付状态 往往是一把数据库事务锁住就结束了,开发体验很丝滑。但一旦系统拆成微服务:
- 订单服务有自己的库
- 库存服务有自己的库
- 支付服务可能还是第三方
这时你再想用传统本地事务,就会发现它只能覆盖“自己这一亩三分地”。跨服务、跨库之后,经典的 ACID 事务不再天然成立。
很多团队第一反应是上 2PC / XA。理论上可行,但在互联网业务里往往不太讨喜:
- 锁资源时间长,吞吐容易下降
- 服务之间强耦合
- 中间环节一多,可用性和运维复杂度都上来了
- 一旦某个参与者卡住,整条链路都受影响
所以在微服务架构下,我们更常用 最终一致性 思路。其中,Saga 模式 是非常实用的一种方案。
这篇文章就从一个最常见的业务场景出发:订单创建与库存扣减的一致性,带你把 Saga 真正落到代码和架构上,而不是只停留在概念图。
业务场景拆解
先明确业务目标:
- 用户提交订单
- 系统校验并冻结/扣减库存
- 库存成功则订单进入“待支付”或“已确认”
- 库存失败则订单取消
- 如果中途某一步失败,要有补偿动作,把前面已完成的步骤“撤回来”
注意,这里我们追求的不是“任意时刻强一致”,而是:
- 短时间内允许状态中间态
- 最终达到业务一致
- 任何失败都能追踪、补偿、重试
这正是 Saga 擅长的地方。
方案对比与取舍分析
在进入设计前,先说清楚为什么选 Saga,而不是别的方案。
1. 本地事务
适合单体或单库场景。
一旦订单和库存分属两个服务,就不够用了。
2. 2PC/XA
优点是强一致。
缺点是协调器复杂、阻塞明显、性能和可用性压力大。对于高并发电商类业务,我一般不会优先推荐。
3. TCC
TCC 更强控制力,典型流程是 Try / Confirm / Cancel。
但它对业务侵入很高,每个服务都要实现一套“预留、确认、取消”的接口,开发成本不低。
4. Saga
Saga 更像“长事务拆成多个本地事务 + 补偿动作”:
- 步骤 A 成功
- 步骤 B 失败
- 调用 A 的补偿逻辑
它的优势是:
- 易于落地
- 更符合微服务自治
- 不要求底层数据库支持 XA
- 对吞吐更友好
它的代价是:
- 只能保证最终一致
- 补偿逻辑设计难度不低
- 幂等、重试、消息重复都必须认真处理
结论:订单与库存这种场景,如果业务允许短暂中间态,Saga 是很合适的平衡点。
核心原理
Saga 有两种常见驱动方式:
- 编排式(Orchestration):由一个 Saga 协调器统一驱动各个步骤
- 协同式(Choreography):服务之间通过事件自行协作
这篇文章我用 编排式 来讲,因为它更适合把流程讲清楚,也更便于排错。
Saga 的基本组成
一个完整的 Saga 往往包含:
- 业务步骤:例如创建订单、预扣库存
- 补偿步骤:例如取消订单、释放库存
- 状态记录:Saga 执行到哪一步,是否失败,是否补偿完成
- 幂等控制:同一个请求重复执行不能出错
- 重试机制:网络抖动、服务超时不能直接放弃
- 可观测性:日志、链路追踪、告警
订单与库存的一条 Saga 链路
flowchart TD
A[用户提交订单] --> B[订单服务创建 PENDING 订单]
B --> C[Saga 协调器发起库存预扣]
C --> D{库存是否充足}
D -- 是 --> E[库存服务冻结库存]
E --> F[订单服务更新为 CONFIRMED]
F --> G[返回下单成功]
D -- 否 --> H[订单服务更新为 CANCELLED]
H --> I[返回库存不足]
这个流程里有个关键点:
订单不是一开始就直接变成最终成功,而是先进入中间态。
这是分布式事务设计里非常重要的习惯。你如果一开始就把订单写成“成功”,后面库存失败了,回滚逻辑就会很拧巴。
失败补偿流程
sequenceDiagram
participant U as 用户
participant O as 订单服务
participant S as Saga协调器
participant I as 库存服务
U->>O: 创建订单
O->>O: 本地事务保存 PENDING
O->>S: 发起 Saga
S->>I: reserve(orderId, sku, count)
I-->>S: success / fail
alt 库存成功
S->>O: confirm(orderId)
O-->>U: 下单成功
else 库存失败
S->>O: cancel(orderId)
O-->>U: 下单失败
end
如果再复杂一点,比如后面还接了优惠券、积分、支付,每个步骤都要配对应补偿动作。
架构设计:订单与库存一致性方案
这里给一个比较实用的设计:
服务职责划分
- 订单服务
- 创建订单
- 更新订单状态:
PENDING / CONFIRMED / CANCELLED - 提供确认和取消接口
- 库存服务
- 查询可用库存
- 预留库存(推荐先冻结,不直接物理扣减)
- 释放冻结库存
- Saga 协调器
- 编排流程
- 持久化 Saga 状态
- 失败重试与补偿
- 消息/任务组件
- 异步重试
- 超时扫描
- 死信处理
为什么推荐“冻结库存”而不是“直接扣减”
很多团队一上来就把库存减掉,后面失败再加回来。
这不是不行,但在支付超时、订单取消、并发回滚等情况下,很容易把库存账做乱。
更稳妥的做法是拆成两个字段:
available_stock:可售库存reserved_stock:已冻结库存
下单时:
available_stock -= nreserved_stock += n
订单取消时:
available_stock += nreserved_stock -= n
订单支付确认后,再决定是否把冻结库存真正转成已售数据。
这个模型更贴近电商业务,也更适合 Saga 补偿。
数据模型建议
下面给一个简化版表结构。为了方便演示,我用 SQLite,可直接运行。
classDiagram
class orders {
+id INTEGER
+order_id TEXT
+sku TEXT
+count INTEGER
+status TEXT
+created_at TEXT
}
class inventory {
+sku TEXT
+available INTEGER
+reserved INTEGER
}
class saga_log {
+id INTEGER
+saga_id TEXT
+order_id TEXT
+step TEXT
+status TEXT
+created_at TEXT
}
orders --> inventory : reserve/release
saga_log --> orders : track
实战代码(可运行)
下面用一个 Python + Flask + SQLite 的最小示例来演示编排式 Saga。
它不是生产级框架,但流程是完整的,你本地就能跑起来理解机制。
目录结构
saga_demo/
app.py
demo.db # 启动后自动创建
安装依赖
pip install flask requests
完整代码
from flask import Flask, request, jsonify
import sqlite3
import uuid
from contextlib import closing
from datetime import datetime
app = Flask(__name__)
DB = "demo.db"
def now():
return datetime.utcnow().isoformat()
def get_conn():
conn = sqlite3.connect(DB, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with closing(get_conn()) as conn:
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id TEXT UNIQUE NOT NULL,
sku TEXT NOT NULL,
count INTEGER NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS inventory (
sku TEXT PRIMARY KEY,
available INTEGER NOT NULL,
reserved INTEGER NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS saga_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
saga_id TEXT NOT NULL,
order_id TEXT NOT NULL,
step TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS idempotency (
biz_key TEXT PRIMARY KEY,
result TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit()
cur.execute("SELECT COUNT(*) AS c FROM inventory WHERE sku = ?", ("SKU-1",))
if cur.fetchone()["c"] == 0:
cur.execute(
"INSERT INTO inventory (sku, available, reserved) VALUES (?, ?, ?)",
("SKU-1", 10, 0)
)
conn.commit()
def log_saga(saga_id, order_id, step, status):
with closing(get_conn()) as conn:
conn.execute(
"INSERT INTO saga_log (saga_id, order_id, step, status, created_at) VALUES (?, ?, ?, ?, ?)",
(saga_id, order_id, step, status, now())
)
conn.commit()
def is_processed(biz_key):
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT result FROM idempotency WHERE biz_key = ?",
(biz_key,)
).fetchone()
return row is not None
def mark_processed(biz_key, result="OK"):
with closing(get_conn()) as conn:
conn.execute(
"INSERT OR IGNORE INTO idempotency (biz_key, result, created_at) VALUES (?, ?, ?)",
(biz_key, result, now())
)
conn.commit()
@app.route("/inventory/reserve", methods=["POST"])
def reserve_inventory():
data = request.json
order_id = data["order_id"]
sku = data["sku"]
count = int(data["count"])
biz_key = f"reserve:{order_id}"
if is_processed(biz_key):
return jsonify({"success": True, "message": "idempotent replay"}), 200
with closing(get_conn()) as conn:
cur = conn.cursor()
row = cur.execute(
"SELECT sku, available, reserved FROM inventory WHERE sku = ?",
(sku,)
).fetchone()
if not row:
return jsonify({"success": False, "message": "sku not found"}), 404
if row["available"] < count:
return jsonify({"success": False, "message": "not enough stock"}), 409
cur.execute(
"UPDATE inventory SET available = available - ?, reserved = reserved + ? WHERE sku = ?",
(count, count, sku)
)
conn.commit()
mark_processed(biz_key)
return jsonify({"success": True}), 200
@app.route("/inventory/release", methods=["POST"])
def release_inventory():
data = request.json
order_id = data["order_id"]
sku = data["sku"]
count = int(data["count"])
biz_key = f"release:{order_id}"
if is_processed(biz_key):
return jsonify({"success": True, "message": "idempotent replay"}), 200
with closing(get_conn()) as conn:
cur = conn.cursor()
row = cur.execute(
"SELECT sku, available, reserved FROM inventory WHERE sku = ?",
(sku,)
).fetchone()
if not row:
return jsonify({"success": False, "message": "sku not found"}), 404
if row["reserved"] < count:
return jsonify({"success": False, "message": "reserved stock not enough"}), 409
cur.execute(
"UPDATE inventory SET available = available + ?, reserved = reserved - ? WHERE sku = ?",
(count, count, sku)
)
conn.commit()
mark_processed(biz_key)
return jsonify({"success": True}), 200
@app.route("/orders/create", methods=["POST"])
def create_order():
data = request.json
order_id = str(uuid.uuid4())
sku = data["sku"]
count = int(data["count"])
with closing(get_conn()) as conn:
conn.execute(
"INSERT INTO orders (order_id, sku, count, status, created_at) VALUES (?, ?, ?, ?, ?)",
(order_id, sku, count, "PENDING", now())
)
conn.commit()
saga_id = str(uuid.uuid4())
log_saga(saga_id, order_id, "CREATE_ORDER", "SUCCESS")
# 编排库存预留
reserve_result = call_reserve(order_id, sku, count)
if reserve_result["success"]:
confirm_order(order_id, saga_id)
return jsonify({"success": True, "order_id": order_id, "status": "CONFIRMED"})
else:
cancel_order(order_id, saga_id)
return jsonify({
"success": False,
"order_id": order_id,
"status": "CANCELLED",
"message": reserve_result["message"]
}), 409
def call_reserve(order_id, sku, count):
with app.test_client() as client:
resp = client.post("/inventory/reserve", json={
"order_id": order_id,
"sku": sku,
"count": count
})
return resp.get_json()
def confirm_order(order_id, saga_id):
with closing(get_conn()) as conn:
conn.execute(
"UPDATE orders SET status = ? WHERE order_id = ? AND status = ?",
("CONFIRMED", order_id, "PENDING")
)
conn.commit()
log_saga(saga_id, order_id, "CONFIRM_ORDER", "SUCCESS")
def cancel_order(order_id, saga_id):
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT sku, count, status FROM orders WHERE order_id = ?",
(order_id,)
).fetchone()
if not row:
log_saga(saga_id, order_id, "CANCEL_ORDER", "ORDER_NOT_FOUND")
return
# 先取消订单
with closing(get_conn()) as conn:
conn.execute(
"UPDATE orders SET status = ? WHERE order_id = ? AND status = ?",
("CANCELLED", order_id, "PENDING")
)
conn.commit()
log_saga(saga_id, order_id, "CANCEL_ORDER", "SUCCESS")
# 如果此前库存已经冻结,则执行释放
with app.test_client() as client:
client.post("/inventory/release", json={
"order_id": order_id,
"sku": row["sku"],
"count": row["count"]
})
@app.route("/orders/<order_id>", methods=["GET"])
def get_order(order_id):
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT order_id, sku, count, status, created_at FROM orders WHERE order_id = ?",
(order_id,)
).fetchone()
if not row:
return jsonify({"message": "not found"}), 404
return jsonify(dict(row))
@app.route("/inventory/<sku>", methods=["GET"])
def get_inventory(sku):
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT sku, available, reserved FROM inventory WHERE sku = ?",
(sku,)
).fetchone()
if not row:
return jsonify({"message": "not found"}), 404
return jsonify(dict(row))
@app.route("/saga/logs/<order_id>", methods=["GET"])
def get_saga_logs(order_id):
with closing(get_conn()) as conn:
rows = conn.execute(
"SELECT saga_id, order_id, step, status, created_at FROM saga_log WHERE order_id = ? ORDER BY id",
(order_id,)
).fetchall()
return jsonify([dict(r) for r in rows])
if __name__ == "__main__":
init_db()
app.run(debug=True, port=5000)
运行方式
启动服务:
python app.py
场景 1:库存充足,下单成功
curl -X POST http://127.0.0.1:5000/orders/create \
-H "Content-Type: application/json" \
-d '{"sku":"SKU-1","count":2}'
查询库存:
curl http://127.0.0.1:5000/inventory/SKU-1
你会看到类似结果:
{
"sku": "SKU-1",
"available": 8,
"reserved": 2
}
场景 2:库存不足,下单失败
curl -X POST http://127.0.0.1:5000/orders/create \
-H "Content-Type: application/json" \
-d '{"sku":"SKU-1","count":100}'
此时订单会被取消,不会错误扣减库存。
这段代码在表达什么
这个最小示例主要传达 5 个落地要点。
1. 订单先写入中间态
status = "PENDING"
这是 Saga 的起手式。
不要一上来就写“成功”。
2. 每个本地事务只操作自己的数据
- 订单服务只改订单表
- 库存服务只改库存表
这符合微服务边界,也更容易拆分成独立服务。
3. 失败靠补偿,不靠全局回滚
库存失败时:
- 订单改成
CANCELLED - 如果库存已经冻结,则释放冻结库存
4. 幂等是必须项
你会看到 idempotency 表。
原因很简单:分布式系统里,重复请求不是“会不会发生”,而是“迟早会发生”。
比如:
- 协调器超时后重试
- 网络抖动导致客户端重复提交
- 消息中间件重复投递
没有幂等,补偿一次就可能把库存多加一次。
5. Saga 日志要落库
saga_log 不是装饰品。
它决定了你出问题时能不能知道:
- 卡在哪一步
- 哪一步失败
- 是否已经补偿
- 是否需要人工介入
我实际做项目时,很多线上排查最后都靠这个表救命。
生产环境怎么演进
上面的代码为了演示,把协调逻辑写在一个进程里。真正上生产时,通常会演进成下面这种形态。
flowchart LR
A[客户端] --> B[订单服务]
B --> C[事务消息/Outbox]
C --> D[Saga协调器]
D --> E[库存服务]
D --> F[支付服务]
D --> G[优惠券服务]
D --> H[超时补偿任务]
H --> D
推荐演进路径
1. 引入 Outbox Pattern
订单服务在本地事务里同时写:
ordersoutbox_event
然后由后台任务把事件可靠投递到消息系统。
这样可以避免“订单写成功了,但消息没发出去”的经典问题。
2. Saga 状态机独立持久化
给 Saga 建立独立表,例如:
saga_instancesaga_step_instance
这样协调器可以在服务重启后恢复上下文。
3. 异步化与超时扫描
不是每一步都要同步等待结果。
比如库存服务 200ms 内没响应,不一定立刻失败,可以:
- 标记步骤为
WAITING - 交给重试任务
- 超过总超时再补偿
4. 接入链路追踪
建议至少把这些字段打进日志:
trace_idsaga_idorder_idstepretry_count
否则多个服务一串起来,排查非常痛苦。
常见坑与排查
这一部分我尽量写得接地气一点,因为这些坑是真的常见。
1. 补偿接口不幂等
现象
- 一个订单取消了两次
- 库存被释放两次
- 最后库存变多了
根因
补偿动作通常会被重试,而不是只调用一次。
如果你写的是:
UPDATE inventory SET available = available + 2;
但没有判断这次补偿是否已经执行过,就会重复加库存。
排查方式
- 查补偿日志是否有重复调用
- 查同一
order_id的幂等记录 - 查库存变动流水是否重复
建议
- 每个业务动作都带
biz_key reserve:{order_id}/release:{order_id}- 先落幂等记录,再执行业务,或通过唯一键保证只执行一次
2. 只改库存总量,不做冻结量区分
现象
- 下单后库存直接减少
- 订单取消时再加回去
- 高并发下对账非常混乱
根因
缺少“可用库存 / 冻结库存”分层。
建议
至少维护:
availablereserved
必要时再加:
soldversion
3. 订单状态机设计过于随意
现象
一个订单可能从:
CONFIRMED -> PENDINGCANCELLED -> CONFIRMED
这种逆向跳转如果没有明确控制,后面会越来越乱。
建议
用明确状态机约束流转,例如:
PENDING -> CONFIRMEDPENDING -> CANCELLED
禁止非法跃迁。
可以在 SQL 中加条件:
UPDATE orders
SET status = 'CONFIRMED'
WHERE order_id = ? AND status = 'PENDING';
这样天然防并发脏写。
4. 协调器以为失败了,参与者其实成功了
这是分布式事务里很经典的一种“双写感知不一致”。
现象
- 协调器调用库存服务超时
- 协调器认为失败,开始补偿
- 但库存服务其实已经执行成功
后果
如果没有幂等与状态查询能力,就会出现“看起来谁都没错,但数据就是错了”。
建议
库存服务最好支持:
- 操作幂等
- 根据
order_id查询预留结果 - 重试时能返回“之前已经成功”
5. 没有死信和人工兜底
现象
某个补偿一直失败,但系统里没人知道。
最后只有用户投诉,你才发现有订单挂了几天。
建议
对 Saga 失败设置分级策略:
- 自动重试 3 次
- 超过阈值进入死信队列
- 触发告警
- 提供人工补偿后台
很多系统不是败在设计,而是败在“没有最后一公里的兜底”。
安全/性能最佳实践
Saga 不只是“事务正确”就够了,线上系统还得考虑安全、容量和吞吐。
安全最佳实践
1. 接口鉴权不能省
订单服务和库存服务之间虽然是内网调用,也建议至少做:
- 服务间身份认证
- 签名或 Token 校验
- 最小权限控制
否则内部系统被误调用,库存接口很容易成为高危入口。
2. 防重放与请求唯一标识
每次业务请求都应带:
request_idorder_idsaga_id
这样既能防止重复处理,也能用于审计。
3. 关键操作留审计日志
尤其是:
- 人工补偿
- 强制取消订单
- 手工修库存
这些日志在事故复盘时非常重要。
性能最佳实践
1. 用短事务,别把远程调用包进数据库事务
这是个高频错误。
有些人会这样写:
- 开启数据库事务
- 插入订单
- 调库存远程接口
- 等返回
- 提交事务
这样数据库锁会被白白持有,性能和死锁风险都变差。
正确做法是:
- 每个服务内部只做本地短事务
- 远程调用在事务边界之外编排
2. 为库存扣减加并发控制
可以用:
- 乐观锁
version - 条件更新
- 原子 SQL
比如:
UPDATE inventory
SET available = available - 1,
reserved = reserved + 1
WHERE sku = 'SKU-1'
AND available >= 1;
然后检查受影响行数是否为 1。
这是非常实用的防超卖手法。
3. Saga 日志和业务表分离
不要把所有编排信息都塞进订单表。
业务表负责业务状态,Saga 表负责流程状态,职责越清晰,后面越稳。
4. 容量估算别只看订单量
Saga 的链路量通常大于订单量。
举个例子:
- 1 笔订单
- 创建订单 1 次
- 预留库存 1 次
- 订单确认 1 次
- 失败时还有补偿 1~2 次
- 重试和日志也算写放大
所以你做容量估算时,至少要考虑:
- 订单峰值 QPS
- 平均每单 Saga 步骤数
- 重试比例
- 日志写入放大倍数
一个简单估算公式:
Saga写操作量 ≈ 订单QPS × (正常步骤数 + 平均重试次数 + 平均补偿次数)
如果订单高峰 1000 QPS,每单平均 4 次写操作,重试系数 0.3,那底层支撑能力至少要按 5000 级别写入量去评估,而不是只盯着 1000 QPS。
边界条件与适用范围
Saga 不是银弹,下面这些边界要提前想清楚。
适合 Saga 的场景
- 允许短暂中间态
- 可以设计明确补偿动作
- 服务边界清晰
- 更重视可用性和吞吐,而非严格强一致
比如:
- 订单、库存、优惠券、积分
- 出行订单、优惠券核销
- 内容发布、多服务审核流
不太适合 Saga 的场景
- 必须严格实时强一致
- 补偿成本极高或不可逆
- 一步失败会带来强法律/财务风险
比如某些核心记账链路,如果补偿难以保证,就要更谨慎,可能需要 TCC 或更强的一致性方案。
我会怎么在项目里落地
如果让我从 0 到 1 设计,我通常会按这个顺序做:
-
先定义状态机
- 订单有哪些状态
- 哪些状态允许互相转换
-
再定义补偿动作
- 哪一步失败要撤销哪一步
- 撤销是否一定成功
- 撤销失败如何重试
-
给每个步骤加幂等键
- 不要等事故后再补
-
落 Saga 持久化表
- 实例表
- 步骤表
- 重试次数
- 最后错误信息
-
补上观测能力
- trace_id
- 告警
- 死信
- 人工处理后台
-
最后才谈框架选型
- Seata Saga
- 自研工作流
- 基于消息中间件编排
很多团队恰好反过来,一上来先选框架,最后发现状态机和补偿逻辑根本没想清楚。
总结
在微服务架构里,订单与库存一致性是绕不过去的话题。
如果你不想承受 2PC/XA 的复杂度,又能接受最终一致性,那么 Saga 模式 是非常现实的一条路。
这篇文章的核心结论可以浓缩成 5 句话:
- 订单先进入中间态,不要直接写最终成功
- 每个服务只做自己的本地事务
- 失败依赖补偿,而不是幻想全局回滚
- 幂等、重试、日志是 Saga 能否上线的生命线
- 库存设计尽量采用“可用 + 冻结”模型,别只维护一个总数
如果你准备在项目里落地,我建议最少做到这些:
- 订单状态机清晰
- 库存预留/释放接口幂等
- Saga 执行日志可追踪
- 超时补偿任务可运行
- 死信与人工介入链路完整
最后提醒一句:
分布式事务的难点,从来不在“让 happy path 跑通”,而在“失败之后系统还能不能稳住”。
Saga 的价值,恰恰就在这里。