区块链节点数据索引实战:基于 The Graph 构建可查询的链上业务数据服务
很多团队第一次做链上业务时,都会先用节点 RPC 直接查数据:查某个地址的交易、查合约事件、查 NFT 持仓、查订单状态。刚开始看起来没问题,但一旦业务稍微复杂一点,问题就全来了:
eth_call能拿到当前状态,却不擅长做历史聚合eth_getLogs能拉事件,但分页、去重、重组、补块都得自己处理- 前端想要“某用户最近 30 天成交额排行”,节点并不会直接给你
- 后台如果高频扫链,RPC 很容易成为瓶颈,成本也会一路上涨
这时候,索引层就变成链上应用的“第二条命”。而 The Graph,恰好是目前最常见、最成熟的一套链上数据索引方案之一。
这篇文章我不打算只讲概念,而是带你从“为什么要做索引”一路走到“怎么真正写出一个能查的子图(Subgraph)”。我们会用一个很典型的例子:索引一个链上转账事件,并把用户转账统计成可查询的数据服务。
背景与问题
先看一个真实业务问题。
假设你有一个 ERC20 代币,产品提了几个需求:
- 查询某个地址的所有转账记录
- 查询某个地址累计转入、累计转出
- 查询全网最近活跃用户
- 按天统计转账次数与金额
- 前端能通过一个接口直接拿到结果,而不是自己扫链
如果不用 The Graph,常见做法大概是:
- 后端定时从某个区块开始调用
eth_getLogs - 把
Transfer事件塞进数据库 - 自己维护区块游标、重试逻辑、数据幂等
- 自己做实体关系建模
- 再对外提供 REST 或 GraphQL API
这不是不能做,而是重复劳动很多。尤其是:
- 链重组(reorg)处理很容易漏
- 事件回滚的语义复杂
- 多合约、多网络后维护成本高
- SQL 表结构和索引策略也要自己设计
The Graph 的价值,就是把“监听链上事件 → 映射成业务实体 → 对外 GraphQL 查询”这一整套流程标准化。
核心原理
先用一张图看 The Graph 的工作流。
flowchart LR
A[区块链节点 RPC] --> B[Graph Node]
B --> C[读取 Subgraph Manifest]
C --> D[监听合约事件]
D --> E[AssemblyScript Mapping 处理]
E --> F[写入实体存储]
F --> G[GraphQL 查询服务]
G --> H[前端/后端业务系统]
它到底做了什么?
The Graph 的核心思想其实很直白:
- 告诉它要监听哪些合约、哪些事件
- 收到事件后,用 mapping 代码把事件转换成实体
- 把实体存到索引数据库中
- 通过 GraphQL 对这些实体进行查询
这里有三个关键文件:
schema.graphql:定义你要存什么数据subgraph.yaml:定义监听哪些链、哪些合约、哪些事件src/mapping.ts:定义收到事件后如何写入实体
你可以把它理解成:
schema.graphql像数据库模型subgraph.yaml像订阅配置mapping.ts像消费事件的 ETL 逻辑
前置知识与环境准备
建议你至少熟悉这些内容:
- Solidity 事件(event)与 ABI
- ERC20 的
Transfer事件 - GraphQL 基本查询语法
- Node.js / npm 基本使用
- Docker 基础命令(如果本地跑 Graph Node)
本文环境
以下环境比较稳:
- Node.js 18+
- Docker / Docker Compose
- Graph CLI
- 一个可访问的 EVM RPC 节点
- 一个已经部署好的 ERC20 合约地址
安装 Graph CLI:
npm install -g @graphprotocol/graph-cli
如果你想本地跑完整索引服务,通常会需要:
- graph-node
- postgres
- ipfs
一个常见方式是用 Docker Compose 起服务。
示例业务:索引 ERC20 Transfer 事件
这篇教程我们围绕一个简单但够实用的目标:
- 监听某 ERC20 合约的
Transfer(address,address,uint256)事件 - 存储每笔转账记录
- 为每个用户维护累计转入、累计转出、交易次数
- 支持 GraphQL 查询用户画像与交易明细
最终会产出的实体
我们设计两个实体:
TransferRecord:每一笔转账AccountStat:每个账户的统计信息
第一步:初始化子图项目
你可以用 CLI 初始化,也可以手动创建。这里为了让结构更清楚,我直接给出一个最小可运行版本的目录:
erc20-transfer-subgraph/
├── abis/
│ └── ERC20.json
├── schema.graphql
├── subgraph.yaml
├── package.json
├── tsconfig.json
└── src/
└── mapping.ts
package.json
{
"name": "erc20-transfer-subgraph",
"version": "1.0.0",
"private": true,
"scripts": {
"codegen": "graph codegen",
"build": "graph build",
"create-local": "graph create --node http://localhost:8020/ erc20-transfer-subgraph",
"deploy-local": "graph deploy --node http://localhost:8020/ --ipfs http://localhost:5001 erc20-transfer-subgraph"
},
"devDependencies": {
"@graphprotocol/graph-cli": "^0.66.0",
"@graphprotocol/graph-ts": "^0.33.0",
"typescript": "^5.3.3"
}
}
tsconfig.json
{
"extends": "@graphprotocol/graph-ts/tsconfig.json",
"compilerOptions": {
"target": "es2020",
"lib": ["es2020"],
"strict": true
}
}
第二步:定义数据模型 schema.graphql
type TransferRecord @entity(immutable: true) {
id: Bytes!
txHash: Bytes!
blockNumber: BigInt!
timestamp: BigInt!
from: Bytes!
to: Bytes!
value: BigInt!
}
type AccountStat @entity {
id: Bytes!
totalIn: BigInt!
totalOut: BigInt!
transferInCount: BigInt!
transferOutCount: BigInt!
lastActiveAt: BigInt!
}
设计说明
这里有几个点值得注意:
TransferRecord.id使用Bytes!,通常可以直接用交易哈希 + logIndex 组合生成TransferRecord设置成immutable: true,因为链上事件记录一旦确认,通常不应被业务反复更新AccountStat则是可变实体,因为每来一笔交易都要累加统计
第三步:配置 subgraph.yaml
下面这个配置监听一个 ERC20 合约的 Transfer 事件。你需要把合约地址和起始区块换成自己的。
specVersion: 1.0.0
indexerHints:
prune: auto
schema:
file: ./schema.graphql
dataSources:
- kind: ethereum
name: ERC20Token
network: mainnet
source:
address: "0xYourTokenAddressHere"
abi: ERC20
startBlock: 19000000
mapping:
kind: ethereum/events
apiVersion: 0.0.7
language: wasm/assemblyscript
entities:
- TransferRecord
- AccountStat
abis:
- name: ERC20
file: ./abis/ERC20.json
eventHandlers:
- event: Transfer(indexed address,indexed address,uint256)
handler: handleTransfer
file: ./src/mapping.ts
关于 startBlock
这是个非常重要的字段。
如果你把 startBlock 设得太早:
- 初次同步会非常慢
- 可能扫很多和业务无关的数据
- RPC 成本和索引耗时都会上升
如果你设得太晚:
- 会漏历史数据
我的建议是:从合约部署块开始,或者从业务真正上线的块高度开始,不要偷懒写成 0。
第四步:准备 ABI
abis/ERC20.json 至少需要包含 Transfer 事件定义。最小示例:
[
{
"anonymous": false,
"inputs": [
{ "indexed": true, "internalType": "address", "name": "from", "type": "address" },
{ "indexed": true, "internalType": "address", "name": "to", "type": "address" },
{ "indexed": false, "internalType": "uint256", "name": "value", "type": "uint256" }
],
"name": "Transfer",
"type": "event"
}
]
如果你后续还要调用合约只读方法,比如 symbol()、decimals(),则 ABI 中还要补上对应函数定义。
第五步:编写 Mapping 逻辑
这是最关键的一步。收到事件后,我们要:
- 生成唯一的
TransferRecord - 分别更新
from和to的AccountStat
src/mapping.ts
import { BigInt, Bytes } from "@graphprotocol/graph-ts";
import { Transfer } from "../generated/ERC20Token/ERC20";
import { TransferRecord, AccountStat } from "../generated/schema";
function getOrCreateAccountStat(account: Bytes, timestamp: BigInt): AccountStat {
let entity = AccountStat.load(account);
if (entity == null) {
entity = new AccountStat(account);
entity.totalIn = BigInt.zero();
entity.totalOut = BigInt.zero();
entity.transferInCount = BigInt.zero();
entity.transferOutCount = BigInt.zero();
entity.lastActiveAt = timestamp;
}
return entity;
}
export function handleTransfer(event: Transfer): void {
let recordId = event.transaction.hash.concatI32(event.logIndex.toI32());
let record = new TransferRecord(recordId);
record.txHash = event.transaction.hash;
record.blockNumber = event.block.number;
record.timestamp = event.block.timestamp;
record.from = event.params.from;
record.to = event.params.to;
record.value = event.params.value;
record.save();
let fromStat = getOrCreateAccountStat(event.params.from, event.block.timestamp);
fromStat.totalOut = fromStat.totalOut.plus(event.params.value);
fromStat.transferOutCount = fromStat.transferOutCount.plus(BigInt.fromI32(1));
fromStat.lastActiveAt = event.block.timestamp;
fromStat.save();
let toStat = getOrCreateAccountStat(event.params.to, event.block.timestamp);
toStat.totalIn = toStat.totalIn.plus(event.params.value);
toStat.transferInCount = toStat.transferInCount.plus(BigInt.fromI32(1));
toStat.lastActiveAt = event.block.timestamp;
toStat.save();
}
代码说明
这里有几个实战点:
1. 为什么 id 用 txHash + logIndex
单独用交易哈希不够,因为一笔交易里可能有多个 Transfer 日志。
所以常见做法是:
transaction.hash + logIndex- 或
blockNumber + txHash + logIndex
这样唯一性更稳。
2. 为什么 AccountStat.id 直接用地址
因为我们是按地址聚合,地址本身天然就是唯一键。
3. 为什么不直接做十进制换算
在索引层,尽量存原始整数值,比如 ERC20 的最小单位。
格式化成 1.23 TOKEN 这类展示逻辑,更适合放在查询层或前端处理。
这一点我很建议养成习惯,否则后面处理精度问题会很烦。
第六步:生成类型并构建
执行:
npm install
npm run codegen
npm run build
如果一切正常,会生成 generated/ 目录以及编译产物。
第七步:本地启动 Graph Node
下面给一个可参考的 docker-compose.yml。
version: "3.8"
services:
postgres:
image: postgres:14
environment:
POSTGRES_USER: graph
POSTGRES_PASSWORD: let-me-in
POSTGRES_DB: graph-node
ports:
- "5432:5432"
ipfs:
image: ipfs/kubo:v0.24.0
ports:
- "5001:5001"
graph-node:
image: graphprotocol/graph-node:v0.35.1
depends_on:
- postgres
- ipfs
ports:
- "8000:8000"
- "8001:8001"
- "8020:8020"
- "8030:8030"
- "8040:8040"
environment:
postgres_host: postgres
postgres_user: graph
postgres_pass: let-me-in
postgres_db: graph-node
ipfs: "ipfs:5001"
ethereum: "mainnet:http://host.docker.internal:8545"
GRAPH_LOG: info
启动:
docker compose up -d
关于 RPC 地址
ethereum: "mainnet:http://host.docker.internal:8545" 的意思是:
- Graph Node 会通过这个地址访问你的链节点
- 如果你本机有本地节点或代理,可以这么配
- 如果你用远程 RPC,比如 Infura、Alchemy、自建节点,也可以直接填完整 URL
第八步:创建并部署子图
npm run create-local
npm run deploy-local
部署成功后,GraphQL 查询接口通常在:
http://localhost:8000/subgraphs/name/erc20-transfer-subgraph
第九步:验证查询结果
查询转账记录
{
transferRecords(first: 5, orderBy: timestamp, orderDirection: desc) {
id
txHash
from
to
value
timestamp
blockNumber
}
}
查询某地址统计
{
accountStat(id: "0x1234567890abcdef1234567890abcdef12345678") {
id
totalIn
totalOut
transferInCount
transferOutCount
lastActiveAt
}
}
查询最近活跃账户
{
accountStats(first: 10, orderBy: lastActiveAt, orderDirection: desc) {
id
totalIn
totalOut
transferInCount
transferOutCount
lastActiveAt
}
}
数据流转过程梳理
如果你对“事件怎么变成可查数据”还是有点抽象,可以看这张时序图。
sequenceDiagram
participant Chain as 区块链
participant Node as Graph Node
participant Map as mapping.ts
participant Store as Entity Store
participant Client as GraphQL Client
Chain->>Node: 新区块 / Transfer 事件
Node->>Map: 调用 handleTransfer(event)
Map->>Store: 保存 TransferRecord
Map->>Store: 更新 from AccountStat
Map->>Store: 更新 to AccountStat
Client->>Node: GraphQL 查询
Node->>Client: 返回索引后的业务数据
逐步验证清单
我做 The Graph 项目时,通常不会一口气写完所有逻辑再测试,而是按下面这个节奏走,省很多时间:
验证 1:Manifest 和 ABI 是否匹配
检查:
subgraph.yaml里的事件签名是否和 ABI 完全一致- 合约地址是否正确
- 网络名称是否正确
startBlock是否合理
验证 2:Codegen 是否通过
执行:
npm run codegen
如果这里报错,通常说明:
schema.graphql有语法问题subgraph.yaml实体名和 schema 对不上- ABI 解析有问题
验证 3:Build 是否通过
执行:
npm run build
如果这里报错,通常是 AssemblyScript 类型问题,比如:
null处理不规范Bytes/Address/string类型混用- 使用了 JS 标准库里不受支持的能力
验证 4:部署后查看同步状态
Graph Node 管理接口常可查看部署状态。你也可以直接看日志:
docker logs -f <graph-node-container-id>
关注这些关键词:
- syncing
- failed
- deterministic error
- non-deterministic error
验证 5:先查最小结果集
一开始别上来就写很复杂的 GraphQL,先查:
{
transferRecords(first: 1) {
id
}
}
只要这一步有结果,后面再逐步扩字段。
常见坑与排查
这一节很重要。我当时第一次写子图时,时间大头都花在排坑,不是在写逻辑。
1. 事件签名写错,导致一个事件都没进来
比如你写成:
event: Transfer(address,address,uint256)
但 ABI 里其实是:
event Transfer(address indexed from, address indexed to, uint256 value);
The Graph 里事件声明通常要完整对齐 indexed 结构:
event: Transfer(indexed address,indexed address,uint256)
排查方式
- 对照 ABI 原文
- 确认大小写和参数顺序
- 看 Graph Node 日志是否提示找不到 handler 对应事件
2. startBlock 太早,同步慢得像没动
症状:
- 部署成功,但长时间查不到结果
- 日志里一直在处理早期区块
解决建议
- 改成合约部署块
- 如果只关心近期数据,从业务生效块开始
- 本地调试时先用更晚的块快速验证逻辑
3. ID 冲突导致数据被覆盖
比如你错误地把 TransferRecord.id = event.transaction.hash。
那同一笔交易中的多条日志就会互相覆盖。
正确做法
let recordId = event.transaction.hash.concatI32(event.logIndex.toI32());
4. 地址大小写导致查询不到实体
链上地址在不同系统里可能表现为:
- checksum address
- 全小写
Bytes存储形式
如果你的 id 定义成 Bytes!,查询时就要按 Graph 支持的格式传值。
很多人这里会因为大小写或类型不一致,误以为数据没进来。
建议
- 统一使用
Bytes做地址主键 - 前端封装一层地址规范化
- 查询前确认实体 ID 格式
5. BigInt 精度与展示混淆
索引层保存的是原始链上值,例如 ERC20 的 value 可能是 18 位精度整数。
如果你在 mapping 中强行转成小数文本:
- 可能增加复杂度
- 影响后续聚合
- 不利于多 token 扩展
建议
- 存原始整数
- 单独存 token metadata(如 decimals)
- 查询层或前端再格式化
6. Mapping 里用了不被支持的 TS/JS 能力
AssemblyScript 不是完整 TypeScript 运行时。
一些你在 Node.js 里习以为常的写法,在这里不能直接用。
常见表现:
- build 失败
- 类型报错
- 运行时报 deterministic error
建议
- 尽量用
@graphprotocol/graph-ts提供的类型和工具 - 少依赖复杂语言特性
- 逻辑保持纯粹、确定性
7. 调用合约只读方法时失败
有些项目会在事件处理里顺便调用合约,比如拿 symbol() 或 name()。
这不是一定不行,但有风险:
- 合约可能在某些块返回失败
- 代理合约 ABI 不匹配
- 非标准 ERC20 行为不一致
建议
- 能从事件拿的数据,优先从事件拿
- 对可失败调用使用
try_前缀方法 - 对失败结果做兜底逻辑
示例:
// 伪示例:若生成代码中有 try_symbol 方法
// let contract = ERC20.bind(event.address);
// let symbolResult = contract.try_symbol();
// if (!symbolResult.reverted) {
// // 使用 symbolResult.value
// }
安全/性能最佳实践
虽然子图不像智能合约那样直接“管钱”,但它仍然是业务数据基础设施,安全性和性能都不能掉以轻心。
1. 保持 Mapping 的确定性
The Graph 要求索引逻辑具备确定性。
也就是说,同一条链上数据在同样输入下,必须生成同样结果。
不建议做的事
- 依赖外部 HTTP 请求
- 使用当前系统时间
- 引入随机数
- 依赖不稳定的外部状态
原则
Mapping 只根据链上事件、区块、交易上下文来产出数据。
2. 只索引真正需要的实体
很多团队刚开始建 schema 时特别兴奋,什么都想存:
- 原始日志存一份
- 解析后实体存一份
- 各种聚合表再存一份
- 再按天、按周、按月各来一套
结果就是:
- 同步慢
- 存储大
- schema 维护困难
我的建议
先问自己两个问题:
- 这个字段是否会被查询?
- 这个聚合是否必须在索引时完成?
如果答案都不确定,就先别存。
3. 谨慎做重计算型聚合
比如每来一笔交易,就全量重算“用户排名”或“全局前十”,这种写法在索引层很容易变重。
更好的做法
- 子图中存基础事实和轻量聚合
- 排行榜类查询放到上层服务或缓存层做
- 对高频榜单引入异步预计算
4. 合理拆分子图
当业务越来越复杂时,可能会遇到:
- 合约很多
- 事件种类多
- 查询模型差异大
- 不同数据更新频率差异明显
这时不要强行把所有内容塞进一个超大子图。
拆分思路
- 按业务域拆:交易、质押、治理、NFT
- 按网络拆:Ethereum、Arbitrum、Base
- 按查询用途拆:明细索引、聚合分析
5. 控制实体更新频率
如果一个实体每个事件都要被频繁更新,它会成为索引热点。
例如你设计一个全局单例实体:
type GlobalStat @entity {
id: ID!
totalTransferCount: BigInt!
}
每来一条转账都更新它,虽然可行,但写入非常集中。
替代方案
- 按天分桶,如
DailyStat - 按账户维度拆散
- 对全局聚合改为查询层统计
6. 对查询层做边界控制
The Graph 提供的是 GraphQL 查询能力,但并不意味着前端可以无节制地查。
风险
- 一次查太多字段
- 深层嵌套查询
- 大分页导致响应慢
建议
- 前端固定查询模板
- 对外暴露 BFF 或 API Gateway 做限流
- 大分页改成游标式或分段查询
一个更完整的建模思路
随着业务深入,你大概率不会只满足于 TransferRecord + AccountStat 两张表。
更常见的演进路径是下面这样:
classDiagram
class TransferRecord {
+Bytes id
+Bytes txHash
+BigInt blockNumber
+BigInt timestamp
+Bytes from
+Bytes to
+BigInt value
}
class AccountStat {
+Bytes id
+BigInt totalIn
+BigInt totalOut
+BigInt transferInCount
+BigInt transferOutCount
+BigInt lastActiveAt
}
class DailyTransferStat {
+String id
+BigInt dayStart
+BigInt transferCount
+BigInt transferVolume
}
AccountStat --> TransferRecord : related by address
DailyTransferStat --> TransferRecord : aggregated from
比如你后续可能继续增加:
DailyTransferStat:按天统计转账量TokenMeta:token 名称、符号、精度HolderSnapshot:某些关键高度的持仓快照
但建议是:先让最小模型稳定跑通,再逐步加。
进阶扩展:按天聚合
如果你想让前端直接查“每日转账量曲线”,可以在 Mapping 中加入按天聚合实体。
schema 增加
type DailyTransferStat @entity {
id: String!
dayStart: BigInt!
transferCount: BigInt!
transferVolume: BigInt!
}
mapping 增加逻辑
import { BigInt } from "@graphprotocol/graph-ts";
import { DailyTransferStat } from "../generated/schema";
function getDayStart(timestamp: BigInt): BigInt {
let secondsInDay = BigInt.fromI32(86400);
return timestamp.div(secondsInDay).times(secondsInDay);
}
function updateDailyStat(timestamp: BigInt, value: BigInt): void {
let dayStart = getDayStart(timestamp);
let id = dayStart.toString();
let daily = DailyTransferStat.load(id);
if (daily == null) {
daily = new DailyTransferStat(id);
daily.dayStart = dayStart;
daily.transferCount = BigInt.zero();
daily.transferVolume = BigInt.zero();
}
daily.transferCount = daily.transferCount.plus(BigInt.fromI32(1));
daily.transferVolume = daily.transferVolume.plus(value);
daily.save();
}
然后在 handleTransfer 中调用:
updateDailyStat(event.block.timestamp, event.params.value);
查询每日统计
{
dailyTransferStats(first: 30, orderBy: dayStart, orderDirection: desc) {
dayStart
transferCount
transferVolume
}
}
这个模式非常适合:
- 折线图
- 柱状图
- 活跃度趋势
- 日级别业务看板
什么时候不适合用 The Graph?
虽然 The Graph 很强,但不是所有场景都适合。
不太适合的情况
1. 强实时、毫秒级响应要求
The Graph 更适合“近实时索引查询”,不是撮合引擎那种毫秒级系统。
2. 特别复杂的跨实体分析
如果你要做很重的分析型查询,可能还是数据仓库、ClickHouse、Elasticsearch 更合适。
3. 大量链下数据与链上数据强耦合
如果链下订单、风控、用户系统才是核心,The Graph 更适合作为链上事实层,而不是唯一数据源。
更准确的定位
你可以把 The Graph 当成:
- 链上事件的标准化索引层
- 面向业务查询的 GraphQL 数据服务
- 链上 OLTP/查询加速层
而不是万能数据库。
总结
如果把这篇文章压缩成一句话,那就是:
The Graph 的核心价值,不只是“能查链上数据”,而是帮你把链上事件沉淀成稳定、可维护、可直接服务业务的查询模型。
这次我们完成了一个完整闭环:
- 明确链上直接查数据的痛点
- 理解 The Graph 的工作原理
- 编写
schema.graphql - 配置
subgraph.yaml - 实现
mapping.ts - 本地部署 Graph Node
- 通过 GraphQL 查询业务数据
- 了解常见坑和性能边界
给你的可执行建议
如果你准备在项目里真正落地,我建议按下面顺序推进:
- 先选一个最小业务场景
- 例如只索引 ERC20
Transfer
- 例如只索引 ERC20
- 先建明细实体,再建轻量聚合
- 不要一上来做复杂报表
- 从合约部署块开始索引
- 避免无意义的全链扫描
- 所有金额先存原始整数
- 展示层再格式化
- Mapping 保持简单、确定性、幂等思维
- 少做“聪明但脆弱”的逻辑
- 先本地跑通,再部署到正式环境
- 日志和同步状态一定要盯
边界条件也要清楚
The Graph 很适合:
- 钱包资产页
- 交易历史
- 用户行为画像
- 运营看板
- 协议数据 API
但如果你要的是:
- 超重分析查询
- 极致低延迟
- 大规模链下联动事务
那它应该是体系中的一层,而不是全部。
如果你已经有一个合约和事件清单,不妨照着本文先做一个最小子图。只要第一次跑通,后面你会很自然地把更多链上业务都抽象进这个模式里。很多团队的数据服务,就是从这样一个小小的 Transfer 索引开始长起来的。