背景与问题
只要系统一旦进入分布式,重复请求、重复投递、重复执行几乎就不是“会不会发生”的问题,而是“什么时候发生”。
最常见的几个场景:
- 消息队列消费者处理成功了,但 ACK 丢了,消息被重复投递
- 用户点击两次“提交订单”,网关层重试了两次
- 下游接口超时,调用方发起重试,但第一次其实已经成功
- 定时任务因为主备切换,被重复调度
- 服务重启后,某些中间状态未及时持久化,导致补偿逻辑重复执行
这些问题背后,本质上都指向同一个关键词:幂等性。
很多团队对幂等性的理解停留在“加个唯一索引”或者“查一下数据库有没有处理过”。这不算错,但远远不够。因为在真实系统里,幂等性不是一个点状技巧,而是一整套设计:业务语义定义、唯一标识生成、状态机约束、存储介质选择、并发控制、失败恢复和监控排查。
我自己踩过一个挺典型的坑:消费者收到支付成功消息后更新订单状态,同时给用户发券。订单更新靠数据库状态控制住了,但发券接口没做幂等,结果重复消费时用户券包里多了一张券。也就是说,局部幂等不等于链路幂等。
所以这篇文章不只讲概念,而是从两个最常见场景入手:
- 消息消费幂等
- 接口重试幂等
并把它们串成一套可落地的架构实践。
核心原理
什么是幂等性
幂等性的定义很简单:
对同一个操作执行一次和执行多次,产生的业务结果应当一致。
这里有两个关键词容易被忽略:
- 同一个操作:必须能识别“这是同一次业务意图”
- 业务结果一致:不是要求每次技术返回值完全一样,而是最终业务状态一致
比如:
- “设置订单状态为已支付”通常可以做成幂等
- “余额增加 100 元”天然不是幂等,除非你把它改写成“处理交易号 T123 对账户入账一次”
所以幂等不是“代码风格”,而是业务语义改写能力。
幂等性的三层实现思路
我通常把幂等设计拆成三层:
1. 业务标识层:谁代表“同一次操作”
常见标识:
- 客户端生成请求号
requestId - 支付流水号
paymentId - 订单号
orderNo - 消息唯一 ID
messageId - 业务去重键,如
userId + couponBatchId + sourceBizId
如果这一层没定义清楚,后面技术手段都会变成“猜测重复”。
2. 状态控制层:重复执行时如何保持结果一致
常用方式:
- 唯一索引
- 去重表
- 乐观锁 / 状态机流转
- Redis
SETNX - 条件更新
update ... where status = 'INIT' - Outbox / Inbox 模式
3. 结果返回层:已经处理过时怎么响应
这层经常被忽略。比如接口重试时,第二次请求命中幂等记录后:
- 直接返回“已处理”
- 返回第一次处理结果
- 返回处理中状态,让调用方稍后查询
如果不定义清楚,前端和调用方会很难接入。
为什么“至少一次投递”一定要求幂等
绝大多数消息队列在工程实践里,可靠消费往往更容易做到的是 At-Least-Once(至少一次),而不是 Exactly-Once(精确一次)。
原因很现实:
- 消费成功但 ACK 失败
- Broker 重试投递
- 消费端处理超时被认为失败
- 网络分区导致状态不一致
所以从架构角度,不要把“消息只会来一次”当作前提条件。正确思路是:
- 接受消息可能重复
- 通过消费端幂等让重复消费无害化
下面这张图可以概括这个链路。
flowchart TD
A[生产者发送消息] --> B[MQ 持久化]
B --> C[消费者拉取消息]
C --> D[执行业务逻辑]
D --> E[ACK 成功]
D --> F[ACK 丢失/超时]
F --> G[MQ 重复投递]
G --> C
C --> H[幂等校验]
H --> I[已处理则跳过]
H --> J[未处理则继续执行]
接口重试中的幂等,不只是“防重复提交”
接口重试通常有三类来源:
- 用户重复点击
- 网关 / SDK 自动重试
- 服务调用方因为超时重试
这里最大的问题是:调用方超时,不代表服务端没成功。
例如:
- 服务端已经落库成功
- 但响应包在网络中丢了
- 调用方超时后再次发起请求
如果接口没有幂等保护,就会重复创建资源、重复扣款、重复发券。
所以接口幂等的关键,不是“挡住脏请求”,而是让同一业务请求在不可靠网络下仍然只生效一次。
方案对比与取舍分析
不同场景的幂等方案,侧重点不一样。下面是一个简化对比。
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 数据库唯一索引 | 创建型操作、去重键明确 | 简单可靠 | 依赖 DB,热点冲突明显 |
| 去重表 | 消息消费、接口请求去重 | 语义清晰,便于排查 | 需要维护记录清理 |
Redis SETNX | 高并发短时去重 | 性能高 | 需处理过期、持久化和误删 |
| 状态机条件更新 | 状态流转类业务 | 与业务强绑定,最稳 | 设计复杂度较高 |
| 请求号 + 结果缓存 | 接口幂等返回结果 | 调用体验好 | 结果存储和过期策略要设计 |
| Inbox/Outbox | 跨服务一致性 | 链路完整 | 实现成本高 |
我的建议很直接:
- 核心资金、库存、订单:优先数据库状态机 + 唯一约束
- 消息消费:优先去重表 / Inbox
- 高频弱一致请求:可以 Redis 辅助,但不要只靠 Redis
- 需要返回首次结果的接口:幂等记录里保存响应摘要
消息消费幂等设计
一个稳妥的消费模型
消息消费场景里,一个比较常见的落地套路是:
- 提取消息唯一键
message_id - 插入消费记录表,利用唯一约束去重
- 执行业务逻辑
- 更新消费状态为成功
- 如果执行异常,不 ACK,让消息重试
- 如果发现已处理,则直接 ACK 并跳过
消费记录表可以设计成这样:
CREATE TABLE message_consume_record (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL,
consumer_group_name VARCHAR(64) NOT NULL,
status VARCHAR(16) NOT NULL,
payload TEXT,
result_text TEXT,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
UNIQUE KEY uk_message_consumer (message_id, consumer_group_name)
);
其中:
message_id + consumer_group_name联合唯一,防止同一消费者组重复处理status可以取值:PROCESSING、SUCCESS、FAILED
状态流转建议
stateDiagram-v2
[*] --> PROCESSING
PROCESSING --> SUCCESS: 业务执行成功
PROCESSING --> FAILED: 业务执行失败
FAILED --> PROCESSING: MQ 重试/人工补偿
SUCCESS --> SUCCESS: 重复消息直接跳过
这里有个细节很重要:
不要只记录“处理过没”,最好记录“处理中/成功/失败”状态。
否则遇到消费者执行到一半宕机,就会出现两难:
- 你认为“处理过了”,后续不再重试,可能丢业务
- 你认为“没处理过”,后续重复执行,可能造成副作用
接口重试幂等设计
推荐链路
一个比较通用的接口幂等链路如下:
- 调用方生成
Idempotency-Key - 服务端先检查这个 key 是否存在
- 不存在则抢占处理权并记录为
PROCESSING - 执行业务逻辑
- 成功后保存最终结果并更新为
SUCCESS - 重复请求到来时:
- 若状态
SUCCESS,直接返回第一次结果 - 若状态
PROCESSING,返回“处理中”或引导查询 - 若状态
FAILED,按策略决定是否允许重试
- 若状态
sequenceDiagram
participant C as Client
participant S as Service
participant R as Idempotency Store
participant DB as Business DB
C->>S: POST /orders (Idempotency-Key: k1)
S->>R: 写入 k1=PROCESSING
R-->>S: 成功
S->>DB: 创建订单
DB-->>S: 成功
S->>R: 更新 k1=SUCCESS + result
S-->>C: 返回订单结果
C->>S: 重试相同请求 (k1)
S->>R: 查询 k1
R-->>S: SUCCESS + result
S-->>C: 直接返回首次结果
Key 的设计原则
幂等 key 最好满足:
- 由调用方生成
- 同一业务意图固定不变
- 不同业务请求绝不能复用
- 尽量不依赖不稳定字段
例如创建订单时,可以这样生成:
userId + clientOrderToken- 或客户端直接生成 UUID 作为
Idempotency-Key
但不要这样:
- 仅用时间戳
- 仅用用户 ID
- 用会变化的请求体签名但没做字段归一化
实战代码(可运行)
下面用 Python + SQLite 演示两个核心场景:
- 消息消费幂等
- 接口重试幂等
示例为了便于本地运行,使用 SQLite。生产环境里,你可以替换为 MySQL/PostgreSQL + Redis。
1)消息消费幂等示例
import sqlite3
import json
import time
from datetime import datetime
DB_FILE = "idem_demo.db"
def now():
return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
def init_db():
conn = sqlite3.connect(DB_FILE)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS message_consume_record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT NOT NULL,
consumer_group_name TEXT NOT NULL,
status TEXT NOT NULL,
payload TEXT,
result_text TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(message_id, consumer_group_name)
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS coupon_account (
user_id INTEGER NOT NULL,
coupon_code TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit()
conn.close()
def grant_coupon(conn, user_id, coupon_code):
cur = conn.cursor()
cur.execute(
"INSERT INTO coupon_account(user_id, coupon_code, created_at) VALUES (?, ?, ?)",
(user_id, coupon_code, now())
)
def consume_message(message_id, consumer_group, payload):
conn = sqlite3.connect(DB_FILE)
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO message_consume_record(
message_id, consumer_group_name, status, payload, result_text, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
message_id,
consumer_group,
"PROCESSING",
json.dumps(payload, ensure_ascii=False),
"",
now(),
now()
))
conn.commit()
except sqlite3.IntegrityError:
cur.execute("""
SELECT status, result_text FROM message_consume_record
WHERE message_id = ? AND consumer_group_name = ?
""", (message_id, consumer_group))
row = cur.fetchone()
conn.close()
print(f"[SKIP] duplicate message: {message_id}, status={row[0]}, result={row[1]}")
return
try:
user_id = payload["user_id"]
coupon_code = payload["coupon_code"]
grant_coupon(conn, user_id, coupon_code)
cur.execute("""
UPDATE message_consume_record
SET status = ?, result_text = ?, updated_at = ?
WHERE message_id = ? AND consumer_group_name = ?
""", ("SUCCESS", "coupon granted", now(), message_id, consumer_group))
conn.commit()
print(f"[OK] message consumed: {message_id}")
except Exception as e:
cur.execute("""
UPDATE message_consume_record
SET status = ?, result_text = ?, updated_at = ?
WHERE message_id = ? AND consumer_group_name = ?
""", ("FAILED", str(e), now(), message_id, consumer_group))
conn.commit()
print(f"[ERR] message failed: {message_id}, err={e}")
raise
finally:
conn.close()
if __name__ == "__main__":
init_db()
msg = {
"user_id": 1001,
"coupon_code": "NEW_USER_50"
}
consume_message("msg-001", "coupon-service", msg)
consume_message("msg-001", "coupon-service", msg) # 重复消费
运行效果
第一次执行会真正发券;第二次执行会因为唯一约束命中而跳过。
这个例子虽然简单,但核心思想已经完整:
- 先登记,再执行业务
- 唯一约束挡住重复
- 状态记录便于恢复和排查
2)接口重试幂等示例
import sqlite3
import json
from datetime import datetime
DB_FILE = "idem_demo.db"
def now():
return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
def init_api_db():
conn = sqlite3.connect(DB_FILE)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS api_idempotency_record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
idem_key TEXT NOT NULL UNIQUE,
status TEXT NOT NULL,
request_body TEXT NOT NULL,
response_body TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS biz_order (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_no TEXT NOT NULL UNIQUE,
user_id INTEGER NOT NULL,
amount INTEGER NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit()
conn.close()
def create_order(idem_key, user_id, amount):
conn = sqlite3.connect(DB_FILE)
cur = conn.cursor()
request_body = json.dumps({"user_id": user_id, "amount": amount}, ensure_ascii=False)
try:
cur.execute("""
INSERT INTO api_idempotency_record(
idem_key, status, request_body, response_body, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (idem_key, "PROCESSING", request_body, "", now(), now()))
conn.commit()
except sqlite3.IntegrityError:
cur.execute("""
SELECT status, response_body, request_body
FROM api_idempotency_record WHERE idem_key = ?
""", (idem_key,))
row = cur.fetchone()
conn.close()
status, response_body, old_request_body = row
if old_request_body != request_body:
return {"code": 409, "msg": "幂等键已被其他请求复用"}
if status == "SUCCESS":
return {"code": 200, "msg": "ok(duplicate)", "data": json.loads(response_body)}
return {"code": 202, "msg": "处理中,请稍后重试"}
try:
order_no = f"ORD-{int(datetime.utcnow().timestamp())}-{user_id}"
cur.execute("""
INSERT INTO biz_order(order_no, user_id, amount, created_at)
VALUES (?, ?, ?, ?)
""", (order_no, user_id, amount, now()))
response = {"order_no": order_no, "user_id": user_id, "amount": amount}
cur.execute("""
UPDATE api_idempotency_record
SET status = ?, response_body = ?, updated_at = ?
WHERE idem_key = ?
""", ("SUCCESS", json.dumps(response, ensure_ascii=False), now(), idem_key))
conn.commit()
return {"code": 200, "msg": "ok", "data": response}
except Exception as e:
cur.execute("""
UPDATE api_idempotency_record
SET status = ?, response_body = ?, updated_at = ?
WHERE idem_key = ?
""", ("FAILED", json.dumps({"error": str(e)}, ensure_ascii=False), now(), idem_key))
conn.commit()
return {"code": 500, "msg": str(e)}
finally:
conn.close()
if __name__ == "__main__":
init_api_db()
print(create_order("req-abc-001", 1001, 299))
print(create_order("req-abc-001", 1001, 299)) # 相同幂等键重试
print(create_order("req-abc-001", 1001, 399)) # 相同幂等键但不同请求体
这个示例体现了三个关键点
- 幂等键唯一
- 同一幂等键必须绑定同一请求体
- 重复请求返回首次结果,而不是重复执行业务
这第三点非常重要。很多实现虽然防止了重复落库,但第二次请求只返回一句“重复请求”,对调用方并不友好。
更实用的做法是:存下首次处理结果,重试时直接回放。
容量估算与存储选择
幂等记录不是免费的,特别是在高并发系统里,要提前估算量级。
一个简单估算方法
假设:
- 接口峰值 QPS:2000
- 其中 20% 需要幂等保护
- 幂等记录保留 3 天
那么记录量大约是:
2000 * 20% * 86400 * 3 = 103,680,000
也就是 1 亿级别。
这个量级下,几个建议很实际:
- 不要把长周期幂等数据全部压在单表
- 按日期分表或冷热分层
- 只保留必要字段,避免大对象响应全量存储
- 对成功记录设置过期清理策略
- 对关键业务保留审计摘要,不一定保留完整请求包
Redis 还是数据库?
很多人第一反应是 Redis。我的经验是:
- Redis 适合做前置快速拦截
- 数据库适合做最终可信记录
如果只是 SETNX 一下,业务刚执行完 Redis key 过期了,或者 Redis 故障丢数据,重复执行还是会发生。所以更稳妥的组合通常是:
- Redis:抗高并发、快速判断
- DB:落最终事实、支持审计与恢复
常见坑与排查
1. 只在应用内存里做去重
这在单机场景可能有点用,但在分布式部署下基本不可靠:
- 实例之间内存不共享
- 重启后数据丢失
- 扩缩容时无法保证一致
排查方式:
- 检查重复请求是否命中不同实例
- 核对幂等记录是否落到共享存储
- 查看重启时段是否与重复问题吻合
2. 业务成功了,但幂等状态没更新
这是很经典的“半成功”问题:
- 订单已创建
- 但幂等记录仍是
PROCESSING - 调用方重试后系统误以为还没完成
典型原因:
- 业务表和幂等表不在同一个事务
- 先执行业务再写幂等记录
- 更新幂等结果时发生超时或锁等待
建议:
- 尽量把业务结果写入与幂等状态更新放在同一事务
- 至少保证“先写幂等占位,再执行业务”
- 对长时间
PROCESSING的记录做巡检和补偿
3. 同一个幂等键被不同请求复用
这类问题经常来自客户端实现不规范,比如把用户 ID 当幂等键,导致不同订单共享同一个 key。
表现:
- 用户第二次下单总是返回第一次订单结果
- 或直接报“重复请求”
排查方式:
- 对比幂等记录中的请求体摘要
- 记录
idem_key与请求 hash 的映射 - 对不一致请求直接返回 409
4. 只保护主操作,没保护副作用
这是我前面提到的“发券坑”。
例如:
- 订单表更新是幂等的
- 但通知、发券、积分、库存冻结没有统一幂等控制
那么主流程看起来没问题,副作用却可能被执行多次。
建议:
- 把链路拆成多个可幂等子步骤
- 每个对外副作用都带唯一业务号
- 不要假设下游一定支持精确一次
5. 把“查询接口”误当成天然安全
查询通常是幂等的,但如果查询里埋了副作用,比如:
- 查询时顺手修复状态
- 查询时刷新某些标记
- 查询时触发缓存回填并写库
那它就不再是纯幂等读操作了。
排查方向:
- 审核查询链路里是否有写操作
- 核查 ORM 自动 flush、懒加载触发更新等隐式行为
安全/性能最佳实践
安全实践
1. 幂等键要有边界,不要无限复用
幂等键如果长期有效,会带来两个问题:
- 存储膨胀
- 被恶意复用导致业务阻塞
建议:
- 为幂等记录设置 TTL 或归档周期
- 对外暴露的 key 要有长度、字符集校验
- 对热点 key 做访问频控
2. 不要把敏感数据直接存入幂等记录
有些团队为了“方便排查”,把完整请求体、完整响应体一股脑存库,里面可能包含:
- 身份证号
- 手机号
- 地址
- 支付信息
建议:
- 只保存必要字段和摘要
- 敏感字段脱敏或加密
- 日志与幂等表权限隔离
3. 防重不等于防刷
幂等性只解决“同一次请求重复执行”的问题,不等于:
- 防攻击
- 防恶意刷接口
- 防撞库
所以仍然需要配合:
- 限流
- 鉴权
- 签名校验
- 风控规则
性能实践
1. 唯一索引字段尽量短
如果幂等键过长,唯一索引会放大写入成本。
建议:
- 原始请求号保留
- 同时生成固定长度摘要作为索引键,例如 SHA-256 截断
2. 热点场景先本地/缓存过滤,再落库
在高并发秒杀或短时突发流量里,可以采用:
- 网关层请求号校验
- Redis
SETNX - 业务库最终唯一约束兜底
也就是说,前面挡一层性能,后面守一层正确性。
3. 清理策略要提前设计
幂等表一旦不清理,很容易变成大表,带来:
- 索引膨胀
- 查询变慢
- 归档困难
建议:
- 成功记录保留短周期
- 失败记录保留更久,便于排查
- 定期归档冷数据
一套更完整的架构建议
如果你的系统已经有一定复杂度,我建议把幂等设计纳入统一治理,而不是每个服务各写一套。
可以考虑抽象出一个统一组件,至少包含:
- 幂等键校验
- 状态存储
- 请求体摘要比对
- 首次结果回放
- 超时处理中记录扫描
- 监控指标与告警
一个简化的架构视图如下:
flowchart LR
A[客户端/调用方] --> B[网关]
B --> C[业务服务]
C --> D[幂等组件]
D --> E[(Redis)]
D --> F[(MySQL)]
C --> G[(业务库)]
C --> H[MQ]
H --> I[消费者]
I --> J[消费去重表]
可以重点监控这些指标:
- 幂等命中率
- 重复消息比例
PROCESSING超时数量- 同 key 不同请求体冲突数
- 唯一索引冲突频次
- 重试成功率
这些指标往往比单纯看错误日志更有用。因为很多幂等问题,不会直接报错,而是默默地产生业务重复。
总结
幂等性这件事,说简单也简单,说复杂也复杂。
简单在于核心原则很少:
- 定义“同一次业务操作”的唯一标识
- 用可靠存储记录处理状态
- 让重复请求返回一致结果,而不是重复执行副作用
复杂在于,它不是只加一行去重代码就结束了,而是要覆盖:
- 消息重复投递
- 接口超时重试
- 并发竞争
- 半成功状态
- 下游副作用重复
- 数据清理与监控治理
如果你准备在项目里落地,我建议按下面这个顺序做,最不容易出错:
- 先梳理业务语义:哪些操作必须幂等
- 给每类操作定义唯一业务键
- 优先用数据库唯一约束或状态机兜底
- 高并发场景再叠加 Redis 做前置拦截
- 保存首次处理结果,优化重试体验
- 补上超时处理中巡检和告警
- 逐步把副作用链路也纳入幂等设计
最后给一个边界判断:
如果一个操作本质上是“累计型”的,比如“积分 +10”、“余额 +100”,那它天然不是幂等操作。要想实现幂等,必须把它改造成“某笔交易只入账一次”的模型。不要试图拿技术手段硬抹平错误的业务语义。
这通常也是分布式幂等设计里,最值得花时间想清楚的一步。