跳转到内容
123xiao | 无名键客

《自动化测试中的测试数据治理实践:构建稳定、可复用的中级项目数据方案》

字数: 0 阅读时长: 1 分钟

自动化测试中的测试数据治理实践:构建稳定、可复用的中级项目数据方案

自动化测试做久了,大家往往会发现:真正不稳定的,很多时候不是脚本本身,而是数据

我在中级规模项目里最常见到的情况是:

  • 用例能跑,但第二次跑就挂;
  • 测试环境一重置,半数脚本全失效;
  • 同一套接口测试,在开发机、CI、预发环境表现不一致;
  • 多人并发跑自动化时,账号、订单、库存互相污染。

这些问题表面上看像“环境偶发”“接口不稳定”,本质上很多都指向同一件事:测试数据没有被治理,只是被堆着用

这篇文章我不讲特别重的平台化方案,而是聚焦一个中级项目最值得落地的目标:构建一套稳定、可复用、能支持并发执行的测试数据方案


背景与问题

在很多团队里,测试数据的来源通常有三种:

  1. 手工在数据库里插入几条记录
  2. 在脚本里写死一批账号、商品、订单号
  3. 依赖前置接口动态创建数据

一开始这样做确实快,但随着项目增长,问题会迅速放大。

常见失控表现

1. 数据耦合用例,导致复用失败

比如某条用例写死“账号 A 必须是已实名状态”,另一条用例又把账号 A 改成未实名。单独跑都没问题,串起来就互相打架。

2. 数据生命周期不清晰

有些数据应该“一次一用”,有些应该“可长期复用”,还有些应该“执行后自动清理”。但如果没人定义规则,最后就会全部混在一起。

3. 环境之间不一致

测试环境、集成环境、预发环境的数据结构相同,但基础字典、权限模型、第三方回调状态并不完全一致。脚本靠“猜数据”时,环境切换就容易翻车。

4. 并发执行时相互污染

CI 一旦并发跑 10 份任务,账号锁定、库存占用、唯一键冲突、脏数据残留会一起冒出来。

为什么中级项目更需要治理

小项目数据少,靠人工还能兜住;超大项目一般已经开始做测试平台和数据中台。最容易卡住的,其实是中级项目

  • 用例量开始变多
  • 自动化被接入 CI
  • 多人同时维护脚本
  • 环境不止一个
  • 业务对象之间已有复杂依赖

这时候如果还没有一套明确的数据治理方案,测试脚本会越来越像“碰运气工程”。


核心原理

测试数据治理不是“准备一堆数据”这么简单,它更像是给数据建立规则。

我通常会把它拆成四个原则:分类、生成、隔离、回收

1. 分类:先明确数据是什么

不是所有测试数据都应该同一种管理方式。最实用的分类方法是按“稳定性”和“生命周期”分层。

数据分层建议

数据类型例子特点适合策略
基础主数据地区、币种、固定商品模板变化少、全局共享环境预置,长期复用
场景模板数据已实名用户、已下单用户、待支付订单能支持多条场景按模板生成或定时重建
临时运行数据本次测试创建的订单、优惠券、任务单生命周期短用例执行时创建,执行后清理
受限稀缺数据实名账号、外部渠道授权账号创建成本高池化管理,加锁分配

这个分类非常关键。
如果把所有数据都做成“执行时实时创建”,脚本会很慢;
如果把所有数据都做成“共享固定数据”,脚本又会互相污染。

2. 生成:优先用“可描述”代替“硬编码”

稳定的数据方案,核心不是存很多数据,而是能按规则生成数据

例如,不要在脚本里写:

username = "test_user_001"

而应该写成:

username = generator.new_user(prefix="pay", realname=False)

前者是“引用一个可能早就脏掉的值”,后者是“声明我要什么样的数据”。

这背后的思想是:
测试关注的是业务状态,而不是具体 ID。

3. 隔离:让数据归属清晰

数据隔离至少要做到三层:

  • 环境隔离:不同环境数据绝不混用
  • 任务隔离:不同 CI 任务有独立标识
  • 用例隔离:高风险场景尽量独立创建数据

最简单有效的做法,是给每次运行生成唯一 run_id,并将其写入测试数据标识中,例如:

  • 用户名:autotest_{run_id}_{index}
  • 订单备注:created_by=autotest, run_id=xxx
  • 数据库记录扩展字段:test_tag

后续清理、排查、追踪都靠这个标识。

4. 回收:没有清理,就没有治理

很多团队只关注“怎么造数据”,不关注“怎么回收数据”。结果就是环境越来越脏,脚本越来越脆。

回收方式一般有三种:

  1. 事务回滚:适合单服务、本地集成测试
  2. 接口/脚本删除:适合集成测试
  3. 定时清理归档:适合无法即时删除的数据

中级项目里最务实的方案是:
执行后尽量主动清理 + 每天兜底定时清理。


一个适合中级项目的落地方案

这里给出一个我觉得足够实用、又不至于过度设计的方案。

方案目标

  • 支持接口/UI 自动化共用数据能力
  • 能在本地和 CI 中稳定执行
  • 支持并发运行
  • 可复用已有基础数据
  • 尽量不依赖重型测试平台

方案组成

  1. 基础数据模板库
    保存环境内长期可复用的数据描述,比如商品模板、组织架构、地区信息。

  2. 数据工厂(Data Factory)
    负责按条件创建测试对象,比如创建用户、订单、优惠券。

  3. 数据池(Data Pool)
    管理稀缺资源,如实名账号、外部授权账号、固定商户号。

  4. 运行标识与清理器
    每次执行带 run_id,执行结束后按标识批量清理。

下面这张图可以把关系看清楚。

flowchart LR
    A[自动化用例] --> B[数据工厂 Data Factory]
    B --> C[基础模板库]
    B --> D[数据池 Data Pool]
    B --> E[业务接口/DB 创建数据]
    A --> F[运行上下文 run_id]
    F --> B
    F --> G[清理器 Cleaner]
    G --> E

设计思路:从“找数据”变成“申请数据”

很多失败的自动化项目,脚本里到处充满这样的逻辑:

  • 去数据库找一个状态为 available 的账号
  • 找一个库存大于 0 的商品
  • 找一个未支付订单继续测试

这类“找现成数据”的方式,早期很方便,但后期极不稳定。因为你根本不知道这个数据下一秒会不会被别人改掉。

更稳妥的方式是:

  • 能自己创建,就自己创建;
  • 不能自己创建,就从池中申请;
  • 申请到的数据必须带锁;
  • 使用完要释放或标记。

这就是“申请数据”的思路。

sequenceDiagram
    participant T as 测试用例
    participant F as 数据工厂
    participant P as 数据池
    participant S as 业务系统
    participant C as 清理器

    T->>F: 请求“已实名可下单用户”
    F->>P: 查询是否有可复用稀缺账号
    alt 数据池存在
        P-->>F: 返回并锁定账号
    else 数据池不存在
        F->>S: 创建用户并补齐实名状态
        S-->>F: 返回用户ID
    end
    F-->>T: 返回场景数据
    T->>S: 执行业务测试
    T->>C: 提交清理清单
    C->>S: 删除/关闭本次临时数据

实战代码(可运行)

下面用 Python 演示一个轻量可运行版本。
它不依赖真实业务系统,而是模拟一个中级项目里最核心的几件事:

  • 生成 run_id
  • 通过数据工厂创建用户和订单
  • 对稀缺账号做池化加锁
  • 执行后按 run_id 清理数据

你可以直接保存为 test_data_governance_demo.py 运行。

import sqlite3
import time
import uuid
from contextlib import contextmanager

DB_FILE = "test_data_demo.db"


def now_ts():
    return int(time.time())


def make_run_id():
    return uuid.uuid4().hex[:8]


class DB:
    def __init__(self, db_file=DB_FILE):
        self.conn = sqlite3.connect(db_file)
        self.conn.row_factory = sqlite3.Row

    def init_schema(self):
        cur = self.conn.cursor()
        cur.executescript("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT UNIQUE NOT NULL,
            realname_status INTEGER NOT NULL DEFAULT 0,
            run_id TEXT,
            created_at INTEGER NOT NULL
        );

        CREATE TABLE IF NOT EXISTS orders (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER NOT NULL,
            amount INTEGER NOT NULL,
            status TEXT NOT NULL,
            run_id TEXT,
            created_at INTEGER NOT NULL,
            FOREIGN KEY(user_id) REFERENCES users(id)
        );

        CREATE TABLE IF NOT EXISTS account_pool (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT UNIQUE NOT NULL,
            in_use INTEGER NOT NULL DEFAULT 0,
            locked_by_run_id TEXT,
            created_at INTEGER NOT NULL
        );
        """)
        self.conn.commit()

    def execute(self, sql, params=()):
        cur = self.conn.cursor()
        cur.execute(sql, params)
        self.conn.commit()
        return cur

    def query_one(self, sql, params=()):
        cur = self.conn.cursor()
        cur.execute(sql, params)
        return cur.fetchone()

    def query_all(self, sql, params=()):
        cur = self.conn.cursor()
        cur.execute(sql, params)
        return cur.fetchall()

    def close(self):
        self.conn.close()


class DataFactory:
    def __init__(self, db: DB, run_id: str):
        self.db = db
        self.run_id = run_id

    def new_user(self, prefix="autotest", realname=False):
        username = f"{prefix}_{self.run_id}_{uuid.uuid4().hex[:6]}"
        self.db.execute(
            "INSERT INTO users(username, realname_status, run_id, created_at) VALUES (?, ?, ?, ?)",
            (username, 1 if realname else 0, self.run_id, now_ts())
        )
        row = self.db.query_one(
            "SELECT * FROM users WHERE username = ?",
            (username,)
        )
        return dict(row)

    def new_order(self, user_id: int, amount=100, status="CREATED"):
        self.db.execute(
            "INSERT INTO orders(user_id, amount, status, run_id, created_at) VALUES (?, ?, ?, ?, ?)",
            (user_id, amount, status, self.run_id, now_ts())
        )
        row = self.db.query_one(
            "SELECT * FROM orders WHERE rowid = last_insert_rowid()"
        )
        return dict(row)

    def acquire_pooled_account(self):
        row = self.db.query_one(
            "SELECT * FROM account_pool WHERE in_use = 0 LIMIT 1"
        )
        if row:
            self.db.execute(
                "UPDATE account_pool SET in_use = 1, locked_by_run_id = ? WHERE id = ? AND in_use = 0",
                (self.run_id, row["id"])
            )
            locked = self.db.query_one(
                "SELECT * FROM account_pool WHERE id = ?",
                (row["id"],)
            )
            if locked["locked_by_run_id"] == self.run_id:
                return dict(locked)

        username = f"pooled_{self.run_id}_{uuid.uuid4().hex[:6]}"
        self.db.execute(
            "INSERT INTO account_pool(username, in_use, locked_by_run_id, created_at) VALUES (?, 1, ?, ?)",
            (username, self.run_id, now_ts())
        )
        created = self.db.query_one(
            "SELECT * FROM account_pool WHERE username = ?",
            (username,)
        )
        return dict(created)


class Cleaner:
    def __init__(self, db: DB, run_id: str):
        self.db = db
        self.run_id = run_id

    def cleanup(self):
        self.db.execute(
            "DELETE FROM orders WHERE run_id = ?",
            (self.run_id,)
        )
        self.db.execute(
            "DELETE FROM users WHERE run_id = ?",
            (self.run_id,)
        )
        self.db.execute(
            "UPDATE account_pool SET in_use = 0, locked_by_run_id = NULL WHERE locked_by_run_id = ?",
            (self.run_id,)
        )


@contextmanager
def managed_run():
    db = DB()
    db.init_schema()
    run_id = make_run_id()
    factory = DataFactory(db, run_id)
    cleaner = Cleaner(db, run_id)
    try:
        print(f"[RUN START] run_id={run_id}")
        yield factory, run_id
    finally:
        cleaner.cleanup()
        print(f"[RUN END] cleaned run_id={run_id}")
        db.close()


def test_create_order_flow():
    with managed_run() as (factory, run_id):
        user = factory.new_user(prefix="pay", realname=True)
        order = factory.new_order(user_id=user["id"], amount=199, status="CREATED")
        pooled = factory.acquire_pooled_account()

        print("run_id =", run_id)
        print("user   =", user)
        print("order  =", order)
        print("pool   =", pooled)

        assert user["realname_status"] == 1
        assert order["amount"] == 199
        assert pooled["locked_by_run_id"] == run_id


if __name__ == "__main__":
    test_create_order_flow()

这段代码体现了什么

虽然它是个简化版 demo,但已经体现了几个关键治理思想:

  • 测试数据带 run_id
  • 临时数据和池化数据分开管理
  • 通过工厂创建,而不是在脚本里写死
  • 执行结束自动清理

如果你是用 pytest,完全可以把 managed_run() 改造成 fixture。


在 pytest 中的接法

如果你的自动化框架本身是 pytest,可以这样接入。

import pytest
from test_data_governance_demo import DB, DataFactory, Cleaner, make_run_id


@pytest.fixture
def data_factory():
    db = DB()
    db.init_schema()
    run_id = make_run_id()
    factory = DataFactory(db, run_id)
    cleaner = Cleaner(db, run_id)
    yield factory
    cleaner.cleanup()
    db.close()


def test_submit_order(data_factory):
    user = data_factory.new_user(prefix="submit", realname=True)
    order = data_factory.new_order(user_id=user["id"], amount=88, status="CREATED")

    assert user["username"].startswith("submit_")
    assert order["status"] == "CREATED"

这样做有两个直接好处:

  1. 用例关注“我要什么数据”,而不是“数据怎么准备”
  2. 数据清理跟着 fixture 生命周期走,不容易忘

数据模型建议:最少要有哪些字段

真正落地时,不管你是存数据库、JSON、还是平台服务,建议给测试数据对象统一补上这些元信息:

字段作用
run_id追踪是哪次执行生成的
created_by标识来源,如 autotest
scenario属于哪个测试场景
expire_at过期时间,便于兜底清理
env防止跨环境误用
status标记可用、占用、失效

如果是池化数据,还建议增加:

字段作用
in_use是否被占用
locked_by当前被谁占用
last_used_at最近使用时间
reuse_count复用次数,辅助淘汰

常见坑与排查

这部分我想写得更接地气一点,因为很多问题真不是“不会写代码”,而是治理边界没定清楚

坑 1:用例依赖共享账号,串行没事,并发就挂

现象

本地单跑通过,CI 并发跑时出现:

  • 用户已锁定
  • 账号状态不一致
  • 下单金额被别的用例修改

根因

多个用例复用了同一个共享数据,但没有锁机制。

排查方法

  • 检查失败用例是否都用同一批账号
  • 查询数据表中是否有多个任务同时修改同一个对象
  • 给请求日志加 run_id,看数据是否被交叉使用

处理建议

  • 稀缺资源必须池化并加锁
  • 普通场景尽量自建数据
  • 不能并发复用的模板,改为“一次一份”

坑 2:清理逻辑只在成功路径执行

现象

测试失败后残留大量脏数据,第二天再跑成功率明显下降。

根因

清理代码写在 assert 之后,或者只在 happy path 调用。

排查方法

  • 检查清理逻辑是否放在 finally / fixture teardown
  • 看失败任务后数据库里是否留有同批 run_id 数据

处理建议

清理逻辑必须是“兜底执行”,别放在业务步骤后面顺手写。


坑 3:通过“查询现有数据”凑场景

现象

脚本经常有这种逻辑:

SELECT * FROM users WHERE status = 'ACTIVE' LIMIT 1;

根因

这是典型的“碰运气取数”。今天查出来的数据能用,不代表明天也能用。

处理建议

把“查询”改成“声明需求”:

  • 需要活跃用户,就创建活跃用户
  • 需要待支付订单,就创建待支付订单
  • 只有受限资源才从池里拿

坑 4:只做数据创建,不做状态校验

现象

创建接口返回成功,但后续业务步骤仍然失败。

根因

有些系统是异步落库、异步审批、异步索引。你以为数据“创建完成”,实际上只是“提交成功”。

排查方法

创建后补校验:

  • 查 DB
  • 查状态接口
  • 轮询直到目标状态

处理建议

数据工厂不要只返回创建结果,还要负责确保数据进入可用状态


坑 5:环境配置和数据模板强绑定

现象

同一份脚本换个环境就失败,因为模板商品、组织 ID、渠道号都变了。

根因

把环境特有数据直接写死在脚本里。

处理建议

把环境差异抽到配置层,比如:

test:
  default_shop_id: 1001
  default_channel_code: CH_TEST
staging:
  default_shop_id: 3008
  default_channel_code: CH_STG

而脚本里只引用逻辑名称,不直接写环境 ID。


一套实用的排查路径

当自动化失败时,我建议优先按下面顺序排查,而不是一上来就怀疑接口。

flowchart TD
    A[自动化失败] --> B{是否与数据相关}
    B -->|是| C[检查 run_id 与数据归属]
    C --> D{数据是否存在}
    D -->|否| E[创建阶段失败/异步未完成]
    D -->|是| F{状态是否符合预期}
    F -->|否| G[被其他任务污染/模板失效]
    F -->|是| H{是否已被清理}
    H -->|是| I[清理时机过早]
    H -->|否| J[转向接口或业务逻辑排查]
    B -->|否| J

这个顺序的好处是:
你能很快判断问题到底在“造数”“占用”“状态流转”还是“清理时机”。


安全/性能最佳实践

测试数据治理除了稳定性,还会碰到两个很现实的问题:数据安全执行效率

安全最佳实践

1. 不要在测试数据里使用真实敏感信息

即便是测试环境,也不要直接放真实手机号、身份证、银行卡号。
建议统一使用脱敏或专用号段。

2. 测试数据权限最小化

自动化账号只保留必要权限,不要顺手给管理员超级权限。否则一旦脚本误操作,影响范围会很大。

3. 对清理脚本增加环境保护

我见过最危险的事故之一,就是测试清理脚本误连到非测试环境。
最低限度也要做环境白名单校验。

例如:

def safe_cleanup(env_name: str):
    allowed = {"test", "staging"}
    if env_name not in allowed:
        raise RuntimeError(f"cleanup blocked for env={env_name}")
    print(f"cleanup allowed for {env_name}")

4. 日志不要打印敏感字段

如果日志里要输出测试数据,至少要对手机号、token、证件号做脱敏。


性能最佳实践

1. 不要所有数据都实时创建

实时创建很稳,但也很慢。
建议按下面策略混用:

  • 基础主数据:预置
  • 复杂场景数据:模板化
  • 高风险数据:实时创建
  • 稀缺资源:池化复用

这是中级项目里性价比最高的组合。

2. 尽量批量造数、批量清理

如果每天都要跑大量回归,逐条创建和逐条删除会拖慢执行。
能批处理就批处理。

3. 异步状态用轮询,不要硬等

比起 sleep(10),更推荐有超时上限的轮询。

import time

def wait_until(fetch_status, expected, timeout=10, interval=0.5):
    start = time.time()
    while time.time() - start < timeout:
        if fetch_status() == expected:
            return True
        time.sleep(interval)
    return False

4. 给数据工厂做缓存,但缓存要分层

例如地区、商品模板这些可以缓存;
但临时订单、临时用户绝不能缓存复用。


方案取舍:什么时候不用做太重

测试数据治理不是越复杂越好。中级项目最怕的是“为治理而治理”。

适合当前方案的场景

  • 有几十到几百条自动化用例
  • 已接入 CI,存在并发执行
  • 多人维护脚本
  • 测试环境经常被复用
  • 业务存在账号、订单、库存等状态对象

暂时不必做太重的平台化场景

  • 用例量很少
  • 团队成员很少
  • 基本没有并发执行
  • 数据对象简单,且可完全重建环境

这时候一个轻量 Data Factory + run_id 清理机制,往往就够了。
没必要一上来就建设完整数据平台。

什么时候该升级到平台化

如果你开始出现这些信号,就该往平台走了:

  • 稀缺数据种类越来越多
  • 环境数量增加
  • 清理规则复杂
  • 需要可视化查看数据占用情况
  • 自动化框架不止一种(接口/UI/性能共用)

落地步骤建议

如果你准备在团队里推动这件事,我建议不要一次性大改,而是分三步走。

第一步:统一标识

先给所有新建测试数据加上:

  • run_id
  • created_by
  • env

只做这一步,定位问题和清理难度都会立刻下降。

第二步:抽出数据工厂

把脚本里散落的“创建账号”“创建订单”“准备商品”提成统一方法。
先不追求平台化,先把重复逻辑收拢。

第三步:引入数据池和清理器

针对稀缺资源做池化,对临时数据做自动清理。
这一步做完,自动化稳定性通常会有明显提升。


总结

自动化测试里的测试数据治理,核心不是“准备更多数据”,而是建立一套可描述、可生成、可隔离、可回收的机制。

如果把这件事压缩成几条最可执行的建议,我会给你这份清单:

  1. 先分类数据:基础主数据、模板数据、临时数据、稀缺数据分开管理
  2. 优先用数据工厂:脚本描述“我要什么状态”,不要写死具体值
  3. 所有测试数据带 run_id:这是追踪和清理的基础
  4. 稀缺资源做池化加锁:别再靠共享账号硬撑并发
  5. 清理逻辑必须兜底执行:写进 finally 或 fixture teardown
  6. 环境差异放配置层:别把环境 ID 写进用例
  7. 别过度设计:中级项目先把稳定性和复用做好,再考虑平台化

最后给一个边界判断:
如果你的项目还在“脚本偶尔跑一下”的阶段,先别把体系搞太大;
但如果你的自动化已经进入 CI、多人协作、并发执行阶段,那么测试数据治理就不再是“优化项”,而是稳定交付的基础设施

很多自动化问题,最后都不是脚本写得不够聪明,而是数据没有被当成一等公民去管理。
一旦你把这件事做对,脚本稳定性、维护效率、排障速度,都会一起上来。


分享到:

下一篇
《Web逆向实战:中级开发者如何定位并复现前端签名算法实现接口自动化调用》