Skip to content

Commit

Permalink
chore: ensure no loss (#162)
Browse files Browse the repository at this point in the history
* chore: finished implementation of ensure-no-loss
  • Loading branch information
troykessler authored Jan 15, 2025
1 parent dde47da commit 83a46d6
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 48 deletions.
Binary file added .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions common/protocol/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"@bundlr-network/client": "^0.8.9",
"@kyvejs/sdk": "1.3.3",
"@kyvejs/types": "1.4.1",
"@kyvejs/coins": "1.0.3",
"arweave": "^1.10.17",
"axios": "^0.27.2",
"bignumber.js": "^9.1.2",
Expand Down
11 changes: 11 additions & 0 deletions common/protocol/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
skipUploaderRole,
submitBundleProposal,
syncPoolState,
syncParams,
parseProposedBundle,
validateBundleProposal,
isNodeValidator,
Expand All @@ -52,6 +53,7 @@ import { SupportedChains } from "@kyvejs/sdk/dist/constants";
import { storageProviderFactory } from "./reactors/storageProviders";
import { compressionFactory } from "./reactors/compression";
import { cacheProviderFactory } from "./reactors/cacheProvider";
import { QueryParamsResponse } from "@kyvejs/types/lcd/kyve/query/v1beta1/params";

/**
* Main class of KYVE protocol nodes representing a validator node.
Expand Down Expand Up @@ -101,6 +103,8 @@ export class Validator {
protected home!: string;
protected dryRun!: boolean;
protected dryRunBundles!: number;
protected ensureNoLoss!: boolean;
protected params!: QueryParamsResponse;

// tmp variables
protected lastUploadedBundle: {
Expand Down Expand Up @@ -143,6 +147,7 @@ export class Validator {

// queries
protected syncPoolState = syncPoolState;
protected syncParams = syncParams;
protected getBalancesForMetrics = getBalancesForMetrics;
protected canVote = canVote;
protected canPropose = canPropose;
Expand Down Expand Up @@ -289,6 +294,11 @@ export class Validator {
"Specify the number of bundles that should be tested before the node properly exits. If zero the node will run indefinitely [default = 0]",
"0"
)
.option(
"--ensure-no-loss",
"Ensures that the node only uploads bundles which can be fully rewarded by the protocol.",
true
)
.action((options) => {
this.start(options);
});
Expand Down Expand Up @@ -328,6 +338,7 @@ export class Validator {
this.home = options.home;
this.dryRun = options.dryRun;
this.dryRunBundles = parseInt(options.dryRunBundles);
this.ensureNoLoss = options.ensureNoLoss;

// name the log file after the time the node got started
this.logFile = `${new Date().toISOString()}.log`;
Expand Down
1 change: 1 addition & 0 deletions common/protocol/src/methods/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export * from "./queries/canPropose";
export * from "./queries/canVote";
export * from "./queries/getBalancesForMetrics";
export * from "./queries/syncPoolState";
export * from "./queries/syncParams";

// validate
export * from "./validate/saveBundleDownload";
Expand Down
43 changes: 43 additions & 0 deletions common/protocol/src/methods/queries/syncParams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Validator } from "../..";
import { callWithBackoffStrategy, standardizeError } from "../../utils";

/**
* syncParams fetches the all protocol params from chain
*
* @method syncParams
* @param {Validator} this
* @return {Promise<void>}
*/
export async function syncParams(this: Validator): Promise<void> {
await callWithBackoffStrategy(
async () => {
for (let l = 0; l < this.lcd.length; l++) {
try {
this.logger.debug(this.rest[l]);
this.logger.debug(`this.lcd.kyve.query.v1beta1.params()`);

this.params = await this.lcd[l].kyve.query.v1beta1.params();

this.m.query_params_successful.inc();
return;
} catch (err) {
this.logger.error(`REST call to "${this.rest[l]}" failed`);
this.logger.error(standardizeError(err));
}
}

throw new Error();
},
{ limitTimeoutMs: 5 * 60 * 1000, increaseByMs: 10 * 1000 },
(err: any, ctx) => {
this.logger.info(
`Requesting query params was unsuccessful. Retrying in ${(
ctx.nextTimeoutInMs / 1000
).toFixed(2)}s ...`
);
this.logger.debug(standardizeError(err));

this.m.query_params_failed.inc();
}
);
}
15 changes: 15 additions & 0 deletions common/protocol/src/methods/setups/setupMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ export function setupMetrics(this: Validator): void {
help: "The amount of QueryPool /kyve/query/v1beta1/pool/{id} calls that failed.",
});

// QueryParams metrics
this.logger.debug(`Initializing metrics: query_params_successful`);

this.m.query_params_successful = new prom_client.Counter({
name: "query_params_successful",
help: "The amount of QueryParams /kyve/query/v1beta1/params/{id} calls that succeeded.",
});

this.logger.debug(`Initializing metrics: query_params_failed`);

this.m.query_params_failed = new prom_client.Counter({
name: "query_params_failed",
help: "The amount of QueryParams /kyve/query/v1beta1/params/{id} calls that failed.",
});

// QueryCanValidate metrics
this.logger.debug(`Initializing metrics: query_can_validate_successful`);

Expand Down
170 changes: 156 additions & 14 deletions common/protocol/src/methods/upload/createBundleProposal.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import BigNumber from "bignumber.js";
import { Validator } from "../..";
import { BundleTag, DataItem } from "../../types";
import { DataItem } from "../../types";
import {
bundleToBytes,
MAX_BUNDLE_BYTE_SIZE,
sha256,
standardizeError,
} from "../../utils";
import { Coin, Coins } from "@kyvejs/coins";

/**
* createBundleProposal assembles a bundle proposal by loading
Expand Down Expand Up @@ -38,7 +39,7 @@ export async function createBundleProposal(this: Validator): Promise<void> {
const toIndex = fromIndex + parseInt(this.pool.data!.max_bundle_size);

// load bundle proposal from local cache
const bundleProposal: DataItem[] = [];
let bundleProposal: DataItem[] = [];

// here we try to fetch data items from the current index
// to the proposal index. If we fail before we simply
Expand All @@ -48,20 +49,12 @@ export async function createBundleProposal(this: Validator): Promise<void> {
`Loading bundle from index ${fromIndex} to index ${toIndex}`
);

let bundleSize = 0;
for (let i = fromIndex; i < toIndex; i++) {
try {
// try to get the data item from local cache
this.logger.debug(`this.cacheProvider.get(${i.toString()})`);
const item = await this.cacheProvider.get(i.toString());
bundleProposal.push(item);
// calculate the size of the data item and add it to the total bundle size
const itemSize = Buffer.from(JSON.stringify(item)).byteLength;
bundleSize += itemSize;
// break if bundle size exceeds limit
if (bundleSize > MAX_BUNDLE_BYTE_SIZE) {
break;
}
} catch {
// if the data item was not found simply abort
// and submit what we just have now
Expand All @@ -80,6 +73,159 @@ export async function createBundleProposal(this: Validator): Promise<void> {

this.logger.info(`Data was found on local cache from required range`);

// get current compression defined on pool
this.logger.debug(`this.compressionFactory()`);
const compression = this.compressionFactory();

let maxBytes = MAX_BUNDLE_BYTE_SIZE;

if (
this.ensureNoLoss &&
(this.sdk[0].isMainnet() || this.sdk[0].isLocal())
) {
await this.syncParams();

// calculate expected storage rewards to calculate
// the maximum amount of bytes we can upload before
// running into a loss
let rewards = new Coins({
denom: this.sdk[0].config.coinDenom,
amount: new BigNumber(this.pool.account_balance)
.times(this.params.pool_params?.pool_inflation_payout_rate ?? "0")
.toFixed(0),
});

this.pool.fundings.forEach((f) => {
rewards = rewards.add(
new Coins(...f.amounts_per_bundle).min(...f.amounts)
);
});

rewards = new Coins(
...rewards.toArray().map((coin) => ({
denom: coin.denom,
amount: new BigNumber(coin.amount)
.multipliedBy(
new BigNumber(1).minus(
this.params.bundles_params?.network_fee ?? "0"
)
)
.toFixed(0),
}))
);

const rewardsUsd = rewards.toArray().reduce((acc: string, coin: Coin) => {
const coin_entry = this.params.funders_params?.coin_whitelist.find(
(w) => w.coin_denom === coin.denom
);

if (!coin_entry || coin_entry.coin_weight === "0") {
return acc;
}

return new BigNumber(acc)
.plus(
new BigNumber(coin.amount)
.dividedBy(
new BigNumber(10).exponentiatedBy(coin_entry.coin_decimals)
)
.times(coin_entry.coin_weight)
)
.toString();
}, "0");

const storageCost =
this.params.bundles_params?.storage_costs.find(
(s) =>
s.storage_provider_id ===
(this.pool.data?.current_storage_provider_id ?? 0)
)?.cost ?? "0";

if (new BigNumber(storageCost).gt(0)) {
const maxBytesWithNoLoss = +new BigNumber(rewardsUsd)
.dividedBy(storageCost)
.toFixed(0);

if (new BigNumber(maxBytesWithNoLoss).lt(maxBytes)) {
maxBytes = maxBytesWithNoLoss;
}
}
}

let low = 0;
let high = bundleProposal.length - 1;
let maxIndex = -1;
let size;

// use binary search to minimize the times we have to compress the bundle to
// find the biggest bundle which is still below the max byte size
while (low <= high) {
const mid = Math.floor((low + high) / 2);

this.logger.debug(
`this.compression.compress($RAW_BUNDLE_PROPOSAL[0:${mid}])`
);
const storageProviderData = await compression
.compress(bundleToBytes(bundleProposal.slice(0, mid + 1)))
.catch((err) => {
this.logger.error(
`Unexpected error compressing bundle. Skipping Uploader Role ...`
);
this.logger.error(standardizeError(err));

return null;
});

// skip uploader role if compression returns null
if (storageProviderData === null) {
await this.skipUploaderRole(fromIndex);
return;
}

size = storageProviderData.byteLength;

this.logger.debug(
`Bundle proposal with index range 0,${mid} has byte size ${size} of max allowed ${maxBytes} bytes`
);

if (size < maxBytes) {
if (mid >= maxIndex) {
maxIndex = mid;
}
low = mid + 1;
} else if (size > maxBytes) {
high = mid - 1;
} else {
if (mid >= maxIndex) {
maxIndex = mid;
}
break;
}
}

this.logger.debug(
`Choosing bundle proposal with index range 0,${maxIndex} has biggest byte size ${size} still below max allowed ${maxBytes} bytes`
);

if (maxIndex + 1 === 0) {
this.logger.info(
`Skip uploader role since uploading at least one data item would exceed the maximum bytes limit`
);

await this.skipUploaderRole(fromIndex);
return;
}

this.logger.info(
`Dropping ${bundleProposal.length - (maxIndex + 1)} items from original ${
bundleProposal.length
} item bundle proposal to prevent exceeding the maximum bytes limit`
);

// cutoff any data items which would exceed the maximum data size which
// does not lead to a loss
bundleProposal = bundleProposal.slice(0, maxIndex + 1);

// get the first key of the bundle proposal which gets
// included in the bundle proposal and saved on chain
// as from_key
Expand Down Expand Up @@ -111,10 +257,6 @@ export async function createBundleProposal(this: Validator): Promise<void> {
return;
}

// get current compression defined on pool
this.logger.debug(`this.compressionFactory()`);
const compression = this.compressionFactory();

const uploadBundle = bundleToBytes(bundleProposal);

// if data was found on the cache proceed with compressing the
Expand Down
4 changes: 4 additions & 0 deletions common/protocol/src/types/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ export interface IMetrics {
query_pool_successful: PromCounter;
query_pool_failed: PromCounter;

// QueryParams metrics
query_params_successful: PromCounter;
query_params_failed: PromCounter;

// QueryCanValidate metrics
query_can_validate_successful: PromCounter;
query_can_validate_failed: PromCounter;
Expand Down
15 changes: 3 additions & 12 deletions common/protocol/test/fallback.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,7 @@ describe("fallback tests", () => {
// ASSERT COMPRESSION INTERFACES
// =============================

expect(compression.compress).toHaveBeenCalledTimes(1);
expect(compression.compress).toHaveBeenLastCalledWith(
Buffer.from(JSON.stringify(bundle))
);
expect(compression.compress).toHaveBeenCalled();

expect(compression.decompress).toHaveBeenCalledTimes(0);

Expand Down Expand Up @@ -388,10 +385,7 @@ describe("fallback tests", () => {
// ASSERT COMPRESSION INTERFACES
// =============================

expect(compression.compress).toHaveBeenCalledTimes(1);
expect(compression.compress).toHaveBeenLastCalledWith(
Buffer.from(JSON.stringify(bundle))
);
expect(compression.compress).toHaveBeenCalled();

expect(compression.decompress).toHaveBeenCalledTimes(0);

Expand Down Expand Up @@ -565,10 +559,7 @@ describe("fallback tests", () => {
// ASSERT COMPRESSION INTERFACES
// =============================

expect(compression.compress).toHaveBeenCalledTimes(1);
expect(compression.compress).toHaveBeenLastCalledWith(
Buffer.from(JSON.stringify(bundle))
);
expect(compression.compress).toHaveBeenCalled();

expect(compression.decompress).toHaveBeenCalledTimes(0);

Expand Down
Loading

0 comments on commit 83a46d6

Please sign in to comment.