自动化测试中的测试数据治理实践:构建稳定、可复用的中级项目数据方案
自动化测试做久了,大家往往会发现:真正不稳定的,很多时候不是脚本本身,而是数据。
我在中级规模项目里最常见到的情况是:
- 用例能跑,但第二次跑就挂;
- 测试环境一重置,半数脚本全失效;
- 同一套接口测试,在开发机、CI、预发环境表现不一致;
- 多人并发跑自动化时,账号、订单、库存互相污染。
这些问题表面上看像“环境偶发”“接口不稳定”,本质上很多都指向同一件事:测试数据没有被治理,只是被堆着用。
这篇文章我不讲特别重的平台化方案,而是聚焦一个中级项目最值得落地的目标:构建一套稳定、可复用、能支持并发执行的测试数据方案。
背景与问题
在很多团队里,测试数据的来源通常有三种:
- 手工在数据库里插入几条记录
- 在脚本里写死一批账号、商品、订单号
- 依赖前置接口动态创建数据
一开始这样做确实快,但随着项目增长,问题会迅速放大。
常见失控表现
1. 数据耦合用例,导致复用失败
比如某条用例写死“账号 A 必须是已实名状态”,另一条用例又把账号 A 改成未实名。单独跑都没问题,串起来就互相打架。
2. 数据生命周期不清晰
有些数据应该“一次一用”,有些应该“可长期复用”,还有些应该“执行后自动清理”。但如果没人定义规则,最后就会全部混在一起。
3. 环境之间不一致
测试环境、集成环境、预发环境的数据结构相同,但基础字典、权限模型、第三方回调状态并不完全一致。脚本靠“猜数据”时,环境切换就容易翻车。
4. 并发执行时相互污染
CI 一旦并发跑 10 份任务,账号锁定、库存占用、唯一键冲突、脏数据残留会一起冒出来。
为什么中级项目更需要治理
小项目数据少,靠人工还能兜住;超大项目一般已经开始做测试平台和数据中台。最容易卡住的,其实是中级项目:
- 用例量开始变多
- 自动化被接入 CI
- 多人同时维护脚本
- 环境不止一个
- 业务对象之间已有复杂依赖
这时候如果还没有一套明确的数据治理方案,测试脚本会越来越像“碰运气工程”。
核心原理
测试数据治理不是“准备一堆数据”这么简单,它更像是给数据建立规则。
我通常会把它拆成四个原则:分类、生成、隔离、回收。
1. 分类:先明确数据是什么
不是所有测试数据都应该同一种管理方式。最实用的分类方法是按“稳定性”和“生命周期”分层。
数据分层建议
| 数据类型 | 例子 | 特点 | 适合策略 |
|---|---|---|---|
| 基础主数据 | 地区、币种、固定商品模板 | 变化少、全局共享 | 环境预置,长期复用 |
| 场景模板数据 | 已实名用户、已下单用户、待支付订单 | 能支持多条场景 | 按模板生成或定时重建 |
| 临时运行数据 | 本次测试创建的订单、优惠券、任务单 | 生命周期短 | 用例执行时创建,执行后清理 |
| 受限稀缺数据 | 实名账号、外部渠道授权账号 | 创建成本高 | 池化管理,加锁分配 |
这个分类非常关键。
如果把所有数据都做成“执行时实时创建”,脚本会很慢;
如果把所有数据都做成“共享固定数据”,脚本又会互相污染。
2. 生成:优先用“可描述”代替“硬编码”
稳定的数据方案,核心不是存很多数据,而是能按规则生成数据。
例如,不要在脚本里写:
username = "test_user_001"
而应该写成:
username = generator.new_user(prefix="pay", realname=False)
前者是“引用一个可能早就脏掉的值”,后者是“声明我要什么样的数据”。
这背后的思想是:
测试关注的是业务状态,而不是具体 ID。
3. 隔离:让数据归属清晰
数据隔离至少要做到三层:
- 环境隔离:不同环境数据绝不混用
- 任务隔离:不同 CI 任务有独立标识
- 用例隔离:高风险场景尽量独立创建数据
最简单有效的做法,是给每次运行生成唯一 run_id,并将其写入测试数据标识中,例如:
- 用户名:
autotest_{run_id}_{index} - 订单备注:
created_by=autotest, run_id=xxx - 数据库记录扩展字段:
test_tag
后续清理、排查、追踪都靠这个标识。
4. 回收:没有清理,就没有治理
很多团队只关注“怎么造数据”,不关注“怎么回收数据”。结果就是环境越来越脏,脚本越来越脆。
回收方式一般有三种:
- 事务回滚:适合单服务、本地集成测试
- 接口/脚本删除:适合集成测试
- 定时清理归档:适合无法即时删除的数据
中级项目里最务实的方案是:
执行后尽量主动清理 + 每天兜底定时清理。
一个适合中级项目的落地方案
这里给出一个我觉得足够实用、又不至于过度设计的方案。
方案目标
- 支持接口/UI 自动化共用数据能力
- 能在本地和 CI 中稳定执行
- 支持并发运行
- 可复用已有基础数据
- 尽量不依赖重型测试平台
方案组成
-
基础数据模板库
保存环境内长期可复用的数据描述,比如商品模板、组织架构、地区信息。 -
数据工厂(Data Factory)
负责按条件创建测试对象,比如创建用户、订单、优惠券。 -
数据池(Data Pool)
管理稀缺资源,如实名账号、外部授权账号、固定商户号。 -
运行标识与清理器
每次执行带run_id,执行结束后按标识批量清理。
下面这张图可以把关系看清楚。
flowchart LR
A[自动化用例] --> B[数据工厂 Data Factory]
B --> C[基础模板库]
B --> D[数据池 Data Pool]
B --> E[业务接口/DB 创建数据]
A --> F[运行上下文 run_id]
F --> B
F --> G[清理器 Cleaner]
G --> E
设计思路:从“找数据”变成“申请数据”
很多失败的自动化项目,脚本里到处充满这样的逻辑:
- 去数据库找一个状态为
available的账号 - 找一个库存大于 0 的商品
- 找一个未支付订单继续测试
这类“找现成数据”的方式,早期很方便,但后期极不稳定。因为你根本不知道这个数据下一秒会不会被别人改掉。
更稳妥的方式是:
- 能自己创建,就自己创建;
- 不能自己创建,就从池中申请;
- 申请到的数据必须带锁;
- 使用完要释放或标记。
这就是“申请数据”的思路。
sequenceDiagram
participant T as 测试用例
participant F as 数据工厂
participant P as 数据池
participant S as 业务系统
participant C as 清理器
T->>F: 请求“已实名可下单用户”
F->>P: 查询是否有可复用稀缺账号
alt 数据池存在
P-->>F: 返回并锁定账号
else 数据池不存在
F->>S: 创建用户并补齐实名状态
S-->>F: 返回用户ID
end
F-->>T: 返回场景数据
T->>S: 执行业务测试
T->>C: 提交清理清单
C->>S: 删除/关闭本次临时数据
实战代码(可运行)
下面用 Python 演示一个轻量可运行版本。
它不依赖真实业务系统,而是模拟一个中级项目里最核心的几件事:
- 生成
run_id - 通过数据工厂创建用户和订单
- 对稀缺账号做池化加锁
- 执行后按
run_id清理数据
你可以直接保存为 test_data_governance_demo.py 运行。
import sqlite3
import time
import uuid
from contextlib import contextmanager
DB_FILE = "test_data_demo.db"
def now_ts():
return int(time.time())
def make_run_id():
return uuid.uuid4().hex[:8]
class DB:
def __init__(self, db_file=DB_FILE):
self.conn = sqlite3.connect(db_file)
self.conn.row_factory = sqlite3.Row
def init_schema(self):
cur = self.conn.cursor()
cur.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
realname_status INTEGER NOT NULL DEFAULT 0,
run_id TEXT,
created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
amount INTEGER NOT NULL,
status TEXT NOT NULL,
run_id TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS account_pool (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
in_use INTEGER NOT NULL DEFAULT 0,
locked_by_run_id TEXT,
created_at INTEGER NOT NULL
);
""")
self.conn.commit()
def execute(self, sql, params=()):
cur = self.conn.cursor()
cur.execute(sql, params)
self.conn.commit()
return cur
def query_one(self, sql, params=()):
cur = self.conn.cursor()
cur.execute(sql, params)
return cur.fetchone()
def query_all(self, sql, params=()):
cur = self.conn.cursor()
cur.execute(sql, params)
return cur.fetchall()
def close(self):
self.conn.close()
class DataFactory:
def __init__(self, db: DB, run_id: str):
self.db = db
self.run_id = run_id
def new_user(self, prefix="autotest", realname=False):
username = f"{prefix}_{self.run_id}_{uuid.uuid4().hex[:6]}"
self.db.execute(
"INSERT INTO users(username, realname_status, run_id, created_at) VALUES (?, ?, ?, ?)",
(username, 1 if realname else 0, self.run_id, now_ts())
)
row = self.db.query_one(
"SELECT * FROM users WHERE username = ?",
(username,)
)
return dict(row)
def new_order(self, user_id: int, amount=100, status="CREATED"):
self.db.execute(
"INSERT INTO orders(user_id, amount, status, run_id, created_at) VALUES (?, ?, ?, ?, ?)",
(user_id, amount, status, self.run_id, now_ts())
)
row = self.db.query_one(
"SELECT * FROM orders WHERE rowid = last_insert_rowid()"
)
return dict(row)
def acquire_pooled_account(self):
row = self.db.query_one(
"SELECT * FROM account_pool WHERE in_use = 0 LIMIT 1"
)
if row:
self.db.execute(
"UPDATE account_pool SET in_use = 1, locked_by_run_id = ? WHERE id = ? AND in_use = 0",
(self.run_id, row["id"])
)
locked = self.db.query_one(
"SELECT * FROM account_pool WHERE id = ?",
(row["id"],)
)
if locked["locked_by_run_id"] == self.run_id:
return dict(locked)
username = f"pooled_{self.run_id}_{uuid.uuid4().hex[:6]}"
self.db.execute(
"INSERT INTO account_pool(username, in_use, locked_by_run_id, created_at) VALUES (?, 1, ?, ?)",
(username, self.run_id, now_ts())
)
created = self.db.query_one(
"SELECT * FROM account_pool WHERE username = ?",
(username,)
)
return dict(created)
class Cleaner:
def __init__(self, db: DB, run_id: str):
self.db = db
self.run_id = run_id
def cleanup(self):
self.db.execute(
"DELETE FROM orders WHERE run_id = ?",
(self.run_id,)
)
self.db.execute(
"DELETE FROM users WHERE run_id = ?",
(self.run_id,)
)
self.db.execute(
"UPDATE account_pool SET in_use = 0, locked_by_run_id = NULL WHERE locked_by_run_id = ?",
(self.run_id,)
)
@contextmanager
def managed_run():
db = DB()
db.init_schema()
run_id = make_run_id()
factory = DataFactory(db, run_id)
cleaner = Cleaner(db, run_id)
try:
print(f"[RUN START] run_id={run_id}")
yield factory, run_id
finally:
cleaner.cleanup()
print(f"[RUN END] cleaned run_id={run_id}")
db.close()
def test_create_order_flow():
with managed_run() as (factory, run_id):
user = factory.new_user(prefix="pay", realname=True)
order = factory.new_order(user_id=user["id"], amount=199, status="CREATED")
pooled = factory.acquire_pooled_account()
print("run_id =", run_id)
print("user =", user)
print("order =", order)
print("pool =", pooled)
assert user["realname_status"] == 1
assert order["amount"] == 199
assert pooled["locked_by_run_id"] == run_id
if __name__ == "__main__":
test_create_order_flow()
这段代码体现了什么
虽然它是个简化版 demo,但已经体现了几个关键治理思想:
- 测试数据带
run_id - 临时数据和池化数据分开管理
- 通过工厂创建,而不是在脚本里写死
- 执行结束自动清理
如果你是用 pytest,完全可以把 managed_run() 改造成 fixture。
在 pytest 中的接法
如果你的自动化框架本身是 pytest,可以这样接入。
import pytest
from test_data_governance_demo import DB, DataFactory, Cleaner, make_run_id
@pytest.fixture
def data_factory():
db = DB()
db.init_schema()
run_id = make_run_id()
factory = DataFactory(db, run_id)
cleaner = Cleaner(db, run_id)
yield factory
cleaner.cleanup()
db.close()
def test_submit_order(data_factory):
user = data_factory.new_user(prefix="submit", realname=True)
order = data_factory.new_order(user_id=user["id"], amount=88, status="CREATED")
assert user["username"].startswith("submit_")
assert order["status"] == "CREATED"
这样做有两个直接好处:
- 用例关注“我要什么数据”,而不是“数据怎么准备”
- 数据清理跟着 fixture 生命周期走,不容易忘
数据模型建议:最少要有哪些字段
真正落地时,不管你是存数据库、JSON、还是平台服务,建议给测试数据对象统一补上这些元信息:
| 字段 | 作用 |
|---|---|
run_id | 追踪是哪次执行生成的 |
created_by | 标识来源,如 autotest |
scenario | 属于哪个测试场景 |
expire_at | 过期时间,便于兜底清理 |
env | 防止跨环境误用 |
status | 标记可用、占用、失效 |
如果是池化数据,还建议增加:
| 字段 | 作用 |
|---|---|
in_use | 是否被占用 |
locked_by | 当前被谁占用 |
last_used_at | 最近使用时间 |
reuse_count | 复用次数,辅助淘汰 |
常见坑与排查
这部分我想写得更接地气一点,因为很多问题真不是“不会写代码”,而是治理边界没定清楚。
坑 1:用例依赖共享账号,串行没事,并发就挂
现象
本地单跑通过,CI 并发跑时出现:
- 用户已锁定
- 账号状态不一致
- 下单金额被别的用例修改
根因
多个用例复用了同一个共享数据,但没有锁机制。
排查方法
- 检查失败用例是否都用同一批账号
- 查询数据表中是否有多个任务同时修改同一个对象
- 给请求日志加
run_id,看数据是否被交叉使用
处理建议
- 稀缺资源必须池化并加锁
- 普通场景尽量自建数据
- 不能并发复用的模板,改为“一次一份”
坑 2:清理逻辑只在成功路径执行
现象
测试失败后残留大量脏数据,第二天再跑成功率明显下降。
根因
清理代码写在 assert 之后,或者只在 happy path 调用。
排查方法
- 检查清理逻辑是否放在
finally/ fixture teardown - 看失败任务后数据库里是否留有同批
run_id数据
处理建议
清理逻辑必须是“兜底执行”,别放在业务步骤后面顺手写。
坑 3:通过“查询现有数据”凑场景
现象
脚本经常有这种逻辑:
SELECT * FROM users WHERE status = 'ACTIVE' LIMIT 1;
根因
这是典型的“碰运气取数”。今天查出来的数据能用,不代表明天也能用。
处理建议
把“查询”改成“声明需求”:
- 需要活跃用户,就创建活跃用户
- 需要待支付订单,就创建待支付订单
- 只有受限资源才从池里拿
坑 4:只做数据创建,不做状态校验
现象
创建接口返回成功,但后续业务步骤仍然失败。
根因
有些系统是异步落库、异步审批、异步索引。你以为数据“创建完成”,实际上只是“提交成功”。
排查方法
创建后补校验:
- 查 DB
- 查状态接口
- 轮询直到目标状态
处理建议
数据工厂不要只返回创建结果,还要负责确保数据进入可用状态。
坑 5:环境配置和数据模板强绑定
现象
同一份脚本换个环境就失败,因为模板商品、组织 ID、渠道号都变了。
根因
把环境特有数据直接写死在脚本里。
处理建议
把环境差异抽到配置层,比如:
test:
default_shop_id: 1001
default_channel_code: CH_TEST
staging:
default_shop_id: 3008
default_channel_code: CH_STG
而脚本里只引用逻辑名称,不直接写环境 ID。
一套实用的排查路径
当自动化失败时,我建议优先按下面顺序排查,而不是一上来就怀疑接口。
flowchart TD
A[自动化失败] --> B{是否与数据相关}
B -->|是| C[检查 run_id 与数据归属]
C --> D{数据是否存在}
D -->|否| E[创建阶段失败/异步未完成]
D -->|是| F{状态是否符合预期}
F -->|否| G[被其他任务污染/模板失效]
F -->|是| H{是否已被清理}
H -->|是| I[清理时机过早]
H -->|否| J[转向接口或业务逻辑排查]
B -->|否| J
这个顺序的好处是:
你能很快判断问题到底在“造数”“占用”“状态流转”还是“清理时机”。
安全/性能最佳实践
测试数据治理除了稳定性,还会碰到两个很现实的问题:数据安全和执行效率。
安全最佳实践
1. 不要在测试数据里使用真实敏感信息
即便是测试环境,也不要直接放真实手机号、身份证、银行卡号。
建议统一使用脱敏或专用号段。
2. 测试数据权限最小化
自动化账号只保留必要权限,不要顺手给管理员超级权限。否则一旦脚本误操作,影响范围会很大。
3. 对清理脚本增加环境保护
我见过最危险的事故之一,就是测试清理脚本误连到非测试环境。
最低限度也要做环境白名单校验。
例如:
def safe_cleanup(env_name: str):
allowed = {"test", "staging"}
if env_name not in allowed:
raise RuntimeError(f"cleanup blocked for env={env_name}")
print(f"cleanup allowed for {env_name}")
4. 日志不要打印敏感字段
如果日志里要输出测试数据,至少要对手机号、token、证件号做脱敏。
性能最佳实践
1. 不要所有数据都实时创建
实时创建很稳,但也很慢。
建议按下面策略混用:
- 基础主数据:预置
- 复杂场景数据:模板化
- 高风险数据:实时创建
- 稀缺资源:池化复用
这是中级项目里性价比最高的组合。
2. 尽量批量造数、批量清理
如果每天都要跑大量回归,逐条创建和逐条删除会拖慢执行。
能批处理就批处理。
3. 异步状态用轮询,不要硬等
比起 sleep(10),更推荐有超时上限的轮询。
import time
def wait_until(fetch_status, expected, timeout=10, interval=0.5):
start = time.time()
while time.time() - start < timeout:
if fetch_status() == expected:
return True
time.sleep(interval)
return False
4. 给数据工厂做缓存,但缓存要分层
例如地区、商品模板这些可以缓存;
但临时订单、临时用户绝不能缓存复用。
方案取舍:什么时候不用做太重
测试数据治理不是越复杂越好。中级项目最怕的是“为治理而治理”。
适合当前方案的场景
- 有几十到几百条自动化用例
- 已接入 CI,存在并发执行
- 多人维护脚本
- 测试环境经常被复用
- 业务存在账号、订单、库存等状态对象
暂时不必做太重的平台化场景
- 用例量很少
- 团队成员很少
- 基本没有并发执行
- 数据对象简单,且可完全重建环境
这时候一个轻量 Data Factory + run_id 清理机制,往往就够了。
没必要一上来就建设完整数据平台。
什么时候该升级到平台化
如果你开始出现这些信号,就该往平台走了:
- 稀缺数据种类越来越多
- 环境数量增加
- 清理规则复杂
- 需要可视化查看数据占用情况
- 自动化框架不止一种(接口/UI/性能共用)
落地步骤建议
如果你准备在团队里推动这件事,我建议不要一次性大改,而是分三步走。
第一步:统一标识
先给所有新建测试数据加上:
run_idcreated_byenv
只做这一步,定位问题和清理难度都会立刻下降。
第二步:抽出数据工厂
把脚本里散落的“创建账号”“创建订单”“准备商品”提成统一方法。
先不追求平台化,先把重复逻辑收拢。
第三步:引入数据池和清理器
针对稀缺资源做池化,对临时数据做自动清理。
这一步做完,自动化稳定性通常会有明显提升。
总结
自动化测试里的测试数据治理,核心不是“准备更多数据”,而是建立一套可描述、可生成、可隔离、可回收的机制。
如果把这件事压缩成几条最可执行的建议,我会给你这份清单:
- 先分类数据:基础主数据、模板数据、临时数据、稀缺数据分开管理
- 优先用数据工厂:脚本描述“我要什么状态”,不要写死具体值
- 所有测试数据带
run_id:这是追踪和清理的基础 - 稀缺资源做池化加锁:别再靠共享账号硬撑并发
- 清理逻辑必须兜底执行:写进
finally或 fixture teardown - 环境差异放配置层:别把环境 ID 写进用例
- 别过度设计:中级项目先把稳定性和复用做好,再考虑平台化
最后给一个边界判断:
如果你的项目还在“脚本偶尔跑一下”的阶段,先别把体系搞太大;
但如果你的自动化已经进入 CI、多人协作、并发执行阶段,那么测试数据治理就不再是“优化项”,而是稳定交付的基础设施。
很多自动化问题,最后都不是脚本写得不够聪明,而是数据没有被当成一等公民去管理。
一旦你把这件事做对,脚本稳定性、维护效率、排障速度,都会一起上来。