Skip to content

Commit

Permalink
parallel evm funnel: increase concurrency in readData
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Apr 16, 2024
1 parent 1ee52c5 commit 960c4e6
Showing 1 changed file with 85 additions and 25 deletions.
110 changes: 85 additions & 25 deletions packages/engine/paima-funnel/src/funnels/parallelEvm/funnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,75 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel {
public override async readData(blockHeight: number): Promise<ChainData[]> {
const cachedState = this.getState();

let latestParallelChainTimestamp: number | undefined;

// if in the previous round we couldn't return some blocks because the
// parallel chain didn't get far enough, we first process those.
if (cachedState.bufferedChainData.length === 0) {
const baseData = await this.baseFunnel.readData(blockHeight);
cachedState.bufferedChainData.push(...baseData);
}
await Promise.all([
(async (): Promise<void> => {
const baseData = await this.baseFunnel.readData(blockHeight);
cachedState.bufferedChainData.push(...baseData);
return;
})(),
,
(async (): Promise<void> => {
const latestBlock = await this.updateLatestBlock();

latestParallelChainTimestamp = Number(
(await this.web3.eth.getBlock(latestBlock)).timestamp
);

// It may be possible that if we always optimistically fetch blocks we
// may end up with some sort of 'memory leak' here.
//
// For example, if on average the base funnel spans a range of 10
// seconds in a single round, and this funnel spans a range of 20
// seconds, the cached data would grow unbounded. In practice we would
// eventually reach the chain's tip, and that would solve that
// problem, but we could be syncing from a really old block.
//
// The following is to prevent that situation. If we still need more
// blocks we still fetch those later, so not doing anything here is
// not a problem. We need to do this here instead of the range
// finalization logic, since we don't have a block from the base chain
// yet.
if (cachedState.bufferedChainData.length > this.config.funnelBlockGroupSize) return;

const from = Math.max(
cachedState.startBlockHeight,
cachedState.lastBlock ? cachedState.lastBlock + 1 : 0
);

const to = Math.min(from + this.config.funnelBlockGroupSize, latestBlock);

doLog(`ParallelEvm funnel ${this.config.chainId}: #${from}-${to}`);

// note: we could potentially do multiple rounds here in the time it
// takes the wrapped funnel to return, but this may require tracking
// some stats, since there is no easy way to figure out just from the
// config how many rounds we could do.
const parallelEvmBlocks = await getMultipleBlockData(this.web3, from, to, this.chainName);

for (const parallelChainBlock of parallelEvmBlocks) {
cachedState.timestampToBlockNumber.push([
parallelChainBlock.timestamp,
parallelChainBlock.blockNumber,
]);

cachedState.lastBlock = parallelChainBlock.blockNumber;
}

const latestBlockQueryState = this.latestBlock();
const latestBlock = await this.web3.eth.getBlock(latestBlockQueryState);
return;
})(),
]);
}

const chainData: ChainData[] = [];

// filter the data so that we are sure we can get all the blocks in the range
for (const data of cachedState.bufferedChainData) {
if (data.timestamp <= Number(latestBlock.timestamp)) {
if (data.timestamp <= latestParallelChainTimestamp!) {
chainData.push(data);
}
}
Expand Down Expand Up @@ -99,9 +153,11 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel {

const maxTimestamp = chainData[chainData.length - 1].timestamp;

const blocks = [];

while (true) {
// if we already have enough blocks, either because we fetched those
// concurrently, or because in the previous round we fetched too many
// (funnelBlockGroupSize for this funnel covers much more range than for the
// wrapped funnel), we don't pull anything.
while (!this.canFinalizeBlockRangeWith(maxTimestamp)) {
const latestBlock = this.latestBlock();

const to = Math.min(latestBlock, cachedState.lastBlock + this.config.funnelBlockGroupSize);
Expand All @@ -117,16 +173,17 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel {
this.chainName
);

blocks.push(...parallelEvmBlocks);
for (const parallelChainBlock of parallelEvmBlocks) {
cachedState.timestampToBlockNumber.push([
parallelChainBlock.timestamp,
parallelChainBlock.blockNumber,
]);

// this has to be > instead of >=
// because there can be multiple blocks with the same timestamp (e.g. Arbitrum)
if (blocks.length > 0 && blocks[blocks.length - 1].timestamp > maxTimestamp) {
break;
cachedState.lastBlock = parallelChainBlock.blockNumber;
}

if (blocks.length > 0) {
cachedState.lastBlock = blocks[blocks.length - 1].blockNumber;
if (this.canFinalizeBlockRangeWith(maxTimestamp)) {
break;
}

// We reach this part of the code if after we fetch blocks we still aren't done syncing.
Expand All @@ -143,15 +200,6 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel {
}
}

for (const parallelChainBlock of blocks) {
cachedState.timestampToBlockNumber.push([
parallelChainBlock.timestamp,
parallelChainBlock.blockNumber,
]);

cachedState.lastBlock = parallelChainBlock.blockNumber;
}

// remove old entries from the timestamp to block mapping, so that it
// doesn't grow forever, since it's cached.
while (true) {
Expand Down Expand Up @@ -532,6 +580,18 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel {

return bufferedState.result;
}

private canFinalizeBlockRangeWith(maxTimestamp: number): boolean {
const cachedState = this.getState();

// this has to be > instead of >=
// because there can be multiple blocks with the same timestamp (e.g. Arbitrum)
return (
cachedState.timestampToBlockNumber.length > 0 &&
cachedState.timestampToBlockNumber[cachedState.timestampToBlockNumber.length - 1][0] >
maxTimestamp
);
}
}

export async function wrapToParallelEvmFunnel(
Expand Down

0 comments on commit 960c4e6

Please sign in to comment.