From ae571cf481fc0c43b45c9544d0e892bf027574af Mon Sep 17 00:00:00 2001 From: david Date: Mon, 30 Dec 2024 14:00:47 -0500 Subject: [PATCH] remove config, include bridge fee calculation, input and output token prices Signed-off-by: david --- packages/error-handling/src/utils/assert.ts | 2 +- packages/indexer/README.md | 4 - packages/indexer/src/messaging/priceWorker.ts | 205 ++++++++++-------- packages/indexer/src/parseEnv.ts | 6 - packages/indexer/src/utils/coingeckoClient.ts | 31 +-- packages/indexer/src/utils/currencyUtils.ts | 17 +- 6 files changed, 130 insertions(+), 135 deletions(-) diff --git a/packages/error-handling/src/utils/assert.ts b/packages/error-handling/src/utils/assert.ts index 699cbd6..8a9ca09 100644 --- a/packages/error-handling/src/utils/assert.ts +++ b/packages/error-handling/src/utils/assert.ts @@ -13,7 +13,7 @@ export function assert( message: string, ): asserts value is NonNullable { try { - return assertModule.ok(value !== null && value !== undefined, message); + return assertModule.ok(value, message); } catch (e: unknown) { throw new AssertError(message); } diff --git a/packages/indexer/README.md b/packages/indexer/README.md index f1bcf7c..1e4de20 100644 --- a/packages/indexer/README.md +++ b/packages/indexer/README.md @@ -51,8 +51,4 @@ ENABLE_HUBPOOL_INDEXER=true ENABLE_BUNDLE_EVENTS_PROCESSOR=true ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE=true ENABLE_BUNDLE_BUILDER=true - -# use symbols defined in /home/dev/src/risklabs/indexer/packages/indexer/src/utils/coingeckoClient.ts -# separate them by comma, no spaces -COINGECKO_SYMBOLS=ethereum,optimism,across-protocol ``` diff --git a/packages/indexer/src/messaging/priceWorker.ts b/packages/indexer/src/messaging/priceWorker.ts index 9b2aba6..559fecb 100644 --- a/packages/indexer/src/messaging/priceWorker.ts +++ b/packages/indexer/src/messaging/priceWorker.ts @@ -3,6 +3,7 @@ import winston from "winston"; import { Job, Worker } from "bullmq"; import { DataSource, entities } from "@repo/indexer-database"; import { IndexerQueues } from "./service"; +import { ethers } from "ethers"; import { getIntegratorId, yesterday, @@ -10,6 +11,7 @@ import { findTokenByAddress, } from "../utils"; import { RetryProvidersFactory } from "../web3/RetryProvidersFactory"; +import { assert } from "@repo/error-handling"; export type PriceMessage = { depositId: number; @@ -27,8 +29,11 @@ export type PriceMessage = { * - Logging errors and information at various stages of the process. */ export class PriceWorker { - public worker: Worker; + private worker: Worker; private coingeckoClient: CoingeckoClient; + private relayHashInfoRepository; + private depositRepository; + private historicPriceRepository; constructor( private redis: Redis, @@ -36,8 +41,55 @@ export class PriceWorker { private logger: winston.Logger, ) { this.coingeckoClient = new CoingeckoClient(); + this.relayHashInfoRepository = this.postgres.getRepository( + entities.RelayHashInfo, + ); + this.depositRepository = this.postgres.getRepository( + entities.V3FundsDeposited, + ); + this.historicPriceRepository = this.postgres.getRepository( + entities.HistoricPrice, + ); this.setWorker(); } + private async getPrice( + address: string, + chainId: number, + time: Date, + quoteCurrency = "usd", + ): Promise { + const priceTime = yesterday(time); + const tokenInfo = findTokenByAddress(address, chainId); + const baseCurrency = tokenInfo.coingeckoId; + + const cachedPrice = await this.historicPriceRepository.findOne({ + where: { + date: priceTime, + baseCurrency, + quoteCurrency, + }, + }); + // we have this price at this time in the db + if (cachedPrice) return Number(cachedPrice.price); + + const fetchedPrice = await this.coingeckoClient.getHistoricDailyPrice( + priceTime.getTime(), + baseCurrency, + ); + const price = fetchedPrice.market_data?.current_price[quoteCurrency]; + assert( + price, + `Unable to fetch price for ${quoteCurrency} in ${baseCurrency} at ${priceTime}`, + ); + await this.historicPriceRepository.insert({ + date: priceTime, + baseCurrency, + quoteCurrency, + price: price.toString(), + }); + + return Number(price); + } public setWorker() { this.worker = new Worker( @@ -57,117 +109,98 @@ export class PriceWorker { { connection: this.redis, concurrency: 10 }, ); } - private async run(params: PriceMessage) { - const { depositId, originChainId } = params; - const relayHashInfoRepository = this.postgres.getRepository( - entities.RelayHashInfo, + // price is assumed to be a float, amount is assumed in wei and decimals is the conversion for that amount + // this outputs the difference between input and output normalized to the price which is typically usd + private calculateBridgeFee( + inputToken: { amount: string; price: number; decimals: number }, + outputToken: { amount: string; price: number; decimals: number }, + ): bigint { + const inputAmountBigInt = BigInt(inputToken.amount); + const outputAmountBigInt = BigInt(outputToken.amount); + + const inputPriceBigInt = BigInt( + Math.round(inputToken.price * Math.pow(10, inputToken.decimals)), ); - const depositRepository = this.postgres.getRepository( - entities.V3FundsDeposited, - ); - const historicPriceRepository = this.postgres.getRepository( - entities.HistoricPrice, + const outputPriceBigInt = BigInt( + Math.round(outputToken.price * Math.pow(10, outputToken.decimals)), ); - const relayHashInfo = await relayHashInfoRepository.findOne({ + const normalizedInputAmount = + (inputAmountBigInt * inputPriceBigInt) / + BigInt(Math.pow(10, inputToken.decimals)); + const normalizedOutputAmount = + (outputAmountBigInt * outputPriceBigInt) / + BigInt(Math.pow(10, outputToken.decimals)); + + return normalizedInputAmount - normalizedOutputAmount; + } + private async run(params: PriceMessage) { + const { depositId, originChainId } = params; + + const relayHashInfo = await this.relayHashInfoRepository.findOne({ where: { depositId, originChainId }, }); - const deposit = await depositRepository.findOne({ + const deposit = await this.depositRepository.findOne({ where: { depositId, originChainId }, }); + // This is catastrophic, we dont want worker retrying if we cannot find this data if (!relayHashInfo || !deposit) { this.logger.error({ at: "PriceWorker", - message: "Relay hash info not found", + message: + "Failed to retrieve relay hash information or deposit record from the database.", ...params, }); return; } + // if blockTimestamp doesnt exist, maybe we keep retrying till it does const blockTime = relayHashInfo?.depositEvent?.blockTimestamp; if (!blockTime) { + const errorMessage = "Deposit block time not found for relay hash info."; this.logger.error({ at: "PriceWorker", - message: "Deposit block time not found for relay hash info", + message: errorMessage, ...params, }); - return; + throw new Error(errorMessage); } - const priceTime = yesterday(blockTime); - const quoteCurrency = "usd"; - const baseTokenInfo = findTokenByAddress( - relayHashInfo.fillEvent.outputToken, - relayHashInfo.destinationChainId, + const inputTokenAddress = relayHashInfo.fillEvent.inputToken; + const outputTokenAddress = relayHashInfo.fillEvent.outputToken; + const destinationChainId = relayHashInfo.destinationChainId; + const inputTokenInfo = findTokenByAddress(inputTokenAddress, originChainId); + const outputTokenInfo = findTokenByAddress( + outputTokenAddress, + destinationChainId, ); - const baseCurrency = baseTokenInfo?.coingeckoId; - let price: undefined | number; - if (!baseCurrency) { - this.logger.error({ - at: "PriceWorker", - message: "Unable to find base currency to quote", - ...params, - outputToken: relayHashInfo.fillEvent.outputToken, - destinationChainId: relayHashInfo.destinationChainId, - }); - return; - } - const existingPrice = await historicPriceRepository.findOne({ - where: { - date: priceTime, - baseCurrency, - quoteCurrency, - }, - }); - // fetch price if one hasnt been saved - if (!existingPrice) { - try { - const historicPriceData = - await this.coingeckoClient.getHistoricDailyPrice( - priceTime.getTime(), - baseCurrency, - ); - price = historicPriceData.market_data?.current_price[quoteCurrency]; - // wasnt able to get a price - if (price === undefined) { - this.logger.error( - `Unable to find ${quoteCurrency} for ${baseCurrency} at time ${priceTime}`, - ); - return; - } - await historicPriceRepository.insert({ - date: priceTime, - baseCurrency, - quoteCurrency, - price: price.toString(), - }); - this.logger.info({ - at: "PriceWorker", - ...params, - message: `Fetched and inserted historic price for ${baseCurrency} on ${priceTime}`, - }); - } catch (error) { - this.logger.error({ - at: "PriceWorker", - ...params, - message: `Failed to fetch or insert historic price for ${baseCurrency} on ${priceTime}`, - error: (error as Error).message, - }); - } - } else { - price = Number(existingPrice.price); - } + const inputTokenPrice = await this.getPrice( + inputTokenAddress, + originChainId, + blockTime, + ); + const outputTokenPrice = await this.getPrice( + outputTokenAddress, + destinationChainId, + blockTime, + ); - if (price === undefined) { - this.logger.error({ - at: "PriceWorker", - ...params, - message: "Failed to get a valid price from cache or coingecko", - }); - return; - } - // TODO: Compute bridge fee + const inputToken = { + amount: relayHashInfo.fillEvent.inputAmount, + price: inputTokenPrice, + decimals: inputTokenInfo.decimals, + }; + + const outputToken = { + amount: relayHashInfo.fillEvent.outputAmount, + price: outputTokenPrice, + decimals: outputTokenInfo.decimals, + }; + + const bridgeFee = this.calculateBridgeFee(inputToken, outputToken); + relayHashInfo.bridgeFeeUsd = bridgeFee.toString(); + await this.relayHashInfoRepository.save(relayHashInfo); } public async close() { return this.worker.close(); diff --git a/packages/indexer/src/parseEnv.ts b/packages/indexer/src/parseEnv.ts index 376948f..efb2dd0 100644 --- a/packages/indexer/src/parseEnv.ts +++ b/packages/indexer/src/parseEnv.ts @@ -7,7 +7,6 @@ import { WebhookTypes, parseWebhookClientsFromString, } from "@repo/webhooks"; -import { CoingeckoSymbol } from "./utils/coingeckoClient"; export type Config = { redisConfig: RedisConfig; @@ -19,7 +18,6 @@ export type Config = { enableBundleIncludedEventsService: boolean; enableBundleBuilder: boolean; webhookConfig: WebhooksConfig; - coingeckoSymbols: CoingeckoSymbol[]; }; export type RedisConfig = { host: string; @@ -196,9 +194,6 @@ export function envToConfig(env: Env): Config { enabledWebhookRequestWorkers: true, clients: parseWebhookClientsFromString(env.WEBHOOK_CLIENTS ?? "[]"), }; - const coingeckoSymbols = parseArray(env.COINGECKO_SYMBOLS).map((symbol) => - CoingeckoSymbol.create(symbol), - ); return { redisConfig, postgresConfig, @@ -209,6 +204,5 @@ export function envToConfig(env: Env): Config { enableBundleIncludedEventsService, enableBundleBuilder, webhookConfig, - coingeckoSymbols, }; } diff --git a/packages/indexer/src/utils/coingeckoClient.ts b/packages/indexer/src/utils/coingeckoClient.ts index 7233b02..29a1837 100644 --- a/packages/indexer/src/utils/coingeckoClient.ts +++ b/packages/indexer/src/utils/coingeckoClient.ts @@ -1,33 +1,6 @@ import * as s from "superstruct"; import { DateTime } from "luxon"; -// tken from scraper and adapted from https://github.com/across-protocol/constants/blob/master/src/tokens.ts -export const CoingeckoSymbol = s.enums([ - "across-protocol", - "aleph-zero", - "arbitrum", - "badger-dao", - "balancer", - "boba-network", - "bridged-usd-coin-base", - "dai", - "ethereum", - "gho", - "havven", - "lisk", - "matic-network", - "optimism", - "pooltogether", - "tether", - "uma", - "usd-coin", - "usd-coin-ethereum-bridged", - "usdb", - "weth", - "wmatic", - "wrapped-bitcoin", -]); -export type CoingeckoSymbol = s.Infer; export const CGHistoricPriceBase = s.object({ id: s.string(), symbol: s.string(), @@ -38,8 +11,6 @@ export const CGHistoricPriceBase = s.object({ }), ), }); -export const isCoingeckoSymbol = (symbol: string) => - s.is(symbol, CoingeckoSymbol); export type CGHistoricPriceBase = s.Infer; @@ -57,7 +28,7 @@ export class CoingeckoClient { // rounds timestamp to the current day public async getHistoricDailyPrice( timestamp: number, - symbol: CoingeckoSymbol, + symbol: string, ): Promise { const cgFormattedDate = DateTime.fromMillis(timestamp).toFormat("dd-LL-yyyy"); diff --git a/packages/indexer/src/utils/currencyUtils.ts b/packages/indexer/src/utils/currencyUtils.ts index d6bdeee..f755c06 100644 --- a/packages/indexer/src/utils/currencyUtils.ts +++ b/packages/indexer/src/utils/currencyUtils.ts @@ -1,5 +1,4 @@ import * as constants from "@across-protocol/constants"; -import { isCoingeckoSymbol, CoingeckoSymbol } from "./coingeckoClient"; export type TokenInfo = { name: string; @@ -14,7 +13,7 @@ export type Token = { decimals: number; address: string; chainId: number; - coingeckoId: CoingeckoSymbol; + coingeckoId: string; }; // mapping the token constants to something easier to search export const tokenSymbolsMap = [ @@ -23,7 +22,6 @@ export const tokenSymbolsMap = [ // map to just a flat list export const tokensList = tokenSymbolsMap.reduce((result, token) => { Object.entries(token.addresses).forEach(([chainId, address]) => { - if (!isCoingeckoSymbol(token.coingeckoId)) return result; result.push({ name: token.name, symbol: token.symbol, @@ -37,13 +35,16 @@ export const tokensList = tokenSymbolsMap.reduce((result, token) => { }, [] as Token[]); // given an address and chain id, return the token data -export function findTokenByAddress( - address: string, - chainId: number, -): Token | undefined { - return tokensList.find( +export function findTokenByAddress(address: string, chainId: number): Token { + const result = tokensList.find( (token) => token.address.toLowerCase() === address.toLowerCase() && token.chainId === chainId, ); + if (!result) { + throw new Error( + `Token info not found for address: ${address} on chainId: ${chainId}`, + ); + } + return result; }