From 70c7bfed20ca28ddecee2deef64720df98a4f10d Mon Sep 17 00:00:00 2001 From: david Date: Mon, 13 Jan 2025 13:02:02 -0500 Subject: [PATCH] use relay hash to find entities Signed-off-by: david --- .../src/entities/HistoricPrice.ts | 4 +- .../src/entities/RelayHashInfo.ts | 6 +- packages/indexer-database/src/main.ts | 2 + .../service/SpokePoolIndexerDataHandler.ts | 22 ++++-- packages/indexer/src/messaging/priceWorker.ts | 77 ++++++++++++------- 5 files changed, 70 insertions(+), 41 deletions(-) diff --git a/packages/indexer-database/src/entities/HistoricPrice.ts b/packages/indexer-database/src/entities/HistoricPrice.ts index 22c00a8..f35ad61 100644 --- a/packages/indexer-database/src/entities/HistoricPrice.ts +++ b/packages/indexer-database/src/entities/HistoricPrice.ts @@ -16,18 +16,16 @@ export class HistoricPrice { @PrimaryGeneratedColumn() id: number; - // bear in mind we are using coingecko symbols directly here, for all intents and purposes this is coingecko historic market price @Column() baseCurrency: string; @Column({ default: "usd" }) quoteCurrency: string; - // yyyy-LL-dd @Column({ type: "date" }) date: Date; - @Column({ type: "float" }) + @Column({ type: "decimal" }) price: string; @CreateDateColumn() diff --git a/packages/indexer-database/src/entities/RelayHashInfo.ts b/packages/indexer-database/src/entities/RelayHashInfo.ts index 5cca566..ce781e0 100644 --- a/packages/indexer-database/src/entities/RelayHashInfo.ts +++ b/packages/indexer-database/src/entities/RelayHashInfo.ts @@ -92,11 +92,11 @@ export class RelayHashInfo { @CreateDateColumn() createdAt: Date; - @Column({ nullable: true, type: "float" }) + @Column({ nullable: true, type: "decimal" }) bridgeFeeUsd: string; - @Column({ nullable: true, type: "float" }) + @Column({ nullable: true, type: "decimal" }) inputPriceUsd: string; - @Column({ nullable: true, type: "float" }) + @Column({ nullable: true, type: "decimal" }) outputPriceUsd: string; @UpdateDateColumn() diff --git a/packages/indexer-database/src/main.ts b/packages/indexer-database/src/main.ts index 96c9bf4..cbae5ea 100644 --- a/packages/indexer-database/src/main.ts +++ b/packages/indexer-database/src/main.ts @@ -39,6 +39,8 @@ export const createDataSource = (config: DatabaseConfig): DataSource => { // Webhooks entities.WebhookRequest, entities.WebhookClient, + // Historic Price + entities.HistoricPrice, ], migrationsTableName: "_migrations", migrations: ["migrations/*.ts"], diff --git a/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts index 36d93a5..003b73c 100644 --- a/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts +++ b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts @@ -118,7 +118,7 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { await this.spokePoolProcessor.process(storedEvents); this.profileStoreEvents(storedEvents); // publish new relays to workers to fill in prices - await this.publishNewRelays(events.filledV3RelayEvents); + await this.publishNewRelays(storedEvents.fills); } /** @@ -336,13 +336,19 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { }); } - private async publishNewRelays(relays: FillWithBlock[]) { - const messages: PriceMessage[] = relays.map((relay) => { - return { - depositId: relay.depositId, - originChainId: relay.originChainId, - }; - }); + private async publishNewRelays( + fills: SaveQueryResult[], + ) { + const messages: PriceMessage[] = fills + .map((fill) => ({ + relayHash: fill.data?.relayHash, + originChainId: fill.data?.originChainId, + })) + .filter( + (x): x is PriceMessage => + x.relayHash !== undefined && x.originChainId !== undefined, + ); + await this.indexerQueuesService.publishMessagesBulk( IndexerQueues.PriceQuery, IndexerQueues.PriceQuery, // Use queue name as job name diff --git a/packages/indexer/src/messaging/priceWorker.ts b/packages/indexer/src/messaging/priceWorker.ts index e2655e8..5c03003 100644 --- a/packages/indexer/src/messaging/priceWorker.ts +++ b/packages/indexer/src/messaging/priceWorker.ts @@ -10,11 +10,14 @@ import { findTokenByAddress } from "../utils"; import { RetryProvidersFactory } from "../web3/RetryProvidersFactory"; import { assert } from "@repo/error-handling"; import * as across from "@across-protocol/sdk"; +import * as ss from "superstruct"; -export type PriceMessage = { - depositId: number; - originChainId: number; -}; +export const PriceMessage = ss.object({ + relayHash: ss.string(), + originChainId: ss.number(), +}); + +export type PriceMessage = ss.Infer; // Convert now to a consistent price timestamp yesterday for lookup purposes export function yesterday(now: Date) { @@ -105,14 +108,17 @@ export class PriceWorker { public setWorker() { this.worker = new Worker( IndexerQueues.PriceQuery, - async (job: Job) => { + async (job: Job) => { + // validate data type + if (!ss.is(job.data, PriceMessage)) return; try { await this.run(job.data); } catch (error) { this.logger.error({ at: "PriceWorker", - message: `Error getting price for deposit ${job.data.depositId} on chain ${job.data.originChainId}`, + message: `Error getting price for fill ${job.data.relayHash}`, error, + job, }); throw error; } @@ -160,48 +166,59 @@ export class PriceWorker { ); } private async run(params: PriceMessage) { - const { depositId, originChainId } = params; + const { relayHash, originChainId } = params; - const relayHashInfoArray = await this.relayHashInfoRepository.find({ - where: { depositId, originChainId }, + const relayHashInfo = await this.relayHashInfoRepository.findOne({ + where: { relayHash, originChainId }, + relations: { + depositEvent: true, + fillEvent: true, + }, }); + + if (!relayHashInfo) { + const errorMessage = `Relay hash info not found by relay hash ${relayHash}`; + this.logger.error({ + at: "PriceWorker", + message: errorMessage, + ...params, + }); + throw new Error(errorMessage); + } + + const { depositId } = relayHashInfo; const deposit = await this.depositRepository.findOne({ where: { depositId, originChainId }, }); - // if we have multiple relays for same deposit, find hte one which matches the deposit hash - // the others would be invalid fills - const relayHashInfo = relayHashInfoArray.find( - (info) => info.depositTxHash === (deposit && deposit.transactionHash), - ); - if ( - relayHashInfo?.bridgeFeeUsd && - relayHashInfo?.inputPriceUsd && - relayHashInfo?.outputPriceUsd - ) { - const errorMessage = "Skipping already processed relay hash"; + if (!deposit) { + const errorMessage = `Unable to find deposit ${depositId} on chain ${originChainId}`; this.logger.error({ at: "PriceWorker", message: errorMessage, ...params, }); - return; + throw new Error(errorMessage); } - const errorMessage = - "Failed to retrieve relay hash information or deposit record from the database."; - // we will keep retrying until found or we know there was a reorg - if (!relayHashInfo || !deposit) { + if ( + relayHashInfo.bridgeFeeUsd && + relayHashInfo.inputPriceUsd && + relayHashInfo.outputPriceUsd + ) { + const errorMessage = "Skipping already processed relay hash"; this.logger.error({ at: "PriceWorker", message: errorMessage, ...params, }); - throw new Error(errorMessage); + return; } + const errorMessage = + "Failed to retrieve relay hash information or deposit record from the database."; // if blockTimestamp doesnt exist, maybe we keep retrying till it does - const blockTime = relayHashInfo?.depositEvent?.blockTimestamp; + const blockTime = relayHashInfo.depositEvent.blockTimestamp; if (!blockTime) { const errorMessage = "Deposit block time not found for relay hash info."; this.logger.error({ @@ -261,6 +278,12 @@ export class PriceWorker { { depositId, originChainId }, updatedFields, ); + this.logger.info({ + at: "PriceWorker#updateRelayHashInfo", + message: "Updated relay hash info with new fields", + params, + updatedFields, + }); } } public async close() {