跳转到内容
123xiao | 无名键客

《分布式架构中基于消息队列实现最终一致性的实战设计与排障指南》

字数: 0 阅读时长: 1 分钟

背景与问题

在单体应用里,下单 -> 扣库存 -> 扣余额 -> 发优惠券 这类流程,往往一个本地事务就能搞定。但一旦系统拆成多个服务,事情立刻变复杂:

  • 订单服务有自己的数据库
  • 库存服务有自己的数据库
  • 账户服务可能还是另一套存储
  • 通知、积分、风控又通过异步链路接入

这时候如果还想靠传统分布式事务“一把梭”,通常会遇到这些现实问题:

  1. 性能差:强一致协议会放大跨服务调用时延。
  2. 可用性受影响:某个参与方卡住,整个链路都可能阻塞。
  3. 工程复杂度高:不是所有中间件、数据库都配合得好。
  4. 业务未必需要强一致:很多场景允许“短时间不一致,但最终要对”。

所以,真正落地时,很多团队都会选择一个更现实的方案:基于消息队列实现最终一致性

简单说,就是把“本地业务成功”与“后续跨服务动作”拆开:

  • 核心服务先完成自己的本地事务
  • 再通过消息驱动其他服务异步处理
  • 借助重试、幂等、补偿,把结果拉回到一致状态

这套方案不是银弹,但足够实用。我自己在排查这类问题时,最常见的并不是“不会发消息”,而是:

  • 消息到底有没有可靠送达?
  • 消费失败为什么一直重试?
  • 为什么数据库成功了但消息丢了?
  • 为什么看起来“重复扣款”了?
  • 为什么 backlog 一高,整个系统像雪崩一样?

下面就按“设计思路 -> 可运行示例 -> 排障路径 -> 最佳实践”的顺序,把这件事完整走一遍。


方案适用边界与取舍分析

先给一个结论:最终一致性适合核心链路能接受短暂延迟、且业务可补偿的场景

适合的场景

  • 下单后异步扣减积分
  • 支付成功后发券、发通知、更新画像
  • 订单完成后同步数据到搜索、报表、推荐系统
  • 多系统间状态传播

不太适合的场景

  • 实时性极强且不能延迟的资金冻结确认
  • 单次操作必须全局原子成功/失败的核心清算链路
  • 无法设计幂等与补偿的业务

常见方案对比

方案一致性性能工程复杂度适用场景
2PC / XA强一致少量核心场景
TCC强一致偏业务化很高资金、预留资源
本地消息表 + MQ最终一致绝大多数业务系统
事务消息最终一致MQ 支持事务能力时
定时任务对账补偿最终一致补漏兜底

本文重点讲最常见、最落地的:本地消息表 + MQ + 消费幂等 + 重试补偿


核心原理

要实现最终一致性,关键不是“发一条消息”这么简单,而是围绕以下四件事构建闭环:

  1. 业务数据和待发送消息要一起落库
  2. 消息投递失败可以补发
  3. 消费者重复收到消息也不能出错
  4. 异常链路要能观测、止血、补偿

一个可靠的基本流程

以“订单创建后通知库存服务扣减库存”为例:

  1. 订单服务开启本地事务
  2. 写入订单表
  3. 写入消息表(状态为 NEW
  4. 本地事务提交
  5. 后台投递器扫描消息表,把消息发到 MQ
  6. 投递成功后把消息状态更新为 SENT
  7. 库存服务消费消息,执行扣库存
  8. 扣库存成功后记录消费幂等标记,ACK 消息
  9. 若失败则重试;超过阈值进入死信队列或人工处理

这套设计的核心价值在于:

  • 避免“业务成功但消息没发出去”
  • 允许“消息发重复”,但通过幂等兜住
  • 允许短时间失败,通过重试和补偿恢复

整体架构图

flowchart LR
    A[订单服务] --> B[(订单库)]
    A --> C[(本地消息表)]
    C --> D[消息投递器]
    D --> E[MQ]
    E --> F[库存服务消费者]
    F --> G[(库存库)]
    F --> H[(消费幂等表)]
    C --> I[补偿任务]
    I --> D

时序图:正常链路

sequenceDiagram
    participant Client as 用户
    participant Order as 订单服务
    participant DB as 订单DB
    participant Outbox as 消息表
    participant Relay as 投递器
    participant MQ as 消息队列
    participant Stock as 库存服务

    Client->>Order: 创建订单
    Order->>DB: 写订单
    Order->>Outbox: 写待发送消息
    Order-->>Client: 下单成功
    Relay->>Outbox: 扫描 NEW 消息
    Relay->>MQ: 投递消息
    Relay->>Outbox: 更新为 SENT
    MQ->>Stock: 投递扣库存消息
    Stock->>Stock: 幂等校验
    Stock->>Stock: 扣减库存
    Stock-->>MQ: ACK

状态流转建议

消息状态不要只设计成“已发/未发”,最少建议分成:

  • NEW:事务内已落库,待投递
  • SENDING:投递中,防并发抢占
  • SENT:已成功发往 MQ
  • CONSUMED:可选,若需要端到端跟踪
  • FAILED:重试超过阈值
  • DEAD:人工介入或死信归档
stateDiagram-v2
    [*] --> NEW
    NEW --> SENDING
    SENDING --> SENT
    SENDING --> NEW: 投递失败,待重试
    SENT --> CONSUMED
    SENT --> FAILED: 多次消费异常
    FAILED --> DEAD
    FAILED --> NEW: 补偿重发

实战代码(可运行)

下面我用 Python 模拟一个“订单服务 + 本地消息表 + MQ + 消费者”的最小可运行示例。
为了方便直接跑起来,这里不用真实 RabbitMQ / Kafka,而是用 SQLite + 内存队列来演示关键机制。

你可以把它理解成原理验证版本:数据库事务、消息投递、幂等消费、失败重试,这几个点都在。

代码说明

这个示例包含:

  • orders:订单表
  • outbox_messages:本地消息表
  • consumer_log:消费幂等表
  • create_order():事务内创建订单并写消息
  • relay_once():扫描本地消息表并投递到 MQ
  • consume_once():消费者幂等消费
  • 一个故意制造的重复投递场景

完整示例

import sqlite3
import json
import queue
import time
from contextlib import contextmanager

DB_FILE = "demo_consistency.db"
mq = queue.Queue()

@contextmanager
def get_conn():
    conn = sqlite3.connect(DB_FILE)
    try:
        yield conn
    finally:
        conn.close()

def init_db():
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute("""
        CREATE TABLE IF NOT EXISTS orders (
            order_id TEXT PRIMARY KEY,
            user_id TEXT NOT NULL,
            amount INTEGER NOT NULL,
            status TEXT NOT NULL
        )
        """)
        cur.execute("""
        CREATE TABLE IF NOT EXISTS outbox_messages (
            msg_id TEXT PRIMARY KEY,
            biz_key TEXT NOT NULL,
            topic TEXT NOT NULL,
            body TEXT NOT NULL,
            status TEXT NOT NULL,
            retry_count INTEGER NOT NULL DEFAULT 0,
            created_at INTEGER NOT NULL,
            updated_at INTEGER NOT NULL
        )
        """)
        cur.execute("""
        CREATE TABLE IF NOT EXISTS stock (
            sku_id TEXT PRIMARY KEY,
            available INTEGER NOT NULL
        )
        """)
        cur.execute("""
        CREATE TABLE IF NOT EXISTS consumer_log (
            msg_id TEXT PRIMARY KEY,
            consumer TEXT NOT NULL,
            consumed_at INTEGER NOT NULL
        )
        """)
        cur.execute("INSERT OR IGNORE INTO stock (sku_id, available) VALUES ('SKU-1', 10)")
        conn.commit()

def now_ts():
    return int(time.time())

def create_order(order_id, user_id, amount, sku_id, count):
    with get_conn() as conn:
        cur = conn.cursor()
        try:
            conn.execute("BEGIN")
            cur.execute(
                "INSERT INTO orders (order_id, user_id, amount, status) VALUES (?, ?, ?, ?)",
                (order_id, user_id, amount, "CREATED")
            )
            msg = {
                "order_id": order_id,
                "sku_id": sku_id,
                "count": count
            }
            ts = now_ts()
            cur.execute("""
                INSERT INTO outbox_messages
                (msg_id, biz_key, topic, body, status, retry_count, created_at, updated_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                f"msg-{order_id}",
                order_id,
                "stock.deduct",
                json.dumps(msg),
                "NEW",
                0,
                ts,
                ts
            ))
            conn.commit()
            print(f"[ORDER] 创建订单成功: {order_id}")
        except Exception:
            conn.rollback()
            raise

def relay_once(batch_size=10):
    with get_conn() as conn:
        cur = conn.cursor()
        rows = cur.execute("""
            SELECT msg_id, topic, body, retry_count
            FROM outbox_messages
            WHERE status = 'NEW'
            ORDER BY created_at
            LIMIT ?
        """, (batch_size,)).fetchall()

        for msg_id, topic, body, retry_count in rows:
            try:
                cur.execute("""
                    UPDATE outbox_messages
                    SET status = 'SENDING', updated_at = ?
                    WHERE msg_id = ? AND status = 'NEW'
                """, (now_ts(), msg_id))
                if cur.rowcount == 0:
                    continue
                conn.commit()

                mq.put({
                    "msg_id": msg_id,
                    "topic": topic,
                    "body": body
                })

                cur.execute("""
                    UPDATE outbox_messages
                    SET status = 'SENT', updated_at = ?
                    WHERE msg_id = ?
                """, (now_ts(), msg_id))
                conn.commit()
                print(f"[RELAY] 投递成功: {msg_id}")
            except Exception as e:
                conn.rollback()
                cur.execute("""
                    UPDATE outbox_messages
                    SET status = 'NEW', retry_count = retry_count + 1, updated_at = ?
                    WHERE msg_id = ?
                """, (now_ts(), msg_id))
                conn.commit()
                print(f"[RELAY] 投递失败: {msg_id}, err={e}")

def consume_once(consumer_name="stock-service"):
    if mq.empty():
        return

    msg = mq.get()
    msg_id = msg["msg_id"]
    body = json.loads(msg["body"])
    sku_id = body["sku_id"]
    count = body["count"]

    with get_conn() as conn:
        cur = conn.cursor()
        try:
            conn.execute("BEGIN")

            # 幂等校验:已消费则直接返回
            exists = cur.execute(
                "SELECT 1 FROM consumer_log WHERE msg_id = ?",
                (msg_id,)
            ).fetchone()
            if exists:
                conn.commit()
                print(f"[CONSUMER] 重复消息,跳过: {msg_id}")
                return

            stock_row = cur.execute(
                "SELECT available FROM stock WHERE sku_id = ?",
                (sku_id,)
            ).fetchone()
            if not stock_row:
                raise Exception("sku not found")

            available = stock_row[0]
            if available < count:
                raise Exception("stock not enough")

            cur.execute(
                "UPDATE stock SET available = available - ? WHERE sku_id = ?",
                (count, sku_id)
            )
            cur.execute(
                "INSERT INTO consumer_log (msg_id, consumer, consumed_at) VALUES (?, ?, ?)",
                (msg_id, consumer_name, now_ts())
            )
            conn.commit()
            print(f"[CONSUMER] 扣库存成功: {msg_id}, sku={sku_id}, count={count}")
        except Exception as e:
            conn.rollback()
            print(f"[CONSUMER] 消费失败: {msg_id}, err={e}")
            # 模拟重试:重新放回队列
            mq.put(msg)

def print_state():
    with get_conn() as conn:
        cur = conn.cursor()
        stock = cur.execute("SELECT sku_id, available FROM stock").fetchall()
        outbox = cur.execute("SELECT msg_id, status, retry_count FROM outbox_messages").fetchall()
        logs = cur.execute("SELECT msg_id, consumer FROM consumer_log").fetchall()
        print("\n=== 当前状态 ===")
        print("stock =", stock)
        print("outbox =", outbox)
        print("consumer_log =", logs)
        print("================\n")

if __name__ == "__main__":
    init_db()

    create_order("O1001", "U1", 100, "SKU-1", 2)

    relay_once()
    consume_once()

    # 故意制造一次重复消息
    mq.put({
        "msg_id": "msg-O1001",
        "topic": "stock.deduct",
        "body": json.dumps({"order_id": "O1001", "sku_id": "SKU-1", "count": 2})
    })
    consume_once()

    print_state()

运行后你会看到什么

第一次消费会真正扣库存,第二次因为 consumer_log 已有记录,所以会被识别为重复消息并跳过。
这就是“至少一次投递 + 消费端幂等”的经典组合。


数据库设计建议

如果你准备把这个模式真正用到生产环境,表结构最好别太随意。下面是一个更接近实战的消息表示例。

CREATE TABLE outbox_messages (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    msg_id VARCHAR(64) NOT NULL UNIQUE,
    biz_key VARCHAR(64) NOT NULL,
    topic VARCHAR(128) NOT NULL,
    tag VARCHAR(64) DEFAULT NULL,
    body JSON NOT NULL,
    headers JSON DEFAULT NULL,
    status VARCHAR(16) NOT NULL,
    retry_count INT NOT NULL DEFAULT 0,
    next_retry_time DATETIME DEFAULT NULL,
    shard_key VARCHAR(64) DEFAULT NULL,
    created_at DATETIME NOT NULL,
    updated_at DATETIME NOT NULL,
    INDEX idx_status_retry (status, next_retry_time),
    INDEX idx_biz_key (biz_key)
);

几个关键字段的建议:

  • msg_id:全局唯一消息 ID,用于消费幂等
  • biz_key:业务主键,比如订单号,方便排障
  • status:消息状态
  • retry_count:重试次数
  • next_retry_time:指数退避调度
  • headers:可放 traceId、来源系统、版本号等
  • shard_key:高并发时便于分片扫描

常见坑与排查

这部分是最有“血泪味”的。我把线上最常见的问题按“现象 -> 原因 -> 排查方法 -> 处理建议”来讲。

1. 订单成功了,但下游没收到消息

常见原因

  • 业务事务提交后,发送 MQ 前服务挂了
  • 代码里先发 MQ,后写 DB,导致顺序不可靠
  • 消息表有数据,但投递器没扫到
  • 投递器更新状态失败,造成脏状态

排查路径

先查订单,再查消息表,再查 MQ,再查消费者日志:

SELECT * FROM orders WHERE order_id = 'O1001';

SELECT msg_id, biz_key, status, retry_count, updated_at
FROM outbox_messages
WHERE biz_key = 'O1001';

SELECT * FROM consumer_log WHERE msg_id = 'msg-O1001';

经验建议

  • 不要把“发 MQ”直接放在本地事务外裸写
  • 一定要有消息表补偿任务
  • 投递器要支持把卡在 SENDING 太久的消息回滚到 NEW

2. 消息重复消费,导致库存扣了两次

常见原因

  • MQ 本身是至少一次投递
  • 消费成功了,但 ACK 丢了
  • 消费者执行成功,但记录幂等标记失败
  • 多实例并发消费同一业务数据

正确做法

幂等不能只靠“业务代码里 if 判断一下”,而应尽量靠唯一约束 + 事务兜底。

例如消费日志表:

CREATE TABLE consumer_log (
    msg_id VARCHAR(64) PRIMARY KEY,
    consumer VARCHAR(64) NOT NULL,
    consumed_at DATETIME NOT NULL
);

消费时:

  1. 开启本地事务
  2. 检查/插入幂等记录
  3. 执行业务更新
  4. 提交事务
  5. 最后 ACK

我踩过的一个坑

有些实现会先执行业务更新,再写幂等表。
如果写幂等表失败,消息重试后又会再次扣减。顺序错了,幂等就失效了。


3. 消息堆积越来越严重

典型现象

  • MQ backlog 飙升
  • 消费延迟从秒级变分钟级
  • 下游数据库 CPU 拉满
  • 重试消息越来越多

常见原因

  • 消费者处理太慢
  • 单条消息粒度过大
  • 下游依赖故障导致反复重试
  • 死信处理缺失,坏消息一直拖垮队列

排查重点

先看这几个指标:

  • 每秒生产量 / 每秒消费量
  • 消费耗时 P95 / P99
  • 重试比例
  • 死信数量
  • 下游 DB 慢 SQL、锁等待
  • 消费线程池是否打满

快速止血方案

  1. 扩容消费者实例
  2. 暂时降级非核心消费逻辑
  3. 把毒性消息转入死信队列
  4. 限制重试频率,避免雪崩
  5. 对热点业务分区隔离

4. 消费失败一直重试,系统被打穿

本质问题

不是所有错误都应该立即重试。

错误分类建议

  • 瞬时错误:网络抖动、超时、临时连接失败
    -> 可重试
  • 业务错误:库存不足、订单状态非法
    -> 不应无限重试
  • 脏数据错误:字段缺失、协议不兼容
    -> 直接进死信并报警

重试策略建议

  • 第 1 次:5 秒后
  • 第 2 次:30 秒后
  • 第 3 次:2 分钟后
  • 第 4 次:10 分钟后
  • 超过阈值:进入死信队列

这比“失败立刻再来一次”要健康得多。


5. 明明消息已经消费成功,但链路上看不到

常见原因

  • traceId 没透传
  • 消费日志没有记录业务主键
  • 监控只做了队列级,没有做到消息级
  • 状态更新滞后或丢失

建议做法

每条消息至少带上:

  • msgId
  • bizKey
  • traceId
  • sourceService
  • eventType
  • eventTime

日志打印也不要只打“消费成功”,最好这样:

{
  "msgId": "msg-O1001",
  "bizKey": "O1001",
  "traceId": "9f3c...",
  "topic": "stock.deduct",
  "consumer": "stock-service",
  "result": "success"
}

安全/性能最佳实践

最终一致性方案不是只看“能不能跑”,还要看“高峰期能不能扛住、出问题能不能控住”。

安全最佳实践

1. 消息体最小化

不要把完整用户信息、身份证号、银行卡等敏感数据直接塞进消息体。

更稳妥的方式是:

  • 消息中只传业务主键
  • 消费者按主键回源查询必要信息
  • 敏感字段脱敏或加密

2. 做好消息签名或来源校验

如果消息链路跨网络域或跨团队,建议增加:

  • 来源系统标识
  • 签名字段
  • 时间戳
  • 重放保护

3. 权限隔离

  • 生产者和消费者用独立账号
  • 不同业务 Topic 分权
  • 补偿工具、重放工具设置审批与审计

性能最佳实践

1. 扫描消息表要分页、分片、限速

最怕的是一个定时任务:

SELECT * FROM outbox_messages WHERE status = 'NEW';

这种写法在量大时非常危险。
正确做法通常是:

  • 按主键范围分页
  • 按时间窗口扫描
  • 按分片键并行拉取
  • 每批次控制大小

2. 业务库与消息扫描压力隔离

如果业务库本来就很忙,消息表扫描会雪上加霜。可以考虑:

  • 单独库表
  • 读写分离
  • 消息表归档
  • 扫描窗口错峰执行

3. 消费端要批量化,但别牺牲幂等

对 Kafka 一类高吞吐队列,消费端可批量拉取、批量处理。
但注意:

  • 幂等记录依然要可靠
  • 批量失败要能局部重试
  • 不要因为一条脏消息拖垮整批

4. 容量估算要提前做

一个非常粗略但实用的估算方法:

  • 每秒订单峰值:QPS_order
  • 每个订单平均派生消息数:N_msg
  • 单日消息体大小:S_msg
  • 保留时长:T

那么:

  • 每秒消息量 ≈ QPS_order * N_msg
  • 每秒带宽 ≈ QPS_order * N_msg * S_msg
  • 存储量 ≈ 每秒消息量 * T * 单条索引/元数据放大系数

举个例子:

  • 峰值下单 2000 QPS
  • 每个订单 3 条消息
  • 单条 2 KB

那消息流量大约就是:

  • 2000 * 3 = 6000 条/秒
  • 6000 * 2KB ≈ 12MB/s

如果消费者实际只能吃掉 4000 条/秒,堆积就会持续增长。
这类问题不是“优化一下代码”就能解决,往往要从容量规划上提前做。


一套更稳的落地建议

如果你准备在真实项目中实施,我建议按下面这个最小闭环来:

生产者侧

  • 本地事务:业务表 + outbox 表一起提交
  • 独立投递器异步扫表
  • 投递状态机明确
  • 超时 SENDING 自动回收
  • 支持补偿重发

MQ 侧

  • Topic 按业务域拆分
  • 消息保留策略明确
  • 死信队列开启
  • 监控 backlog、重试、延迟

消费者侧

  • 幂等表或业务唯一约束
  • 业务更新与幂等记录同事务
  • 错误分类处理
  • 重试退避
  • 毒性消息快速隔离

运维与治理侧

  • traceId 全链路透传
  • 可按 bizKey/msgId 检索
  • 提供消息重放工具
  • 提供死信修复流程
  • 定期做对账任务

常用排障清单

线上出问题时,我通常按这个顺序排:

1. 先确认业务是否真正成功

  • 订单表是否已提交?
  • 状态是否符合预期?

2. 再确认 outbox 是否存在

  • 是否有对应 bizKey
  • 状态是 NEW/SENDING/SENT/FAILED 哪一种
  • retry_count 有没有异常升高

3. 再看 MQ 是否已接收

  • 是否进入正确 Topic
  • 分区/路由是否正常
  • 消息堆积是否异常

4. 再看消费者是否处理

  • 是否拉到消息
  • 是否幂等拦截
  • 是否下游超时或锁冲突

5. 最后看是否需要补偿

  • 自动补偿是否已触发
  • 是否进入死信
  • 是否需要人工重放

这个顺序的好处是:从“数据事实”出发,而不是从“猜测代码可能有问题”出发。


总结

基于消息队列实现最终一致性,本质上是在强一致和系统可用性之间做工程上的平衡。

真正靠谱的方案,通常不是“MQ 发成功就完了”,而是这一整套:

  • 本地事务写业务数据 + 消息表
  • 异步可靠投递
  • 消费端幂等
  • 失败重试与死信隔离
  • 对账补偿与可观测性

如果你只记住三句话,我建议记这三句:

  1. 不要赌消息不会丢,要设计成丢了也能补。
  2. 不要赌消息不会重复,要设计成重复也没事。
  3. 不要赌线上永远正常,要让排障路径清晰可执行。

最后给几个可直接落地的建议:

  • 优先选 Outbox 模式 做生产者可靠性
  • 消费端默认按 至少一次投递 来设计幂等
  • 重试一定做 退避和上限
  • 死信队列、补偿任务、链路追踪一个都别少
  • 对“库存不足、状态非法”这类业务错误,不要无脑重试

边界也要讲清楚:
如果你的业务要求“跨服务必须瞬时全局一致,且不能接受任何短暂不一致”,那最终一致性不是最佳解,应该重新评估 TCC 或更强事务方案。

但对大多数互联网业务系统来说,这套方案足够实战、足够稳,也足够值得长期建设。


分享到:

上一篇
《Spring Boot 中基于 Spring Cache 与 Redis 的多级缓存实战:一致性、穿透防护与性能调优》
下一篇
《安卓逆向实战:基于 Frida 与 JADX 的登录参数签名分析与请求重放方法》