区块链数据索引实战:基于 The Graph 构建可查询的链上业务分析接口
很多团队第一次做链上分析时,都会先写一个脚本:连 RPC、扫事件、存数据库、再写几个接口。起步很快,但往后通常会越来越痛苦——重放区块、处理回滚、维护游标、补历史数据、做分页查询、兼容前端筛选条件……这些事情并不“高大上”,却非常消耗精力。
这篇文章我想换一个更实战的角度:不先讲概念堆砌,而是直接带你做一个可以查询的链上业务分析接口。我们基于 The Graph 构建一个简单但完整的索引项目,把 ERC-20 转账事件整理成可查询的数据模型,并且顺手做一些常见统计字段,方便后续接 BI、大屏、风控或运营分析。
文章面向已经懂一点 Solidity、事件日志、GraphQL 的中级读者。你不需要很懂 The Graph 的全部生态,但最好知道合约事件是什么、ABI 是什么。
背景与问题
为什么“直接查链”不适合业务分析
链上原始数据当然是真实来源,但它并不适合直接拿来做业务查询。原因很简单:
-
RPC 更擅长状态读取,不擅长复杂分析
- 查某个地址余额可以
- 但查“最近 30 天某代币 Top 100 活跃地址,并按净流入排序”就很痛苦
-
日志是按区块顺序存的,不是按业务维度组织的
- 事件是 append-only
- 业务查询却常常是“按地址”“按时间”“按某种聚合指标”来查
-
前端和分析系统需要结构化接口
- 分页、过滤、排序、关联查询
- 如果每个页面都直接打 RPC,延迟和复杂度都会失控
一个典型场景
假设你有一个 ERC-20 代币项目,想做这些接口:
- 查询最近 20 条转账记录
- 查询某个地址的转入/转出历史
- 查询某地址累计转入、累计转出、净流入
- 查询转账量最高的活跃地址
如果靠手搓脚本 + MySQL 当然能做,但你很快会遇到这些现实问题:
- 历史区块怎么补?
- 监听中断后如何续跑?
- 链回滚怎么办?
- 重复消费怎么去重?
- 如何暴露统一查询接口给前端?
The Graph 的价值就在这里:它把“链上事件 -> 实体存储 -> GraphQL 查询”这条链路标准化了。
前置知识与环境准备
你需要具备
- 会看 Solidity 合约事件
- 知道 ERC-20 的
Transfer(address,address,uint256)事件 - 对 GraphQL 查询语法有基本了解
- 本机有 Node.js、npm 或 yarn
本文使用的技术栈
- The Graph CLI
- Graph Node / Hosted Service / Subgraph Studio(任选部署方式)
- AssemblyScript 编写 mapping
- GraphQL 查询接口
- 一个标准 ERC-20 合约
环境安装
npm install -g @graphprotocol/graph-cli
初始化 subgraph 项目时,你也可以用:
graph init --studio erc20-transfer-analytics
如果你是第一次做,我建议先在 Subgraph Studio 或可用托管环境里跑通,再考虑自建 Graph Node。自建更灵活,但维护成本也更高。
核心原理
The Graph 的核心思路可以概括为一句话:
订阅链上事件,把原始日志映射成结构化实体,再通过 GraphQL 暴露查询能力。
数据流转过程
flowchart LR
A[区块链节点 / RPC] --> B[The Graph 监听合约事件]
B --> C[Mapping 解析事件]
C --> D[Entity 写入索引存储]
D --> E[GraphQL 查询接口]
E --> F[前端 / BI / 分析系统]
关键组成
-
subgraph.yaml
- 定义数据源、起始区块、ABI、事件处理函数
-
schema.graphql
- 定义最终要存的实体结构
-
mapping.ts
- 把链上事件转成实体记录
这套机制解决了什么
- 按业务维度重组链上数据
- 自动维护索引状态
- 可以声明式暴露 GraphQL 查询
- 对事件处理逻辑进行版本化管理
一个简化的执行时序
sequenceDiagram
participant Chain as Blockchain
participant Graph as Graph Node
participant Map as mapping.ts
participant Store as Store
participant API as GraphQL API
Chain->>Graph: 新区块 / 事件日志
Graph->>Map: 调用事件处理函数
Map->>Store: 创建或更新 Entity
API->>Store: 查询 entities
Store-->>API: 返回结构化结果
数据建模:先想清楚“查什么”,再决定“存什么”
很多人一上来就把事件原样存下来,后面再补统计字段。这样不是不行,但我个人经验是:先从查询需求倒推 schema,会少走很多弯路。
本文我们定义两个实体:
TransferRecord:每一笔转账明细AccountStat:每个地址的累计统计
schema.graphql
type TransferRecord @entity {
id: ID!
txHash: Bytes!
logIndex: BigInt!
blockNumber: BigInt!
timestamp: BigInt!
token: Bytes!
from: Bytes!
to: Bytes!
value: BigInt!
}
type AccountStat @entity {
id: ID!
address: Bytes!
totalIn: BigInt!
totalOut: BigInt!
transferInCount: BigInt!
transferOutCount: BigInt!
netFlow: BigInt!
updatedAt: BigInt!
}
为什么这样设计
TransferRecord.id
一笔链上事件最稳妥的唯一标识,通常用:
txHash-logIndex
因为同一笔交易里可能有多个 Transfer,只用 txHash 会冲突。
AccountStat
这个实体是典型的“索引时预聚合”。
它的好处是:
- 查询某地址统计时不用每次扫全量明细
- 排行榜类接口也更容易做
- 前端打开地址页会更快
当然,边界也要知道:
- 如果你需要窗口期统计(如最近 7 天),只存累计值不够
- 这时还要结合明细表或额外做周期聚合实体
实战代码(可运行)
下面我们做一个最小可运行版本,索引某 ERC-20 合约的 Transfer 事件。
示例代码基于 The Graph 常见项目结构,重点展示核心文件。
第一步:定义 subgraph.yaml
specVersion: 1.0.0
schema:
file: ./schema.graphql
dataSources:
- kind: ethereum
name: ERC20Token
network: mainnet
source:
address: "0xA0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
abi: ERC20
startBlock: 6082465
mapping:
kind: ethereum/events
apiVersion: 0.0.7
language: wasm/assemblyscript
entities:
- TransferRecord
- AccountStat
abis:
- name: ERC20
file: ./abis/ERC20.json
eventHandlers:
- event: Transfer(indexed address,indexed address,uint256)
handler: handleTransfer
file: ./src/mapping.ts
这里用的是 USDC 主网地址作为示例。你在实际项目里换成自己的合约地址即可。
几个关键字段解释
startBlock- 非常重要
- 不要从创世块开始扫,能明确起点就明确起点
eventHandlers- 把合约事件绑定到 mapping 函数
network- 要和实际部署链一致
第二步:准备 ABI
你需要在 ./abis/ERC20.json 放入合约 ABI,最少包含 Transfer 事件定义。
简化后的 ABI 片段如下:
[
{
"anonymous": false,
"inputs": [
{ "indexed": true, "internalType": "address", "name": "from", "type": "address" },
{ "indexed": true, "internalType": "address", "name": "to", "type": "address" },
{ "indexed": false, "internalType": "uint256", "name": "value", "type": "uint256" }
],
"name": "Transfer",
"type": "event"
}
]
第三步:编写 mapping.ts
这是最核心的部分。我们在这里把事件映射成实体,并维护账户统计。
import { BigInt } from "@graphprotocol/graph-ts"
import { Transfer } from "../generated/ERC20Token/ERC20"
import { TransferRecord, AccountStat } from "../generated/schema"
function getOrCreateAccountStat(accountId: string, timestamp: BigInt): AccountStat {
let stat = AccountStat.load(accountId)
if (stat == null) {
stat = new AccountStat(accountId)
stat.address = Bytes.fromHexString(accountId) as Bytes
stat.totalIn = BigInt.zero()
stat.totalOut = BigInt.zero()
stat.transferInCount = BigInt.zero()
stat.transferOutCount = BigInt.zero()
stat.netFlow = BigInt.zero()
stat.updatedAt = timestamp
}
return stat as AccountStat
}
export function handleTransfer(event: Transfer): void {
let id = event.transaction.hash.toHexString() + "-" + event.logIndex.toString()
let record = new TransferRecord(id)
record.txHash = event.transaction.hash
record.logIndex = event.logIndex
record.blockNumber = event.block.number
record.timestamp = event.block.timestamp
record.token = event.address
record.from = event.params.from
record.to = event.params.to
record.value = event.params.value
record.save()
let fromId = event.params.from.toHexString()
let fromStat = getOrCreateAccountStat(fromId, event.block.timestamp)
fromStat.totalOut = fromStat.totalOut.plus(event.params.value)
fromStat.transferOutCount = fromStat.transferOutCount.plus(BigInt.fromI32(1))
fromStat.netFlow = fromStat.totalIn.minus(fromStat.totalOut)
fromStat.updatedAt = event.block.timestamp
fromStat.save()
let toId = event.params.to.toHexString()
let toStat = getOrCreateAccountStat(toId, event.block.timestamp)
toStat.totalIn = toStat.totalIn.plus(event.params.value)
toStat.transferInCount = toStat.transferInCount.plus(BigInt.fromI32(1))
toStat.netFlow = toStat.totalIn.minus(toStat.totalOut)
toStat.updatedAt = event.block.timestamp
toStat.save()
}
注意一个容易忽略的问题
上面代码里用到了 Bytes,因此完整导入应为:
import { BigInt, Bytes } from "@graphprotocol/graph-ts"
也就是说,你的最终可运行版本应使用下面这份:
import { BigInt, Bytes } from "@graphprotocol/graph-ts"
import { Transfer } from "../generated/ERC20Token/ERC20"
import { TransferRecord, AccountStat } from "../generated/schema"
function getOrCreateAccountStat(accountId: string, timestamp: BigInt): AccountStat {
let stat = AccountStat.load(accountId)
if (stat == null) {
stat = new AccountStat(accountId)
stat.address = Bytes.fromHexString(accountId) as Bytes
stat.totalIn = BigInt.zero()
stat.totalOut = BigInt.zero()
stat.transferInCount = BigInt.zero()
stat.transferOutCount = BigInt.zero()
stat.netFlow = BigInt.zero()
stat.updatedAt = timestamp
}
return stat as AccountStat
}
export function handleTransfer(event: Transfer): void {
let id = event.transaction.hash.toHexString() + "-" + event.logIndex.toString()
let record = new TransferRecord(id)
record.txHash = event.transaction.hash
record.logIndex = event.logIndex
record.blockNumber = event.block.number
record.timestamp = event.block.timestamp
record.token = event.address
record.from = event.params.from
record.to = event.params.to
record.value = event.params.value
record.save()
let fromId = event.params.from.toHexString()
let fromStat = getOrCreateAccountStat(fromId, event.block.timestamp)
fromStat.totalOut = fromStat.totalOut.plus(event.params.value)
fromStat.transferOutCount = fromStat.transferOutCount.plus(BigInt.fromI32(1))
fromStat.netFlow = fromStat.totalIn.minus(fromStat.totalOut)
fromStat.updatedAt = event.block.timestamp
fromStat.save()
let toId = event.params.to.toHexString()
let toStat = getOrCreateAccountStat(toId, event.block.timestamp)
toStat.totalIn = toStat.totalIn.plus(event.params.value)
toStat.transferInCount = toStat.transferInCount.plus(BigInt.fromI32(1))
toStat.netFlow = toStat.totalIn.minus(toStat.totalOut)
toStat.updatedAt = event.block.timestamp
toStat.save()
}
第四步:生成代码与构建
graph codegen
graph build
如果 schema、ABI、mapping 有不一致,这一步通常就会报错。我的建议是不要怕报错,第一次搭建时,报错是最快的老师。
第五步:部署 subgraph
如果你使用 Studio,通常流程类似:
graph auth --studio <DEPLOY_KEY>
graph deploy --studio erc20-transfer-analytics
具体命令可能随版本略有变化,以你本地 CLI 提示为准。
第六步:通过 GraphQL 查询数据
查询最近 10 条转账记录
{
transferRecords(first: 10, orderBy: timestamp, orderDirection: desc) {
id
txHash
from
to
value
blockNumber
timestamp
}
}
查询某地址统计
{
accountStats(where: { id: "0x55fe002aeff02f77364de339a1292923a15844b8" }) {
id
address
totalIn
totalOut
transferInCount
transferOutCount
netFlow
updatedAt
}
}
查询净流入最高的地址
{
accountStats(first: 20, orderBy: netFlow, orderDirection: desc) {
id
totalIn
totalOut
netFlow
}
}
逐步验证清单
我很建议你不要一口气写完再部署,而是按下面顺序逐步验证:
验证 1:事件能否被正确捕获
- 部署后先查
transferRecords - 看是否有数据返回
- 看区块高度是否在持续推进
验证 2:唯一键是否正确
- 检查是否有
id冲突 - 确认同一交易多个日志不会覆盖
验证 3:统计值是否正确
- 随机找一个地址
- 抽几条链上交易手工核对
totalIn / totalOut
验证 4:排序和过滤是否符合预期
orderBy: timestampwhere: { from: ... }或where: { to: ... }
进阶:把查询接口包装成业务 API
虽然 The Graph 已经能直接提供 GraphQL,但在实际项目里,你通常还会加一层后端服务:
- 统一鉴权
- 聚合多个 subgraph 数据
- 做缓存
- 做字段兼容和业务语义转换
一个简单的 Node.js 查询示例如下:
const query = `
{
accountStats(first: 5, orderBy: netFlow, orderDirection: desc) {
id
totalIn
totalOut
netFlow
}
}
`;
async function fetchTopAccounts() {
const res = await fetch("https://api.thegraph.com/subgraphs/name/your-subgraph", {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({ query })
});
const data = await res.json();
console.log(JSON.stringify(data, null, 2));
}
fetchTopAccounts().catch(console.error);
如果你要对外提供 REST 风格接口,也可以在服务层再包一层:
import express from "express";
const app = express();
app.get("/api/top-netflow", async (req, res) => {
const query = `
{
accountStats(first: 10, orderBy: netFlow, orderDirection: desc) {
id
totalIn
totalOut
netFlow
}
}`;
const response = await fetch("https://api.thegraph.com/subgraphs/name/your-subgraph", {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({ query })
});
const result = await response.json();
res.json(result.data.accountStats);
});
app.listen(3000, () => {
console.log("server started at http://localhost:3000");
});
这样前端就不必直接面对 GraphQL,也更方便做权限控制和缓存。
常见坑与排查
这一节非常重要。The Graph 最花时间的,往往不是“写出来”,而是“为什么没出数据”。
1. startBlock 设置过早,导致同步极慢
现象
- 部署成功,但长时间看不到结果
- 同步高度推进很慢
原因
从非常早的区块开始扫描,尤其是热门合约,历史日志量巨大。
处理建议
- 尽量查到合约部署块
- 或至少从业务真实起始块开始
- 先用较近区块验证功能,再回补历史
2. id 设计不对,导致数据覆盖
现象
- 查询记录数偏少
- 明明一笔交易里有多个事件,最后只保留一条
原因
只用 txHash 作为主键。
正确做法
使用:
let id = event.transaction.hash.toHexString() + "-" + event.logIndex.toString()
3. schema 类型与 mapping 赋值不匹配
现象
graph codegen或graph build报类型错误- 部署后运行时报实体字段异常
常见例子
Bytes和String混用BigInt字段赋了普通数字- 可空字段和非空字段不一致
排查方法
逐个对照:
schema.graphql- ABI 生成类型
mapping.ts的字段赋值
4. 事件签名写错
现象
- subgraph 正常部署
- 但就是没有数据
原因
subgraph.yaml 里的事件签名必须和 ABI 完全一致。
例如 ERC-20 转账事件必须写成:
- event: Transfer(indexed address,indexed address,uint256)
handler: handleTransfer
多一个空格少一个类型,都可能导致匹配失败。
5. 忽略零地址场景,统计被污染
ERC-20 的 Transfer 事件里,零地址通常意味着:
from == 0x000...000:铸造to == 0x000...000:销毁
如果你把零地址也当普通账户统计,排行榜和净流入结果会非常怪。
改进思路
let zeroAddress = "0x0000000000000000000000000000000000000000"
if (fromId != zeroAddress) {
// 更新 from 统计
}
if (toId != zeroAddress) {
// 更新 to 统计
}
这属于我当时踩过的一个典型坑:数据全都有,但一做业务榜单就发现“零地址”永远名列前茅。
6. 查询不到最新数据,不一定是索引坏了
有时是这些原因:
- subgraph 还在同步
- 你的查询 endpoint 指向了旧版本
- 前端做了缓存
- 排序字段不是最新字段
建议优先确认:
- 当前同步高度
- 最新实体的
blockNumber - 是否部署到了正确环境
安全/性能最佳实践
The Graph 解决的是索引和查询,不代表你可以完全不考虑安全和性能。下面这些建议,是真正在生产里比较有用的。
1. 索引逻辑尽量“纯函数化”
mapping 中不要做复杂外部依赖假设,尽量只依赖:
- 当前事件
- 当前区块上下文
- 已存实体状态
这样回放和重建时结果更稳定。
2. 谨慎做高成本聚合
如果你在每个事件里都更新大量实体,会明显拖慢同步。
例如一个事件发生时:
- 更新明细 1 条
- 更新账户 2 条
- 再更新日统计、周统计、月统计、排行榜缓存……
这样很容易把 mapping 写得过重。
建议
优先级可以这样排:
- 必要明细
- 高频查询所需的轻量聚合
- 复杂报表交给下游服务二次加工
3. 控制实体设计复杂度
不要把一个实体做成“万能表”。
更稳妥的思路是:
- 明细实体负责还原事实
- 聚合实体负责服务查询场景
这比把几十个业务字段全塞进一张实体里更容易维护。
4. 为分页和排序预先设计查询路径
GraphQL 查询很灵活,但不是所有组合都高效。
例如你经常要做:
- 按时间倒序翻页
- 按地址过滤
- 查某段时间内的数据
那 schema 设计时就应考虑:
- 是否需要单独的日维度实体
- 是否需要把地址作为主实体来维护聚合状态
- 是否需要服务层缓存热点查询
5. 处理链回滚的认知要正确
The Graph 会考虑链重组和回滚场景,但前提是你的 mapping 逻辑本身具备可重放性。
也就是说:
- 不要依赖外部不可重现状态
- 不要在 mapping 里假设“事件一旦来过就永不变化”
- 尽量让实体更新是确定性的
6. 对外暴露 API 时加限流和缓存
如果你的前端、分析后台、第三方系统都直接打 subgraph,很容易把查询打爆,尤其是复杂 GraphQL。
建议在业务服务层做:
- 热点接口缓存
- 查询白名单
- 限流
- 超时控制
- 返回字段裁剪
一个更完整的数据结构演进思路
当你从“能查”走向“好用”,通常会经历下面这个阶段演进:
flowchart TD
A[只存 Transfer 明细] --> B[增加账户累计统计]
B --> C[增加按天聚合实体]
C --> D[服务层封装排行榜/报表接口]
D --> E[接入缓存与监控]
这个演进路径很实用,因为它符合大多数项目的真实节奏:
- 第一步先验证事件能不能稳定索引
- 第二步解决地址页、排行榜页性能问题
- 第三步解决时间窗口分析
- 第四步再做真正面向业务方的接口
不要一开始就想做成“全链通用分析平台”,那样很容易 schema 设计过度。
适用边界:什么时候 The Graph 很合适,什么时候未必
很合适的场景
- 基于事件的业务数据索引
- 地址、交易、合约维度的查询接口
- 前端展示类链上数据
- 中轻量分析型 API
- 固定业务规则的聚合指标
不一定适合的场景
- 超复杂 OLAP 分析
- 跨链、跨协议超大规模关联计算
- 强实时到毫秒级的风控联动
- 需要大量 ad-hoc SQL 的离线分析
这类场景通常会把 The Graph 当成一个上游索引层,然后把数据再送入 ClickHouse、BigQuery、Spark 或自建数仓体系。
换句话说:
The Graph 很适合做“业务可查询索引层”,但不必把它当成万能分析引擎。
总结
如果你正在做链上业务分析接口,我的建议很直接:
-
先从查询需求反推 schema
- 不要只机械存事件
- 想清楚前端和业务方到底要查什么
-
先做最小可运行闭环
- 一个合约
- 一个事件
- 两个实体
- 几个 GraphQL 查询
- 先跑通,再扩展
-
把聚合控制在“够用”范围
- 高频查询指标可以在索引时维护
- 复杂报表留给下游服务
-
重视主键、起始区块、零地址、类型匹配
- 这些看似小问题,往往就是 80% 排障时间的来源
-
生产上最好包一层业务 API
- 做鉴权、缓存、限流、版本控制
- 不要让前端无限制直连复杂 GraphQL
从工程实践看,The Graph 最大的价值不是“替你查链”,而是帮你把链上原始事件整理成可持续维护、可面向业务消费的查询接口。如果你的目标是做仪表盘、地址分析页、排行榜、基础运营报表,它通常是非常高效的一层。
如果你现在正准备动手,我建议就照本文这个最小例子开始:先索引一个 ERC-20 的 Transfer,再把账户累计统计做出来。 一旦这个闭环通了,你再扩展到 DeFi 协议事件、NFT 交易、清算记录,思路其实是一样的。