背景与问题
订单服务和库存服务,几乎是分布式系统里最经典的一对“冤家”。
在单体应用里,下单通常就是一段本地事务:
- 创建订单
- 扣减库存
- 提交事务
要么全成功,要么全失败,逻辑非常直观。
但一旦拆成微服务:
- 订单服务负责订单创建
- 库存服务负责库存扣减
- 两边各自有独立数据库
这时问题立刻出现了:跨服务的原子事务很难做,成本也很高。如果硬上 2PC/XA,理论上可以追求强一致,但实际在高并发、电商促销、链路复杂的场景里,往往会碰到这些问题:
- 性能差,锁资源时间长
- 可用性下降,协调器成为薄弱点
- 服务升级、扩缩容、异构存储时很别扭
- 一旦链路某处抖动,整体吞吐明显下滑
所以很多业务系统会选择一个更现实的方案:基于消息队列实现最终一致性。
这里的核心目标不是“绝对同一时刻一致”,而是:
- 订单最终能反映库存处理结果
- 库存不会被重复扣减
- 系统在失败、重试、消息重复、服务抖动的情况下,仍能恢复到正确状态
这篇文章我就围绕一个典型场景来讲:用户下单后,订单服务通过消息驱动库存扣减,最终达成一致。重点不是概念堆砌,而是把设计、代码、坑点和排查思路串起来。
场景建模:订单与库存的一条真实链路
先把问题说具体一点。
假设用户下单购买商品 sku-1001,数量 2。业务流程可能是:
- 订单服务校验参数,创建订单,状态为
PENDING - 订单服务发送“订单已创建,待扣库存”消息
- 库存服务消费消息,执行库存预扣或实扣
- 库存服务处理完成后,回写结果消息
- 订单服务根据结果把订单状态改成:
CONFIRMED:库存扣减成功FAILED:库存不足或处理失败
这个流程里最重要的不是“消息有没有发出去”,而是:
- 订单数据库写入和消息发送不能丢一头
- 库存消费必须幂等
- 回执消息也必须可重试、可对账
- 订单状态流转要可恢复,不要卡死在中间态
下面先看整体结构。
flowchart LR
U[用户下单] --> O[订单服务]
O --> OD[(订单库)]
O --> OT[(Outbox事件表)]
O --> MQ[(消息队列)]
MQ --> I[库存服务]
I --> ID[(库存库)]
I --> MQ2[(结果消息)]
MQ2 --> O2[订单结果处理]
O2 --> OD
这张图里有一个关键点:Outbox 事件表。这是很多系统把“最终一致性”做稳的核心。
核心原理
1. 最终一致性不是“放个 MQ 就行”
很多人第一次设计时会写成这样:
- 订单服务插入订单
- 调用 MQ 发送消息
看起来没问题,但实际上这里有一个经典故障窗口:
- 订单已写入数据库
- 还没来得及发消息,服务宕机了
结果就是:订单存在,但库存永远收不到消息。
反过来也一样危险:
- 消息发出去了
- 订单事务回滚了
结果库存已经扣了,但订单根本不存在。
所以“数据库写入”和“消息发送”必须通过某种机制关联起来。最常见、最好落地的办法,就是 本地事务 + Outbox 表 + 异步投递。
2. Outbox 模式
思路很朴素:
- 在订单服务本地事务里,同时做两件事:
- 写订单表
- 写消息事件表(Outbox)
- 事务提交后,由后台任务扫描 Outbox 表,把未投递事件推送到 MQ
- 推送成功后,将事件标记为已发送
这样做的好处是:
- 订单写入和“待发送消息”是同一个本地事务
- 不依赖分布式事务
- 即使 MQ 短暂不可用,也能靠后台补发恢复
订单侧状态流转
stateDiagram-v2
[*] --> PENDING
PENDING --> CONFIRMED: 库存扣减成功
PENDING --> FAILED: 库存不足/处理失败
PENDING --> CANCELLED: 超时未完成取消
FAILED --> [*]
CONFIRMED --> [*]
CANCELLED --> [*]
注意这里我特意保留了 PENDING 中间态。因为在分布式系统里,中间态不是问题,不可恢复的中间态才是问题。
3. 消费端幂等
MQ 的现实世界里,你必须默认下面这些事都会发生:
- 消息重复投递
- 消费者处理到一半崩溃
- 消费成功了,但 ack 丢了
- 网络抖动导致生产方重复发送
所以库存服务不能靠“消息只来一次”活着,而必须靠幂等活着。
常见做法:
- 给每个业务事件一个唯一
event_id - 库存服务落库前先检查
event_id是否处理过 - 已处理则直接返回成功,不再重复扣减
4. 结果回传与补偿
仅有“订单发消息给库存”还不够,订单服务还得知道处理结果。通常有两种方式:
- 同步查询库存结果:实现简单,但耦合高
- 库存服务回发结果消息:更解耦,更适合异步架构
我更推荐第二种。因为最终一致性的思维就是:不要试图在一次 RPC 里完成所有事情,而是靠事件驱动完成收敛。
方案对比与取舍分析
在订单库存场景里,常见方案其实有三种。
方案一:同步 RPC + 本地重试
流程:
- 订单创建
- 同步调用库存服务扣减
- 成功则提交订单,失败则回滚
优点:
- 链路直观
- 排查简单
缺点:
- 强依赖库存服务可用性
- 高峰期容易级联失败
- 一旦拆库,事务边界不好处理
适用场景:
- 并发不高
- 服务少
- 对实时性要求极强
方案二:分布式事务(2PC/XA)
优点:
- 追求强一致
- 业务代码看起来干净
缺点:
- 吞吐差
- 对数据库、驱动、中间件要求高
- 运维复杂,故障面大
适用场景:
- 核心金融类、低吞吐、对强一致要求极高的局部链路
方案三:MQ 最终一致性
优点:
- 解耦
- 吞吐高
- 容错和恢复能力更强
缺点:
- 业务状态变复杂
- 需要幂等、补偿、对账机制
- 排查比单体事务更考验工程能力
适用场景:
- 电商、零售、营销、履约等高并发业务
如果你问我在订单库存这种问题上更推荐哪个,我的答案通常是:大多数互联网交易链路,优先考虑 MQ 最终一致性。但前提是团队能把基础设施和可观测性做好。
实战代码(可运行)
下面我用 Python 做一个可运行的简化示例。它不依赖真实 MQ,而是用内存队列模拟消息中间件,重点展示几个核心点:
- 订单创建与 Outbox 落库
- Outbox 事件投递
- 库存消费幂等
- 结果回传
- 订单最终状态收敛
这是一个教学示例,真实生产环境应替换为 MySQL/PostgreSQL + Kafka/RabbitMQ/RocketMQ 等。
代码示例
import sqlite3
import json
import queue
import threading
import time
import uuid
from contextlib import contextmanager
DB_FILE = "demo_order_inventory.db"
order_to_stock_mq = queue.Queue()
stock_to_order_mq = queue.Queue()
def init_db():
conn = sqlite3.connect(DB_FILE)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
sku_id TEXT NOT NULL,
quantity INTEGER NOT NULL,
status TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS outbox_events (
event_id TEXT PRIMARY KEY,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS inventory (
sku_id TEXT PRIMARY KEY,
available INTEGER NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS inventory_txn (
event_id TEXT PRIMARY KEY,
order_id TEXT NOT NULL,
sku_id TEXT NOT NULL,
quantity INTEGER NOT NULL,
result TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS order_result_events (
event_id TEXT PRIMARY KEY,
order_id TEXT NOT NULL,
result TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
cur.execute("INSERT OR IGNORE INTO inventory(sku_id, available) VALUES('sku-1001', 10)")
conn.commit()
conn.close()
@contextmanager
def db_conn():
conn = sqlite3.connect(DB_FILE)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def create_order(order_id, sku_id, quantity):
event_id = str(uuid.uuid4())
payload = {
"event_id": event_id,
"order_id": order_id,
"sku_id": sku_id,
"quantity": quantity
}
with db_conn() as conn:
cur = conn.cursor()
cur.execute(
"INSERT INTO orders(order_id, sku_id, quantity, status) VALUES (?, ?, ?, ?)",
(order_id, sku_id, quantity, "PENDING")
)
cur.execute(
"INSERT INTO outbox_events(event_id, topic, payload, status) VALUES (?, ?, ?, ?)",
(event_id, "order_created", json.dumps(payload), "NEW")
)
print(f"[订单服务] 订单创建成功 order_id={order_id}, status=PENDING")
def outbox_dispatcher():
while True:
with db_conn() as conn:
cur = conn.cursor()
cur.execute("""
SELECT event_id, topic, payload, retry_count
FROM outbox_events
WHERE status = 'NEW'
ORDER BY created_at
LIMIT 10
""")
rows = cur.fetchall()
for event_id, topic, payload, retry_count in rows:
try:
order_to_stock_mq.put(json.loads(payload), timeout=1)
cur.execute(
"UPDATE outbox_events SET status = 'SENT' WHERE event_id = ?",
(event_id,)
)
print(f"[Outbox] 事件已投递 event_id={event_id}")
except Exception:
cur.execute(
"UPDATE outbox_events SET retry_count = ? WHERE event_id = ?",
(retry_count + 1, event_id)
)
time.sleep(1)
def inventory_consumer():
while True:
try:
msg = order_to_stock_mq.get(timeout=1)
except queue.Empty:
continue
event_id = msg["event_id"]
order_id = msg["order_id"]
sku_id = msg["sku_id"]
quantity = msg["quantity"]
with db_conn() as conn:
cur = conn.cursor()
# 幂等校验:同一个 event_id 处理过就直接跳过
cur.execute("SELECT result FROM inventory_txn WHERE event_id = ?", (event_id,))
existed = cur.fetchone()
if existed:
print(f"[库存服务] 重复消息,跳过 event_id={event_id}")
continue
cur.execute("SELECT available FROM inventory WHERE sku_id = ?", (sku_id,))
row = cur.fetchone()
if not row:
result = "FAILED"
else:
available = row[0]
if available >= quantity:
cur.execute(
"UPDATE inventory SET available = available - ? WHERE sku_id = ?",
(quantity, sku_id)
)
result = "SUCCESS"
else:
result = "FAILED"
cur.execute("""
INSERT INTO inventory_txn(event_id, order_id, sku_id, quantity, result)
VALUES (?, ?, ?, ?, ?)
""", (event_id, order_id, sku_id, quantity, result))
stock_to_order_mq.put({
"event_id": str(uuid.uuid4()),
"order_id": order_id,
"source_event_id": event_id,
"result": result
})
print(f"[库存服务] 处理完成 order_id={order_id}, result={result}")
def order_result_consumer():
while True:
try:
msg = stock_to_order_mq.get(timeout=1)
except queue.Empty:
continue
result_event_id = msg["event_id"]
order_id = msg["order_id"]
result = msg["result"]
with db_conn() as conn:
cur = conn.cursor()
# 回执消息也要幂等
cur.execute("SELECT 1 FROM order_result_events WHERE event_id = ?", (result_event_id,))
if cur.fetchone():
continue
cur.execute(
"INSERT INTO order_result_events(event_id, order_id, result) VALUES (?, ?, ?)",
(result_event_id, order_id, result)
)
new_status = "CONFIRMED" if result == "SUCCESS" else "FAILED"
cur.execute(
"UPDATE orders SET status = ? WHERE order_id = ? AND status = 'PENDING'",
(new_status, order_id)
)
print(f"[订单服务] 订单状态更新 order_id={order_id}, status={new_status}")
def print_state():
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT order_id, sku_id, quantity, status FROM orders ORDER BY created_at")
orders = cur.fetchall()
cur.execute("SELECT sku_id, available FROM inventory")
inventory = cur.fetchall()
print("\n===== 当前系统状态 =====")
print("orders:")
for row in orders:
print(row)
print("inventory:")
for row in inventory:
print(row)
print("=======================\n")
if __name__ == "__main__":
init_db()
threads = [
threading.Thread(target=outbox_dispatcher, daemon=True),
threading.Thread(target=inventory_consumer, daemon=True),
threading.Thread(target=order_result_consumer, daemon=True),
]
for t in threads:
t.start()
create_order("order-1001", "sku-1001", 2)
create_order("order-1002", "sku-1001", 5)
create_order("order-1003", "sku-1001", 6)
time.sleep(5)
print_state()
运行方式
python demo.py
预期现象
初始库存是 10:
order-1001扣 2,成功order-1002扣 5,成功order-1003扣 6,失败,因为只剩 3
这段代码虽然是简化版,但把几个关键设计都带出来了:
- 创建订单和写 Outbox 在一个本地事务里
- Outbox 异步投递
- 库存消费按
event_id幂等 - 结果消息回流到订单服务
- 订单从
PENDING收敛到最终状态
时序图:一笔订单如何走完整条链路
sequenceDiagram
participant User as 用户
participant Order as 订单服务
participant DB as 订单DB/Outbox
participant MQ as 消息队列
participant Stock as 库存服务
participant InvDB as 库存DB
User->>Order: 提交订单
Order->>DB: 本地事务写订单(PENDING)+Outbox事件
DB-->>Order: 提交成功
Order-->>User: 下单受理成功
Order->>MQ: Outbox异步投递订单事件
MQ->>Stock: 投递扣库存消息
Stock->>InvDB: 幂等检查并扣减库存
InvDB-->>Stock: 成功/失败
Stock->>MQ: 发送库存处理结果
MQ->>Order: 投递结果消息
Order->>DB: 更新订单状态(CONFIRMED/FAILED)
常见坑与排查
这一部分非常关键。很多团队不是不会画架构图,而是上线后扛不住异常流量和边界故障。下面这些坑,我基本都见过,有些我自己也踩过。
1. 订单落库成功,但消息没发出去
现象
- 订单状态一直是
PENDING - 库存服务完全没收到消息
- 业务方反馈“订单卡住了”
根因
通常是没有 Outbox,或者 Outbox 扫描任务停了。
排查路径
- 查订单表是否存在该订单
- 查 Outbox 表是否存在对应事件
- 看事件状态:
NEW:说明没发出去SENT:说明发过了,去 MQ 查消费情况
- 检查 Outbox 投递线程/任务是否异常退出
- 看 MQ 连接、权限、broker 状态
建议
- Outbox 扫描任务必须有监控
NEW状态积压要报警- 单个事件重试次数超过阈值要人工介入
2. 消息重复消费,库存被扣多次
现象
- 同一订单只下了一次
- 库存却减少了两次甚至更多
根因
消费端没有做幂等,或者幂等只做在内存里,服务重启后失效。
正确做法
- 以业务唯一键或事件唯一键做幂等
- 幂等记录必须持久化到数据库或可靠存储
- 扣减库存和写幂等记录应在同一事务里完成
SQL 思路示例
CREATE TABLE inventory_txn (
event_id VARCHAR(64) PRIMARY KEY,
order_id VARCHAR(64) NOT NULL,
sku_id VARCHAR(64) NOT NULL,
quantity INT NOT NULL,
result VARCHAR(16) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
如果 event_id 已存在,说明处理过了。
3. 库存扣减成功,但订单状态没更新
现象
- 库存已经少了
- 订单还停留在
PENDING
根因
一般是结果回传链路出了问题:
- 库存服务没发结果消息
- 结果消息丢失
- 订单服务消费失败
- 订单状态更新 SQL 条件写错
排查路径
- 查库存事务表,确认是否扣减成功
- 查结果消息是否已投递
- 查订单结果消费日志
- 查订单状态更新记录
- 看是否存在死信队列积压
补救思路
- 做对账任务:库存成功但订单未确认的记录自动修复
- 对
PENDING超时订单定期巡检 - 允许人工补单/回滚的运营入口
4. 超卖问题
这类问题特别常见,尤其在秒杀场景。
错误写法
先查库存,再更新库存:
SELECT available FROM inventory WHERE sku_id = 'sku-1001';
UPDATE inventory SET available = available - 1 WHERE sku_id = 'sku-1001';
高并发下两个线程可能同时读到同一个库存值。
更稳妥的写法
用条件更新保证原子扣减:
UPDATE inventory
SET available = available - 1
WHERE sku_id = 'sku-1001'
AND available >= 1;
然后检查受影响行数:
- 1:扣减成功
- 0:库存不足
这比“先查再扣”稳得多。
5. 消息顺序依赖导致状态错乱
比如你有:
- 订单创建消息
- 订单取消消息
如果取消消息先到,创建消息后到,消费者就可能把一个已取消订单又改回处理中。
建议
- 尽量避免强依赖全局顺序
- 使用状态机约束,非法状态转换直接拒绝
- 同一业务实体尽量路由到同一分区/队列
例如:
PENDING -> CONFIRMED合法CANCELLED -> CONFIRMED不合法,应拒绝
6. 把“重试”当成“补偿”
重试和补偿不是一回事。
- 重试:同一动作再执行一次,希望成功
- 补偿:执行反向动作,把系统拉回可接受状态
比如库存扣减成功但订单创建失败:
- 重试订单创建:叫重试
- 把库存加回去:叫补偿
很多系统的问题在于,什么都靠重试,最后重试把错误放大了。
安全/性能最佳实践
最终一致性不是只讲业务逻辑,安全和性能也必须一起考虑。
1. 消息体不要裸奔,做签名或最少字段校验
如果 MQ 是跨网络、跨集群,建议至少做到:
- 消息体字段白名单校验
- 关键字段非空校验
- 业务签名或 HMAC 校验
- 敏感信息脱敏,不要把用户隐私直接塞进消息
一个简单的消息结构可以像这样:
{
"event_id": "a1b2c3",
"event_type": "ORDER_CREATED",
"timestamp": 1710000000,
"order_id": "order-1001",
"sku_id": "sku-1001",
"quantity": 2,
"sign": "xxxxxx"
}
2. 队列长度、消费延迟、死信数必须监控
我比较建议最少监控这几类指标:
- Outbox 待发送数量
- MQ topic/queue 积压量
- 消费成功率、失败率、重试次数
- 死信队列数量
- 订单
PENDING超时数量 - 库存扣减成功但订单未确认数量
如果这些指标你看不到,系统出问题时基本只能靠猜。
3. 幂等表要控制膨胀
幂等记录如果一直不清理,表会越来越大,影响查询性能。
建议:
- 按天/按月分表
- 给
event_id建唯一索引 - 设置归档策略
- 保留期根据业务回放窗口决定,比如 7 天、30 天、90 天
4. 批量拉取 Outbox,但不要无限扫表
Outbox 调度器常见优化方式:
- 按状态索引拉取
NEW事件 - 每次限制批量条数
- 多实例并发时用分片或“抢占式更新”避免重复发送
- 已发送记录定期归档
例如:
SELECT event_id, topic, payload
FROM outbox_events
WHERE status = 'NEW'
ORDER BY created_at
LIMIT 100;
一定要配索引,否则数据量一大,扫表会拖垮数据库。
5. 对账机制不是可选项
只要是最终一致性,就要接受一个现实:理论上总会有极少数记录因为各种边界故障没及时收敛。
所以必须有对账任务,例如:
- 订单
PENDING超过 10 分钟,检查库存事务结果 - 库存已扣减但订单未确认,自动补状态
- 订单已取消但库存未释放,自动发起释放流程
这其实是“系统自愈能力”的一部分,不只是运维脚本。
6. 容量估算别忽略峰值
在架构设计里,我建议至少算三件事:
订单峰值消息量
如果大促峰值是 5000 单/s,且每单至少产生两条消息:
- 订单创建事件
- 库存处理结果事件
那么基础消息吞吐至少是:
5000 * 2 = 10000 msg/s
如果加上重试、取消、回滚、补偿消息,实际预留最好乘以 2~3 倍。
库存热点 SKU 压力
热点商品会导致单个 SKU 成为写热点。你需要考虑:
- 分片库存
- 预扣库存
- Redis 原子扣减 + DB 异步落账
- 限流与排队
PENDING 中间态容量
如果库存服务平均处理延迟是 200ms,峰值 5000 单/s,那么理论上的在途订单量约为:
5000 * 0.2 = 1000
如果发生消息堆积,PENDING 会迅速放大,所以订单表和查询接口都要考虑中间态压力。
一个更稳的落地建议
如果你准备在真实项目里做,我建议按下面的最小闭环来落地:
-
订单服务
- 本地事务写订单 + Outbox
- 订单初始状态
PENDING
-
消息投递层
- Outbox 扫描补发
- 失败重试
- 超阈值告警
-
库存服务
- 原子扣减库存
- 幂等表防重复
- 处理结果写本地事务
-
结果回流
- 库存结果事件回传订单服务
- 订单状态机控制合法迁移
-
兜底机制
- 死信队列
- 对账任务
- 人工补偿入口
这五步做全了,系统才算真正具备“最终一致性”的工程化能力。只做前两步,基本只是“用了 MQ”;做完整套,才算“设计了可靠异步架构”。
总结
基于消息队列实现订单与库存的最终一致性,本质上是在做三件事:
- 把跨服务事务拆成可恢复的本地事务
- 用消息驱动状态收敛,而不是强求一步到位
- 用幂等、补偿、对账去对抗分布式环境的不确定性
如果你只记住几个最重要的结论,我建议记这几条:
- 不要直接“写库后发消息”,优先用 Outbox
- 消费端一定要 幂等
- 库存扣减要用 原子条件更新
- 订单状态必须设计成 可恢复状态机
- 对账和补偿 不是锦上添花,而是标配
- 最终一致性适合高并发业务,但前提是你能接受短暂中间态
最后说一句很实在的话:
最终一致性不是降低要求,而是换一种更适合分布式系统的达成方式。
它不像本地事务那样“看上去简单”,但只要设计得当,面对真实世界的高并发、抖动、失败和重试,它反而更可靠、更有韧性。