diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..73dbef17 Binary files /dev/null and b/.DS_Store differ diff --git a/common/protocol/package.json b/common/protocol/package.json index 0d63a9a4..5cd97b37 100644 --- a/common/protocol/package.json +++ b/common/protocol/package.json @@ -23,6 +23,7 @@ "@bundlr-network/client": "^0.8.9", "@kyvejs/sdk": "1.3.3", "@kyvejs/types": "1.4.1", + "@kyvejs/coins": "1.0.3", "arweave": "^1.10.17", "axios": "^0.27.2", "bignumber.js": "^9.1.2", diff --git a/common/protocol/src/index.ts b/common/protocol/src/index.ts index d77ec859..7642c5e7 100644 --- a/common/protocol/src/index.ts +++ b/common/protocol/src/index.ts @@ -31,6 +31,7 @@ import { skipUploaderRole, submitBundleProposal, syncPoolState, + syncParams, parseProposedBundle, validateBundleProposal, isNodeValidator, @@ -52,6 +53,7 @@ import { SupportedChains } from "@kyvejs/sdk/dist/constants"; import { storageProviderFactory } from "./reactors/storageProviders"; import { compressionFactory } from "./reactors/compression"; import { cacheProviderFactory } from "./reactors/cacheProvider"; +import { QueryParamsResponse } from "@kyvejs/types/lcd/kyve/query/v1beta1/params"; /** * Main class of KYVE protocol nodes representing a validator node. @@ -101,6 +103,8 @@ export class Validator { protected home!: string; protected dryRun!: boolean; protected dryRunBundles!: number; + protected ensureNoLoss!: boolean; + protected params!: QueryParamsResponse; // tmp variables protected lastUploadedBundle: { @@ -143,6 +147,7 @@ export class Validator { // queries protected syncPoolState = syncPoolState; + protected syncParams = syncParams; protected getBalancesForMetrics = getBalancesForMetrics; protected canVote = canVote; protected canPropose = canPropose; @@ -289,6 +294,11 @@ export class Validator { "Specify the number of bundles that should be tested before the node properly exits. If zero the node will run indefinitely [default = 0]", "0" ) + .option( + "--ensure-no-loss", + "Ensures that the node only uploads bundles which can be fully rewarded by the protocol.", + true + ) .action((options) => { this.start(options); }); @@ -328,6 +338,7 @@ export class Validator { this.home = options.home; this.dryRun = options.dryRun; this.dryRunBundles = parseInt(options.dryRunBundles); + this.ensureNoLoss = options.ensureNoLoss; // name the log file after the time the node got started this.logFile = `${new Date().toISOString()}.log`; diff --git a/common/protocol/src/methods/index.ts b/common/protocol/src/methods/index.ts index f051c352..7565b855 100644 --- a/common/protocol/src/methods/index.ts +++ b/common/protocol/src/methods/index.ts @@ -35,6 +35,7 @@ export * from "./queries/canPropose"; export * from "./queries/canVote"; export * from "./queries/getBalancesForMetrics"; export * from "./queries/syncPoolState"; +export * from "./queries/syncParams"; // validate export * from "./validate/saveBundleDownload"; diff --git a/common/protocol/src/methods/queries/syncParams.ts b/common/protocol/src/methods/queries/syncParams.ts new file mode 100644 index 00000000..434bab33 --- /dev/null +++ b/common/protocol/src/methods/queries/syncParams.ts @@ -0,0 +1,43 @@ +import { Validator } from "../.."; +import { callWithBackoffStrategy, standardizeError } from "../../utils"; + +/** + * syncParams fetches the all protocol params from chain + * + * @method syncParams + * @param {Validator} this + * @return {Promise} + */ +export async function syncParams(this: Validator): Promise { + await callWithBackoffStrategy( + async () => { + for (let l = 0; l < this.lcd.length; l++) { + try { + this.logger.debug(this.rest[l]); + this.logger.debug(`this.lcd.kyve.query.v1beta1.params()`); + + this.params = await this.lcd[l].kyve.query.v1beta1.params(); + + this.m.query_params_successful.inc(); + return; + } catch (err) { + this.logger.error(`REST call to "${this.rest[l]}" failed`); + this.logger.error(standardizeError(err)); + } + } + + throw new Error(); + }, + { limitTimeoutMs: 5 * 60 * 1000, increaseByMs: 10 * 1000 }, + (err: any, ctx) => { + this.logger.info( + `Requesting query params was unsuccessful. Retrying in ${( + ctx.nextTimeoutInMs / 1000 + ).toFixed(2)}s ...` + ); + this.logger.debug(standardizeError(err)); + + this.m.query_params_failed.inc(); + } + ); +} diff --git a/common/protocol/src/methods/setups/setupMetrics.ts b/common/protocol/src/methods/setups/setupMetrics.ts index 01f6f675..b2a90399 100644 --- a/common/protocol/src/methods/setups/setupMetrics.ts +++ b/common/protocol/src/methods/setups/setupMetrics.ts @@ -143,6 +143,21 @@ export function setupMetrics(this: Validator): void { help: "The amount of QueryPool /kyve/query/v1beta1/pool/{id} calls that failed.", }); + // QueryParams metrics + this.logger.debug(`Initializing metrics: query_params_successful`); + + this.m.query_params_successful = new prom_client.Counter({ + name: "query_params_successful", + help: "The amount of QueryParams /kyve/query/v1beta1/params/{id} calls that succeeded.", + }); + + this.logger.debug(`Initializing metrics: query_params_failed`); + + this.m.query_params_failed = new prom_client.Counter({ + name: "query_params_failed", + help: "The amount of QueryParams /kyve/query/v1beta1/params/{id} calls that failed.", + }); + // QueryCanValidate metrics this.logger.debug(`Initializing metrics: query_can_validate_successful`); diff --git a/common/protocol/src/methods/upload/createBundleProposal.ts b/common/protocol/src/methods/upload/createBundleProposal.ts index b6747ede..d3c7a8b9 100644 --- a/common/protocol/src/methods/upload/createBundleProposal.ts +++ b/common/protocol/src/methods/upload/createBundleProposal.ts @@ -1,12 +1,13 @@ import BigNumber from "bignumber.js"; import { Validator } from "../.."; -import { BundleTag, DataItem } from "../../types"; +import { DataItem } from "../../types"; import { bundleToBytes, MAX_BUNDLE_BYTE_SIZE, sha256, standardizeError, } from "../../utils"; +import { Coin, Coins } from "@kyvejs/coins"; /** * createBundleProposal assembles a bundle proposal by loading @@ -38,7 +39,7 @@ export async function createBundleProposal(this: Validator): Promise { const toIndex = fromIndex + parseInt(this.pool.data!.max_bundle_size); // load bundle proposal from local cache - const bundleProposal: DataItem[] = []; + let bundleProposal: DataItem[] = []; // here we try to fetch data items from the current index // to the proposal index. If we fail before we simply @@ -48,20 +49,12 @@ export async function createBundleProposal(this: Validator): Promise { `Loading bundle from index ${fromIndex} to index ${toIndex}` ); - let bundleSize = 0; for (let i = fromIndex; i < toIndex; i++) { try { // try to get the data item from local cache this.logger.debug(`this.cacheProvider.get(${i.toString()})`); const item = await this.cacheProvider.get(i.toString()); bundleProposal.push(item); - // calculate the size of the data item and add it to the total bundle size - const itemSize = Buffer.from(JSON.stringify(item)).byteLength; - bundleSize += itemSize; - // break if bundle size exceeds limit - if (bundleSize > MAX_BUNDLE_BYTE_SIZE) { - break; - } } catch { // if the data item was not found simply abort // and submit what we just have now @@ -80,6 +73,159 @@ export async function createBundleProposal(this: Validator): Promise { this.logger.info(`Data was found on local cache from required range`); + // get current compression defined on pool + this.logger.debug(`this.compressionFactory()`); + const compression = this.compressionFactory(); + + let maxBytes = MAX_BUNDLE_BYTE_SIZE; + + if ( + this.ensureNoLoss && + (this.sdk[0].isMainnet() || this.sdk[0].isLocal()) + ) { + await this.syncParams(); + + // calculate expected storage rewards to calculate + // the maximum amount of bytes we can upload before + // running into a loss + let rewards = new Coins({ + denom: this.sdk[0].config.coinDenom, + amount: new BigNumber(this.pool.account_balance) + .times(this.params.pool_params?.pool_inflation_payout_rate ?? "0") + .toFixed(0), + }); + + this.pool.fundings.forEach((f) => { + rewards = rewards.add( + new Coins(...f.amounts_per_bundle).min(...f.amounts) + ); + }); + + rewards = new Coins( + ...rewards.toArray().map((coin) => ({ + denom: coin.denom, + amount: new BigNumber(coin.amount) + .multipliedBy( + new BigNumber(1).minus( + this.params.bundles_params?.network_fee ?? "0" + ) + ) + .toFixed(0), + })) + ); + + const rewardsUsd = rewards.toArray().reduce((acc: string, coin: Coin) => { + const coin_entry = this.params.funders_params?.coin_whitelist.find( + (w) => w.coin_denom === coin.denom + ); + + if (!coin_entry || coin_entry.coin_weight === "0") { + return acc; + } + + return new BigNumber(acc) + .plus( + new BigNumber(coin.amount) + .dividedBy( + new BigNumber(10).exponentiatedBy(coin_entry.coin_decimals) + ) + .times(coin_entry.coin_weight) + ) + .toString(); + }, "0"); + + const storageCost = + this.params.bundles_params?.storage_costs.find( + (s) => + s.storage_provider_id === + (this.pool.data?.current_storage_provider_id ?? 0) + )?.cost ?? "0"; + + if (new BigNumber(storageCost).gt(0)) { + const maxBytesWithNoLoss = +new BigNumber(rewardsUsd) + .dividedBy(storageCost) + .toFixed(0); + + if (new BigNumber(maxBytesWithNoLoss).lt(maxBytes)) { + maxBytes = maxBytesWithNoLoss; + } + } + } + + let low = 0; + let high = bundleProposal.length - 1; + let maxIndex = -1; + let size; + + // use binary search to minimize the times we have to compress the bundle to + // find the biggest bundle which is still below the max byte size + while (low <= high) { + const mid = Math.floor((low + high) / 2); + + this.logger.debug( + `this.compression.compress($RAW_BUNDLE_PROPOSAL[0:${mid}])` + ); + const storageProviderData = await compression + .compress(bundleToBytes(bundleProposal.slice(0, mid + 1))) + .catch((err) => { + this.logger.error( + `Unexpected error compressing bundle. Skipping Uploader Role ...` + ); + this.logger.error(standardizeError(err)); + + return null; + }); + + // skip uploader role if compression returns null + if (storageProviderData === null) { + await this.skipUploaderRole(fromIndex); + return; + } + + size = storageProviderData.byteLength; + + this.logger.debug( + `Bundle proposal with index range 0,${mid} has byte size ${size} of max allowed ${maxBytes} bytes` + ); + + if (size < maxBytes) { + if (mid >= maxIndex) { + maxIndex = mid; + } + low = mid + 1; + } else if (size > maxBytes) { + high = mid - 1; + } else { + if (mid >= maxIndex) { + maxIndex = mid; + } + break; + } + } + + this.logger.debug( + `Choosing bundle proposal with index range 0,${maxIndex} has biggest byte size ${size} still below max allowed ${maxBytes} bytes` + ); + + if (maxIndex + 1 === 0) { + this.logger.info( + `Skip uploader role since uploading at least one data item would exceed the maximum bytes limit` + ); + + await this.skipUploaderRole(fromIndex); + return; + } + + this.logger.info( + `Dropping ${bundleProposal.length - (maxIndex + 1)} items from original ${ + bundleProposal.length + } item bundle proposal to prevent exceeding the maximum bytes limit` + ); + + // cutoff any data items which would exceed the maximum data size which + // does not lead to a loss + bundleProposal = bundleProposal.slice(0, maxIndex + 1); + // get the first key of the bundle proposal which gets // included in the bundle proposal and saved on chain // as from_key @@ -111,10 +257,6 @@ export async function createBundleProposal(this: Validator): Promise { return; } - // get current compression defined on pool - this.logger.debug(`this.compressionFactory()`); - const compression = this.compressionFactory(); - const uploadBundle = bundleToBytes(bundleProposal); // if data was found on the cache proceed with compressing the diff --git a/common/protocol/src/types/metrics.ts b/common/protocol/src/types/metrics.ts index a82ecfcb..f9399dda 100644 --- a/common/protocol/src/types/metrics.ts +++ b/common/protocol/src/types/metrics.ts @@ -32,6 +32,10 @@ export interface IMetrics { query_pool_successful: PromCounter; query_pool_failed: PromCounter; + // QueryParams metrics + query_params_successful: PromCounter; + query_params_failed: PromCounter; + // QueryCanValidate metrics query_can_validate_successful: PromCounter; query_can_validate_failed: PromCounter; diff --git a/common/protocol/test/fallback.test.ts b/common/protocol/test/fallback.test.ts index 81e6d7c1..719b9f47 100644 --- a/common/protocol/test/fallback.test.ts +++ b/common/protocol/test/fallback.test.ts @@ -227,10 +227,7 @@ describe("fallback tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); - expect(compression.compress).toHaveBeenLastCalledWith( - Buffer.from(JSON.stringify(bundle)) - ); + expect(compression.compress).toHaveBeenCalled(); expect(compression.decompress).toHaveBeenCalledTimes(0); @@ -388,10 +385,7 @@ describe("fallback tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); - expect(compression.compress).toHaveBeenLastCalledWith( - Buffer.from(JSON.stringify(bundle)) - ); + expect(compression.compress).toHaveBeenCalled(); expect(compression.decompress).toHaveBeenCalledTimes(0); @@ -565,10 +559,7 @@ describe("fallback tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); - expect(compression.compress).toHaveBeenLastCalledWith( - Buffer.from(JSON.stringify(bundle)) - ); + expect(compression.compress).toHaveBeenCalled(); expect(compression.decompress).toHaveBeenCalledTimes(0); diff --git a/common/protocol/test/genesis.test.ts b/common/protocol/test/genesis.test.ts index 0f6a87ae..e0705c2e 100644 --- a/common/protocol/test/genesis.test.ts +++ b/common/protocol/test/genesis.test.ts @@ -216,10 +216,7 @@ describe("genesis tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); - expect(compression.compress).toHaveBeenLastCalledWith( - Buffer.from(JSON.stringify(bundle)) - ); + expect(compression.compress).toHaveBeenCalled(); expect(compression.decompress).toHaveBeenCalledTimes(0); diff --git a/common/protocol/test/propose_bundle.test.ts b/common/protocol/test/propose_bundle.test.ts index 6fcc13a5..beaf6097 100644 --- a/common/protocol/test/propose_bundle.test.ts +++ b/common/protocol/test/propose_bundle.test.ts @@ -243,7 +243,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); + expect(compression.compress).toHaveBeenCalled(); expect(compression.compress).toHaveBeenLastCalledWith( Buffer.from(JSON.stringify(bundle)) ); @@ -411,7 +411,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); + expect(compression.compress).toHaveBeenCalled(); expect(compression.compress).toHaveBeenLastCalledWith( Buffer.from(JSON.stringify(bundle)) ); @@ -701,7 +701,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); + expect(compression.compress).toHaveBeenCalled(); expect(compression.compress).toHaveBeenLastCalledWith( Buffer.from(JSON.stringify(bundle)) ); @@ -860,7 +860,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); + expect(compression.compress).toHaveBeenCalled(); expect(compression.compress).toHaveBeenLastCalledWith( Buffer.from(JSON.stringify(bundle)) ); @@ -1015,7 +1015,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); + expect(compression.compress).toHaveBeenCalled(); expect(compression.compress).toHaveBeenLastCalledWith( Buffer.from(JSON.stringify(bundle)) ); @@ -1179,7 +1179,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); + expect(compression.compress).toHaveBeenCalled(); expect(compression.compress).toHaveBeenLastCalledWith( Buffer.from(JSON.stringify(bundle)) ); @@ -1483,7 +1483,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); + expect(compression.compress).toHaveBeenCalled(); expect(compression.compress).toHaveBeenLastCalledWith( Buffer.from(JSON.stringify(bundle)) ); @@ -1634,7 +1634,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(0); + expect(compression.compress).toHaveBeenCalled(); expect(compression.decompress).toHaveBeenCalledTimes(0); @@ -1782,10 +1782,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); - expect(compression.compress).toHaveBeenLastCalledWith( - Buffer.from(JSON.stringify(bundle)) - ); + expect(compression.compress).toHaveBeenCalled(); expect(compression.decompress).toHaveBeenCalledTimes(0); @@ -1793,11 +1790,7 @@ describe("propose bundle tests", () => { // ASSERT RUNTIME INTERFACES // ========================= - expect(runtime.summarizeDataBundle).toHaveBeenCalledTimes(1); - expect(runtime.summarizeDataBundle).toHaveBeenLastCalledWith( - expect.any(Validator), - bundle - ); + expect(runtime.summarizeDataBundle).toHaveBeenCalledTimes(0); expect(runtime.validateDataItem).toHaveBeenCalledTimes(0); @@ -1954,7 +1947,7 @@ describe("propose bundle tests", () => { // ASSERT COMPRESSION INTERFACES // ============================= - expect(compression.compress).toHaveBeenCalledTimes(1); + expect(compression.compress).toHaveBeenCalled(); expect(compression.compress).toHaveBeenLastCalledWith( Buffer.from(JSON.stringify(bundle)) ); diff --git a/common/sdk/src/constants.ts b/common/sdk/src/constants.ts index ec1d89a0..7b9b11b5 100644 --- a/common/sdk/src/constants.ts +++ b/common/sdk/src/constants.ts @@ -7,6 +7,11 @@ export const GOV_AUTHORITY = "kyve10d07y265gmmuvt4z0w9aw880jnsr700jdv7nah"; export const GAS_MULTIPLIER = 1.5; export const COIN_TYPE = 118; +export const MAINNET_CHAIN_ID = "kyve-1"; +export const TESTNET_CHAIN_ID = "kaon-1"; +export const DEVNET_CHAIN_ID = "korellia-2"; +export const LOCAL_CHAIN_ID = "kyve-local"; + export type IConfig = { chainId: string; chainName: string; diff --git a/common/sdk/src/sdk.ts b/common/sdk/src/sdk.ts index d865640b..4ace7c88 100644 --- a/common/sdk/src/sdk.ts +++ b/common/sdk/src/sdk.ts @@ -18,14 +18,18 @@ import { createKyveLCDClient } from "./clients/lcd-client/client"; import KyveClient from "./clients/rpc-client/client"; import KyveWebClient from "./clients/rpc-client/web.client"; import { + DEVNET_CHAIN_ID, IConfig, KYVE_COSMOSTATION_CONFIG, KYVE_KEPLR_CONFIG, KYVE_LEAP_CONFIG, + LOCAL_CHAIN_ID, + MAINNET_CHAIN_ID, PREFIX, SUPPORTED_CHAIN_CONFIGS, SUPPORTED_WALLETS, SupportedChains, + TESTNET_CHAIN_ID, } from "./constants"; import { cosmostationMethods, @@ -377,4 +381,36 @@ export class KyveSDK { fromBase64(signature) ); } + + /** + * check if sdk is connected to the kyve mainnet + * @return boolean + */ + isMainnet(): boolean { + return this.config.chainId === MAINNET_CHAIN_ID; + } + + /** + * check if sdk is connected to the kyve testnet + * @return boolean + */ + isTestnet(): boolean { + return this.config.chainId === TESTNET_CHAIN_ID; + } + + /** + * check if sdk is connected to the kyve devnet + * @return boolean + */ + isDevnet(): boolean { + return this.config.chainId === DEVNET_CHAIN_ID; + } + + /** + * check if sdk is connected to the kyve local chain + * @return boolean + */ + isLocal(): boolean { + return this.config.chainId === LOCAL_CHAIN_ID; + } } diff --git a/integrations/.DS_Store b/integrations/.DS_Store new file mode 100644 index 00000000..3ee24203 Binary files /dev/null and b/integrations/.DS_Store differ diff --git a/integrations/tendermint/.DS_Store b/integrations/tendermint/.DS_Store new file mode 100644 index 00000000..6c7017c9 Binary files /dev/null and b/integrations/tendermint/.DS_Store differ