Skip to content

Commit

Permalink
perf(api): subscribe to network block number
Browse files Browse the repository at this point in the history
  • Loading branch information
hbriese committed Dec 14, 2023
1 parent 20ee314 commit a72c420
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 48 deletions.
27 changes: 10 additions & 17 deletions api/src/features/events/events.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { NetworksService } from '../util/networks/networks.service';
import { DatabaseService } from '../database/database.service';
import e from '~/edgeql-js';
import { P, match } from 'ts-pattern';
import { Hex, asHex, tryOrCatchAsync } from 'lib';
import { Hex, asHex } from 'lib';
import { Chain } from 'chains';
import { InjectRedis } from '@songkeys/nestjs-redis';
import Redis from 'ioredis';
Expand All @@ -20,7 +20,7 @@ import { AbiEvent } from 'abitype';
import { Log as ViemLog, encodeEventTopics, hexToNumber } from 'viem';

const DEFAULT_CHUNK_SIZE = 200;
const BLOCK_TIME_MS = 500;
const BLOCK_TIME = 500; /* ms */
const TOO_MANY_RESULTS_RE =
/Query returned more than .+? results. Try with this block range \[(?:0x[0-9a-f]+), (0x[0-9a-f]+)\]/;

Expand All @@ -29,7 +29,7 @@ export const EventsQueue = createQueue<EventJobData>('Events', {
attempts: 50,
backoff: {
type: 'fixed',
delay: BLOCK_TIME_MS,
delay: BLOCK_TIME,
},
removeOnComplete: true,
removeOnFail: false,
Expand Down Expand Up @@ -88,16 +88,7 @@ export class EventsWorker extends WorkerHost<TypedWorker<EventsQueue>> implement
const { chain } = job.data;
const network = this.networks.get(chain);

const latest = await tryOrCatchAsync(
async () => Number(await network.getBlockNumber()), // Warning: truncate bigint -> number
async (e) => {
if (job.data.queueNext !== false) {
// Next job hasn't been queued yet, queue it next attempt
await job.updateData({ ...job.data, queueNext: job.attemptsMade + 1 });
}
throw e;
},
);
const latest = Number(network.blockNumber()); // Warning: bigint -> number
const from = job.data.from;
const to = Math.min(job.data.to ?? from + DEFAULT_CHUNK_SIZE - 1, latest);

Expand All @@ -110,13 +101,13 @@ export class EventsWorker extends WorkerHost<TypedWorker<EventsQueue>> implement
if (shouldQueue) {
if (latest < from) {
// Up to date; retry after a delay
return this.queue.add(EventsQueue.name, { chain, from }, { delay: BLOCK_TIME_MS });
return this.queue.add(EventsQueue.name, { chain, from }, { delay: BLOCK_TIME });
} else {
// Queue up next job
this.queue.add(
EventsQueue.name,
{ chain, from: to + 1 },
{ delay: latest === from ? BLOCK_TIME_MS : undefined },
{ delay: latest === from ? BLOCK_TIME : undefined },
);
}
}
Expand Down Expand Up @@ -150,7 +141,9 @@ export class EventsWorker extends WorkerHost<TypedWorker<EventsQueue>> implement
(log) => this.listeners.get(log.topics[0]!)?.map((listener) => listener({ chain, log })),
),
);
Logger.verbose(`Processed ${logs.length} events from ${to - from + 1} blocks [${from}, ${to}]`);
Logger.verbose(
`[${chain}]: Processed ${logs.length} events from ${to - from + 1} blocks [${from}, ${to}]`,
);
}

private async addMissingJob() {
Expand All @@ -177,7 +170,7 @@ export class EventsWorker extends WorkerHost<TypedWorker<EventsQueue>> implement

const from = lastProcessedBlock
? Number(lastProcessedBlock) + 1 // Warning: bigint -> number
: Number(await network.getBlockNumber()); // Warning: bigint -> number
: Number(network.blockNumber()); // Warning: bigint -> number
Logger.log(`${network.chain.key}: events starting from block ${from}`);

this.queue.add(EventsQueue.name, { chain: network.chain.key, from });
Expand Down
107 changes: 76 additions & 31 deletions api/src/features/util/networks/networks.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import { CONFIG } from '~/config';
import { asChain, asUAddress, UAddress } from 'lib';
import { ChainConfig, Chain, CHAINS, NetworkWallet } from 'chains';
import {
FallbackTransport,
PublicClient,
Transport,
WatchBlockNumberErrorType,
createPublicClient,
createWalletClient,
fallback,
http,
webSocket,
} from 'viem';
Expand All @@ -33,18 +34,15 @@ export class NetworksService implements AsyncIterable<Network> {
return this.get(asChain(address));
}

async *[Symbol.asyncIterator]() {
*all() {
for (const chain of Object.values(CHAINS)) {
try {
const existing = this.clients[chain.key];
if (existing) return existing;

const network = this.get(chain);
await network.getBlockNumber();
yield network;
} catch (_) {
// Ignore unavailable networks e.g. local
}
yield this.get(chain);
}
}

async *[Symbol.asyncIterator]() {
for (const network of this.all()) {
if ((await network.status()) === 'healthy') yield network;
}
}
}
Expand All @@ -56,29 +54,39 @@ interface CreateParams {

function create({ chainKey, redis }: CreateParams) {
const chain = CHAINS[chainKey];
const transport = fallback([
...chain.rpcUrls.default.webSocket.map((url) => webSocket(url, { retryCount: 10 })),
...chain.rpcUrls.default.http.map((url) => http(url, { retryCount: 10, batch: true })),
]);
const transport = chain.rpcUrls.default.webSocket.length
? webSocket(undefined, { retryCount: 10 })
: http();
// const transport = fallback([
// ...chain.rpcUrls.default.webSocket.map((url) => webSocket(url, { retryCount: 10 })),
// ...chain.rpcUrls.default.http.map((url) => http(url, { retryCount: 10, batch: true })),
// ]);

const wallet = createWalletClient({
account: privateKeyToAccount(CONFIG.walletPrivateKeys[chainKey]),
return createPublicClient<Transport, ChainConfig>({
chain,
transport,
});
const walletAddress = asUAddress(wallet.account.address, chainKey);

return createPublicClient<FallbackTransport, ChainConfig>({
chain,
transport: fallback([
...chain.rpcUrls.default.webSocket.map((url) => webSocket(url, { retryCount: 10 })),
...chain.rpcUrls.default.http.map((url) => http(url, { retryCount: 10, batch: true })),
]),
key: chain.key,
name: chain.name,
batch: { multicall: true },
pollingInterval: 250 /* ms */,
}).extend((_client) => ({
pollingInterval: 250 /* ms */, // Used when websocket is unavailable
}).extend((client) => ({
...walletActions(client, transport, redis),
...blockNumberAndStatusActions(client),
}));
}

type Client = PublicClient<Transport, ChainConfig>;

function walletActions(client: Client, transport: Transport, redis: Redis) {
const chain = client.chain;
const wallet = createWalletClient({
account: privateKeyToAccount(CONFIG.walletPrivateKeys[chain.key]),
chain,
transport,
});
const walletAddress = asUAddress(wallet.account.address, chain.key);

return {
walletAddress,
async useWallet<R>(f: (wallet: NetworkWallet) => R): Promise<R> {
const mutex = new Mutex(redis, `network-wallet:${walletAddress}`, {
Expand All @@ -93,5 +101,42 @@ function create({ chainKey, redis }: CreateParams) {
await mutex.release();
}
},
}));
};
}

function blockNumberAndStatusActions(client: Client) {
let status: 'healthy' | WatchBlockNumberErrorType = 'healthy';
let blockNumber = 0n;

let connect: (() => void) | null = null;
const connecting = new Promise<void>((resolve) => {
connect = () => {
resolve();
connect = null;
};
});

client.watchBlockNumber({
onBlockNumber: (newBlockNumber) => {
if (blockNumber < newBlockNumber) {
blockNumber = newBlockNumber;
status = 'healthy';
connect?.();
}
},
onError: (error) => {
status = error as WatchBlockNumberErrorType;
},
emitOnBegin: true,
});

return {
blockNumber() {
return blockNumber;
},
async status() {
await connecting;
return status;
},
};
}

0 comments on commit a72c420

Please sign in to comment.