Skip to content

Commit

Permalink
use relay hash to find entities
Browse files Browse the repository at this point in the history
Signed-off-by: david <[email protected]>
  • Loading branch information
daywiss committed Jan 13, 2025
1 parent ca4498f commit 70c7bfe
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 41 deletions.
4 changes: 1 addition & 3 deletions packages/indexer-database/src/entities/HistoricPrice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@ export class HistoricPrice {
@PrimaryGeneratedColumn()
id: number;

// bear in mind we are using coingecko symbols directly here, for all intents and purposes this is coingecko historic market price
@Column()
baseCurrency: string;

@Column({ default: "usd" })
quoteCurrency: string;

// yyyy-LL-dd
@Column({ type: "date" })
date: Date;

@Column({ type: "float" })
@Column({ type: "decimal" })
price: string;

@CreateDateColumn()
Expand Down
6 changes: 3 additions & 3 deletions packages/indexer-database/src/entities/RelayHashInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ export class RelayHashInfo {
@CreateDateColumn()
createdAt: Date;

@Column({ nullable: true, type: "float" })
@Column({ nullable: true, type: "decimal" })
bridgeFeeUsd: string;
@Column({ nullable: true, type: "float" })
@Column({ nullable: true, type: "decimal" })
inputPriceUsd: string;
@Column({ nullable: true, type: "float" })
@Column({ nullable: true, type: "decimal" })
outputPriceUsd: string;

@UpdateDateColumn()
Expand Down
2 changes: 2 additions & 0 deletions packages/indexer-database/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export const createDataSource = (config: DatabaseConfig): DataSource => {
// Webhooks
entities.WebhookRequest,
entities.WebhookClient,
// Historic Price
entities.HistoricPrice,
],
migrationsTableName: "_migrations",
migrations: ["migrations/*.ts"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler {
await this.spokePoolProcessor.process(storedEvents);
this.profileStoreEvents(storedEvents);
// publish new relays to workers to fill in prices
await this.publishNewRelays(events.filledV3RelayEvents);
await this.publishNewRelays(storedEvents.fills);
}

/**
Expand Down Expand Up @@ -336,13 +336,19 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler {
});
}

private async publishNewRelays(relays: FillWithBlock[]) {
const messages: PriceMessage[] = relays.map((relay) => {
return {
depositId: relay.depositId,
originChainId: relay.originChainId,
};
});
private async publishNewRelays(
fills: SaveQueryResult<entities.FilledV3Relay>[],
) {
const messages: PriceMessage[] = fills
.map((fill) => ({
relayHash: fill.data?.relayHash,
originChainId: fill.data?.originChainId,
}))
.filter(
(x): x is PriceMessage =>
x.relayHash !== undefined && x.originChainId !== undefined,
);

await this.indexerQueuesService.publishMessagesBulk(
IndexerQueues.PriceQuery,
IndexerQueues.PriceQuery, // Use queue name as job name
Expand Down
77 changes: 50 additions & 27 deletions packages/indexer/src/messaging/priceWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import { findTokenByAddress } from "../utils";
import { RetryProvidersFactory } from "../web3/RetryProvidersFactory";
import { assert } from "@repo/error-handling";
import * as across from "@across-protocol/sdk";
import * as ss from "superstruct";

export type PriceMessage = {
depositId: number;
originChainId: number;
};
export const PriceMessage = ss.object({
relayHash: ss.string(),
originChainId: ss.number(),
});

export type PriceMessage = ss.Infer<typeof PriceMessage>;

// Convert now to a consistent price timestamp yesterday for lookup purposes
export function yesterday(now: Date) {
Expand Down Expand Up @@ -105,14 +108,17 @@ export class PriceWorker {
public setWorker() {
this.worker = new Worker(
IndexerQueues.PriceQuery,
async (job: Job<PriceMessage>) => {
async (job: Job<unknown>) => {
// validate data type
if (!ss.is(job.data, PriceMessage)) return;
try {
await this.run(job.data);
} catch (error) {
this.logger.error({
at: "PriceWorker",
message: `Error getting price for deposit ${job.data.depositId} on chain ${job.data.originChainId}`,
message: `Error getting price for fill ${job.data.relayHash}`,
error,
job,
});
throw error;
}
Expand Down Expand Up @@ -160,48 +166,59 @@ export class PriceWorker {
);
}
private async run(params: PriceMessage) {
const { depositId, originChainId } = params;
const { relayHash, originChainId } = params;

const relayHashInfoArray = await this.relayHashInfoRepository.find({
where: { depositId, originChainId },
const relayHashInfo = await this.relayHashInfoRepository.findOne({
where: { relayHash, originChainId },
relations: {
depositEvent: true,
fillEvent: true,
},
});

if (!relayHashInfo) {
const errorMessage = `Relay hash info not found by relay hash ${relayHash}`;
this.logger.error({
at: "PriceWorker",
message: errorMessage,
...params,
});
throw new Error(errorMessage);
}

const { depositId } = relayHashInfo;
const deposit = await this.depositRepository.findOne({
where: { depositId, originChainId },
});
// if we have multiple relays for same deposit, find hte one which matches the deposit hash
// the others would be invalid fills
const relayHashInfo = relayHashInfoArray.find(
(info) => info.depositTxHash === (deposit && deposit.transactionHash),
);

if (
relayHashInfo?.bridgeFeeUsd &&
relayHashInfo?.inputPriceUsd &&
relayHashInfo?.outputPriceUsd
) {
const errorMessage = "Skipping already processed relay hash";
if (!deposit) {
const errorMessage = `Unable to find deposit ${depositId} on chain ${originChainId}`;
this.logger.error({
at: "PriceWorker",
message: errorMessage,
...params,
});
return;
throw new Error(errorMessage);
}
const errorMessage =
"Failed to retrieve relay hash information or deposit record from the database.";

// we will keep retrying until found or we know there was a reorg
if (!relayHashInfo || !deposit) {
if (
relayHashInfo.bridgeFeeUsd &&
relayHashInfo.inputPriceUsd &&
relayHashInfo.outputPriceUsd
) {
const errorMessage = "Skipping already processed relay hash";
this.logger.error({
at: "PriceWorker",
message: errorMessage,
...params,
});
throw new Error(errorMessage);
return;
}
const errorMessage =
"Failed to retrieve relay hash information or deposit record from the database.";

// if blockTimestamp doesnt exist, maybe we keep retrying till it does
const blockTime = relayHashInfo?.depositEvent?.blockTimestamp;
const blockTime = relayHashInfo.depositEvent.blockTimestamp;
if (!blockTime) {
const errorMessage = "Deposit block time not found for relay hash info.";
this.logger.error({
Expand Down Expand Up @@ -261,6 +278,12 @@ export class PriceWorker {
{ depositId, originChainId },
updatedFields,
);
this.logger.info({
at: "PriceWorker#updateRelayHashInfo",
message: "Updated relay hash info with new fields",
params,
updatedFields,
});
}
}
public async close() {
Expand Down

0 comments on commit 70c7bfe

Please sign in to comment.