跳转到内容
123xiao | 无名键客

《自动化测试中的测试数据治理实践:从数据构造、隔离到回收的落地方案》

字数: 0 阅读时长: 1 分钟

背景与问题

自动化测试做久了,很多团队最后卡住的不是“脚本不会写”,而是“数据越来越乱”。

我自己在项目里最常见到的几类问题是:

  • 用例依赖固定账号,今天能跑,明天被别人改脏了
  • 测试环境共享一套数据库,A 任务刚造完数据,B 任务就把它覆盖了
  • 回归测试跑完留下大量垃圾数据,环境越跑越慢
  • 测试数据构造逻辑分散在各个脚本里,没人知道哪些字段是关键,改一个接口一片红
  • CI 并发一高,重复造数、主键冲突、唯一索引冲突全来了

这些问题的本质,不只是“测试数据不够用”,而是测试数据缺少治理

所谓治理,不是单纯准备几条初始化 SQL,而是把测试数据看成一个完整生命周期的对象:

  1. 构造:怎么稳定、快速、可复用地生成数据
  2. 隔离:怎么保证不同用例、不同任务、不同环境互不污染
  3. 回收:怎么在测试后清理、归档、过期销毁,避免环境腐化

如果这三件事没有统一设计,自动化测试规模一上来,稳定性一定会掉。


核心原理

测试数据治理可以抽象成一个很实用的模型:“数据工厂 + 隔离策略 + 生命周期回收器”

1. 数据构造:从“写死数据”升级为“可声明的数据工厂”

很多团队一开始会在测试里直接写:

user = {
    "name": "test_user_01",
    "phone": "13800000001"
}

看起来简单,但很快会遇到问题:

  • 数据重复
  • 不同场景字段不一致
  • 一个对象依赖另一个对象时,构造顺序混乱
  • 修改业务字段后,需要全仓库替换

更好的办法是建立测试数据工厂(Data Factory)

  • 统一定义对象默认值
  • 支持按场景覆盖字段
  • 支持级联构造关联对象
  • 自动生成唯一标识
  • 记录创建来源,方便后续回收

核心思路是:用“模板 + 参数覆盖 + 元数据标签”生成数据

2. 数据隔离:优先隔离命名空间,而不是只靠“大家自觉”

测试数据隔离至少有三层:

第一层:用例级隔离

每个用例的数据都带唯一标识,比如:

  • run_id
  • case_id
  • worker_id
  • timestamp

例如用户名不再是 test_user,而是:

test_user_{run_id}_{case_id}

第二层:任务级隔离

CI 中多个 Job 并发执行时,需要按任务隔离:

  • 独立数据库 schema
  • 独立租户
  • 独立业务前缀
  • 独立队列 topic / cache key 前缀

第三层:环境级隔离

测试环境、预发环境、联调环境的数据不能混用。最怕的是:

  • 自动化脚本误连到准生产
  • 回收任务清错环境
  • 公共账号在不同环境共享凭据

所以治理里一定要把环境身份放入数据元信息里。

3. 数据回收:别指望“跑完自然干净”

测试数据如果不回收,会带来几个直接后果:

  • 数据量膨胀,查询变慢
  • 唯一键冲突越来越频繁
  • 老数据影响断言结果
  • 环境出现“假通过”或“假失败”

回收通常有三种策略:

策略 A:事务回滚

适合单进程、单库、无异步副作用的接口测试。

优点:

  • 干净
  • 不残留

缺点:

  • 无法覆盖异步任务、消息队列、外部系统调用

策略 B:显式删除

测试结束后根据测试标签删除。

优点:

  • 适用范围广
  • 能治理跨服务数据

缺点:

  • 要求所有造数操作都可追踪
  • 删除链路复杂时容易漏

策略 C:TTL 过期回收

给测试数据打标签,定时扫描过期删除。

优点:

  • 对失败中断任务更友好
  • 不依赖用例一定执行 teardown

缺点:

  • 环境会短时间堆积数据
  • 回收窗口设计不好会影响性能

实际落地里,我更推荐:“显式删除 + TTL 兜底”
因为 teardown 很容易因为用例异常、进程退出、网络波动而没执行,单靠显式删除不稳;但只靠 TTL,又会让环境长期脏着。


一个可落地的治理架构

下面这张图可以概括一套中型团队常用的方案。

flowchart TD
    A[测试用例启动] --> B[生成 run_id / case_id / worker_id]
    B --> C[调用测试数据工厂]
    C --> D[写入业务数据]
    C --> E[记录数据元信息到 registry]
    D --> F[执行自动化测试]
    F --> G{测试结束}
    G -->|成功/失败| H[显式回收]
    H --> I[删除业务数据]
    H --> J[删除 registry 记录]
    G -->|teardown 未执行| K[TTL 定时回收器]
    K --> I

这个架构里有一个关键组件:registry(测试数据注册表)

它不一定要很复杂,本质上就是一张记录“谁创建了什么数据”的表,至少存这些字段:

  • resource_type
  • resource_id
  • env
  • run_id
  • case_id
  • created_at
  • expire_at
  • cleanup_status

这样一来,数据回收就不再依赖“猜数据库里哪些是测试数据”,而是按登记信息回收


核心设计拆解

1. 测试数据要带元信息

我建议所有测试数据都带统一元标签:

  • created_by = autotest
  • run_id
  • case_id
  • env
  • expire_at

哪怕业务表不能直接加这些字段,也可以通过扩展字段、备注字段、关联表或者 registry 记录保存。

2. 先治理“可识别”,再治理“可删除”

很多团队一上来就想做自动清理,但连哪些是测试数据都分不清。

正确顺序应该是:

  1. 让测试数据可识别
  2. 让创建动作可登记
  3. 再做删除和过期回收

3. 把“造数能力”沉淀到基础设施,而不是散落在用例里

不要让每个测试工程师各写一套:

  • 创建用户
  • 创建订单
  • 创建优惠券
  • 创建库存

应该提供统一 SDK 或服务:

  • create_user()
  • create_order()
  • create_coupon()

这样做的价值不只是复用,更重要的是:

  • 能统一加唯一前缀
  • 能自动登记 registry
  • 能统一 teardown
  • 能在业务字段变更时只改一处

实战代码(可运行)

下面用一个可运行的 Python + SQLite 示例,演示测试数据构造、隔离、登记、回收的完整流程。

这个例子故意不依赖复杂框架,方便你直接跑通思路。真实项目里可以把它接进 pytest、unittest 或自研测试平台。

1. 初始化数据库

# file: test_data_governance.py
import sqlite3
import time
import uuid
from contextlib import contextmanager

DB_FILE = "autotest_demo.db"

def get_conn():
    return sqlite3.connect(DB_FILE)

def init_db():
    conn = get_conn()
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        username TEXT NOT NULL UNIQUE,
        phone TEXT NOT NULL UNIQUE,
        env TEXT NOT NULL,
        created_by TEXT NOT NULL,
        run_id TEXT NOT NULL,
        case_id TEXT NOT NULL,
        created_at INTEGER NOT NULL
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS orders (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        order_no TEXT NOT NULL UNIQUE,
        user_id INTEGER NOT NULL,
        amount INTEGER NOT NULL,
        env TEXT NOT NULL,
        created_by TEXT NOT NULL,
        run_id TEXT NOT NULL,
        case_id TEXT NOT NULL,
        created_at INTEGER NOT NULL
    )
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS test_data_registry (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        resource_type TEXT NOT NULL,
        resource_id TEXT NOT NULL,
        env TEXT NOT NULL,
        run_id TEXT NOT NULL,
        case_id TEXT NOT NULL,
        created_at INTEGER NOT NULL,
        expire_at INTEGER NOT NULL,
        cleanup_status TEXT NOT NULL DEFAULT 'PENDING'
    )
    """)

    conn.commit()
    conn.close()

2. 定义运行上下文和唯一标识

class TestContext:
    def __init__(self, env: str, case_id: str, ttl_seconds: int = 3600):
        self.env = env
        self.case_id = case_id
        self.run_id = str(uuid.uuid4())[:8]
        self.worker_id = str(uuid.uuid4())[:6]
        self.ttl_seconds = ttl_seconds

    def uniq(self, prefix: str) -> str:
        ts = int(time.time() * 1000)
        return f"{prefix}_{self.env}_{self.run_id}_{self.case_id}_{self.worker_id}_{ts}"

3. 测试数据工厂

class TestDataFactory:
    def __init__(self, conn, ctx: TestContext):
        self.conn = conn
        self.ctx = ctx

    def _register(self, resource_type: str, resource_id: str):
        now = int(time.time())
        expire_at = now + self.ctx.ttl_seconds
        self.conn.execute("""
            INSERT INTO test_data_registry
            (resource_type, resource_id, env, run_id, case_id, created_at, expire_at, cleanup_status)
            VALUES (?, ?, ?, ?, ?, ?, ?, 'PENDING')
        """, (
            resource_type,
            str(resource_id),
            self.ctx.env,
            self.ctx.run_id,
            self.ctx.case_id,
            now,
            expire_at
        ))

    def create_user(self, username=None, phone=None):
        now = int(time.time())
        username = username or self.ctx.uniq("user")
        phone = phone or f"13{str(int(time.time()*1000000))[-9:]}"
        cur = self.conn.cursor()

        cur.execute("""
            INSERT INTO users (username, phone, env, created_by, run_id, case_id, created_at)
            VALUES (?, ?, ?, 'autotest', ?, ?, ?)
        """, (
            username,
            phone,
            self.ctx.env,
            self.ctx.run_id,
            self.ctx.case_id,
            now
        ))
        user_id = cur.lastrowid
        self._register("users", user_id)
        self.conn.commit()
        return {
            "id": user_id,
            "username": username,
            "phone": phone
        }

    def create_order(self, user_id: int, amount: int = 100):
        now = int(time.time())
        order_no = self.ctx.uniq("order")
        cur = self.conn.cursor()

        cur.execute("""
            INSERT INTO orders (order_no, user_id, amount, env, created_by, run_id, case_id, created_at)
            VALUES (?, ?, ?, ?, 'autotest', ?, ?, ?)
        """, (
            order_no,
            user_id,
            amount,
            self.ctx.env,
            self.ctx.run_id,
            self.ctx.case_id,
            now
        ))
        order_id = cur.lastrowid
        self._register("orders", order_id)
        self.conn.commit()
        return {
            "id": order_id,
            "order_no": order_no,
            "amount": amount
        }

4. 回收器

这里按 registry 反向删除业务数据。

class TestDataCleaner:
    DELETE_SQL = {
        "orders": "DELETE FROM orders WHERE id = ?",
        "users": "DELETE FROM users WHERE id = ?"
    }

    def __init__(self, conn):
        self.conn = conn

    def cleanup_by_run(self, env: str, run_id: str):
        cur = self.conn.cursor()
        rows = cur.execute("""
            SELECT id, resource_type, resource_id
            FROM test_data_registry
            WHERE env = ? AND run_id = ? AND cleanup_status = 'PENDING'
            ORDER BY id DESC
        """, (env, run_id)).fetchall()

        for reg_id, resource_type, resource_id in rows:
            delete_sql = self.DELETE_SQL.get(resource_type)
            if not delete_sql:
                continue

            self.conn.execute(delete_sql, (resource_id,))
            self.conn.execute("""
                UPDATE test_data_registry
                SET cleanup_status = 'DONE'
                WHERE id = ?
            """, (reg_id,))

        self.conn.commit()

    def cleanup_expired(self, now_ts: int):
        cur = self.conn.cursor()
        rows = cur.execute("""
            SELECT id, resource_type, resource_id
            FROM test_data_registry
            WHERE expire_at <= ? AND cleanup_status = 'PENDING'
            ORDER BY id DESC
        """, (now_ts,)).fetchall()

        for reg_id, resource_type, resource_id in rows:
            delete_sql = self.DELETE_SQL.get(resource_type)
            if not delete_sql:
                continue

            self.conn.execute(delete_sql, (resource_id,))
            self.conn.execute("""
                UPDATE test_data_registry
                SET cleanup_status = 'DONE'
                WHERE id = ?
            """, (reg_id,))

        self.conn.commit()

5. 用上下文管理器包装测试生命周期

@contextmanager
def managed_test_session(env: str, case_id: str, ttl_seconds: int = 60):
    conn = get_conn()
    ctx = TestContext(env=env, case_id=case_id, ttl_seconds=ttl_seconds)
    factory = TestDataFactory(conn, ctx)
    cleaner = TestDataCleaner(conn)
    try:
        yield ctx, factory
    finally:
        cleaner.cleanup_by_run(env=ctx.env, run_id=ctx.run_id)
        conn.close()

6. 模拟一个自动化测试

def test_create_user_and_order():
    with managed_test_session(env="test", case_id="case_create_order") as (ctx, factory):
        user = factory.create_user()
        order = factory.create_order(user_id=user["id"], amount=200)

        print("Run ID:", ctx.run_id)
        print("User:", user)
        print("Order:", order)

        assert user["id"] > 0
        assert order["amount"] == 200

if __name__ == "__main__":
    init_db()
    test_create_user_and_order()
    print("done")

运行后你会看到:

  • 用例执行时创建了 user 和 order
  • 数据被写入 registry
  • 用例退出后自动按 run_id 回收

这就是一个最小可用的数据治理闭环。


在 pytest 中怎么接

如果你们是 pytest 体系,可以把上下文封装成 fixture。

# conftest.py
import pytest
from test_data_governance import init_db, get_conn, TestContext, TestDataFactory, TestDataCleaner

@pytest.fixture(scope="session", autouse=True)
def setup_db():
    init_db()

@pytest.fixture
def test_data(request):
    conn = get_conn()
    case_id = request.node.name
    ctx = TestContext(env="test", case_id=case_id, ttl_seconds=300)
    factory = TestDataFactory(conn, ctx)
    cleaner = TestDataCleaner(conn)

    yield {
        "ctx": ctx,
        "factory": factory
    }

    cleaner.cleanup_by_run(env=ctx.env, run_id=ctx.run_id)
    conn.close()

测试用例就会简洁很多:

def test_order_flow(test_data):
    factory = test_data["factory"]
    user = factory.create_user()
    order = factory.create_order(user_id=user["id"], amount=500)

    assert order["amount"] == 500

这样做最大的好处是:造数和清理成为基础设施,不再是用例作者额外操心的事。


数据隔离策略的取舍

不同团队环境条件不同,隔离策略没有唯一标准。下面是比较实用的选择方法。

方案对比

方案隔离强度成本适用场景
唯一前缀隔离单环境共享库、接口测试
租户隔离SaaS、多租户业务
Schema 隔离中高数据库支持 schema,CI 并发较高
独立库隔离很高核心回归、稳定性优先
容器/临时环境隔离最高很高端到端验证、发布前验收

我的经验是:

  • 接口自动化:优先用“唯一前缀 + registry + TTL”
  • 并发 CI:优先加“schema/租户级隔离”
  • 核心链路验收:能上临时环境就上临时环境

不要一开始就追求最重的隔离方案。环境成本、维护复杂度、创建速度都要算进去。


测试数据生命周期时序图

下面这张时序图适合拿去和测试平台、后端、DBA 一起对齐流程。

sequenceDiagram
    participant T as TestCase
    participant F as DataFactory
    participant DB as BusinessDB
    participant R as Registry
    participant C as Cleaner

    T->>F: 请求创建用户/订单
    F->>DB: 插入业务数据
    DB-->>F: 返回 resource_id
    F->>R: 记录 resource_type/resource_id/run_id/expire_at
    R-->>F: 登记成功
    F-->>T: 返回测试对象

    T->>T: 执行业务断言

    alt teardown 正常执行
        T->>C: cleanup_by_run(run_id)
        C->>R: 查询待清理资源
        C->>DB: 按逆序删除业务数据
        C->>R: 标记 cleanup_status=DONE
    else teardown 失败或中断
        C->>R: 定时扫描 expire_at
        C->>DB: 删除过期测试数据
        C->>R: 标记 cleanup_status=DONE
    end

常见坑与排查

这部分很重要,因为测试数据治理最容易败在“方案看着合理,落地一地鸡毛”。

坑 1:只清主表,不清关联表

典型现象:

  • 用户删了,但订单、地址、优惠券还在
  • 再次造数时出现脏关联
  • 数据库外键报错

排查思路:

  1. 梳理业务对象依赖关系
  2. 删除时按逆拓扑顺序执行
  3. 不要只靠开发拍脑袋说“删 user 就行了”

如果对象关系复杂,建议明确维护资源依赖图。

classDiagram
    class User {
      +id
      +username
    }
    class Order {
      +id
      +order_no
      +user_id
    }
    class Coupon {
      +id
      +user_id
    }
    User <|-- Order
    User <|-- Coupon

虽然 Mermaid 的 classDiagram 不等于真实 ER 图,但拿来表达“先删谁后删谁”很直观。

坑 2:唯一字段生成规则不稳

典型现象:

  • 并发时用户名冲突
  • 手机号、邮箱、订单号撞库
  • 重跑同一 case 时出现偶发失败

建议:

  • 不要只用秒级时间戳
  • 不要只用 case_id
  • 至少组合:env + run_id + worker_id + ms timestamp

如果业务字段长度有限,要提前设计压缩编码规则。

坑 3:回收逻辑只在测试成功时执行

这个坑我真的踩过。脚本写成:

if test_passed:
    cleanup()

一旦失败,脏数据就留下来,几轮回归后环境彻底不可用了。

正确姿势是:

  • 清理放在 finally
  • 再配一个 TTL 定时兜底
  • 回收任务本身要有监控和告警

坑 4:误删人工联调数据

当环境是共享环境时,这个问题很危险。

避免方式:

  • 只删除 created_by = autotest 的数据
  • 或只删除 registry 中有登记的数据
  • 回收器必须带环境白名单
  • 禁止在 prod / pre-prod 默认执行 destructive cleanup

坑 5:跨服务数据删不干净

比如一个下单用例可能涉及:

  • 用户服务
  • 订单服务
  • 库存服务
  • 营销服务
  • 消息队列
  • 搜索索引
  • 缓存

这时如果你的回收只删主库数据,结果就是:

  • ES 里还能搜到
  • Redis 缓存还在
  • MQ 死信队列里有残留事件

解决办法:

  • registry 记录的 resource_type 不能只面向 DB 表
  • 要能描述“缓存 key”“索引文档”“topic 消息标识”等资源
  • 清理器支持多种 handler,而不只是 SQL delete

安全/性能最佳实践

测试数据治理不是只图“能跑”,还要考虑安全和性能边界。

安全最佳实践

1. 严格环境隔离

自动化账号必须限制权限:

  • 测试环境可写
  • 预发环境谨慎可写
  • 生产环境默认不可写、不可删

如果清理器拿的是高权限账号,一旦连错环境,后果很严重。

2. 测试数据禁止使用真实敏感信息

不要为了“更像真实场景”就把真实手机号、身份证号、银行卡号放进去。

建议:

  • 用脱敏模板数据
  • 使用保留号段
  • 对日志输出做脱敏

3. 回收任务要有保护开关

至少加这些控制项:

  • 环境白名单
  • 最大删除条数限制
  • 干跑模式(dry-run)
  • 删除审计日志

性能最佳实践

1. 造数优先走 API 还是 DB?

这取决于测试目标:

  • 测业务流程:优先走 API,贴近真实链路
  • 准备前置数据:可适当走 DB/内部服务,提升速度
  • 测复杂联动:可以混合,前置走快速通道,核心操作走 API

不要教条地说“测试数据必须都通过 UI/API 创建”,那样构造前置条件会很慢。

2. 批量回收优于逐条回收

如果单次回归造了几万条数据,逐条删会很慢。

更好的做法:

  • 先按 run_id 批量查询
  • 分批 delete
  • 为 registry 的 env/run_id/expire_at/cleanup_status 建索引

例如:

CREATE INDEX IF NOT EXISTS idx_registry_run
ON test_data_registry(env, run_id, cleanup_status);

CREATE INDEX IF NOT EXISTS idx_registry_expire
ON test_data_registry(expire_at, cleanup_status);

3. 回收不要和高峰压测抢资源

定时 TTL 清理最好避开:

  • 批量回归高峰
  • 大促演练
  • 压测窗口

否则回收 SQL 和业务测试互相争抢资源,会让结果失真。

4. 为数据工厂做缓存和模板化

不是所有数据都要从零创建。比如地区、商品类目、基础配置这些相对稳定的公共数据:

  • 可以预置
  • 可以按只读基线管理
  • 可以版本化

治理的重点是易变测试对象,而不是把一切都动态生成。


一套推荐的落地步骤

如果你们团队现在还没有系统化治理,我建议按下面顺序推进,成本最可控。

第一步:统一标识规范

先定清楚所有测试数据命名规范:

  • 前缀
  • run_id
  • case_id
  • env
  • created_by

这一步最简单,但收益立刻可见。

第二步:建设数据工厂

挑 3~5 个最常用业务对象先做:

  • 用户
  • 订单
  • 商品
  • 优惠券
  • 地址

不要试图一次性覆盖全部模型。

第三步:引入 registry

把“创建了什么资源”记录下来。哪怕先只记数据库主键,也比完全不可追踪强。

第四步:实现 teardown 回收

先做到按 run_id 清理,覆盖主流程。

第五步:补 TTL 兜底

解决 teardown 失效、中断退出、异常终止的问题。

第六步:扩展到跨服务资源

把缓存、索引、对象存储、队列消息等资源纳入治理。


边界条件:什么时候不值得做太重的治理

不是所有团队都要立刻上完整体系。以下情况可以先轻量处理:

  • 用例数量少,且串行执行
  • 环境是一次性临时库,用后即销毁
  • 测试对象很简单,没有复杂关联
  • 数据量小,环境重建成本低

这时候你可以先做:

  • 唯一命名
  • fixture 统一造数
  • 简单 teardown
  • 每日重建环境

但只要出现下面这些信号,就说明该上治理了:

  • 并发执行变多
  • 共用环境冲突频发
  • 回归结果不稳定
  • 数据清理靠人工
  • 数据构造代码到处复制

总结

测试自动化要跑得稳,测试数据不能再被当成“脚本里的几行参数”。
真正可持续的方案,一定是把它当成一个完整生命周期系统来治理:

  • 构造:通过数据工厂统一生成
  • 隔离:通过 run_id、租户、schema 或环境边界实现互不污染
  • 回收:通过 registry、teardown 和 TTL 形成闭环

如果你现在只能做一件事,我最建议先做这三条:

  1. 所有测试数据统一带唯一标识和 created_by 标签
  2. 把常用造数逻辑沉淀成工厂,而不是散落在用例里
  3. 建立 registry,并让回收基于 registry,而不是猜测删除

这三步做完,自动化测试的稳定性通常就会明显上一个台阶。

测试数据治理的目标,不是“绝对完美”,而是让环境在持续运行下仍然可预测、可恢复、可扩展。这才是自动化测试真正能规模化落地的基础。


分享到:

上一篇
《区块链数据索引实战:从智能合约事件到高性能查询接口的设计与实现》
下一篇
《Web3 中级实战:基于 EIP-712 与钱包签名实现去中心化登录(SIWE)完整方案》