Skip to content

Commit

Permalink
perf(api): track event logs per block ema
Browse files Browse the repository at this point in the history
This dynamic block ranges, improving syncing performance by ~50%
  • Loading branch information
hbriese committed Dec 19, 2023
1 parent 719f668 commit f66eb00
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions api/src/features/events/events.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { NetworksService } from '../util/networks/networks.service';
import { DatabaseService } from '../database/database.service';
import e from '~/edgeql-js';
import { Hex, asHex } from 'lib';
import { Chain } from 'chains';
import { CHAINS, Chain } from 'chains';
import { InjectRedis } from '@songkeys/nestjs-redis';
import Redis from 'ioredis';
import { Mutex } from 'redis-semaphore';
Expand All @@ -19,8 +19,10 @@ import { DEFAULT_JOB_OPTIONS } from '../util/bull/bull.module';
import { AbiEvent } from 'abitype';
import { Log as ViemLog, encodeEventTopics, hexToNumber } from 'viem';

const DEFAULT_CHUNK_SIZE = 200;
const BLOCK_TIME = 500; /* ms */
const TARGET_LOGS_PER_JOB = 9_000; // Max 10k
const DEFAULT_LOGS_PER_BLOCK = 100;
const LOGS_PER_BLOCK_EMA_ALPHA = 0.4;
const TOO_MANY_RESULTS_RE =
/Query returned more than .+? results. Try with this block range \[(?:0x[0-9a-f]+), (0x[0-9a-f]+)\]/;

Expand Down Expand Up @@ -55,6 +57,9 @@ export type EventListener = (data: EventData) => Promise<void>;
export class EventsWorker extends Worker<EventsQueue> {
private listeners = new Map<Hex, EventListener[]>();
private events: AbiEvent[] = [];
private logsPerBlockEma = Object.fromEntries(
Object.keys(CHAINS).map((chain) => [chain as Chain, DEFAULT_LOGS_PER_BLOCK]),
) as Record<Chain, number>;

constructor(
@InjectQueue(EventsQueue.name)
Expand Down Expand Up @@ -83,15 +88,16 @@ export class EventsWorker extends Worker<EventsQueue> {

const latest = Number(network.blockNumber()); // Warning: bigint -> number
const from = job.data.from;
const to = Math.min(job.data.to ?? from + DEFAULT_CHUNK_SIZE - 1, latest);
const targetBlocks = Math.max(1, Math.floor(TARGET_LOGS_PER_JOB / this.logsPerBlockEma[chain]));
const to = Math.min(job.data.to ?? from + targetBlocks - 1, latest);

// Queue next job on the first attempt unless split job
const shouldQueue = job.attemptsMade === 1 && !job.data.split;

if (shouldQueue) {
if (latest < from) {
// Up to date; retry after a delay
return this.queue.add(EventsQueue.name, { chain, from }, { delay: BLOCK_TIME * 2 });
return this.queue.add(EventsQueue.name, { chain, from }, { delay: BLOCK_TIME });
} else {
// Queue up next job
this.queue.add(
Expand All @@ -110,6 +116,12 @@ export class EventsWorker extends Worker<EventsQueue> {
strict: true,
});

// Update logs per block exponential moving average
const blocksProcessed = to - from + 1;
this.logsPerBlockEma[chain] =
(1 - LOGS_PER_BLOCK_EMA_ALPHA) * this.logsPerBlockEma[chain] +
LOGS_PER_BLOCK_EMA_ALPHA * (logs.length / blocksProcessed);

await Promise.all(
logs
.filter((log) => log.topics.length)
Expand All @@ -119,19 +131,14 @@ export class EventsWorker extends Worker<EventsQueue> {
),
);
this.log.verbose(
`Processed ${logs.length} events from ${to - from + 1} blocks [${from}, ${to}] on ${chain}`,
`Processed ${logs.length} events from ${blocksProcessed} blocks [${from}, ${to}] on ${chain}`,
);
} catch (e) {
const match = TOO_MANY_RESULTS_RE.exec((e as Error).message ?? '');
if (!match) throw e;

// Split the job into two smaller jobs
const newTo = hexToNumber(asHex(match[1]));
this.log.log(
`Splitting job [${from}, ${to}] by ${to - newTo}` +
(job.data.split ? ' -- already split' : '') +
`on ${chain}`,
);
return this.queue.addBulk([
{ name: EventsQueue.name, data: { chain, from, to: newTo, split: true } },
{ name: EventsQueue.name, data: { chain, from: newTo + 1, to, split: true } },
Expand Down

0 comments on commit f66eb00

Please sign in to comment.