跳转到内容
123xiao | 无名键客

《微服务架构中分布式事务的实战方案:基于 Saga 模式与消息最终一致性的落地指南》

字数: 0 阅读时长: 1 分钟

背景与问题

只要系统拆成了微服务,分布式事务几乎迟早会找上门。

一个最典型的场景是下单:

  1. 订单服务创建订单
  2. 库存服务扣减库存
  3. 支付服务冻结或扣款
  4. 营销服务发放优惠
  5. 物流服务创建配送单

如果这几个动作都在一个数据库里,用本地事务很好办;但一旦它们分散在不同服务、不同数据库,传统单机事务就失效了。很多团队最开始会想到 2PC/XA,但真正落地时常常会遇到几个现实问题:

  • 数据库、中间件支持不统一
  • 锁持有时间长,吞吐量明显下降
  • 协调器故障会放大系统脆弱性
  • 云原生环境下跨服务强一致成本过高

所以,在微服务里,我们通常不会强行追求“所有节点同时提交”。更常见、也更务实的做法是:

  • 业务流程层面用 Saga 拆解大事务
  • 服务间通信用可靠消息实现最终一致性
  • 通过幂等、补偿、重试、对账把系统拉回正确状态

这篇文章不讲概念堆砌,而是从架构设计到代码实现,带你把这套方案走一遍。


为什么 Saga + 消息最终一致性更适合微服务

先说结论:Saga 适合长流程业务事务,消息最终一致性适合跨服务状态传播,两者结合,是很多中大型微服务系统里性价比很高的一种方案。

典型业务链路

以“订单创建”为例:

  • 订单服务:创建订单,状态 PENDING
  • 库存服务:锁定库存
  • 支付服务:创建支付单
  • 积分服务:冻结积分
  • 如果任何一步失败,触发前面步骤的补偿动作

这里的关键不是“绝不失败”,而是:

  • 每一步都能独立提交本地事务
  • 失败后能通过补偿把业务回滚到可接受状态
  • 即使消息重复、延迟、乱序,也不会把数据搞乱

核心原理

1. Saga 模式:把大事务拆成一串本地事务

Saga 的核心思想很简单:

一个跨服务的大事务,拆成多个本地事务,每个本地事务成功后发布下一步指令;如果中间失败,则按相反顺序执行补偿操作。

Saga 常见有两种实现方式:

编排式 Saga(Orchestration)

由一个“流程协调者”统一决定下一步做什么。

优点:

  • 流程清晰,适合复杂业务
  • 容易观察整体状态
  • 补偿逻辑集中管理

缺点:

  • 协调器容易变复杂
  • 需要处理状态机持久化

协同式 Saga(Choreography)

每个服务通过事件驱动下一步,谁收到事件谁决定是否继续。

优点:

  • 去中心化,扩展灵活
  • 服务间解耦较强

缺点:

  • 链路分散,排查困难
  • 业务流程容易“隐形化”

在实战里,我更建议:

  • 流程简单、团队成熟度一般:优先编排式
  • 流程较短、事件驱动明显:可考虑协同式

2. 消息最终一致性:允许短暂不一致,但最终收敛

消息最终一致性的核心在于:

  • 本地事务成功
  • 可靠地把“业务事件”写出来
  • 消费方反复重试直到处理成功
  • 使用幂等避免重复处理副作用

这里最经典的做法是 Outbox Pattern(事务消息表)

  1. 在本地事务中,同时写业务数据和消息表
  2. 后台任务把消息表中的未发送消息投递到 MQ
  3. 消费方收到消息后执行业务逻辑
  4. 处理成功后记录消费状态

这样即使服务在事务提交后瞬间宕机,消息也不会丢。


整体架构设计

下面先看一个推荐的落地架构。

flowchart LR
    A[客户端提交订单] --> B[订单服务]
    B --> C[(订单库)]
    B --> D[(Outbox消息表)]
    D --> E[消息投递器]
    E --> F[MQ]
    F --> G[库存服务]
    F --> H[支付服务]
    F --> I[积分服务]
    G --> J[(库存库)]
    H --> K[(支付库)]
    I --> L[(积分库)]
    G --> F
    H --> F
    I --> F

这个图里有两个非常关键的点:

  • 订单服务不直接跨库开事务
  • 事件发布依赖 Outbox,而不是“先写库再发 MQ”这种不可靠操作

Saga 状态流转

stateDiagram-v2
    [*] --> PENDING
    PENDING --> INVENTORY_LOCKED: 库存锁定成功
    INVENTORY_LOCKED --> PAYMENT_CREATED: 支付单创建成功
    PAYMENT_CREATED --> COMPLETED: 全部成功
    INVENTORY_LOCKED --> COMPENSATING: 支付失败
    PAYMENT_CREATED --> COMPENSATING: 后续失败
    COMPENSATING --> CANCELLED: 补偿完成
    COMPENSATING --> COMPENSATION_FAILED: 补偿异常
    COMPENSATION_FAILED --> COMPENSATING: 重试补偿

这里建议你不要只维护“成功/失败”两态,而是明确建模:

  • PENDING
  • INVENTORY_LOCKED
  • PAYMENT_CREATED
  • COMPLETED
  • COMPENSATING
  • CANCELLED
  • COMPENSATION_FAILED

状态足够清晰,排障时真的会轻松很多。


方案对比与取舍分析

1. 2PC/XA vs Saga

方案一致性性能实现复杂度适用场景
2PC/XA强一致较低少量核心场景、老系统
TCC强一致偏业务化很高对一致性要求极高
Saga最终一致长事务、微服务业务流程
可靠消息最终一致性最终一致异步驱动、事件传播

2. 什么时候别用 Saga

Saga 不是银弹。以下场景要谨慎:

  • 证券撮合、账户实时扣款 等对强一致要求极高
  • 补偿动作不具备业务可逆性
  • 流程高度耦合且失败成本巨大
  • 团队没有统一的事件规范、幂等机制、监控体系

换句话说,Saga 很适合电商、履约、营销、订单类场景,但不一定适合所有资金核心链路。


核心设计要点

1. 业务 ID 与幂等键

每一条业务操作都要带上:

  • order_id
  • saga_id
  • message_id
  • idempotency_key

我踩过的一个坑是:只按 order_id 做幂等,结果同一订单内不同步骤互相覆盖,最后查出来一团乱。更稳妥的方式是:

业务主键 + 步骤名 + 动作类型

例如:

  • order-1001:reserve_inventory:forward
  • order-1001:reserve_inventory:compensate

2. 补偿不是数据库 rollback

很多人一开始会误以为“补偿 = 回滚”。其实不是。

比如库存锁定成功后,支付失败,补偿动作不是回滚 SQL,而是发起一笔新的业务操作:

  • 解锁库存
  • 取消支付单
  • 回退积分冻结

它们都应当是显式的业务接口。

3. 重试要有边界

重试不是无限打。建议配置:

  • 指数退避
  • 最大重试次数
  • 死信队列
  • 人工干预入口

不然失败消息会在系统里永远打转,CPU、日志、告警一起爆。


实战代码(可运行)

下面给一个简化但可运行的示例,使用 Python + Flask + SQLite 来模拟:

  • 订单服务
  • Outbox 消息表
  • 消息投递
  • 库存服务消费
  • 幂等处理

这个示例重点演示模式,不依赖真实 MQ。我们用数据库表模拟消息队列,方便本地跑起来理解全流程。

目录结构

.
├── app.py
└── requirements.txt

requirements.txt

flask==3.0.0

app.py

import json
import sqlite3
import threading
import time
import uuid
from contextlib import contextmanager
from flask import Flask, jsonify, request

app = Flask(__name__)
DB = "demo.db"


@contextmanager
def get_conn():
    conn = sqlite3.connect(DB)
    conn.row_factory = sqlite3.Row
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


def init_db():
    with get_conn() as conn:
        conn.execute("""
        CREATE TABLE IF NOT EXISTS orders (
            id TEXT PRIMARY KEY,
            status TEXT NOT NULL,
            amount INTEGER NOT NULL,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
        """)
        conn.execute("""
        CREATE TABLE IF NOT EXISTS inventory (
            product_id TEXT PRIMARY KEY,
            stock INTEGER NOT NULL
        )
        """)
        conn.execute("""
        CREATE TABLE IF NOT EXISTS outbox (
            id TEXT PRIMARY KEY,
            topic TEXT NOT NULL,
            payload TEXT NOT NULL,
            status TEXT NOT NULL DEFAULT 'NEW',
            retry_count INTEGER NOT NULL DEFAULT 0,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
        """)
        conn.execute("""
        CREATE TABLE IF NOT EXISTS inbox_processed (
            idempotency_key TEXT PRIMARY KEY,
            processed_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
        """)
        conn.execute("""
        CREATE TABLE IF NOT EXISTS inventory_reservations (
            order_id TEXT PRIMARY KEY,
            product_id TEXT NOT NULL,
            quantity INTEGER NOT NULL,
            status TEXT NOT NULL
        )
        """)
        cur = conn.execute("SELECT COUNT(*) AS c FROM inventory")
        if cur.fetchone()["c"] == 0:
            conn.execute(
                "INSERT INTO inventory(product_id, stock) VALUES (?, ?)",
                ("sku-1", 10)
            )


def create_order_with_outbox(order_id, amount, product_id, quantity):
    saga_id = str(uuid.uuid4())
    message_id = str(uuid.uuid4())
    idempotency_key = f"{order_id}:reserve_inventory:forward"

    event = {
        "message_id": message_id,
        "saga_id": saga_id,
        "order_id": order_id,
        "product_id": product_id,
        "quantity": quantity,
        "idempotency_key": idempotency_key
    }

    with get_conn() as conn:
        conn.execute(
            "INSERT INTO orders(id, status, amount) VALUES (?, ?, ?)",
            (order_id, "PENDING", amount)
        )
        conn.execute(
            "INSERT INTO outbox(id, topic, payload, status) VALUES (?, ?, ?, ?)",
            (message_id, "ReserveInventory", json.dumps(event), "NEW")
        )


def publish_outbox():
    while True:
        try:
            with get_conn() as conn:
                rows = conn.execute(
                    "SELECT id, topic, payload, retry_count FROM outbox WHERE status = 'NEW' ORDER BY created_at LIMIT 10"
                ).fetchall()

                for row in rows:
                    payload = json.loads(row["payload"])
                    try:
                        if row["topic"] == "ReserveInventory":
                            handle_reserve_inventory(payload)

                        conn.execute(
                            "UPDATE outbox SET status = 'SENT' WHERE id = ?",
                            (row["id"],)
                        )
                    except Exception as e:
                        conn.execute(
                            "UPDATE outbox SET retry_count = retry_count + 1 WHERE id = ?",
                            (row["id"],)
                        )
                        print(f"[publisher] publish failed: {e}, message_id={row['id']}")
        except Exception as e:
            print(f"[publisher] loop error: {e}")

        time.sleep(2)


def handle_reserve_inventory(event):
    order_id = event["order_id"]
    product_id = event["product_id"]
    quantity = event["quantity"]
    idempotency_key = event["idempotency_key"]

    with get_conn() as conn:
        processed = conn.execute(
            "SELECT 1 FROM inbox_processed WHERE idempotency_key = ?",
            (idempotency_key,)
        ).fetchone()
        if processed:
            print(f"[inventory] duplicate message ignored: {idempotency_key}")
            return

        stock_row = conn.execute(
            "SELECT stock FROM inventory WHERE product_id = ?",
            (product_id,)
        ).fetchone()

        if not stock_row:
            raise Exception("product not found")

        if stock_row["stock"] < quantity:
            conn.execute(
                "UPDATE orders SET status = 'CANCELLED' WHERE id = ?",
                (order_id,)
            )
            conn.execute(
                "INSERT INTO inbox_processed(idempotency_key) VALUES (?)",
                (idempotency_key,)
            )
            print(f"[inventory] insufficient stock, order cancelled: {order_id}")
            return

        conn.execute(
            "UPDATE inventory SET stock = stock - ? WHERE product_id = ?",
            (quantity, product_id)
        )
        conn.execute(
            "INSERT OR REPLACE INTO inventory_reservations(order_id, product_id, quantity, status) VALUES (?, ?, ?, ?)",
            (order_id, product_id, quantity, "RESERVED")
        )
        conn.execute(
            "UPDATE orders SET status = 'INVENTORY_LOCKED' WHERE id = ?",
            (order_id,)
        )
        conn.execute(
            "INSERT INTO inbox_processed(idempotency_key) VALUES (?)",
            (idempotency_key,)
        )
        print(f"[inventory] reserved stock for order: {order_id}")


@app.post("/orders")
def create_order():
    body = request.json or {}
    order_id = body.get("order_id") or str(uuid.uuid4())
    amount = int(body.get("amount", 100))
    product_id = body.get("product_id", "sku-1")
    quantity = int(body.get("quantity", 1))

    try:
        create_order_with_outbox(order_id, amount, product_id, quantity)
        return jsonify({"success": True, "order_id": order_id}), 201
    except sqlite3.IntegrityError:
        return jsonify({"success": False, "message": "duplicate order_id"}), 409


@app.get("/orders/<order_id>")
def get_order(order_id):
    with get_conn() as conn:
        row = conn.execute(
            "SELECT * FROM orders WHERE id = ?",
            (order_id,)
        ).fetchone()
        if not row:
            return jsonify({"message": "not found"}), 404
        return jsonify(dict(row))


@app.get("/inventory/<product_id>")
def get_inventory(product_id):
    with get_conn() as conn:
        row = conn.execute(
            "SELECT * FROM inventory WHERE product_id = ?",
            (product_id,)
        ).fetchone()
        if not row:
            return jsonify({"message": "not found"}), 404
        return jsonify(dict(row))


if __name__ == "__main__":
    init_db()
    t = threading.Thread(target=publish_outbox, daemon=True)
    t.start()
    app.run(debug=True)

运行方式

python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python app.py

测试创建订单

curl -X POST http://127.0.0.1:5000/orders \
  -H "Content-Type: application/json" \
  -d '{
    "order_id": "order-1001",
    "amount": 100,
    "product_id": "sku-1",
    "quantity": 2
  }'

查询订单状态:

curl http://127.0.0.1:5000/orders/order-1001

查询库存:

curl http://127.0.0.1:5000/inventory/sku-1

这个示例体现了什么

虽然代码不长,但已经体现了落地的关键点:

  • 订单写入和 Outbox 写入同一个本地事务
  • 消息由后台任务异步投递
  • 消费端通过 inbox_processed 做幂等
  • 库存不足时执行业务取消,而不是数据库层回滚跨服务状态

Saga 时序示意

再看一张完整一点的时序图,会更容易理解“成功路径”和“补偿路径”。

sequenceDiagram
    participant C as Client
    participant O as 订单服务
    participant X as Outbox投递器
    participant M as MQ
    participant I as 库存服务
    participant P as 支付服务

    C->>O: 创建订单
    O->>O: 本地事务写订单+Outbox
    X->>M: 投递 ReserveInventory
    M->>I: 消费库存锁定事件
    I->>I: 本地事务锁定库存
    I->>M: 发布 InventoryReserved
    M->>P: 消费支付创建事件
    P->>P: 创建支付单失败
    P->>M: 发布 PaymentFailed
    M->>O: 通知 Saga 补偿
    O->>M: 发布 ReleaseInventory
    M->>I: 消费释放库存事件
    I->>I: 解锁库存
    O->>O: 更新订单为 CANCELLED

常见坑与排查

这部分我建议认真看,很多项目不是败在“不会设计”,而是败在这些细节没兜住。

1. 先发 MQ,再提交数据库

这是非常常见的错误顺序。

如果你这样做:

  1. 发 MQ 成功
  2. 本地数据库提交失败

那消费者已经收到事件,但业务数据根本不存在,直接产生脏读和幻读式问题。

正确做法

  • 用 Outbox:业务数据和消息记录同事务提交
  • 异步投递 MQ

2. 消息重复消费导致库存多扣

消息系统默认至少一次投递,重复消费是常态,不是异常。

排查方法

看消费日志里是否存在:

  • 同一个 message_id
  • 同一个 idempotency_key
  • 同一个业务主键被多次更新

正确做法

  • 消费端一定做幂等
  • 幂等记录必须落库,不要只放内存
  • 幂等判断和业务处理尽量在一个本地事务里完成

3. 补偿消息比正向消息先到

在高并发、网络抖动、重试并存时,消息乱序并不罕见。

表现

  • 先收到 ReleaseInventory
  • 后收到 ReserveInventory

如果没做状态校验,就会出现库存状态错乱。

处理建议

  • 每个步骤都建状态机
  • 对非法状态迁移直接拒绝
  • 补偿操作要求可幂等、可重入

例如:

  • 只有 RESERVED 状态才能 RELEASED
  • 已经 RELEASED 再次释放时直接成功返回

4. 补偿失败后无人接手

这是线上系统里最危险的隐性故障之一。表面看订单只是“处理中”,实际上永远卡住了。

建议做法

  • 补偿任务持久化
  • 设置最大重试次数
  • 超阈值进入人工处理队列
  • 给运营或技术支持提供修复入口

5. 链路追踪缺失,定位全靠猜

当订单、库存、支付分散在不同服务里,如果没有统一 Trace ID,排查会非常痛苦。

至少要统一这些字段

  • trace_id
  • saga_id
  • order_id
  • message_id

日志、监控、审计表里都应该能查到。


安全/性能最佳实践

安全方面

1. 消息内容最小化

不要把敏感数据原样塞进消息体,比如:

  • 用户完整身份证号
  • 银行卡号
  • 明文手机号

消息中尽量只放:

  • 业务 ID
  • 资源 ID
  • 必要状态
  • 可脱敏字段

2. 接口验签与权限控制

补偿接口、状态推进接口,不能谁都能调。建议:

  • 服务间使用 mTLS 或网关鉴权
  • 内部事件带签名或可信来源校验
  • 运维修复接口做 RBAC 权限控制

3. 审计留痕

对于涉及资金、库存、权益的事务流转,最好保留:

  • 操作时间
  • 操作者/系统来源
  • 前后状态
  • 请求参数摘要

这样在事故复盘时非常有价值。

性能方面

1. Outbox 表要控制膨胀

Outbox 是高频写表,如果不清理,很快就会变成热点。

建议:

  • 按时间分表或分区
  • 发送成功的数据归档
  • 建立 (status, created_at) 索引

示例 SQL:

CREATE INDEX idx_outbox_status_created_at
ON outbox(status, created_at);

2. 批量拉取、批量发送

投递器不要一条条查、一条条发。可以:

  • 每次拉 100 条
  • 批量提交 MQ
  • 控制并发线程数

这样吞吐量会明显提升。

3. 消费逻辑要短事务

消费者本地事务里只做必要的状态变更,不要顺手查一堆远程接口。否则:

  • 锁时间变长
  • 重试成本变高
  • 雪崩时恢复更慢

4. 容量估算要提前做

一个粗略估算公式:

每日消息量 = 日订单量 × 平均事务步骤数 × (1 + 重试率 + 补偿率)

比如:

  • 日订单量 100 万
  • 平均 4 个步骤
  • 重试率 5%
  • 补偿率 2%

那么消息量大约是:

100万 × 4 × (1 + 0.05 + 0.02) = 428万

这会直接影响:

  • MQ 吞吐选型
  • 消费者实例数
  • Outbox 清理策略
  • 日志与监控成本

落地建议:从“小闭环”开始

如果你所在团队还没有成熟的分布式事务治理体系,我不建议一上来就做一个超级通用的 Saga 平台。更现实的路径是:

第一步:先把 Outbox + 幂等打牢

先确保:

  • 业务数据和事件消息同事务落库
  • 消费端有稳定幂等能力
  • 有失败重试和死信处理

这一步完成后,很多“消息丢失、重复消费、状态不一致”问题就能大幅下降。

第二步:对一个核心链路引入 Saga

建议选择:

  • 业务价值高
  • 步骤数 3~5 个
  • 补偿动作清晰
  • 能接受短暂最终一致

比如:

  • 下单 -> 锁库存 -> 创建支付单
  • 会员开通 -> 权益发放 -> 通知 CRM

第三步:补齐监控与人工修复

技术方案能否真正上线,不在于 happy path 多漂亮,而在于异常时能不能接住。至少要有:

  • Saga 状态查询页
  • 失败消息重试入口
  • 补偿失败告警
  • 业务对账任务

一个实用的排查清单

当你遇到“订单状态不对”时,可以按这个顺序查:

  1. 订单主记录状态是否正确
  2. Outbox 是否已生成消息
  3. 消息是否投递成功
  4. 消费者是否收到消息
  5. 幂等表是否已记录
  6. 下游本地事务是否提交
  7. 是否触发补偿
  8. 补偿是否重试失败
  9. 是否进入死信或人工队列
  10. 对账任务是否已经修复

这个顺序很重要,能避免一上来就在 Kafka、RocketMQ、数据库日志里到处乱翻。


总结

在微服务架构里,分布式事务的关键不是追求“绝对同步提交”,而是建立一套 可恢复、可观测、可补偿 的一致性机制。

如果把这篇文章压缩成几条最有执行价值的建议,我会给这几条:

  1. 优先考虑 Saga + 消息最终一致性,而不是盲目上 XA
  2. 消息发布一定用 Outbox,不要“写库后顺手发 MQ”
  3. 消费端幂等是底线,不是优化项
  4. 补偿动作必须显式设计,并且可重试、可审计
  5. 状态机、Trace ID、死信队列、人工修复入口必须配套

最后也要提醒一个边界条件:
Saga 解决的是大多数业务系统里的“最终一致性问题”,不是所有场景都能接受它。 对资金强一致、法律合规要求极高的链路,仍然要结合 TCC、账户系统记账模型,甚至局部保留单体强事务设计。

如果你的业务属于订单、库存、履约、营销这类典型微服务链路,那么从 Outbox + 幂等 + 编排式 Saga 开始,通常是一条足够稳、也足够现实的落地路径。


分享到:

上一篇
《AI 应用中 RAG 检索增强生成的中级实战:从向量库选型到召回效果优化》
下一篇
《分布式架构中基于一致性哈希与服务发现的灰度发布实践与避坑指南》