跳转到内容
123xiao | 无名键客

《区块链数据索引实战:基于 The Graph 构建可查询的链上业务分析接口》

字数: 0 阅读时长: 1 分钟

区块链数据索引实战:基于 The Graph 构建可查询的链上业务分析接口

很多团队第一次做链上分析时,都会先写一个脚本:连 RPC、扫事件、存数据库、再写几个接口。起步很快,但往后通常会越来越痛苦——重放区块、处理回滚、维护游标、补历史数据、做分页查询、兼容前端筛选条件……这些事情并不“高大上”,却非常消耗精力。

这篇文章我想换一个更实战的角度:不先讲概念堆砌,而是直接带你做一个可以查询的链上业务分析接口。我们基于 The Graph 构建一个简单但完整的索引项目,把 ERC-20 转账事件整理成可查询的数据模型,并且顺手做一些常见统计字段,方便后续接 BI、大屏、风控或运营分析。

文章面向已经懂一点 Solidity、事件日志、GraphQL 的中级读者。你不需要很懂 The Graph 的全部生态,但最好知道合约事件是什么、ABI 是什么。


背景与问题

为什么“直接查链”不适合业务分析

链上原始数据当然是真实来源,但它并不适合直接拿来做业务查询。原因很简单:

  1. RPC 更擅长状态读取,不擅长复杂分析

    • 查某个地址余额可以
    • 但查“最近 30 天某代币 Top 100 活跃地址,并按净流入排序”就很痛苦
  2. 日志是按区块顺序存的,不是按业务维度组织的

    • 事件是 append-only
    • 业务查询却常常是“按地址”“按时间”“按某种聚合指标”来查
  3. 前端和分析系统需要结构化接口

    • 分页、过滤、排序、关联查询
    • 如果每个页面都直接打 RPC,延迟和复杂度都会失控

一个典型场景

假设你有一个 ERC-20 代币项目,想做这些接口:

  • 查询最近 20 条转账记录
  • 查询某个地址的转入/转出历史
  • 查询某地址累计转入、累计转出、净流入
  • 查询转账量最高的活跃地址

如果靠手搓脚本 + MySQL 当然能做,但你很快会遇到这些现实问题:

  • 历史区块怎么补?
  • 监听中断后如何续跑?
  • 链回滚怎么办?
  • 重复消费怎么去重?
  • 如何暴露统一查询接口给前端?

The Graph 的价值就在这里:它把“链上事件 -> 实体存储 -> GraphQL 查询”这条链路标准化了。


前置知识与环境准备

你需要具备

  • 会看 Solidity 合约事件
  • 知道 ERC-20 的 Transfer(address,address,uint256) 事件
  • 对 GraphQL 查询语法有基本了解
  • 本机有 Node.js、npm 或 yarn

本文使用的技术栈

  • The Graph CLI
  • Graph Node / Hosted Service / Subgraph Studio(任选部署方式)
  • AssemblyScript 编写 mapping
  • GraphQL 查询接口
  • 一个标准 ERC-20 合约

环境安装

npm install -g @graphprotocol/graph-cli

初始化 subgraph 项目时,你也可以用:

graph init --studio erc20-transfer-analytics

如果你是第一次做,我建议先在 Subgraph Studio 或可用托管环境里跑通,再考虑自建 Graph Node。自建更灵活,但维护成本也更高。


核心原理

The Graph 的核心思路可以概括为一句话:

订阅链上事件,把原始日志映射成结构化实体,再通过 GraphQL 暴露查询能力。

数据流转过程

flowchart LR
    A[区块链节点 / RPC] --> B[The Graph 监听合约事件]
    B --> C[Mapping 解析事件]
    C --> D[Entity 写入索引存储]
    D --> E[GraphQL 查询接口]
    E --> F[前端 / BI / 分析系统]

关键组成

  1. subgraph.yaml

    • 定义数据源、起始区块、ABI、事件处理函数
  2. schema.graphql

    • 定义最终要存的实体结构
  3. mapping.ts

    • 把链上事件转成实体记录

这套机制解决了什么

  • 按业务维度重组链上数据
  • 自动维护索引状态
  • 可以声明式暴露 GraphQL 查询
  • 对事件处理逻辑进行版本化管理

一个简化的执行时序

sequenceDiagram
    participant Chain as Blockchain
    participant Graph as Graph Node
    participant Map as mapping.ts
    participant Store as Store
    participant API as GraphQL API

    Chain->>Graph: 新区块 / 事件日志
    Graph->>Map: 调用事件处理函数
    Map->>Store: 创建或更新 Entity
    API->>Store: 查询 entities
    Store-->>API: 返回结构化结果

数据建模:先想清楚“查什么”,再决定“存什么”

很多人一上来就把事件原样存下来,后面再补统计字段。这样不是不行,但我个人经验是:先从查询需求倒推 schema,会少走很多弯路。

本文我们定义两个实体:

  • TransferRecord:每一笔转账明细
  • AccountStat:每个地址的累计统计

schema.graphql

type TransferRecord @entity {
  id: ID!
  txHash: Bytes!
  logIndex: BigInt!
  blockNumber: BigInt!
  timestamp: BigInt!
  token: Bytes!
  from: Bytes!
  to: Bytes!
  value: BigInt!
}

type AccountStat @entity {
  id: ID!
  address: Bytes!
  totalIn: BigInt!
  totalOut: BigInt!
  transferInCount: BigInt!
  transferOutCount: BigInt!
  netFlow: BigInt!
  updatedAt: BigInt!
}

为什么这样设计

TransferRecord.id

一笔链上事件最稳妥的唯一标识,通常用:

  • txHash-logIndex

因为同一笔交易里可能有多个 Transfer,只用 txHash 会冲突。

AccountStat

这个实体是典型的“索引时预聚合”。

它的好处是:

  • 查询某地址统计时不用每次扫全量明细
  • 排行榜类接口也更容易做
  • 前端打开地址页会更快

当然,边界也要知道:

  • 如果你需要窗口期统计(如最近 7 天),只存累计值不够
  • 这时还要结合明细表或额外做周期聚合实体

实战代码(可运行)

下面我们做一个最小可运行版本,索引某 ERC-20 合约的 Transfer 事件。

示例代码基于 The Graph 常见项目结构,重点展示核心文件。


第一步:定义 subgraph.yaml

specVersion: 1.0.0
schema:
  file: ./schema.graphql

dataSources:
  - kind: ethereum
    name: ERC20Token
    network: mainnet
    source:
      address: "0xA0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
      abi: ERC20
      startBlock: 6082465
    mapping:
      kind: ethereum/events
      apiVersion: 0.0.7
      language: wasm/assemblyscript
      entities:
        - TransferRecord
        - AccountStat
      abis:
        - name: ERC20
          file: ./abis/ERC20.json
      eventHandlers:
        - event: Transfer(indexed address,indexed address,uint256)
          handler: handleTransfer
      file: ./src/mapping.ts

这里用的是 USDC 主网地址作为示例。你在实际项目里换成自己的合约地址即可。

几个关键字段解释

  • startBlock
    • 非常重要
    • 不要从创世块开始扫,能明确起点就明确起点
  • eventHandlers
    • 把合约事件绑定到 mapping 函数
  • network
    • 要和实际部署链一致

第二步:准备 ABI

你需要在 ./abis/ERC20.json 放入合约 ABI,最少包含 Transfer 事件定义。

简化后的 ABI 片段如下:

[
  {
    "anonymous": false,
    "inputs": [
      { "indexed": true, "internalType": "address", "name": "from", "type": "address" },
      { "indexed": true, "internalType": "address", "name": "to", "type": "address" },
      { "indexed": false, "internalType": "uint256", "name": "value", "type": "uint256" }
    ],
    "name": "Transfer",
    "type": "event"
  }
]

第三步:编写 mapping.ts

这是最核心的部分。我们在这里把事件映射成实体,并维护账户统计。

import { BigInt } from "@graphprotocol/graph-ts"
import { Transfer } from "../generated/ERC20Token/ERC20"
import { TransferRecord, AccountStat } from "../generated/schema"

function getOrCreateAccountStat(accountId: string, timestamp: BigInt): AccountStat {
  let stat = AccountStat.load(accountId)

  if (stat == null) {
    stat = new AccountStat(accountId)
    stat.address = Bytes.fromHexString(accountId) as Bytes
    stat.totalIn = BigInt.zero()
    stat.totalOut = BigInt.zero()
    stat.transferInCount = BigInt.zero()
    stat.transferOutCount = BigInt.zero()
    stat.netFlow = BigInt.zero()
    stat.updatedAt = timestamp
  }

  return stat as AccountStat
}

export function handleTransfer(event: Transfer): void {
  let id = event.transaction.hash.toHexString() + "-" + event.logIndex.toString()

  let record = new TransferRecord(id)
  record.txHash = event.transaction.hash
  record.logIndex = event.logIndex
  record.blockNumber = event.block.number
  record.timestamp = event.block.timestamp
  record.token = event.address
  record.from = event.params.from
  record.to = event.params.to
  record.value = event.params.value
  record.save()

  let fromId = event.params.from.toHexString()
  let fromStat = getOrCreateAccountStat(fromId, event.block.timestamp)
  fromStat.totalOut = fromStat.totalOut.plus(event.params.value)
  fromStat.transferOutCount = fromStat.transferOutCount.plus(BigInt.fromI32(1))
  fromStat.netFlow = fromStat.totalIn.minus(fromStat.totalOut)
  fromStat.updatedAt = event.block.timestamp
  fromStat.save()

  let toId = event.params.to.toHexString()
  let toStat = getOrCreateAccountStat(toId, event.block.timestamp)
  toStat.totalIn = toStat.totalIn.plus(event.params.value)
  toStat.transferInCount = toStat.transferInCount.plus(BigInt.fromI32(1))
  toStat.netFlow = toStat.totalIn.minus(toStat.totalOut)
  toStat.updatedAt = event.block.timestamp
  toStat.save()
}

注意一个容易忽略的问题

上面代码里用到了 Bytes,因此完整导入应为:

import { BigInt, Bytes } from "@graphprotocol/graph-ts"

也就是说,你的最终可运行版本应使用下面这份:

import { BigInt, Bytes } from "@graphprotocol/graph-ts"
import { Transfer } from "../generated/ERC20Token/ERC20"
import { TransferRecord, AccountStat } from "../generated/schema"

function getOrCreateAccountStat(accountId: string, timestamp: BigInt): AccountStat {
  let stat = AccountStat.load(accountId)

  if (stat == null) {
    stat = new AccountStat(accountId)
    stat.address = Bytes.fromHexString(accountId) as Bytes
    stat.totalIn = BigInt.zero()
    stat.totalOut = BigInt.zero()
    stat.transferInCount = BigInt.zero()
    stat.transferOutCount = BigInt.zero()
    stat.netFlow = BigInt.zero()
    stat.updatedAt = timestamp
  }

  return stat as AccountStat
}

export function handleTransfer(event: Transfer): void {
  let id = event.transaction.hash.toHexString() + "-" + event.logIndex.toString()

  let record = new TransferRecord(id)
  record.txHash = event.transaction.hash
  record.logIndex = event.logIndex
  record.blockNumber = event.block.number
  record.timestamp = event.block.timestamp
  record.token = event.address
  record.from = event.params.from
  record.to = event.params.to
  record.value = event.params.value
  record.save()

  let fromId = event.params.from.toHexString()
  let fromStat = getOrCreateAccountStat(fromId, event.block.timestamp)
  fromStat.totalOut = fromStat.totalOut.plus(event.params.value)
  fromStat.transferOutCount = fromStat.transferOutCount.plus(BigInt.fromI32(1))
  fromStat.netFlow = fromStat.totalIn.minus(fromStat.totalOut)
  fromStat.updatedAt = event.block.timestamp
  fromStat.save()

  let toId = event.params.to.toHexString()
  let toStat = getOrCreateAccountStat(toId, event.block.timestamp)
  toStat.totalIn = toStat.totalIn.plus(event.params.value)
  toStat.transferInCount = toStat.transferInCount.plus(BigInt.fromI32(1))
  toStat.netFlow = toStat.totalIn.minus(toStat.totalOut)
  toStat.updatedAt = event.block.timestamp
  toStat.save()
}

第四步:生成代码与构建

graph codegen
graph build

如果 schema、ABI、mapping 有不一致,这一步通常就会报错。我的建议是不要怕报错,第一次搭建时,报错是最快的老师


第五步:部署 subgraph

如果你使用 Studio,通常流程类似:

graph auth --studio <DEPLOY_KEY>
graph deploy --studio erc20-transfer-analytics

具体命令可能随版本略有变化,以你本地 CLI 提示为准。


第六步:通过 GraphQL 查询数据

查询最近 10 条转账记录

{
  transferRecords(first: 10, orderBy: timestamp, orderDirection: desc) {
    id
    txHash
    from
    to
    value
    blockNumber
    timestamp
  }
}

查询某地址统计

{
  accountStats(where: { id: "0x55fe002aeff02f77364de339a1292923a15844b8" }) {
    id
    address
    totalIn
    totalOut
    transferInCount
    transferOutCount
    netFlow
    updatedAt
  }
}

查询净流入最高的地址

{
  accountStats(first: 20, orderBy: netFlow, orderDirection: desc) {
    id
    totalIn
    totalOut
    netFlow
  }
}

逐步验证清单

我很建议你不要一口气写完再部署,而是按下面顺序逐步验证:

验证 1:事件能否被正确捕获

  • 部署后先查 transferRecords
  • 看是否有数据返回
  • 看区块高度是否在持续推进

验证 2:唯一键是否正确

  • 检查是否有 id 冲突
  • 确认同一交易多个日志不会覆盖

验证 3:统计值是否正确

  • 随机找一个地址
  • 抽几条链上交易手工核对 totalIn / totalOut

验证 4:排序和过滤是否符合预期

  • orderBy: timestamp
  • where: { from: ... }where: { to: ... }

进阶:把查询接口包装成业务 API

虽然 The Graph 已经能直接提供 GraphQL,但在实际项目里,你通常还会加一层后端服务:

  • 统一鉴权
  • 聚合多个 subgraph 数据
  • 做缓存
  • 做字段兼容和业务语义转换

一个简单的 Node.js 查询示例如下:

const query = `
{
  accountStats(first: 5, orderBy: netFlow, orderDirection: desc) {
    id
    totalIn
    totalOut
    netFlow
  }
}
`;

async function fetchTopAccounts() {
  const res = await fetch("https://api.thegraph.com/subgraphs/name/your-subgraph", {
    method: "POST",
    headers: {
      "Content-Type": "application/json"
    },
    body: JSON.stringify({ query })
  });

  const data = await res.json();
  console.log(JSON.stringify(data, null, 2));
}

fetchTopAccounts().catch(console.error);

如果你要对外提供 REST 风格接口,也可以在服务层再包一层:

import express from "express";

const app = express();

app.get("/api/top-netflow", async (req, res) => {
  const query = `
  {
    accountStats(first: 10, orderBy: netFlow, orderDirection: desc) {
      id
      totalIn
      totalOut
      netFlow
    }
  }`;

  const response = await fetch("https://api.thegraph.com/subgraphs/name/your-subgraph", {
    method: "POST",
    headers: {
      "Content-Type": "application/json"
    },
    body: JSON.stringify({ query })
  });

  const result = await response.json();
  res.json(result.data.accountStats);
});

app.listen(3000, () => {
  console.log("server started at http://localhost:3000");
});

这样前端就不必直接面对 GraphQL,也更方便做权限控制和缓存。


常见坑与排查

这一节非常重要。The Graph 最花时间的,往往不是“写出来”,而是“为什么没出数据”。

1. startBlock 设置过早,导致同步极慢

现象

  • 部署成功,但长时间看不到结果
  • 同步高度推进很慢

原因

从非常早的区块开始扫描,尤其是热门合约,历史日志量巨大。

处理建议

  • 尽量查到合约部署块
  • 或至少从业务真实起始块开始
  • 先用较近区块验证功能,再回补历史

2. id 设计不对,导致数据覆盖

现象

  • 查询记录数偏少
  • 明明一笔交易里有多个事件,最后只保留一条

原因

只用 txHash 作为主键。

正确做法

使用:

let id = event.transaction.hash.toHexString() + "-" + event.logIndex.toString()

3. schema 类型与 mapping 赋值不匹配

现象

  • graph codegengraph build 报类型错误
  • 部署后运行时报实体字段异常

常见例子

  • BytesString 混用
  • BigInt 字段赋了普通数字
  • 可空字段和非空字段不一致

排查方法

逐个对照:

  • schema.graphql
  • ABI 生成类型
  • mapping.ts 的字段赋值

4. 事件签名写错

现象

  • subgraph 正常部署
  • 但就是没有数据

原因

subgraph.yaml 里的事件签名必须和 ABI 完全一致。

例如 ERC-20 转账事件必须写成:

- event: Transfer(indexed address,indexed address,uint256)
  handler: handleTransfer

多一个空格少一个类型,都可能导致匹配失败。


5. 忽略零地址场景,统计被污染

ERC-20 的 Transfer 事件里,零地址通常意味着:

  • from == 0x000...000:铸造
  • to == 0x000...000:销毁

如果你把零地址也当普通账户统计,排行榜和净流入结果会非常怪。

改进思路

let zeroAddress = "0x0000000000000000000000000000000000000000"

if (fromId != zeroAddress) {
  // 更新 from 统计
}

if (toId != zeroAddress) {
  // 更新 to 统计
}

这属于我当时踩过的一个典型坑:数据全都有,但一做业务榜单就发现“零地址”永远名列前茅。


6. 查询不到最新数据,不一定是索引坏了

有时是这些原因:

  • subgraph 还在同步
  • 你的查询 endpoint 指向了旧版本
  • 前端做了缓存
  • 排序字段不是最新字段

建议优先确认:

  • 当前同步高度
  • 最新实体的 blockNumber
  • 是否部署到了正确环境

安全/性能最佳实践

The Graph 解决的是索引和查询,不代表你可以完全不考虑安全和性能。下面这些建议,是真正在生产里比较有用的。

1. 索引逻辑尽量“纯函数化”

mapping 中不要做复杂外部依赖假设,尽量只依赖:

  • 当前事件
  • 当前区块上下文
  • 已存实体状态

这样回放和重建时结果更稳定。


2. 谨慎做高成本聚合

如果你在每个事件里都更新大量实体,会明显拖慢同步。

例如一个事件发生时:

  • 更新明细 1 条
  • 更新账户 2 条
  • 再更新日统计、周统计、月统计、排行榜缓存……

这样很容易把 mapping 写得过重。

建议

优先级可以这样排:

  1. 必要明细
  2. 高频查询所需的轻量聚合
  3. 复杂报表交给下游服务二次加工

3. 控制实体设计复杂度

不要把一个实体做成“万能表”。
更稳妥的思路是:

  • 明细实体负责还原事实
  • 聚合实体负责服务查询场景

这比把几十个业务字段全塞进一张实体里更容易维护。


4. 为分页和排序预先设计查询路径

GraphQL 查询很灵活,但不是所有组合都高效。

例如你经常要做:

  • 按时间倒序翻页
  • 按地址过滤
  • 查某段时间内的数据

那 schema 设计时就应考虑:

  • 是否需要单独的日维度实体
  • 是否需要把地址作为主实体来维护聚合状态
  • 是否需要服务层缓存热点查询

5. 处理链回滚的认知要正确

The Graph 会考虑链重组和回滚场景,但前提是你的 mapping 逻辑本身具备可重放性。

也就是说:

  • 不要依赖外部不可重现状态
  • 不要在 mapping 里假设“事件一旦来过就永不变化”
  • 尽量让实体更新是确定性的

6. 对外暴露 API 时加限流和缓存

如果你的前端、分析后台、第三方系统都直接打 subgraph,很容易把查询打爆,尤其是复杂 GraphQL。

建议在业务服务层做:

  • 热点接口缓存
  • 查询白名单
  • 限流
  • 超时控制
  • 返回字段裁剪

一个更完整的数据结构演进思路

当你从“能查”走向“好用”,通常会经历下面这个阶段演进:

flowchart TD
    A[只存 Transfer 明细] --> B[增加账户累计统计]
    B --> C[增加按天聚合实体]
    C --> D[服务层封装排行榜/报表接口]
    D --> E[接入缓存与监控]

这个演进路径很实用,因为它符合大多数项目的真实节奏:

  • 第一步先验证事件能不能稳定索引
  • 第二步解决地址页、排行榜页性能问题
  • 第三步解决时间窗口分析
  • 第四步再做真正面向业务方的接口

不要一开始就想做成“全链通用分析平台”,那样很容易 schema 设计过度。


适用边界:什么时候 The Graph 很合适,什么时候未必

很合适的场景

  • 基于事件的业务数据索引
  • 地址、交易、合约维度的查询接口
  • 前端展示类链上数据
  • 中轻量分析型 API
  • 固定业务规则的聚合指标

不一定适合的场景

  • 超复杂 OLAP 分析
  • 跨链、跨协议超大规模关联计算
  • 强实时到毫秒级的风控联动
  • 需要大量 ad-hoc SQL 的离线分析

这类场景通常会把 The Graph 当成一个上游索引层,然后把数据再送入 ClickHouse、BigQuery、Spark 或自建数仓体系。

换句话说:

The Graph 很适合做“业务可查询索引层”,但不必把它当成万能分析引擎。


总结

如果你正在做链上业务分析接口,我的建议很直接:

  1. 先从查询需求反推 schema

    • 不要只机械存事件
    • 想清楚前端和业务方到底要查什么
  2. 先做最小可运行闭环

    • 一个合约
    • 一个事件
    • 两个实体
    • 几个 GraphQL 查询
    • 先跑通,再扩展
  3. 把聚合控制在“够用”范围

    • 高频查询指标可以在索引时维护
    • 复杂报表留给下游服务
  4. 重视主键、起始区块、零地址、类型匹配

    • 这些看似小问题,往往就是 80% 排障时间的来源
  5. 生产上最好包一层业务 API

    • 做鉴权、缓存、限流、版本控制
    • 不要让前端无限制直连复杂 GraphQL

从工程实践看,The Graph 最大的价值不是“替你查链”,而是帮你把链上原始事件整理成可持续维护、可面向业务消费的查询接口。如果你的目标是做仪表盘、地址分析页、排行榜、基础运营报表,它通常是非常高效的一层。

如果你现在正准备动手,我建议就照本文这个最小例子开始:先索引一个 ERC-20 的 Transfer,再把账户累计统计做出来。 一旦这个闭环通了,你再扩展到 DeFi 协议事件、NFT 交易、清算记录,思路其实是一样的。


分享到:

下一篇
《Web逆向实战:中级开发者如何定位并复现前端签名算法实现接口自动化调用》