Skip to content

Commit

Permalink
mina funnel: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Mar 21, 2024
1 parent f2f9e03 commit c78f206
Show file tree
Hide file tree
Showing 15 changed files with 683 additions and 11 deletions.
68 changes: 68 additions & 0 deletions packages/engine/paima-funnel/src/cde/minaGeneric.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import {
CdeMinaGenericDatum,
ChainDataExtensionDatum,
ChainDataExtensionMinaGeneric,
} from '@paima/sm';
import { ChainDataExtensionDatumType } from '@paima/utils';

export default async function getCdeData(
minaArchive: string,
extension: ChainDataExtensionMinaGeneric,
fromTimestamp: number,
toTimestamp: number,
getBlockNumber: (minaTimestamp: number) => number,
network: string
): Promise<CdeMinaGenericDatum[]> {
console.log('from-to', fromTimestamp, toTimestamp);
const data = (await fetch(minaArchive, {
method: 'POST',

headers: {
'Content-Type': 'application/json',
},

body: JSON.stringify({
query: `
{
events(
input: {
address: "${extension.address}",
fromTimestamp: "${fromTimestamp}",
toTimestamp: "${toTimestamp}"
}
) {
blockInfo {
height
timestamp
}
eventData {
data
}
}
}
`,
}),
})
.then(res => res.json())
.then(res => {
return res;
})
.then(json => json['data']['events'])) as {
blockInfo: { height: number; timestamp: string };
eventData: { data: string[] }[];
}[];

return data.flatMap(singleBlockData =>
singleBlockData.eventData.flatMap(perTx =>
perTx.data.map(txEvent => ({
cdeId: extension.cdeId,
cdeDatumType: ChainDataExtensionDatumType.MinaGeneric,
blockNumber: getBlockNumber(Number(singleBlockData.blockInfo.timestamp)),
payload: txEvent,
network,
scheduledPrefix: extension.scheduledPrefix,
paginationCursor: { cursor: singleBlockData.blockInfo.timestamp, finished: false },
}))
)
);
}
4 changes: 4 additions & 0 deletions packages/engine/paima-funnel/src/cde/reading.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ async function getSpecificCdeData(
// this is used by the block funnel, which can't get information for this
// extension
return [];
case ChainDataExtensionType.MinaGeneric:
// this is used by the block funnel, which can't get information for this
// extension
return [];
default:
assertNever(extension);
}
Expand Down
57 changes: 57 additions & 0 deletions packages/engine/paima-funnel/src/funnels/FunnelCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export type CacheMapType = {
[RpcCacheEntry.SYMBOL]?: RpcCacheEntry;
[CarpFunnelCacheEntry.SYMBOL]?: CarpFunnelCacheEntry;
[EvmFunnelCacheEntry.SYMBOL]?: EvmFunnelCacheEntry;
[MinaFunnelCacheEntry.SYMBOL]?: MinaFunnelCacheEntry;
};
export class FunnelCacheManager {
public cacheEntries: CacheMapType = {};
Expand Down Expand Up @@ -178,3 +179,59 @@ export class EvmFunnelCacheEntry implements FunnelCacheEntry {
this.cachedData = {};
};
}

export type MinaFunnelCacheEntryState = {
startingSlotTimestamp: number;
lastPoint: { timestamp: number } | undefined;
genesisTime: number;
cursors:
| {
[cdeId: number]: { cursor: string; finished: boolean };
}
| undefined;
};

export class MinaFunnelCacheEntry implements FunnelCacheEntry {
private state: MinaFunnelCacheEntryState | null = null;
public static readonly SYMBOL = Symbol('MinaFunnelStartingSlot');

public updateStartingSlot(startingSlotTimestamp: number, genesisTime: number): void {
this.state = {
startingSlotTimestamp,
genesisTime,
lastPoint: this.state?.lastPoint,
cursors: this.state?.cursors,
};
}

public updateLastPoint(timestamp: number): void {
if (this.state) {
this.state.lastPoint = { timestamp };
}
}

public updateCursor(cdeId: number, presyncCursor: { cursor: string; finished: boolean }): void {
if (this.state) {
if (!this.state.cursors) {
this.state.cursors = {};
}

this.state.cursors[cdeId] = presyncCursor;
}
}

public initialized(): boolean {
return !!this.state;
}

public getState(): Readonly<MinaFunnelCacheEntryState> {
if (!this.state) {
throw new Error('[mina-funnel] Uninitialized cache entry');
}
return this.state;
}

clear: FunnelCacheEntry['clear'] = () => {
this.state = null;
};
}
14 changes: 10 additions & 4 deletions packages/engine/paima-funnel/src/funnels/carp/funnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,17 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {

const cursors = await getCarpCursors.run(undefined, dbTx);

const extensions = sharedData.extensions
.filter(extensions => (extensions.network = config.network))
.map(extension => extension.cdeId);

for (const cursor of cursors) {
newEntry.updateCursor(cursor.cde_id, {
cursor: cursor.cursor,
finished: cursor.finished,
});
// TODO: performance concern? but not likely
if (extensions.find(cdeId => cdeId === cursor.cde_id))
newEntry.updateCursor(cursor.cde_id, {
cursor: cursor.cursor,
finished: cursor.finished,
});
}

return newEntry;
Expand Down
Loading

0 comments on commit c78f206

Please sign in to comment.