Web3 中间件实战:用 The Graph 与 Ethers.js 构建可扩展的链上数据查询服务
在很多 Web3 项目里,真正把产品做“顺手”的,不只是合约本身,而是链上数据怎么被稳定、快速、可扩展地读出来。
如果你直接拿 ethers.js 去扫链上历史事件,一开始会觉得很直接:连 RPC、读合约、拉事件、拼结果,几百行代码就能跑起来。但数据量一大,问题就来了:
- 历史区块扫描很慢
- RPC 频率限制容易触发
- 分页、过滤、聚合都得自己做
- 前端每次都直连链,体验不稳定
- 重组(reorg)和索引一致性处理麻烦
这时候,The Graph 负责“索引”,Ethers.js 负责“实时补充与链上校验”,是一个非常实用的组合:
前者擅长把事件数据整理成结构化查询接口,后者擅长做链上实时读取、状态校验和回退兜底。把两者放进一个中间件里,基本就是很多生产系统的常见做法。
这篇文章我会带你做一个完整的实战:构建一个ERC-20 Transfer 查询服务,支持:
- 用 The Graph 查询历史转账记录
- 用 Ethers.js 补充最新链上余额
- 通过 Node.js 暴露统一 API
- 具备基础的错误处理、性能优化和排查思路
我会尽量按“带你做一遍”的方式来写,而不是只讲概念。
背景与问题
假设你要做一个钱包、资产看板或者链上分析后台,用户会提这样的需求:
- 查询某地址最近的转账记录
- 查询某代币在某账户上的余额
- 支持按区块范围、时间范围、代币地址过滤
- 尽可能快,最好前端一打开页面就有结果
如果完全依赖 ethers.js:
provider.getLogs()在大范围区块查询时容易慢- 不同 RPC 节点对日志返回条数有限制
- 你需要自己维护事件解析、分页游标、排序逻辑
- 复杂查询(比如“某地址既是 from 又是 to 的记录”)会很痛苦
如果完全依赖 The Graph:
- 子图索引有延迟,不一定是最新块
- 某些动态链上状态(如
balanceOf)最好直接读链 - 子图 schema 设计不当,后期改动成本大
所以更合理的思路是:
- 历史数据、可检索数据:交给 The Graph
- 实时状态、强一致读取:交给 Ethers.js
- 统一服务出口:用 Node.js 中间层对外提供 API
这层中间件的价值,在于把“索引查询”和“链上直读”隔离开,让前端和业务代码不必知道底层差异。
核心原理
先看整体结构。
flowchart LR
A[Smart Contract / ERC20] --> B[Blockchain Events]
B --> C[The Graph Indexer]
C --> D[GraphQL Query API]
A --> E[Ethers.js RPC Reader]
D --> F[Node.js Middleware]
E --> F
F --> G[Frontend / Backend Consumer]
这个架构里有三个关键角色:
1. The Graph:做“离线整理后的可查询索引”
The Graph 通过监听链上事件,把事件转换成你定义好的实体,例如:
TransferAccountToken
然后暴露 GraphQL API,让你可以像查数据库一样查链上历史数据。
这特别适合:
- 历史转账记录
- 排序和分页
- 复杂过滤
- 聚合前的数据准备
2. Ethers.js:做“最新状态读取”和“校验”
Ethers.js 直接连 RPC 节点,适合读取:
balanceOfsymboldecimalstotalSupply- 最新块高
- 某一笔交易收据
也适合做兜底逻辑,例如:
- The Graph 暂时没同步到最新块
- 需要校验某条记录对应交易是否真的上链
- 需要读取合约当前实时状态
3. Node.js 中间件:做“统一出口”
中间件负责:
- 封装 GraphQL 查询
- 封装链上读取
- 合并与格式化返回
- 缓存热点数据
- 对错误做降级处理
这层很关键,因为它让前端不必同时处理 GraphQL 和 RPC 两套逻辑。
前置知识与环境准备
为了能直接跟着做,先准备下面这些东西:
- Node.js 18+
- npm 或 pnpm
- 一个可访问的以太坊 RPC URL
- 一个可用的 The Graph 子图接口
- 基础了解:
- Solidity 事件
- ERC-20 标准
- GraphQL 基础查询
- JavaScript/Node.js 异步编程
本文示例会假设你已经有一个可查询 ERC-20 Transfer 的子图。
如果你自己维护子图,后面我也会给出最小 schema 和 mapping 示例。
初始化项目:
mkdir web3-middleware-demo
cd web3-middleware-demo
npm init -y
npm install express ethers graphql-request dotenv
项目结构先保持简单:
.
├── src
│ ├── app.js
│ ├── graphClient.js
│ ├── chainClient.js
│ └── service.js
├── .env
└── package.json
.env 示例:
PORT=3000
RPC_URL=https://mainnet.infura.io/v3/YOUR_KEY
GRAPH_URL=https://api.thegraph.com/subgraphs/name/YOUR_SUBGRAPH
TOKEN_ADDRESS=0x0000000000000000000000000000000000000000
核心原理拆解:一次查询是怎么流动的
这里用一个时序图看得更清楚。
sequenceDiagram
participant U as User
participant API as Node Middleware
participant G as The Graph
participant R as Ethereum RPC
participant T as ERC20 Contract
U->>API: GET /address/:account/transfers
API->>G: GraphQL 查询历史 Transfer
G-->>API: 返回结构化历史记录
API->>R: 读取最新区块 / balanceOf
R->>T: eth_call
T-->>R: 返回余额
R-->>API: 返回链上实时数据
API-->>U: 合并后的统一响应
这个流程里最重要的设计点有两个:
- 历史查询和实时读取职责分离
- API 返回统一格式,屏蔽底层来源差异
这样你后续想替换成别的索引器,或者增加缓存层,都不会影响调用方。
一个最小可用的子图设计
如果你还没有子图,先看一个最小模型。我们只关心 ERC-20 的转账事件。
GraphQL Schema
type Transfer @entity(immutable: true) {
id: ID!
from: Bytes!
to: Bytes!
value: BigInt!
blockNumber: BigInt!
blockTimestamp: BigInt!
transactionHash: Bytes!
}
type Account @entity {
id: ID!
sentTransfers: [Transfer!]! @derivedFrom(field: "from")
receivedTransfers: [Transfer!]! @derivedFrom(field: "to")
}
Subgraph 事件配置示意
specVersion: 0.0.5
schema:
file: ./schema.graphql
dataSources:
- kind: ethereum
name: ERC20
network: mainnet
source:
address: "0xYourTokenAddress"
abi: ERC20
startBlock: 18000000
mapping:
kind: ethereum/events
apiVersion: 0.0.7
language: wasm/assemblyscript
entities:
- Transfer
- Account
abis:
- name: ERC20
file: ./abis/ERC20.json
eventHandlers:
- event: Transfer(indexed address,indexed address,uint256)
handler: handleTransfer
file: ./src/mapping.ts
Mapping 示例
import { Transfer as TransferEvent } from "../generated/ERC20/ERC20"
import { Transfer, Account } from "../generated/schema"
export function handleTransfer(event: TransferEvent): void {
let transfer = new Transfer(
event.transaction.hash.toHexString() + "-" + event.logIndex.toString()
)
transfer.from = event.params.from
transfer.to = event.params.to
transfer.value = event.params.value
transfer.blockNumber = event.block.number
transfer.blockTimestamp = event.block.timestamp
transfer.transactionHash = event.transaction.hash
transfer.save()
let fromAccount = Account.load(event.params.from.toHexString())
if (fromAccount == null) {
fromAccount = new Account(event.params.from.toHexString())
fromAccount.save()
}
let toAccount = Account.load(event.params.to.toHexString())
if (toAccount == null) {
toAccount = new Account(event.params.to.toHexString())
toAccount.save()
}
}
上面这个模型非常朴素,但已经足够支撑很多中间件场景。
生产里如果要做多 token、多链、价格关联、用户画像,schema 会更复杂,但思路一样。
实战代码(可运行)
下面开始搭建 Node.js 中间件。目标很明确:
- 提供
GET /address/:account/transfers - 返回某地址的历史转账 + 实时余额
- 数据源来自 The Graph 和 Ethers.js
1)封装 Graph 查询客户端
src/graphClient.js
const { GraphQLClient, gql } = require("graphql-request");
const graphClient = new GraphQLClient(process.env.GRAPH_URL);
async function queryTransfersByAccount(account, first = 10, skip = 0) {
const normalized = account.toLowerCase();
const query = gql`
query GetTransfers($account: Bytes!, $first: Int!, $skip: Int!) {
sent: transfers(
where: { from: $account }
first: $first
skip: $skip
orderBy: blockTimestamp
orderDirection: desc
) {
id
from
to
value
blockNumber
blockTimestamp
transactionHash
}
received: transfers(
where: { to: $account }
first: $first
skip: $skip
orderBy: blockTimestamp
orderDirection: desc
) {
id
from
to
value
blockNumber
blockTimestamp
transactionHash
}
}
`;
return graphClient.request(query, {
account: normalized,
first,
skip,
});
}
module.exports = {
queryTransfersByAccount,
};
这里用了两个查询字段:
sentreceived
这样我们就能把“作为发送方”和“作为接收方”的记录一起拿回来,再在服务层统一排序。
2)封装 Ethers.js 链上读取
src/chainClient.js
const { ethers } = require("ethers");
const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
const erc20Abi = [
"function balanceOf(address owner) view returns (uint256)",
"function symbol() view returns (string)",
"function decimals() view returns (uint8)",
];
function getTokenContract() {
return new ethers.Contract(process.env.TOKEN_ADDRESS, erc20Abi, provider);
}
async function getTokenBalance(account) {
const contract = getTokenContract();
const [balance, symbol, decimals, blockNumber] = await Promise.all([
contract.balanceOf(account),
contract.symbol(),
contract.decimals(),
provider.getBlockNumber(),
]);
return {
raw: balance.toString(),
formatted: ethers.formatUnits(balance, decimals),
symbol,
decimals,
blockNumber,
};
}
module.exports = {
getTokenBalance,
provider,
};
这里我用了 Promise.all 并发读取,这个小优化很值,尤其是你要同时读多个链上字段的时候。
3)服务层:合并、排序、格式化
src/service.js
const { queryTransfersByAccount } = require("./graphClient");
const { getTokenBalance } = require("./chainClient");
function normalizeTransfer(item, direction) {
return {
id: item.id,
direction,
from: item.from,
to: item.to,
value: item.value,
blockNumber: Number(item.blockNumber),
blockTimestamp: Number(item.blockTimestamp),
transactionHash: item.transactionHash,
};
}
async function getAddressDashboard(account, first = 10, skip = 0) {
const [graphData, balanceData] = await Promise.all([
queryTransfersByAccount(account, first, skip),
getTokenBalance(account),
]);
const transfers = [
...graphData.sent.map((item) => normalizeTransfer(item, "sent")),
...graphData.received.map((item) => normalizeTransfer(item, "received")),
].sort((a, b) => b.blockTimestamp - a.blockTimestamp);
return {
account,
balance: balanceData,
transfers,
meta: {
source: {
history: "the-graph",
balance: "ethers-rpc",
},
count: transfers.length,
},
};
}
module.exports = {
getAddressDashboard,
};
这一步就是中间件最核心的价值:把不同来源的数据揉成一个业务友好的对象。
4)暴露 API
src/app.js
require("dotenv").config();
const express = require("express");
const { ethers } = require("ethers");
const { getAddressDashboard } = require("./service");
const app = express();
const port = process.env.PORT || 3000;
app.get("/health", (req, res) => {
res.json({ ok: true });
});
app.get("/address/:account/transfers", async (req, res) => {
try {
const { account } = req.params;
const first = Number(req.query.first || 10);
const skip = Number(req.query.skip || 0);
if (!ethers.isAddress(account)) {
return res.status(400).json({ error: "invalid address" });
}
const data = await getAddressDashboard(account, first, skip);
res.json(data);
} catch (error) {
console.error("API error:", error);
res.status(500).json({
error: "internal server error",
message: error.message,
});
}
});
app.listen(port, () => {
console.log(`Server running at http://localhost:${port}`);
});
运行:
node src/app.js
测试:
curl "http://localhost:3000/address/0x0000000000000000000000000000000000000000/transfers?first=5&skip=0"
一次结果应该长什么样
一个典型返回可能是这样:
{
"account": "0x0000000000000000000000000000000000000000",
"balance": {
"raw": "123450000000000000000",
"formatted": "123.45",
"symbol": "TKN",
"decimals": 18,
"blockNumber": 20500001
},
"transfers": [
{
"id": "0xabc-1",
"direction": "received",
"from": "0x111",
"to": "0x0000000000000000000000000000000000000000",
"value": "1000000000000000000",
"blockNumber": 20499990,
"blockTimestamp": 1720000000,
"transactionHash": "0xabc"
}
],
"meta": {
"source": {
"history": "the-graph",
"balance": "ethers-rpc"
},
"count": 1
}
}
这类格式前端一般很好接,因为:
transfers已经是统一列表direction已经标注balance已经格式化meta明确说明了数据来源
逐步验证清单
如果你是边学边做,建议按这个顺序验证,不要一上来就怀疑“是不是 The Graph 有问题”。
第一步:验证 RPC 通不通
单独测试 ethers.js:
const { ethers } = require("ethers");
async function main() {
const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
const block = await provider.getBlockNumber();
console.log("latest block:", block);
}
main().catch(console.error);
如果这一步都不通,先别看子图。
第二步:验证 GraphQL 查询能返回
直接写一个最小测试:
require("dotenv").config();
const { GraphQLClient, gql } = require("graphql-request");
async function main() {
const client = new GraphQLClient(process.env.GRAPH_URL);
const query = gql`
query {
transfers(first: 1) {
id
from
to
value
}
}
`;
const data = await client.request(query);
console.log(JSON.stringify(data, null, 2));
}
main().catch(console.error);
如果这里报错,优先检查:
- GRAPH_URL 是否正确
- schema 里实体名是不是
transfers where参数里的字段类型是否匹配
第三步:验证合并逻辑
只测 service 层:
require("dotenv").config();
const { getAddressDashboard } = require("./src/service");
async function main() {
const account = "0x0000000000000000000000000000000000000000";
const data = await getAddressDashboard(account, 5, 0);
console.log(JSON.stringify(data, null, 2));
}
main().catch(console.error);
常见坑与排查
这一部分我尽量写得接地气一点,因为这些问题真的是最容易把时间耗光的地方。
坑 1:The Graph 查不到数据,但链上明明有交易
常见原因
- 子图的
startBlock设得太晚 - 合约地址配置错了
- 事件签名写错
- 子图还没同步到目标区块
- 查询的地址大小写或格式不一致
排查方式
先查子图最新同步块高。不同部署环境方式略有不同,但核心思路是确认:
- 当前索引到了哪个区块
- 目标交易发生在哪个区块
如果目标交易区块 > 子图当前同步区块,那不是没数据,是还没同步到。
坑 2:GraphQL 查询字段类型不匹配
比如 Bytes!、BigInt! 这些类型,如果你用错变量类型,会直接报错。
例子
错误写法:
query GetTransfers($account: String!) {
transfers(where: { from: $account }) {
id
}
}
正确写法:
query GetTransfers($account: Bytes!) {
transfers(where: { from: $account }) {
id
}
}
经验建议
我自己一般会把 GraphQL 查询先在 Playground 里跑通,再搬到代码里,不然变量和 schema 对不上时很烦。
坑 3:Ethers v5 和 v6 API 混用
这几年很多示例代码还停留在 v5,但新项目往往装的是 v6。
两者在不少细节上不一样,比如:
ethers.providers.JsonRpcProvidervsethers.JsonRpcProviderutils.formatUnitsvsethers.formatUnits
如果你复制示例时没注意版本,很容易报 undefined。
建议
确认本地版本:
npm list ethers
然后统一按一个版本写,不要混搭。
坑 4:分页逻辑不稳定
The Graph 常见分页方式是 first + skip,在数据量很大时,skip 越大越慢,且数据变动时可能出现重复或漏项。
建议
中小规模可以先用 skip。
如果你的数据量非常大,建议改成基于游标或基于字段范围分页,比如按:
blockTimestamp_ltid_gt
来做增量翻页。
坑 5:历史记录和实时余额对不上
这很正常,不一定是 bug。
可能原因
- The Graph 还没同步到最新块
- 余额是实时读链,转账列表是历史索引
- 链发生了短暂 reorg
- 合约存在 mint/burn 等非普通 transfer 逻辑
处理建议
在 API 返回里明确标注:
- 历史数据来源
- 实时余额来源
- 对应块高
这样前端和调用方就知道这个“不一致”是可解释的,而不是黑盒错误。
安全/性能最佳实践
到这里功能已经能跑了,但如果想真正拿去做生产中间件,下面这些点非常关键。
1. 不要让前端直接拼 GraphQL 查询
很多人为了省事,前端直连 The Graph。能不能做?能。
但一旦查询复杂、需要鉴权、需要合并链上数据,最好还是走中间件。
原因很简单:
- 避免暴露底层接口细节
- 方便做缓存、限流、审计
- 方便热修复查询逻辑
- 统一错误格式
2. 对热点读取做缓存
例如:
symboldecimals- 某地址首页数据
- 最近区块范围内的热门代币转账
最简单可以先做进程内缓存,生产里再接 Redis。
缓存策略建议:
- 代币元信息:长 TTL
- 余额:短 TTL,5~15 秒
- 历史转账:按查询参数缓存 15~60 秒
3. 控制查询规模
不要让调用方随意传一个 first=10000。
这是很典型的中间件自杀式接口设计。
建议限制:
first最大 100 或 200skip最大某个阈值- 地址参数必须校验
- 可选加 API Key 和 rate limit
4. 明确处理 reorg 与最终一致性
链上系统不是传统数据库,没有“刚写完就永远稳定”的保证。
尤其是新区块附近的数据,可能受重组影响。
实践建议
- 历史查询尽量以“已确认块”为准
- 对最新几个块的数据做“软展示”
- 返回
indexedBlock或latestRpcBlock - 对关键业务增加二次校验
5. 批量化与并发控制
如果一个页面要查 100 个地址余额,不要简单 Promise.all(100个RPC) 就结束了。
这样很可能把自己的 RPC 打爆。
更好的做法
- 做并发池
- 批量聚合请求
- 热点地址缓存
- 尽可能减少重复
symbol/decimals读取
6. 错误降级
一个成熟的中间件,不应该因为 The Graph 暂时超时,就整个接口直接不可用。
建议的降级策略
- The Graph 超时:返回余额 + 空历史记录
- RPC 超时:返回历史记录 + 标记余额不可用
- 两者都失败:返回统一错误码和可观测日志
可以把可用性优先级设计成这样:
stateDiagram-v2
[*] --> FullData
FullData --> HistoryOnly: RPC timeout
FullData --> BalanceOnly: Graph timeout
HistoryOnly --> PartialResponse
BalanceOnly --> PartialResponse
PartialResponse --> [*]
这类降级设计非常实用,尤其在公共 RPC 或第三方索引服务波动时。
进阶扩展:从 Demo 到可扩展服务
如果你要把这个方案继续做大,可以往这几个方向演进。
多链支持
抽象一个 chainId 参数:
- 不同链使用不同 RPC URL
- 不同链使用不同子图 endpoint
- 中间件统一输出格式
这样一个 API 就能支持 Ethereum、Arbitrum、Polygon 等网络。
多合约支持
如果不是只查一个 token,而是多个 token,可以把 TOKEN_ADDRESS 从环境变量变成接口参数,但一定要加白名单,不要允许用户任意打未知合约。
丰富查询能力
你可以继续扩展:
- 按时间范围查询
- 按 token 查询
- 按方向过滤 sent/received
- 返回交易状态与 gas 信息
- 聚合每日转账量
这些都适合放在中间件里,而不是让前端自己拼。
一个更稳妥的接口设计建议
真实项目里,我更推荐把接口拆成两个:
GET /address/:account/transfersGET /address/:account/balance
然后再加一个聚合接口:
GET /address/:account/dashboard
原因是:
- 单一接口更容易缓存
- 排查问题时更容易定位到底是 Graph 挂了还是 RPC 挂了
- 聚合接口可以复用底层服务函数
也就是说,聚合是面向产品的,拆分是面向工程的。
总结
如果把这篇文章压缩成一句话,那就是:
The Graph 负责把链上历史事件“整理成可查数据”,Ethers.js 负责把链上实时状态“直接读出来”,Node.js 中间件负责把两者变成一个稳定、可扩展的服务接口。
本文我们做了这些事:
- 明确了为什么不能只靠 RPC 扫链
- 梳理了 The Graph + Ethers.js 的职责边界
- 给出了一个最小可用的子图模型
- 实现了可运行的 Node.js 查询中间件
- 讨论了分页、同步延迟、版本差异、reorg 等常见坑
- 给出了缓存、限流、降级、并发控制等生产建议
如果你准备把它落到项目里,我建议按下面的优先级推进:
- 先做最小可用版本:历史转账 + 实时余额
- 再补工程能力:缓存、限流、日志
- 最后做扩展性:多链、多 token、复杂筛选
边界条件也要记住:
- 如果你追求绝对最新状态,不能只依赖 The Graph
- 如果你需要复杂历史检索,不要只靠 Ethers.js 扫链
- 如果你要给前端稳定体验,最好一定有中间件层
这套方案并不神奇,但非常务实。很多时候,系统能不能稳定跑,不在于用了多新的技术,而在于你有没有把“索引”和“实时读取”这两件事分清楚。