Skip to content

Commit

Permalink
parallel evm funnel: increase concurrency in readData
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Mar 14, 2024
1 parent 20ea0dd commit e46785c
Showing 1 changed file with 86 additions and 25 deletions.
111 changes: 86 additions & 25 deletions packages/engine/paima-funnel/src/funnels/parallelEvm/funnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,75 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel {
public override async readData(blockHeight: number): Promise<ChainData[]> {
const cachedState = this.getState();

let latestParallelChainTimestamp: number | undefined;

// if in the previous round we couldn't return some blocks because the
// parallel chain didn't get far enough, we first process those.
if (cachedState.bufferedChainData.length === 0) {
const baseData = await this.baseFunnel.readData(blockHeight);
cachedState.bufferedChainData.push(...baseData);
}
await Promise.all([
(async (): Promise<void> => {
const baseData = await this.baseFunnel.readData(blockHeight);
cachedState.bufferedChainData.push(...baseData);
return;
})(),
,
(async (): Promise<void> => {
const latestBlock = await this.updateLatestBlock();

latestParallelChainTimestamp = Number(
(await this.web3.eth.getBlock(latestBlock)).timestamp
);

// It may be possible that if we always optimistically fetch blocks we
// may end up with some sort of 'memory leak' here.
//
// For example, if on average the base funnel spans a range of 10
// seconds in a single round, and this funnel spans a range of 20
// seconds, the cached data would grow unbounded. In practice we would
// eventually reach the chain's tip, and that would solve that
// problem, but we could be syncing from a really old block.
//
// The following is to prevent that situation. If we still need more
// blocks we still fetch those later, so not doing anything here is
// not a problem. We need to do this here instead of the range
// finalization logic, since we don't have a block from the base chain
// yet.
if (cachedState.bufferedChainData.length > this.config.funnelBlockGroupSize) return;

const from = Math.max(
cachedState.startBlockHeight,
cachedState.lastBlock ? cachedState.lastBlock + 1 : 0
);

const to = Math.min(from + this.config.funnelBlockGroupSize, latestBlock);

doLog(`ParallelEvm funnel ${this.config.chainId}: #${from}-${to}`);

// note: we could potentially do multiple rounds here in the time it
// takes the wrapped funnel to return, but this may require tracking
// some stats, since there is no easy way to figure out just from the
// config how many rounds we could do.
const parallelEvmBlocks = await getMultipleBlockData(this.web3, from, to, this.chainName);

for (const parallelChainBlock of parallelEvmBlocks) {
cachedState.timestampToBlockNumber.push([
parallelChainBlock.timestamp,
parallelChainBlock.blockNumber,
]);

cachedState.lastBlock = parallelChainBlock.blockNumber;
}

const latestBlockQueryState = this.latestBlock();
const latestBlock = await this.web3.eth.getBlock(latestBlockQueryState);
return;
})(),
]);
}

const chainData: ChainData[] = [];

// filter the data so that we are sure we can get all the blocks in the range
for (const data of cachedState.bufferedChainData) {
if (data.timestamp <= Number(latestBlock.timestamp)) {
if (data.timestamp <= latestParallelChainTimestamp!) {
chainData.push(data);
}
}
Expand Down Expand Up @@ -99,31 +153,35 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel {

const maxTimestamp = chainData[chainData.length - 1].timestamp;

const blocks = [];

while (true) {
// if we already have enough blocks (because we fetched those
// concurrently)
while (!this.canFinalizeBlockRangeWith(maxTimestamp)) {
const latestBlock = this.latestBlock();

const to = Math.min(latestBlock, cachedState.lastBlock + this.config.funnelBlockGroupSize);
// we need to fetch the blocks in order to know the timestamps, so that we
// can make a mapping from the trunk chain to the parallel chain.

doLog(`ParallelEvm funnel ${this.config.chainId}: #${cachedState.lastBlock + 1}-${to}`);

const parallelEvmBlocks = await getMultipleBlockData(
this.web3,
cachedState.lastBlock + 1,
to,
this.chainName
);

blocks.push(...parallelEvmBlocks);
for (const parallelChainBlock of parallelEvmBlocks) {
cachedState.timestampToBlockNumber.push([
parallelChainBlock.timestamp,
parallelChainBlock.blockNumber,
]);

// this has to be > instead of >=
// because there can be multiple blocks with the same timestamp (e.g. Arbitrum)
if (blocks.length > 0 && blocks[blocks.length - 1].timestamp > maxTimestamp) {
break;
cachedState.lastBlock = parallelChainBlock.blockNumber;
}

if (blocks.length > 0) {
cachedState.lastBlock = blocks[blocks.length - 1].blockNumber;
if (this.canFinalizeBlockRangeWith(maxTimestamp)) {
break;
}

// We reach this part of the code if after we fetch blocks we still aren't done syncing.
Expand All @@ -140,15 +198,6 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel {
}
}

for (const parallelChainBlock of blocks) {
cachedState.timestampToBlockNumber.push([
parallelChainBlock.timestamp,
parallelChainBlock.blockNumber,
]);

cachedState.lastBlock = parallelChainBlock.blockNumber;
}

// remove old entries from the timestamp to block mapping, so that it
// doesn't grow forever, since it's cached.
while (true) {
Expand Down Expand Up @@ -541,6 +590,18 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel {

return bufferedState.result;
}

private canFinalizeBlockRangeWith(maxTimestamp: number): boolean {
const cachedState = this.getState();

// this has to be > instead of >=
// because there can be multiple blocks with the same timestamp (e.g. Arbitrum)
return (
cachedState.timestampToBlockNumber.length > 0 &&
cachedState.timestampToBlockNumber[cachedState.timestampToBlockNumber.length - 1][0] >
maxTimestamp
);
}
}

export async function wrapToParallelEvmFunnel(
Expand Down

0 comments on commit e46785c

Please sign in to comment.