From f32e5f33447d27f6425fa2754f64bc1b5acfe1ef Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Wed, 14 Feb 2024 12:12:44 -0500 Subject: [PATCH] refactor: migrate to new ISA implementation (#588) This switches to a rewrite of indexer-selection that has been stripped down to only concerning itself with picking candidates out of a valid set. The new implementation removes a lot of long-standing inefficiencies, and will be easier to iterate on changes to the ISA. See https://github.com/edgeandnode/candidate-selection A bunch of integration testing and production performance comparisons will happen before this is released. --- Cargo.lock | 50 ++- Cargo.toml | 7 +- gateway-framework/Cargo.toml | 2 +- gateway-framework/src/budgets.rs | 2 +- graph-gateway/Cargo.toml | 2 +- graph-gateway/src/block_constraints.rs | 6 +- graph-gateway/src/client_query.rs | 383 +++++++++++------- graph-gateway/src/client_query/context.rs | 7 +- graph-gateway/src/indexer_client.rs | 3 +- graph-gateway/src/indexing_performance.rs | 122 ++++++ graph-gateway/src/lib.rs | 1 + graph-gateway/src/main.rs | 95 +---- indexer-selection/Cargo.toml | 23 -- indexer-selection/bin/sim.rs | 97 ----- indexer-selection/src/actor.rs | 86 ---- indexer-selection/src/decay.rs | 221 ---------- indexer-selection/src/economic_security.rs | 70 ---- indexer-selection/src/indexing.rs | 130 ------ indexer-selection/src/lib.rs | 317 --------------- indexer-selection/src/performance.rs | 44 -- indexer-selection/src/reliability.rs | 43 -- indexer-selection/src/score.rs | 300 -------------- indexer-selection/src/simulation.rs | 143 ------- indexer-selection/src/test.rs | 342 ---------------- indexer-selection/src/tokens.rs | 12 - indexer-selection/src/utility.rs | 84 ---- .../tools/contrived-characteristics.csv | 11 - .../tools/indexer-characteristics.py | 63 --- .../tools/reliability-response.py | 34 -- indexer-selection/tools/simulation.py | 40 -- 30 files changed, 410 insertions(+), 2330 deletions(-) create mode 100644 graph-gateway/src/indexing_performance.rs delete mode 100644 indexer-selection/Cargo.toml delete mode 100644 indexer-selection/bin/sim.rs delete mode 100644 indexer-selection/src/actor.rs delete mode 100644 indexer-selection/src/decay.rs delete mode 100644 indexer-selection/src/economic_security.rs delete mode 100644 indexer-selection/src/indexing.rs delete mode 100644 indexer-selection/src/lib.rs delete mode 100644 indexer-selection/src/performance.rs delete mode 100644 indexer-selection/src/reliability.rs delete mode 100644 indexer-selection/src/score.rs delete mode 100644 indexer-selection/src/simulation.rs delete mode 100644 indexer-selection/src/test.rs delete mode 100644 indexer-selection/src/tokens.rs delete mode 100644 indexer-selection/src/utility.rs delete mode 100644 indexer-selection/tools/contrived-characteristics.csv delete mode 100644 indexer-selection/tools/indexer-characteristics.py delete mode 100644 indexer-selection/tools/reliability-response.py delete mode 100644 indexer-selection/tools/simulation.py diff --git a/Cargo.lock b/Cargo.lock index 93f53b50..82b5e93e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -667,6 +667,30 @@ dependencies = [ "serde", ] +[[package]] +name = "candidate-selection" +version = "0.1.0" +source = "git+ssh://git@github.com/edgeandnode/candidate-selection.git?rev=0f711bc#0f711bc250bb484ae86c5673683520f6524fbd7f" +dependencies = [ + "arrayvec 0.7.4", + "ordered-float", + "permutation", + "proptest", + "rand", +] + +[[package]] +name = "candidate-selection" +version = "0.1.0" +source = "git+ssh://git@github.com/edgeandnode/candidate-selection.git?rev=c0b5efa#c0b5efa6a6657c201cc71ee4108991e37bc1f581" +dependencies = [ + "arrayvec 0.7.4", + "ordered-float", + "permutation", + "proptest", + "rand", +] + [[package]] name = "cargo-platform" version = "0.1.6" @@ -1871,13 +1895,13 @@ dependencies = [ "anyhow", "assert_matches", "axum", + "candidate-selection 0.1.0 (git+ssh://git@github.com/edgeandnode/candidate-selection.git?rev=c0b5efa)", "ethers", "eventuals", "gateway-common", "graphql-http 0.2.1", "headers", "hex", - "indexer-selection", "itertools 0.12.1", "lazy_static", "maxminddb", @@ -2450,21 +2474,13 @@ checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" [[package]] name = "indexer-selection" -version = "0.0.1" +version = "0.1.0" +source = "git+ssh://git@github.com/edgeandnode/candidate-selection.git?rev=0f711bc#0f711bc250bb484ae86c5673683520f6524fbd7f" dependencies = [ - "alloy-primitives", - "anyhow", - "arrayvec 0.7.4", - "gateway-common", - "num-traits", - "ordered-float", - "permutation", + "candidate-selection 0.1.0 (git+ssh://git@github.com/edgeandnode/candidate-selection.git?rev=0f711bc)", "rand", - "rand_distr", "thegraph", - "tokio", "toolshed", - "tracing", ] [[package]] @@ -3671,16 +3687,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "rand_distr" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" -dependencies = [ - "num-traits", - "rand", -] - [[package]] name = "rand_xorshift" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 64590b59..9f35499a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,5 @@ [workspace] -members = [ - "graph-gateway", - "indexer-selection", - "gateway-common", - "gateway-framework", -] +members = ["graph-gateway", "gateway-common", "gateway-framework"] resolver = "2" [profile.release] diff --git a/gateway-framework/Cargo.toml b/gateway-framework/Cargo.toml index fb4bb1fa..3828d0d3 100644 --- a/gateway-framework/Cargo.toml +++ b/gateway-framework/Cargo.toml @@ -9,12 +9,12 @@ alloy-sol-types = "0.6.2" anyhow.workspace = true axum.workspace = true ethers = "2.0.13" +candidate-selection = { git = "ssh://git@github.com/edgeandnode/candidate-selection.git", rev = "c0b5efa" } eventuals = "0.6.7" gateway-common = { path = "../gateway-common" } graphql-http.workspace = true headers = "0.3.9" hex.workspace = true -indexer-selection = { path = "../indexer-selection" } itertools = "0.12.1" lazy_static = "1.4.0" maxminddb = "0.24" diff --git a/gateway-framework/src/budgets.rs b/gateway-framework/src/budgets.rs index 48cfabf7..83d230ed 100644 --- a/gateway-framework/src/budgets.rs +++ b/gateway-framework/src/budgets.rs @@ -1,7 +1,7 @@ use std::time::Duration; +use candidate_selection::criteria::decay::DecayBuffer; use eventuals::{Eventual, EventualWriter}; -use indexer_selection::decay::DecayBuffer; use ordered_float::NotNan; use tokio::time::interval; use tokio::{select, spawn, sync::mpsc}; diff --git a/graph-gateway/Cargo.toml b/graph-gateway/Cargo.toml index cbc0b45e..3f678051 100644 --- a/graph-gateway/Cargo.toml +++ b/graph-gateway/Cargo.toml @@ -20,7 +20,7 @@ graphql.workspace = true graphql-http.workspace = true headers = "0.3.9" hickory-resolver = "0.24.0" -indexer-selection = { path = "../indexer-selection" } +indexer-selection = { git = "ssh://git@github.com/edgeandnode/candidate-selection.git", rev = "0f711bc" } indoc = "2.0.4" itertools = "0.12.1" num-traits = "0.2.18" diff --git a/graph-gateway/src/block_constraints.rs b/graph-gateway/src/block_constraints.rs index ceec4604..a5a6fa0e 100644 --- a/graph-gateway/src/block_constraints.rs +++ b/graph-gateway/src/block_constraints.rs @@ -13,9 +13,7 @@ use thegraph::types::BlockPointer; use gateway_framework::{block_constraints::BlockConstraint, errors::Error}; -pub fn block_constraints<'c>( - context: &'c Context<'c, String>, -) -> Result, Error> { +pub fn block_constraints(context: &Context) -> Result, Error> { let mut constraints = BTreeSet::new(); let vars = &context.variables; // ba6c90f1-3baf-45be-ac1c-f60733404436 @@ -64,7 +62,7 @@ pub fn block_constraints<'c>( } pub fn make_query_deterministic( - mut ctx: Context<'_, String>, + mut ctx: Context, resolved: &BTreeSet, latest: &BlockPointer, ) -> Result { diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index 3f9e3eef..879666ea 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -1,8 +1,8 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use std::time::{Duration, Instant}; -use alloy_primitives::{Address, BlockNumber, U256}; +use alloy_primitives::{Address, BlockNumber}; use alloy_sol_types::Eip712Domain; use anyhow::anyhow; use axum::extract::OriginalUri; @@ -15,22 +15,24 @@ use axum::{ use cost_model::{Context as AgoraContext, CostModel}; use eventuals::Ptr; use futures::future::join_all; +use gateway_framework::budgets::USD; +use gateway_framework::chains::UnresolvedBlock; +use gateway_framework::errors::UnavailableReason; use headers::ContentType; +use indexer_selection::{ArrayVec, Candidate, ExpectedPerformance, Normalized, Performance}; use num_traits::cast::ToPrimitive as _; use ordered_float::NotNan; use prost::bytes::Buf; use rand::{rngs::SmallRng, SeedableRng as _}; use serde::Deserialize; use serde_json::value::RawValue; -use thegraph::types::{attestation, BlockPointer, DeploymentId, UDecimal18}; +use thegraph::types::{attestation, BlockPointer, DeploymentId}; use tokio::sync::mpsc; -use toolshed::buffer_queue::QueueWriter; +use toolshed::url::Url; use tracing::Instrument; use gateway_common::utils::http_ext::HttpBuilderExt; use gateway_common::{types::Indexing, utils::timestamp::unix_timestamp}; -use gateway_framework::budgets::USD; -use gateway_framework::chains::UnresolvedBlock; use gateway_framework::{ block_constraints::BlockConstraint, chains::BlockCache, @@ -38,13 +40,11 @@ use gateway_framework::{ metrics::{with_metric, METRICS}, scalar::ScalarReceipt, }; -use indexer_selection::{ - actor::Update, BlockRequirements, Candidate, IndexerError as SelectionError, - IndexerErrorObservation, InputError, Selection, UtilityParameters, SELECTION_LIMIT, -}; use crate::block_constraints::{block_constraints, make_query_deterministic}; use crate::indexer_client::{check_block_error, IndexerClient, ResponsePayload}; +use crate::indexers::indexing; +use crate::indexing_performance::IndexingPerformance; use crate::reports::{self, serialize_attestation, KafkaClient}; use crate::topology::{Deployment, GraphNetwork, Subgraph}; use crate::unattestable_errors::{miscategorized_attestable, miscategorized_unattestable}; @@ -68,12 +68,29 @@ pub mod query_tracing; pub mod rate_limiter; pub mod require_auth; +const SELECTION_LIMIT: usize = 3; + #[derive(Debug, Deserialize)] pub struct QueryBody { pub query: String, pub variables: Option>, } +#[derive(Clone, Debug)] +pub struct Selection { + pub indexing: Indexing, + pub url: Url, + pub fee: u128, + pub blocks_behind: u64, +} + +struct BlockRequirements { + /// required block range + range: Option<(BlockNumber, BlockNumber)>, + /// does the query benefit from using the latest block (contains NumberGTE or Unconstrained) + latest: bool, +} + #[allow(clippy::too_many_arguments)] pub async fn handle_query( State(ctx): State, @@ -246,7 +263,7 @@ async fn handle_client_query_inner( let mut indexer_errors: BTreeMap = Default::default(); - let mut candidates: BTreeSet = deployments + let mut available_indexers: BTreeSet = deployments .iter() .flat_map(move |deployment| { let id = deployment.id; @@ -256,21 +273,20 @@ async fn handle_client_query_inner( }) }) .collect(); - if candidates.is_empty() { - return Err(Error::NoIndexers); - } - let blocklist = ctx .indexings_blocklist .value_immediate() .unwrap_or_default(); - candidates.retain(|candidate| { + available_indexers.retain(|candidate| { if blocklist.contains(candidate) || ctx.bad_indexers.contains(&candidate.indexer) { indexer_errors.insert(candidate.indexer, IndexerError::Unavailable(NoStatus)); return false; } true }); + if available_indexers.is_empty() { + return Err(Error::NoIndexers); + } let variables = payload .variables @@ -313,78 +329,53 @@ async fn handle_client_query_inner( .map(|(index, deployment)| (deployment.id, index.try_into().unwrap_or(u8::MAX))) .collect(); - let candidates: Vec = candidates - .into_iter() - .filter_map(|indexing| { - let cost_model = ctx - .indexing_statuses - .value_immediate() - .and_then(|m| m.get(&indexing).and_then(|s| s.cost_model.clone())); - let fee = match indexer_fee(&cost_model, &mut context) { - // Clamp indexer fee to the budget. - Ok(fee) => fee.min(budget), - Err(err) => { - indexer_errors.insert(indexing.indexer, err); - return None; - } - }; - Some(Candidate { - indexing, - fee, - versions_behind: *versions_behind.get(&indexing.deployment).unwrap_or(&0), - }) - }) - .collect(); - - let block_constraints = block_constraints(&context).unwrap_or_default(); - let mut resolved_blocks = join_all( - block_constraints - .iter() - .filter_map(|constraint| constraint.clone().into_unresolved()) - .map(|unresolved| block_cache.fetch_block(unresolved)), + let mut resolved_blocks = Default::default(); + let block_requirements = resolve_block_requirements( + block_cache, + &mut resolved_blocks, + &context, + manifest_min_block, ) - .await - .into_iter() - .collect::, UnresolvedBlock>>() - .map_err(Error::BlockNotFound)?; - let min_block = resolved_blocks.iter().map(|b| b.number).min(); - let max_block = resolved_blocks.iter().map(|b| b.number).max(); - let block_requirements = BlockRequirements { - range: min_block.map(|min| (min, max_block.unwrap())), - has_latest: block_constraints.iter().any(|c| match c { - BlockConstraint::Unconstrained | BlockConstraint::NumberGTE(_) => true, - BlockConstraint::Hash(_) | BlockConstraint::Number(_) => false, - }), - }; + .await?; - // Reject queries for blocks before the minimum start block in the manifest, but only if the - // constraint is for an exact block. For example, we always want to allow `block_gte: 0`. - let request_contains_invalid_blocks = resolved_blocks - .iter() - .filter(|b| { - block_constraints.iter().any(|c| match c { - BlockConstraint::Unconstrained | BlockConstraint::NumberGTE(_) => false, - BlockConstraint::Hash(hash) => hash == &b.hash, - BlockConstraint::Number(number) => number == &b.number, - }) - }) - .any(|b| b.number < manifest_min_block); - if request_contains_invalid_blocks { - return Err(Error::BadQuery(anyhow!( - "requested block {}, before minimum `startBlock` of manifest {}", - min_block.unwrap_or_default(), - manifest_min_block, - ))); + let chain_head = block_cache + .chain_head + .value_immediate() + .ok_or(Error::BlockNotFound(UnresolvedBlock::WithNumber(0)))? + .number; + let blocks_per_minute = block_cache.blocks_per_minute.value_immediate().unwrap_or(0); + tracing::debug!(chain_head, blocks_per_minute); + + let indexing_statuses = ctx.indexing_statuses.value_immediate().unwrap(); + let mut candidates = Vec::new(); + { + let perf = ctx.indexing_perf.latest(); + for indexing in available_indexers { + match prepare_candidate( + &ctx.network, + &indexing_statuses, + &perf, + &versions_behind, + &mut context, + &block_requirements, + chain_head, + blocks_per_minute, + grt_per_usd, + budget, + indexing, + ) { + Ok(candidate) => candidates.push(candidate), + Err(indexer_error) => { + indexer_errors.insert(indexing.indexer, indexer_error); + } + } + } + } + if candidates.is_empty() { + return Err(Error::NoIndexers); } let blocks_per_minute = block_cache.blocks_per_minute.value_immediate().unwrap_or(0); - let mut utility_params = UtilityParameters { - budget: indexer_selection::tokens::GRT(UDecimal18::from_raw_u256(U256::from(budget))), - requirements: block_requirements, - // 170cbcf3-db7f-404a-be13-2022d9142677 - latest_block: 0, - block_rate_hz: blocks_per_minute as f64 / 60.0, - }; let mut rng = SmallRng::from_entropy(); let mut total_indexer_fees: u128 = 0; @@ -392,43 +383,41 @@ async fn handle_client_query_inner( for retry in 0..ctx.indexer_selection_retry_limit { // Make sure our observations are up-to-date if retrying. if retry > 0 { - let _ = ctx.observations.flush().await; + ctx.indexing_perf.flush().await; + + // Update candidate performance. + let perf = ctx.indexing_perf.latest(); + for candidate in &mut candidates { + if let Some(updated) = perf.get(&Indexing { + indexer: candidate.indexer, + deployment: candidate.deployment, + }) { + candidate.perf = updated.expected_performance(); + } + } } - let latest_block = block_cache - .chain_head - .value_immediate() - .ok_or(Error::BlockNotFound(UnresolvedBlock::WithNumber(0)))?; - tracing::debug!(?latest_block); - // 170cbcf3-db7f-404a-be13-2022d9142677 - utility_params.latest_block = latest_block.number; - let selection_timer = METRICS.indexer_selection_duration.start_timer(); - let (mut selections, selection_errors) = ctx - .isa_state - .latest() - .select_indexers(&mut rng, &utility_params, &candidates) - .map_err(|err| match err { - InputError::MissingNetworkParams => { - Error::Internal(anyhow!("missing network params")) - } - })?; + let selections: ArrayVec<&Candidate, SELECTION_LIMIT> = + indexer_selection::select(&mut rng, &candidates); drop(selection_timer); + let mut selections: Vec = selections + .iter() + .map(|c| Selection { + indexing: Indexing { + indexer: c.indexer, + deployment: c.deployment, + }, + url: c.url.clone(), + fee: (c.fee.as_f64() * budget as f64) as u128, + blocks_behind: (c.seconds_behind as u64 / 60) * blocks_per_minute, + }) + .collect(); let selections_len = selections.len(); - for (error, indexers) in selection_errors.0 { - for indexer in indexers { - let error = match error { - SelectionError::NoStatus => IndexerError::Unavailable(NoStatus), - SelectionError::NoStake => IndexerError::Unavailable(NoStake), - SelectionError::MissingRequiredBlock => IndexerError::Unavailable(MissingBlock), - SelectionError::FeeTooHigh => IndexerError::Internal("fee too high"), - SelectionError::NaN => IndexerError::Internal("NaN"), - }; - indexer_errors.insert(*indexer, error); - } - } if selections.is_empty() { + // Candidates that would never be selected should be filtered out for improved errors. + tracing::error!("no candidates selected"); continue; } @@ -453,14 +442,14 @@ async fn handle_client_query_inner( indexer_client: ctx.indexer_client.clone(), kafka_client: ctx.kafka_client, attestation_domain: ctx.attestation_domain, - observations: ctx.observations.clone(), + indexing_perf: ctx.indexing_perf.clone(), deployment, response_time: Duration::default(), }; let latest_query_block = pick_latest_query_block( block_cache, - latest_block.number.saturating_sub(selection.blocks_behind), + chain_head.saturating_sub(selection.blocks_behind), blocks_per_minute, ) .await @@ -483,9 +472,9 @@ async fn handle_client_query_inner( let optimistic_query = optimistic_query( context.clone(), &mut resolved_blocks, - &latest_block, + chain_head, &latest_query_block, - &utility_params.requirements, + &block_requirements, block_cache, &selection, ) @@ -514,7 +503,6 @@ async fn handle_client_query_inner( indexer_query_context, selection.clone(), deterministic_query, - latest_query_block.number, optimistic_query, ) .await; @@ -554,12 +542,133 @@ async fn handle_client_query_inner( ))) } +async fn resolve_block_requirements( + block_cache: &BlockCache, + resolved_blocks: &mut BTreeSet, + context: &AgoraContext<'_, String>, + manifest_min_block: BlockNumber, +) -> Result { + let constraints = block_constraints(context).unwrap_or_default(); + resolved_blocks.append( + &mut join_all( + constraints + .iter() + .filter_map(|constraint| constraint.clone().into_unresolved()) + .map(|unresolved| block_cache.fetch_block(unresolved)), + ) + .await + .into_iter() + .collect::, UnresolvedBlock>>() + .map_err(Error::BlockNotFound)?, + ); + let min_block = resolved_blocks.iter().map(|b| b.number).min(); + let max_block = resolved_blocks.iter().map(|b| b.number).max(); + + // Reject queries for blocks before the minimum start block in the manifest, but only if the + // constraint is for an exact block. For example, we always want to allow `block_gte: 0`. + let request_contains_invalid_blocks = resolved_blocks + .iter() + .filter(|b| { + constraints.iter().any(|c| match c { + BlockConstraint::Unconstrained | BlockConstraint::NumberGTE(_) => false, + BlockConstraint::Hash(hash) => hash == &b.hash, + BlockConstraint::Number(number) => number == &b.number, + }) + }) + .any(|b| b.number < manifest_min_block); + if request_contains_invalid_blocks { + return Err(Error::BadQuery(anyhow!( + "requested block {}, before minimum `startBlock` of manifest {}", + min_block.unwrap_or_default(), + manifest_min_block, + ))); + } + + Ok(BlockRequirements { + range: min_block.map(|min| (min, max_block.unwrap())), + latest: constraints.iter().any(|c| match c { + BlockConstraint::Unconstrained | BlockConstraint::NumberGTE(_) => true, + BlockConstraint::Hash(_) | BlockConstraint::Number(_) => false, + }), + }) +} + +#[allow(clippy::too_many_arguments)] +fn prepare_candidate( + network: &GraphNetwork, + statuses: &HashMap, + perf: &HashMap, + versions_behind: &BTreeMap, + context: &mut AgoraContext, + block_requirements: &BlockRequirements, + chain_head: BlockNumber, + blocks_per_minute: u64, + grt_per_usd: NotNan, + budget: u128, + indexing: Indexing, +) -> Result { + let info = network + .indexing(&indexing) + .ok_or(IndexerError::Unavailable(UnavailableReason::NoStatus))?; + let status = statuses + .get(&indexing) + .ok_or(IndexerError::Unavailable(UnavailableReason::NoStatus))?; + let perf = perf + .get(&indexing) + .map(|p| p.expected_performance()) + .unwrap_or(ExpectedPerformance { + success_rate: Normalized::ONE, + latency_success_ms: 0, + latency_failure_ms: 0, + }); + + let fee = Normalized::new(indexer_fee(&status.cost_model, context)? as f64 / budget as f64) + .unwrap_or(Normalized::ONE); + + let reported_block_range = status.min_block.unwrap_or(0)..=status.block; + if block_requirements + .range + .map(|(start, end)| { + !(reported_block_range.contains(&start) && reported_block_range.contains(&end)) + }) + .unwrap_or(false) + { + return Err(IndexerError::Unavailable(UnavailableReason::MissingBlock)); + } + + let seconds_behind = if block_requirements.latest { + chain_head + .saturating_sub(status.block) + .checked_div(blocks_per_minute) + .unwrap_or(0) as u32 + } else { + 0 + }; + + let slashable_usd = ((info.staked_tokens as f64 * 1e-18) / *grt_per_usd) as u64; + if slashable_usd == 0 { + return Err(IndexerError::Unavailable(NoStake)); + } + + Ok(Candidate { + indexer: indexing.indexer, + deployment: indexing.deployment, + url: info.url.clone(), + perf, + fee, + seconds_behind, + slashable_usd, + subgraph_versions_behind: *versions_behind.get(&indexing.deployment).unwrap_or(&0), + zero_allocation: info.allocated_tokens == 0, + }) +} + #[derive(Clone)] struct IndexerQueryContext { pub indexer_client: IndexerClient, pub kafka_client: &'static KafkaClient, pub attestation_domain: &'static Eip712Domain, - pub observations: QueueWriter, + pub indexing_perf: IndexingPerformance, pub deployment: Arc, pub response_time: Duration, } @@ -568,7 +677,6 @@ async fn handle_indexer_query( mut ctx: IndexerQueryContext, selection: Selection, deterministic_query: String, - latest_query_block: u64, optimistic_query: Option, ) -> Result { let indexing = selection.indexing; @@ -595,9 +703,10 @@ async fn handle_indexer_query( }; METRICS.indexer_query.check(&[&deployment], &result); + let latency_ms = ctx.response_time.as_millis() as u32; tracing::info!( target: reports::INDEXER_QUERY_TARGET, - response_time_ms = ctx.response_time.as_millis() as u32, + response_time_ms = latency_ms, status_message = match &result { Ok(_) => "200 OK".to_string(), Err(err) => format!("{err:?}"), @@ -605,20 +714,8 @@ async fn handle_indexer_query( status_code = reports::indexer_attempt_status_code(&result), ); - let observation = match &result { - Ok(_) => Ok(()), - Err(IndexerError::Timeout) => Err(IndexerErrorObservation::Timeout), - Err(IndexerError::Unavailable(MissingBlock)) => { - Err(IndexerErrorObservation::IndexingBehind { latest_query_block }) - } - Err(_) => Err(IndexerErrorObservation::Other), - }; - - let _ = ctx.observations.write(Update::QueryObservation { - indexing, - duration: ctx.response_time, - result: observation, - }); + ctx.indexing_perf + .feedback(indexing, result.is_ok(), latency_ms); result } @@ -676,9 +773,6 @@ async fn handle_indexer_query_inner( for error in &indexer_errors { if miscategorized_unattestable(error) { - let _ = ctx.observations.write(Update::Penalty { - indexing: selection.indexing, - }); let message = if !indexer_errors.is_empty() { format!("unattestable response: {}", indexer_errors.join("; ")) } else { @@ -749,7 +843,11 @@ async fn pick_latest_query_block( max_block: BlockNumber, blocks_per_minute: u64, ) -> Result { - for n in [max_block, max_block - 1, max_block - blocks_per_minute] { + for n in [ + max_block, + max_block.saturating_sub(1), + max_block.saturating_sub(blocks_per_minute), + ] { if let Ok(block) = cache.fetch_block(UnresolvedBlock::WithNumber(n)).await { return Ok(block); } @@ -764,13 +862,13 @@ async fn pick_latest_query_block( async fn optimistic_query( ctx: AgoraContext<'_, String>, resolved: &mut BTreeSet, - latest: &BlockPointer, + chain_head: BlockNumber, latest_query_block: &BlockPointer, requirements: &BlockRequirements, block_cache: &BlockCache, selection: &Selection, ) -> Option { - if !requirements.has_latest { + if !requirements.latest { return None; } let blocks_per_minute = block_cache.blocks_per_minute.value_immediate()?; @@ -778,8 +876,7 @@ async fn optimistic_query( return None; } let min_block = requirements.range.map(|(min, _)| min).unwrap_or(0); - let optimistic_block_number = latest - .number + let optimistic_block_number = chain_head .saturating_sub(blocks_per_minute / 30) .max(min_block); if optimistic_block_number <= latest_query_block.number { diff --git a/graph-gateway/src/client_query/context.rs b/graph-gateway/src/client_query/context.rs index db1bfbea..838a6693 100644 --- a/graph-gateway/src/client_query/context.rs +++ b/graph-gateway/src/client_query/context.rs @@ -4,17 +4,15 @@ use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use eventuals::{Eventual, Ptr}; use ordered_float::NotNan; -use toolshed::buffer_queue::QueueWriter; -use toolshed::double_buffer::DoubleBufferReader; use toolshed::url::Url; use gateway_common::types::Indexing; use gateway_framework::budgets::Budgeter; use gateway_framework::chains::BlockCache; -use indexer_selection::actor::Update; use crate::indexer_client::IndexerClient; use crate::indexers::indexing; +use crate::indexing_performance::IndexingPerformance; use crate::reports::KafkaClient; use crate::topology::GraphNetwork; @@ -29,9 +27,8 @@ pub struct Context { pub block_caches: &'static HashMap, pub network: GraphNetwork, pub indexing_statuses: Eventual>>, + pub indexing_perf: IndexingPerformance, pub attestation_domain: &'static Eip712Domain, pub bad_indexers: &'static HashSet
, pub indexings_blocklist: Eventual>>, - pub isa_state: DoubleBufferReader, - pub observations: QueueWriter, } diff --git a/graph-gateway/src/indexer_client.rs b/graph-gateway/src/indexer_client.rs index 38844d65..01905bd7 100644 --- a/graph-gateway/src/indexer_client.rs +++ b/graph-gateway/src/indexer_client.rs @@ -8,7 +8,8 @@ use gateway_framework::{ errors::{IndexerError, UnavailableReason::*}, scalar::{ReceiptSigner, ReceiptStatus, ScalarReceipt}, }; -use indexer_selection::Selection; + +use crate::client_query::Selection; pub struct IndexerResponse { pub status: u16, diff --git a/graph-gateway/src/indexing_performance.rs b/graph-gateway/src/indexing_performance.rs new file mode 100644 index 00000000..d652cdf6 --- /dev/null +++ b/graph-gateway/src/indexing_performance.rs @@ -0,0 +1,122 @@ +use futures::channel::oneshot; +use gateway_common::types::Indexing; +use indexer_selection::Performance; +use std::{collections::HashMap, ops::Deref, time::Duration}; +use tokio::{ + select, + sync::{mpsc, RwLock}, + time, +}; + +#[derive(Clone)] +pub struct IndexingPerformance { + data: &'static DoubleBuffer, + msgs: mpsc::UnboundedSender, +} + +pub enum Msg { + Feedback { + indexing: Indexing, + success: bool, + latency_ms: u32, + }, + Flush(oneshot::Sender<()>), +} + +impl IndexingPerformance { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + let data: &'static DoubleBuffer = Box::leak(Box::default()); + Actor::spawn(data, rx); + Self { data, msgs: tx } + } + + pub fn latest(&'_ self) -> impl '_ + Deref> { + loop { + // This is guaranteed to only move forward in time, and is almost guaranteed to acquire + // the lock "immediately". These guarantees come from the invariant that there is a + // single writer and it can only be in a few possible states. + for locked in &self.data.0 { + if let Ok(data) = locked.try_read() { + return data; + } + } + } + } + + pub fn feedback(&self, indexing: Indexing, success: bool, latency_ms: u32) { + self.msgs + .send(Msg::Feedback { + indexing, + success, + latency_ms, + }) + .unwrap(); + } + + pub async fn flush(&self) { + let (tx, rx) = oneshot::channel(); + self.msgs.send(Msg::Flush(tx)).unwrap(); + let _ = rx.await; + } +} + +#[derive(Default)] +struct DoubleBuffer([RwLock>; 2]); + +struct Actor { + data: &'static DoubleBuffer, +} + +impl Actor { + fn spawn(data: &'static DoubleBuffer, mut msgs: mpsc::UnboundedReceiver) { + let mut actor = Self { data }; + let mut timer = time::interval(Duration::from_secs(1)); + tokio::spawn(async move { + let batch_limit = 32; + let mut msg_buf = Vec::with_capacity(batch_limit); + loop { + select! { + _ = timer.tick() => actor.decay().await, + _ = msgs.recv_many(&mut msg_buf, batch_limit) => actor.handle_msgs(&mut msg_buf).await, + } + } + }); + } + + async fn decay(&mut self) { + for locked in &self.data.0 { + for perf in locked.write().await.values_mut() { + perf.decay(); + } + } + } + + async fn handle_msgs(&mut self, msgs: &mut Vec) { + for locked in &self.data.0 { + let mut unlocked = locked.write().await; + for msg in msgs.iter() { + match msg { + Msg::Flush(_) => (), + &Msg::Feedback { + indexing, + success, + latency_ms, + } => { + unlocked + .entry(indexing) + .or_default() + .feedback(success, latency_ms); + } + } + } + } + for msg in msgs.drain(..) { + if let Msg::Flush(notify) = msg { + notify.send(()).unwrap(); + } + } + debug_assert!(msgs.is_empty()); + } +} diff --git a/graph-gateway/src/lib.rs b/graph-gateway/src/lib.rs index 9e6da929..08c48caf 100644 --- a/graph-gateway/src/lib.rs +++ b/graph-gateway/src/lib.rs @@ -3,6 +3,7 @@ pub mod client_query; pub mod config; pub mod indexer_client; pub mod indexers; +pub mod indexing_performance; pub mod indexings_blocklist; pub mod reports; pub mod subgraph_studio; diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 1658252e..dac658b2 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -18,27 +18,23 @@ use axum::{ routing, Router, Server, }; use eventuals::{Eventual, EventualExt as _, Ptr}; +use gateway_framework::budgets::USD; +use graph_gateway::indexing_performance::IndexingPerformance; use ordered_float::NotNan; use prometheus::{self, Encoder as _}; use secp256k1::SecretKey; use serde_json::json; use simple_rate_limiter::RateLimiter; -use thegraph::types::UDecimal18; use thegraph::{ client as subgraph_client, types::{attestation, DeploymentId}, }; use tokio::signal::unix::SignalKind; use tokio::spawn; -use toolshed::{ - buffer_queue::{self, QueueWriter}, - double_buffer, -}; use tower_http::cors::{self, CorsLayer}; use uuid::Uuid; use gateway_common::types::Indexing; -use gateway_framework::budgets::USD; use gateway_framework::geoip::GeoIP; use gateway_framework::scalar::ReceiptSigner; use gateway_framework::{ @@ -62,8 +58,6 @@ use graph_gateway::indexings_blocklist::indexings_blocklist; use graph_gateway::reports::{self, KafkaClient}; use graph_gateway::topology::{Deployment, GraphNetwork}; use graph_gateway::{client_query, indexings_blocklist, subgraph_studio, subscriptions_subgraph}; -use indexer_selection::tokens::GRT; -use indexer_selection::{actor::Update, BlockStatus}; #[global_allocator] static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; @@ -101,15 +95,6 @@ async fn main() { tracing::info!("Graph gateway starting... ID: {}", gateway_id); tracing::debug!(config = %config_repr); - let (isa_state, isa_writer) = double_buffer!(indexer_selection::State::default()); - - // Start the actor to manage updates - let (update_writer, update_reader) = buffer_queue::pair(); - spawn(async move { - indexer_selection::actor::process_updates(isa_writer, update_reader).await; - tracing::error!("ISA actor stopped"); - }); - let geoip = config .geoip_database .filter(|_| !config.geoip_blocked_countries.is_empty()) @@ -138,13 +123,6 @@ async fn main() { } ExchangeRateProvider::Rpc(url) => exchange_rate::grt_per_usd(url).await.unwrap(), }; - update_from_eventual( - grt_per_usd - .clone() - .map(|grt_per_usd| async move { GRT(UDecimal18::try_from(*grt_per_usd).unwrap()) }), - update_writer.clone(), - Update::GRTPerUSD, - ); let network_subgraph_client = subgraph_client::Client::new(http_client.clone(), config.network_subgraph.clone()); @@ -152,13 +130,6 @@ async fn main() { network_subgraph::Client::create(network_subgraph_client, config.l2_gateway.is_some()) .await; - // TODO: delete when new ISA is used - update_writer - .write(Update::SlashingPercentage( - UDecimal18::try_from(0.025).unwrap(), - )) - .unwrap(); - let attestation_domain: &'static Eip712Domain = Box::leak(Box::new(attestation::eip712_domain( U256::from_str_radix(&config.attestations.chain_id, 10) @@ -230,24 +201,11 @@ async fn main() { .await, )); - { - let update_writer = update_writer.clone(); - let indexing_statuses = indexing_statuses.clone(); - eventuals::join((network.deployments.clone(), indexing_statuses)) - .pipe_async(move |(deployments, indexing_statuses)| { - let update_writer = update_writer.clone(); - async move { - write_indexer_inputs( - &update_writer, - receipt_signer, - &deployments, - &indexing_statuses, - ) - .await; - } - }) - .forever(); - } + eventuals::join((network.deployments.clone(), indexing_statuses.clone())) + .pipe_async(move |(deployments, indexing_statuses)| async move { + update_allocations(receipt_signer, &deployments, &indexing_statuses).await; + }) + .forever(); let api_keys = match config.studio_url { Some(url) => subgraph_studio::api_keys(http_client.clone(), url, config.studio_auth), @@ -290,8 +248,6 @@ async fn main() { tracing::info!("Waiting for exchange rate..."); grt_per_usd.value().await.unwrap(); - tracing::info!("Waiting for ISA setup..."); - update_writer.flush().await.unwrap(); let client_query_ctx = Context { indexer_client: IndexerClient { @@ -305,12 +261,11 @@ async fn main() { grt_per_usd, network, indexing_statuses, + indexing_perf: IndexingPerformance::new(), attestation_domain, bad_indexers, indexings_blocklist, block_caches, - observations: update_writer, - isa_state, }; tracing::info!("Waiting for chain heads from block caches..."); @@ -450,18 +405,6 @@ async fn await_shutdown_signals() { } } -fn update_from_eventual(eventual: Eventual, writer: QueueWriter, f: F) -where - V: eventuals::Value, - F: 'static + Send + Fn(V) -> Update, -{ - eventual - .pipe(move |v| { - let _ = writer.write(f(v)); - }) - .forever(); -} - async fn ip_rate_limit( State(limiter): State<&'static RateLimiter>, ConnectInfo(info): ConnectInfo, @@ -474,8 +417,7 @@ async fn ip_rate_limit( Ok(next.run(req).await) } -async fn write_indexer_inputs( - update_writer: &QueueWriter, +async fn update_allocations( receipt_signer: &ReceiptSigner, deployments: &HashMap>, indexing_statuses: &HashMap, @@ -489,7 +431,6 @@ async fn write_indexer_inputs( indexing_statuses = indexing_statuses.len(), ); - let mut indexings: HashMap = HashMap::new(); let mut allocations: HashMap = HashMap::new(); for (deployment, indexer) in deployments.values().flat_map(|deployment| { deployment @@ -501,27 +442,9 @@ async fn write_indexer_inputs( indexer: indexer.id, deployment: deployment.id, }; - let status = match indexing_statuses.get(&indexing) { - Some(status) => status, - None => continue, - }; - let update = indexer_selection::IndexingStatus { - url: indexer.url.clone(), - stake: GRT(UDecimal18::from_raw_u256(U256::from(indexer.staked_tokens))), - allocation: GRT(UDecimal18::from_raw_u256(U256::from( - indexer.allocated_tokens, - ))), - block: Some(BlockStatus { - reported_number: status.block, - behind_reported_block: false, - min_block: status.min_block, - }), - }; allocations.insert(indexing, indexer.largest_allocation); - indexings.insert(indexing, update); } receipt_signer.update_allocations(allocations).await; - let _ = update_writer.write(Update::Indexings(indexings)); } async fn handle_metrics() -> impl axum::response::IntoResponse { diff --git a/indexer-selection/Cargo.toml b/indexer-selection/Cargo.toml deleted file mode 100644 index af348831..00000000 --- a/indexer-selection/Cargo.toml +++ /dev/null @@ -1,23 +0,0 @@ -[package] -edition = "2021" -name = "indexer-selection" -version = "0.0.1" - -[[bin]] -name = "sim" -path = "bin/sim.rs" - -[dependencies] -alloy-primitives.workspace = true -anyhow.workspace = true -arrayvec = "0.7" -gateway-common = { path = "../gateway-common" } -num-traits = "0.2" -ordered-float = "4.2" -permutation = "0.4" -rand.workspace = true -rand_distr = "0.4" -thegraph.workspace = true -tokio.workspace = true -toolshed.workspace = true -tracing.workspace = true diff --git a/indexer-selection/bin/sim.rs b/indexer-selection/bin/sim.rs deleted file mode 100644 index a3a1e2f1..00000000 --- a/indexer-selection/bin/sim.rs +++ /dev/null @@ -1,97 +0,0 @@ -use std::io::{stdin, BufRead as _}; - -use anyhow::Result; - -use gateway_common::utils::testing::gen_blocks; -use indexer_selection::{ - simulation::*, tokens::GRT, BlockRequirements, Selection, UtilityParameters, -}; -use thegraph::types::UDecimal18; - -#[tokio::main] -async fn main() -> Result<()> { - let header = "indexer,fee,blocks_behind,latency_ms,success_rate,allocation,stake"; - let characteristics = stdin() - .lock() - .lines() - .filter_map(|line| { - let line = line.unwrap(); - if line == header { - return None; - } - let fields = line.split(',').collect::>(); - Some(IndexerCharacteristics { - address: fields[0].parse().expect("address"), - fee: GRT(fields[1].parse().expect("fee")), - blocks_behind: fields[2].parse::().expect("blocks_behind").round() as u64, - latency_ms: fields[3].parse::().expect("latency_ms").round() as u64, - success_rate: fields[4].parse().expect("success_rate"), - allocation: GRT(fields[5].parse().expect("allocation")), - stake: GRT(fields[6].parse().expect("stake")), - }) - }) - .collect::>(); - - let budget = GRT(UDecimal18::try_from(0.01).unwrap()); - let freshness_requirements = BlockRequirements { - range: None, - has_latest: true, - }; - let blocks = { - let max_blocks_behind = characteristics.iter().map(|c| c.blocks_behind).max(); - let last_block = max_blocks_behind.unwrap() + 100; - gen_blocks(&(0..last_block).collect::>()) - }; - let latest_block = blocks.last().unwrap().number; - let params = UtilityParameters { - budget, - requirements: freshness_requirements, - latest_block, - block_rate_hz: 0.1, - }; - - println!("label,indexer,detail,selections,fees"); - eprintln!("| selection limit | total fees (GRT) | avg. latency (ms) | avg. blocks behind | avg. indexers selected | avg. selection time (ms) |"); - eprintln!("| --- | --- | --- | --- | --- | --- |"); - for selection_limit in [1, 3] { - let results = simulate(&characteristics, ¶ms, 100, selection_limit).await?; - - let total_cost = results.selections.iter().map(|s| s.fee).sum::() as f64 * 1e-18; - eprintln!( - "| {} | {:.6} | {:.0} | {:.2} | {:.2} | {:.2} |", - selection_limit, - total_cost, - results.avg_latency, - results.avg_blocks_behind, - results.selections.len() as f64 / results.client_queries as f64, - results.avg_selection_seconds * 1e3, - ); - - for indexer in &characteristics { - let selections = results - .selections - .iter() - .filter(|s| s.indexing.indexer == indexer.address) - .collect::>(); - let detail = format!( - "fee={:.4} behind={:02} latency={:04} success={:.3} alloc={:1.0e} stake={:1.0e}", - f64::from(indexer.fee.0), - indexer.blocks_behind, - indexer.latency_ms, - indexer.success_rate, - f64::from(indexer.allocation.0), - f64::from(indexer.stake.0), - ); - println!( - "selection_limit={},{},{},{},{}", - selection_limit, - indexer.address, - detail, - selections.len(), - selections.iter().map(|s| s.fee).sum::() as f64 * 1e-18, - ); - } - } - - Ok(()) -} diff --git a/indexer-selection/src/actor.rs b/indexer-selection/src/actor.rs deleted file mode 100644 index c686c899..00000000 --- a/indexer-selection/src/actor.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::collections::HashMap; - -use thegraph::types::UDecimal18; -use tokio::{ - select, - time::{sleep_until, Duration, Instant}, -}; -use toolshed::{ - buffer_queue::{Event, QueueReader}, - double_buffer::DoubleBufferWriter, -}; - -use crate::{tokens::GRT, IndexerErrorObservation, Indexing, IndexingStatus, State}; - -#[derive(Debug)] -pub enum Update { - GRTPerUSD(GRT), - SlashingPercentage(UDecimal18), - Indexings(HashMap), - QueryObservation { - indexing: Indexing, - duration: Duration, - result: Result<(), IndexerErrorObservation>, - }, - Penalty { - indexing: Indexing, - }, -} - -pub async fn process_updates( - mut writer: DoubleBufferWriter, - mut events: QueueReader, -) { - let mut event_buffer = Vec::new(); - let mut next_decay = Instant::now() + Duration::from_secs(60); - - loop { - select! { - _ = sleep_until(next_decay) => { - next_decay = Instant::now() + Duration::from_secs(60); - writer.update(|indexers| indexers.decay()).await; - }, - _ = events.read(&mut event_buffer) => { - writer - .update(|state| { - for update in event_buffer.iter() { - if let Event::Update(update) = update { - apply_state_update(state, update); - } - } - }) - .await; - for update in event_buffer.iter() { - if let Event::Flush(notify) = update { - notify.notify_one(); - } - } - event_buffer.clear(); - }, - } - } -} - -pub fn apply_state_update(state: &mut State, update: &Update) { - match update { - Update::GRTPerUSD(grt_per_usd) => { - state.network_params.grt_per_usd = Some(*grt_per_usd); - } - Update::SlashingPercentage(slashing_percentage) => { - state.network_params.slashing_percentage = Some(*slashing_percentage); - } - Update::Indexings(indexings) => { - for (indexing, update) in indexings { - state.insert_indexing(*indexing, update.clone()); - } - } - Update::QueryObservation { - indexing, - duration, - result, - } => { - state.observe_query(indexing, *duration, *result); - } - Update::Penalty { indexing } => state.penalize(indexing), - } -} diff --git a/indexer-selection/src/decay.rs b/indexer-selection/src/decay.rs deleted file mode 100644 index e99fe1b5..00000000 --- a/indexer-selection/src/decay.rs +++ /dev/null @@ -1,221 +0,0 @@ -use std::time::Duration; - -/// DecayBuffer approximates a histogram of data points over time to inform a prediction. Data -/// points are collected in the first (current) bin. Each call to `decay` rotates the bins to the -/// right and resets the current bin. The information stored in each bin is decayed away at a rate -/// of `1 - (0.001 * D)` per decay cycle (https://www.desmos.com/calculator/7kfwwvtkc1). -/// -/// We'll consider query count for this example: -/// e.g. [c_0, c_1, c_2, ..., c_5461] where c_i is the count time T-i. -/// Imagine we get a query roughly once per decay, we could see something like: -/// [1, 0, 2, 0, 0, 1, ..., 2] -/// As a cycle passes, we shift the data down because T-0 is now T-1 and T-500 is now T-501. So -/// shifting gives us this: -/// [0, 1, 0, 2, 0, 0, ..., 2, 2] -/// (The final 1 disappeared into the first member of the ellipsis, and the 2 popped out from the -/// last member of the ellipsis) -/// -/// There is no actual decay yet in the above description. Note though that if we shift multiple -/// times the sum should be the same for a while. -/// e.g. [1, 0, 0, ...] -> [0, 1, 0, ...] -> [0, 0, 1, ...] -/// The sum of all frames here is 1 until the 1 drops off the end. -/// -/// The purpose of the decay is to weigh more recent data exponentially more than old data. If the -/// decay per frame is 1% then we would get approximately this: -/// [1, 0, 0, ...] -> [0, .99, 0, ...] -> [0, 0, .98] -/// (This looks linear, but is exponential I've just rounded the numbers). -/// -/// Note that every time we call decay, the sum of all values decreases. -/// -/// We consider the accuracy of timestamp of recent data is more important than the accuracy of -/// timestamp of old data. For example, it's useful to know if a failed request happened 2 seconds -/// ago vs 12 seconds ago. But less useful to know whether it happened 1002 vs 1012 seconds ago even -/// though that's the same duration. So for the approximation of our histogram, we use time frames -/// with intervals of F consecutive powers of 4. -/// e.g. [1, 4, 16, 64] if F = 4 - -#[derive(Clone, Debug)] -pub struct DecayBuffer { - frames: [T; F], -} - -pub trait Decay { - fn decay(&mut self, prev: &Self, retain: f64, take: f64); -} - -pub type ISADecayBuffer = DecayBuffer; - -impl Default for DecayBuffer -where - [T; F]: Default, -{ - fn default() -> Self { - debug_assert!(F > 0); - debug_assert!(D < 1000); - Self { - frames: Default::default(), - } - } -} - -impl DecayBuffer -where - Self: Default, -{ - pub fn new() -> Self { - Default::default() - } -} - -impl DecayBuffer { - pub fn current_mut(&mut self) -> &mut T { - &mut self.frames[0] - } - - pub fn frames(&self) -> &[T] { - &self.frames - } - - pub fn map<'a, I>(&'a self, f: impl FnMut(&T) -> I + 'a) -> impl Iterator + 'a { - self.frames.iter().map(f) - } -} - -impl DecayBuffer -where - T: Decay + Default, -{ - pub fn decay(&mut self) { - // BQN: (1-1e¯3×d)×((1-4⋆-↕f)×⊢)+(«4⋆-↕f)×⊢ - // LLVM should be capable of constant folding & unrolling this loop nicely. - // https://rust.godbolt.org/z/K13dj78Ge - for i in (1..self.frames.len()).rev() { - let retain = 1.0 - 4_f64.powi(-(i as i32)); - let take = 4_f64.powi(-(i as i32 - 1)); - let decay = 1.0 - 1e-3 * D as f64; - let (cur, prev) = self.frames[..=i].split_last_mut().unwrap(); - cur.decay(prev.last().unwrap(), retain * decay, take * decay); - } - self.frames[0] = T::default(); - } -} - -impl Decay for f64 { - fn decay(&mut self, prev: &Self, retain: f64, take: f64) { - *self = (*self * retain) + (prev * take); - } -} - -impl Decay for Duration { - fn decay(&mut self, prev: &Self, retain: f64, take: f64) { - let mut v = self.as_secs_f64(); - v.decay(&prev.as_secs_f64(), retain, take); - *self = Duration::from_secs_f64(v); - } -} - -// This could have been done more automatically by using a proc-macro, but this is simpler. -#[macro_export] -macro_rules! impl_struct_decay { - ($name:ty {$($field:ident),*}) => { - impl Decay for $name { - fn decay(&mut self, prev: &Self, retain: f64, take: f64) { - // Doing a destructure ensures that we don't miss any fields, should they be added - // in the future. I tried it and the compiler even gives you a nice error message: - // - // missing structure fields: - // -{name} - let Self { $($field: _),* } = self; - $( - self.$field.decay(&prev.$field, retain, take); - )* - } - } - }; -} - -#[cfg(test)] -mod test { - use std::iter; - - use arrayvec::ArrayVec; - - use gateway_common::utils::testing::assert_within; - - use super::*; - - struct Model(Vec); - - impl Model { - fn new() -> Self { - Self((0..F).flat_map(|i| iter::repeat(0.0).take(w(i))).collect()) - } - - fn decay(&mut self) { - // BQN: »(1-d×1e¯3)×⊢ - for x in &mut self.0 { - *x *= 1.0 - 1e-3 * D as f64; - } - self.0.rotate_right(1); - self.0[0] = 0.0; - } - - fn frames(&self) -> ArrayVec { - (0..F) - .scan(0, |i, f| { - let offset = *i; - let len = w(f); - *i += len; - Some(self.0[offset..][..len].iter().sum::()) - }) - .collect() - } - } - - fn w(i: usize) -> usize { - 4_u64.pow(i as u32) as usize - } - - #[test] - fn test() { - model_check::<7, 0>(); - model_check::<7, 1>(); - model_check::<7, 5>(); - model_check::<7, 10>(); - } - - fn model_check() - where - [f64; F]: Default, - { - let mut model = Model::::new(); - let mut buf = DecayBuffer::::default(); - - for _ in 0..1000 { - model.0[0] = 1.0; - model.decay(); - *buf.current_mut() = 1.0; - buf.decay(); - - let value = buf.frames().iter().sum::(); - let expected = model.frames().iter().sum::(); - - println!("---",); - println!("{:.2e} {}", expected, show(&model.frames())); - println!("{:.2e} {}", value, show(buf.frames())); - println!("{}", (value - expected) / expected); - - assert_within(value, expected, 0.013 * expected); - } - } - - fn show(v: &[f64]) -> String { - format!( - "[{}]", - v.iter() - .map(|f| format!("{f:.2e}")) - .collect::>() - .join(" ") - ) - } -} diff --git a/indexer-selection/src/economic_security.rs b/indexer-selection/src/economic_security.rs deleted file mode 100644 index 16f420b7..00000000 --- a/indexer-selection/src/economic_security.rs +++ /dev/null @@ -1,70 +0,0 @@ -use thegraph::types::UDecimal18; - -use crate::tokens::{GRT, USD}; - -#[derive(Default)] -pub struct NetworkParameters { - pub slashing_percentage: Option, - pub grt_per_usd: Option, -} - -impl NetworkParameters { - pub fn slashable_usd(&self, indexer_stake: GRT) -> Option { - let slashing_percentage = self.slashing_percentage?; - let slashable_grt = indexer_stake.0 * slashing_percentage; - let slashable_usd = slashable_grt / self.grt_per_usd?.0; - Some(USD(slashable_usd)) - } -} - -#[cfg(test)] -mod tests { - use gateway_common::utils::testing::assert_within; - - use crate::utility::ConcaveUtilityParameters; - - use super::*; - - #[test] - fn high_stake() { - // $1m dollars amounts to ~80% utility - test_economic_security_utility(1_000, 0.5, 2_000_000_000, 0.00000161757, 1_000_000, 0.8); - } - - #[test] - fn low_stake() { - // $100k amounts to 15% utility - test_economic_security_utility(100, 0.05, 200_000_000, 0.00000161757, 100_000, 0.15); - } - - #[test] - fn low_a() { - // Different u_a for a case where $10k is plenty of utility - test_economic_security_utility(1, 1.0, 10_000, 0.00016, 10_000, 0.8); - } - - #[test] - fn testnet_a() { - // For the testnet, expecting ~$400k slashable for these parameters. - // Each Indexer gets $5m, and slashing percent is 10. - test_economic_security_utility(1, 0.1, 4_000_000, 0.000006, 400_000, 0.91); - } - - fn test_economic_security_utility( - grt_per_usd: u128, - slashing_percentage: f64, - stake: u128, - u_a: f64, - expected_slashable: u128, - expected_utility: f64, - ) { - let params = NetworkParameters { - grt_per_usd: Some(GRT(UDecimal18::from(grt_per_usd))), - slashing_percentage: UDecimal18::try_from(slashing_percentage).ok(), - }; - let slashable = params.slashable_usd(GRT(UDecimal18::from(stake))).unwrap(); - let utility = ConcaveUtilityParameters::one(u_a).concave_utility(slashable.0.into()); - assert_eq!(slashable.0, UDecimal18::from(expected_slashable)); - assert_within(utility.utility, expected_utility, 0.01); - } -} diff --git a/indexer-selection/src/indexing.rs b/indexer-selection/src/indexing.rs deleted file mode 100644 index 08414fb9..00000000 --- a/indexer-selection/src/indexing.rs +++ /dev/null @@ -1,130 +0,0 @@ -use std::ops::RangeInclusive; -use std::time::{Duration, Instant}; - -use toolshed::url::Url; - -use crate::decay::ISADecayBuffer; -use crate::performance::*; -use crate::reliability::*; -use crate::tokens::GRT; -use crate::{BlockRequirements, IndexerErrorObservation}; - -pub struct IndexingState { - pub status: IndexingStatus, - pub reliability: ISADecayBuffer, - pub perf_success: ISADecayBuffer, - pub perf_failure: ISADecayBuffer, - pub last_use: Instant, -} - -impl IndexingState { - pub fn new(status: IndexingStatus) -> Self { - Self { - status, - reliability: ISADecayBuffer::default(), - perf_success: ISADecayBuffer::default(), - perf_failure: ISADecayBuffer::default(), - last_use: Instant::now() - Duration::from_secs(60), - } - } -} - -#[derive(Clone, Debug)] -pub struct IndexingStatus { - pub url: Url, - pub stake: GRT, - pub allocation: GRT, - pub block: Option, -} - -/// We compare candidate indexers based on their last reported block. Any observation of the indexer -/// behind the reported block will result in a penalty and the block status being considered -/// untrustworthy until the next time it is reported. -#[derive(Clone, Debug)] -pub struct BlockStatus { - pub reported_number: u64, - pub behind_reported_block: bool, - pub min_block: Option, -} - -impl BlockStatus { - pub fn meets_requirements(&self, requirements: &BlockRequirements, block_rate_hz: f64) -> bool { - if self.behind_reported_block { - return false; - } - - // Allow selecting indexers if their reported block is "close enough" to the required range. This is to allow an - // admittedly fragile use-case where client queries contain a constraint based on the most recent block from - // some RPC provider. Indexers closer to chain head and with higher success rate will be favored all else being - // equal. - let offset = (block_rate_hz * 60.0) as u64; - - let (min, max) = match requirements.range { - Some((start, end)) => (start.saturating_sub(offset), end.saturating_sub(offset)), - None => return true, - }; - let reported_range = RangeInclusive::new(self.min_block.unwrap_or(0), self.reported_number); - reported_range.contains(&min) && reported_range.contains(&max) - } -} - -impl IndexingState { - pub fn update_status(&mut self, status: IndexingStatus) { - self.status = status; - } - - pub fn observe_query( - &mut self, - duration: Duration, - result: Result<(), IndexerErrorObservation>, - ) { - self.last_use = self.last_use.max(Instant::now() - duration); - match result { - Ok(()) => self.perf_success.current_mut().observe(duration), - Err(err) => { - self.perf_failure.current_mut().observe(duration); - match err { - IndexerErrorObservation::Other => (), - IndexerErrorObservation::Timeout | IndexerErrorObservation::BadAttestation => { - self.penalize() - } - IndexerErrorObservation::IndexingBehind { latest_query_block } => { - self.observe_indexing_behind(latest_query_block); - // Avoid negative impact on reliability score resulting from our predictions - // of the indexer's block status. - return; - } - }; - } - }; - self.reliability.current_mut().observe(result.is_ok()); - } - - fn observe_indexing_behind(&mut self, latest_query_block: u64) { - let status = match &mut self.status.block { - Some(status) => status, - None => return, - }; - if latest_query_block <= status.reported_number { - status.behind_reported_block = true; - self.penalize(); - } - } - - pub fn penalize(&mut self) { - self.reliability.current_mut().penalize(); - } - - pub fn decay(&mut self) { - let Self { - status: _, - last_use: _, - perf_success, - reliability, - perf_failure, - } = self; - reliability.decay(); - perf_success.decay(); - perf_failure.decay(); - } -} diff --git a/indexer-selection/src/lib.rs b/indexer-selection/src/lib.rs deleted file mode 100644 index 723c2323..00000000 --- a/indexer-selection/src/lib.rs +++ /dev/null @@ -1,317 +0,0 @@ -use std::collections::HashMap; -use std::collections::{BTreeMap, BTreeSet}; -use std::time::Duration; - -use alloy_primitives::{Address, U256}; -use num_traits::Zero as _; -pub use ordered_float::NotNan; -use rand::{prelude::SmallRng, Rng as _}; -use score::{expected_individual_score, ExpectedValue}; -use thegraph::types::UDecimal18; -use tokens::GRT; -use toolshed::url::Url; - -use gateway_common::types::Indexing; - -use crate::score::{select_indexers, SelectionFactors}; -pub use crate::{ - economic_security::NetworkParameters, - indexing::{BlockStatus, IndexingState, IndexingStatus}, - score::SELECTION_LIMIT, - utility::ConcaveUtilityParameters, -}; - -pub mod actor; -pub mod decay; -mod economic_security; -mod indexing; -mod performance; -mod reliability; -mod score; -pub mod simulation; -#[cfg(test)] -mod test; -pub mod tokens; -mod utility; - -/// If an indexer's score is penalized such that it falls below this proportion of the max indexer -/// score, then the indexer will be discarded from the set of indexers to select from. -const MIN_SCORE_CUTOFF: f64 = 0.25; - -#[derive(Clone, Debug)] -pub struct Candidate { - pub indexing: Indexing, - pub fee: u128, - pub versions_behind: u8, -} - -#[derive(Clone, Debug)] -pub struct Selection { - pub indexing: Indexing, - pub url: Url, - pub fee: u128, - pub blocks_behind: u64, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum SelectionError { - BadInput(InputError), - BadIndexer(IndexerError), -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum InputError { - MissingNetworkParams, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)] -pub enum IndexerError { - NoStatus, - NoStake, - MissingRequiredBlock, - FeeTooHigh, - NaN, -} - -impl From for SelectionError { - fn from(err: InputError) -> Self { - Self::BadInput(err) - } -} - -impl From for SelectionError { - fn from(err: IndexerError) -> Self { - Self::BadIndexer(err) - } -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum IndexerErrorObservation { - Timeout, - IndexingBehind { - /// Latest block used for the indexer query - latest_query_block: u64, - }, - BadAttestation, - Other, -} - -#[derive(Default, Debug, Eq, PartialEq)] -pub struct BlockRequirements { - /// Range of blocks specified in the query, inclusive. - pub range: Option<(u64, u64)>, - /// If true, the query has an unspecified block which means the query benefits from syncing as - /// far in the future as possible. - pub has_latest: bool, -} - -#[derive(Debug)] -pub struct IndexerErrors<'a>(pub BTreeMap>); - -impl<'a> IndexerErrors<'a> { - fn add(&mut self, err: IndexerError, indexer: &'a Address) { - self.0.entry(err).or_default().insert(indexer); - } -} - -#[derive(Debug)] -pub struct UtilityParameters { - pub budget: GRT, - pub requirements: BlockRequirements, - pub latest_block: u64, - pub block_rate_hz: f64, -} - -#[derive(Default)] -pub struct State { - pub network_params: NetworkParameters, - indexings: HashMap, -} - -impl State { - pub fn insert_indexing(&mut self, indexing: Indexing, status: IndexingStatus) { - if let Some(entry) = self.indexings.get_mut(&indexing) { - entry.update_status(status); - } else { - self.indexings.insert(indexing, IndexingState::new(status)); - } - } - - pub fn observe_query( - &mut self, - indexing: &Indexing, - duration: Duration, - result: Result<(), IndexerErrorObservation>, - ) { - if let Some(state) = self.indexings.get_mut(indexing) { - state.observe_query(duration, result); - } - } - - pub fn penalize(&mut self, indexing: &Indexing) { - if let Some(state) = self.indexings.get_mut(indexing) { - state.penalize(); - } - } - - pub fn decay(&mut self) { - for indexing in self.indexings.values_mut() { - indexing.decay(); - } - } - - // We use a small-state PRNG (xoroshiro256++) here instead of StdRng (ChaCha12). - // `select_indexers` does not require the protections provided by a CSPRNG. The Xoroshiro++ - // algorithm provides high statistical quality while reducing the runtime of selection - // consumed by generating ranom numbers from 4% to 2% at the time of writing. - // - // See also: https://docs.rs/rand/latest/rand/rngs/struct.SmallRng.html - pub fn select_indexers<'a>( - &self, - rng: &mut SmallRng, - params: &UtilityParameters, - candidates: &'a [Candidate], - ) -> Result<(Vec, IndexerErrors<'a>), InputError> { - let mut errors = IndexerErrors(BTreeMap::new()); - let mut available = Vec::::new(); - - for candidate in candidates { - match self.selection_factors(candidate, params) { - Ok(factors) => available.push(factors), - Err(SelectionError::BadIndexer(err)) => { - errors.add(err, &candidate.indexing.indexer) - } - Err(SelectionError::BadInput(err)) => return Err(err), - }; - } - - if tracing::enabled!(tracing::Level::TRACE) { - tracing::trace!(?available); - } else if rng.gen_bool(0.001) { - tracing::debug!(?available); - } - - // Find the maximum expected individual indexer score. - let max_score = available - .iter() - .map(|factors| factors.expected_score) - .max() - .unwrap_or(NotNan::zero()); - // `select_indexers` discourages sybils by weighting it's selection based on the `sybil` - // value. Having a random score cutoff that is weighted toward 1 normalized to the highest - // score makes it so that we define our selection based on an expected score distribution, - // so that even if there are many bad indexers with lots of stake it may not adversely - // affect the result. This is important because an Indexer deployed on the other side of the - // world should not generally bring our expected score down below the minimum requirements - // set forth by this equation. - let mut score_cutoff: NotNan = - NotNan::new(rng.gen_range(MIN_SCORE_CUTOFF..=1.0)).unwrap(); - score_cutoff = max_score * score_cutoff; - // Filter out indexers below the cutoff. This avoids a situation where most indexers have - // terrible scores, only a few have good scores, and the good indexers are often passed over - // in multi-selection. - tracing::debug!(score_cutoff = *score_cutoff); - available.retain(|factors| factors.expected_score >= score_cutoff); - - let selections = select_indexers(rng, params, &available); - Ok((selections, errors)) - } - - fn selection_factors( - &self, - candidate: &Candidate, - params: &UtilityParameters, - ) -> Result { - let state = self - .indexings - .get(&candidate.indexing) - .ok_or(IndexerError::NoStatus)?; - - let block_status = state.status.block.as_ref().ok_or(IndexerError::NoStatus)?; - if !block_status.meets_requirements(¶ms.requirements, params.block_rate_hz) { - return Err(IndexerError::MissingRequiredBlock.into()); - } - - if state.status.stake == GRT(UDecimal18::from(0)) { - return Err(IndexerError::NoStake.into()); - } - - let slashable = self - .network_params - .slashable_usd(state.status.stake) - .ok_or(InputError::MissingNetworkParams)?; - - let fee = GRT(UDecimal18::from_raw_u256(U256::from(candidate.fee))); - if fee > params.budget { - return Err(IndexerError::FeeTooHigh.into()); - } - - let reliability = state.reliability.expected_value(); - let perf_success = state.perf_success.expected_value(); - let slashable_usd = slashable.0.into(); - let zero_allocation = state.status.allocation == GRT(UDecimal18::from(0)); - let blocks_behind = params - .latest_block - .saturating_sub(block_status.reported_number); - - let expected_score = NotNan::new(expected_individual_score( - params, - reliability, - perf_success, - candidate.versions_behind, - blocks_behind, - slashable_usd, - zero_allocation, - &fee, - )) - .unwrap_or(NotNan::zero()); - debug_assert!(*expected_score > 0.0); - - Ok(SelectionFactors { - indexing: candidate.indexing, - url: state.status.url.clone(), - versions_behind: candidate.versions_behind, - reliability, - perf_success, - perf_failure: state.perf_failure.expected_value(), - blocks_behind, - slashable_usd, - expected_score, - fee, - last_use: state.last_use, - sybil: sybil(&state.status.allocation)?, - }) - } -} - -/// Sybil protection -fn sybil(indexer_allocation: &GRT) -> Result, IndexerError> { - let identity: f64 = indexer_allocation.0.into(); - - // There is a GIP out there which would allow for allocations with 0 GRT stake. - // For example, MIPS. We don't want for those to never be selected. Furthermore, - // we can account for the cost of an allocation which would contribute to sybil. - const BONUS: f64 = 1000.0; - - // Don't flatten so quickly, since numbers are large - const SLOPE: f64 = 100.0; - - // To optimize for sybil protection, we want to just mult the utility by the identity - // weight. But this may run into some economic problems. Consider the following scenario: - // - // Two indexers: A, and B have utilities of 45% and 55%, respectively. They are equally - // delegated at 50% of the total delegation pool, each. Because their delegation is - // equal, they receive query fees proportional to their utility. A delegator notices that - // part of their delegation would be more efficiently allocated if they move it to the - // indexer with higher utility so that delegation reflects query volume. So, now they have - // utilities of 45% and 55%, and delegation of 45% and 55%. Because these are multiplied, - // the new selection is 40% and 60%. But, the delegations are 45% and 55%. So… a delegator - // notices that their delegation is inefficiently allocated and move their delegation to the - // more selected indexer. Now, delegation is 40% 60%, but utility stayed at 45% and 55%… and - // the new selections are 35% and 65%… and so the cycle continues. The gap continues to - // widen until all delegation is moved to the marginally better Indexer. This is the kind of - // winner-take-all scenario we are trying to avoid. In the absence of cold hard math and - // reasoning, going to try using log magics. - let sybil = (((identity + BONUS) / SLOPE) + 1.0).log(std::f64::consts::E); - NotNan::new(sybil).map_err(|_| IndexerError::NaN) -} diff --git a/indexer-selection/src/performance.rs b/indexer-selection/src/performance.rs deleted file mode 100644 index 86f9227d..00000000 --- a/indexer-selection/src/performance.rs +++ /dev/null @@ -1,44 +0,0 @@ -use std::f64::consts::E; -use std::time::Duration; - -use crate::{ - decay::{Decay, ISADecayBuffer}, - impl_struct_decay, - score::ExpectedValue, - utility::UtilityFactor, -}; - -// https://www.desmos.com/calculator/rvqjvypylj -pub fn performance_utility(latency_ms: u32) -> UtilityFactor { - let sigmoid = |x: u32| 1.0 + E.powf(((x as f64).powf(1.1) - 400.0) / 300.0); - UtilityFactor { - utility: sigmoid(0) / sigmoid(latency_ms), - weight: 1.0, - } -} - -#[derive(Clone, Debug, Default)] -pub struct Performance { - total_latency_ms: f64, - count: f64, -} - -impl Performance { - pub fn observe(&mut self, duration: Duration) { - self.total_latency_ms += duration.as_millis() as f64; - self.count += 1.0; - } -} - -impl ExpectedValue for ISADecayBuffer { - fn expected_value(&self) -> f64 { - let total_latency_ms = self.map(|p| p.total_latency_ms).sum::(); - let total_count = self.map(|p| p.count).sum::(); - (total_latency_ms + 0.1) / total_count.max(1.0) - } -} - -impl_struct_decay!(Performance { - total_latency_ms, - count -}); diff --git a/indexer-selection/src/reliability.rs b/indexer-selection/src/reliability.rs deleted file mode 100644 index ddbc869c..00000000 --- a/indexer-selection/src/reliability.rs +++ /dev/null @@ -1,43 +0,0 @@ -use crate::{ - decay::{Decay, ISADecayBuffer}, - impl_struct_decay, - score::ExpectedValue, -}; - -#[derive(Debug, Default, Clone, Copy)] -pub struct Reliability { - successful_queries: f64, - failed_queries: f64, - penalties: f64, -} - -impl Reliability { - pub fn observe(&mut self, success: bool) { - if success { - self.successful_queries += 1.0; - } else { - self.failed_queries += 1.0; - } - } - - pub fn penalize(&mut self) { - self.penalties += 1.0; - } -} - -impl ExpectedValue for ISADecayBuffer { - // https://www.desmos.com/calculator/gspottbqp7 - fn expected_value(&self) -> f64 { - let successful_queries = self.map(|r| r.successful_queries).sum::() + 0.1; - let total_queries = successful_queries + self.map(|r| r.failed_queries).sum::(); - let p_success = successful_queries / total_queries; - let p_penalty = self.map(|r| r.penalties).sum::() / total_queries; - p_success / (p_penalty + 1.0) - } -} - -impl_struct_decay!(Reliability { - successful_queries, - failed_queries, - penalties -}); diff --git a/indexer-selection/src/score.rs b/indexer-selection/src/score.rs deleted file mode 100644 index e019c60d..00000000 --- a/indexer-selection/src/score.rs +++ /dev/null @@ -1,300 +0,0 @@ -use std::time::{Duration, Instant}; - -use alloy_primitives::U256; -use arrayvec::ArrayVec; -use ordered_float::NotNan; -use rand::{prelude::SliceRandom as _, Rng}; -use thegraph::types::UDecimal18; -use toolshed::url::Url; - -use crate::performance::performance_utility; -use crate::tokens::GRT; -use crate::utility::{weighted_product_model, UtilityFactor}; -use crate::{ - BlockRequirements, ConcaveUtilityParameters, Indexing, Selection, UtilityParameters, - MIN_SCORE_CUTOFF, -}; - -#[derive(Debug)] -pub struct SelectionFactors { - pub indexing: Indexing, - pub url: Url, - pub versions_behind: u8, - pub reliability: f64, - pub perf_success: f64, - pub perf_failure: f64, - pub blocks_behind: u64, - pub slashable_usd: f64, - pub expected_score: NotNan, - pub fee: GRT, - pub last_use: Instant, - pub sybil: NotNan, -} - -pub const SELECTION_LIMIT: usize = 3; - -/// A subset of available indexers, with combined utility -struct MetaIndexer<'s>(pub ArrayVec<&'s SelectionFactors, SELECTION_LIMIT>); - -type V = ArrayVec; - -pub trait ExpectedValue { - fn expected_value(&self) -> f64; -} - -pub fn select_indexers( - rng: &mut R, - params: &UtilityParameters, - factors: &[SelectionFactors], -) -> Vec { - if factors.is_empty() { - return vec![]; - } - // Sample indexer subsets, discarding likely duplicates, retaining the highest scoring subset - // of available indexers. - // - // We must use a suitable indexer, if one exists. Indexers are filtered out when calculating - // selection factors if they are over budget or don't meet freshness requirements. So they won't - // pollute the set we're selecting from. - let sample_limit = factors.len().min(16); - let mut selections: ArrayVec<&SelectionFactors, SELECTION_LIMIT> = ArrayVec::new(); - - // Find the best individual indexer out of some samples to start with. - for _ in 0..sample_limit { - let indexer = factors.choose_weighted(rng, |f| *f.sybil).unwrap(); - if selections.is_empty() { - selections.push(indexer); - } else if indexer.expected_score > selections[0].expected_score { - selections[0] = indexer; - } - } - let mut combined_score = selections[0].expected_score; - // Sample some indexers and add them to the selected set if they increase the combined score. - for _ in 0..sample_limit { - if selections.len() == SELECTION_LIMIT { - break; - } - let candidate = factors.choose_weighted(rng, |f| *f.sybil).unwrap(); - let mut meta_indexer = MetaIndexer(selections.clone()); - meta_indexer.0.push(candidate); - - // skip to next iteration if we've already checked this indexer selection. - if selections - .iter() - .any(|s| s.indexing.indexer == candidate.indexing.indexer) - { - continue; - } - - let score = meta_indexer - .score(params) - .try_into() - .expect("NaN multi-selection score"); - if score > combined_score { - combined_score = score; - selections.push(candidate); - } - } - - selections - .into_iter() - .map(|f| Selection { - indexing: f.indexing, - url: f.url.clone(), - fee: f.fee.0.as_u128().unwrap_or(0), - blocks_behind: f.blocks_behind, - }) - .collect() -} - -impl MetaIndexer<'_> { - fn fee(&self) -> GRT { - GRT(self.0.iter().map(|f| f.fee.0).sum()) - } - - fn score(&self, params: &UtilityParameters) -> f64 { - if self.0.is_empty() { - return 0.0; - } - - // Expected values calculated based on the following BQN: - // # indexer success latencies - // l ← ⟨50, 20, 100⟩ - // # indexer reliabilities - // r ← ⟨0.99, 0.5, 0.8⟩ - // # sort both vectors by success latency - // Sort ← (⍋l)⊏⊢ ⋄ r ↩ Sort r ⋄ l ↩ Sort l - // pf ← ×`1-r # ⟨ 0.5 0.005 0.001 ⟩ - // ps ← r×1»pf # ⟨ 0.5 0.495 0.004 ⟩ - // ExpectedValue ← +´ps×⊢ - // # Calculate the expected value for latency under inversion. Since performance utility - // # has an inverse relationship with utility. We want the values to be pulled toward - // # infility as reliability decreases. - // ExpectedValue⌾÷ l # 28.62 - - let mut reliability: V = self.0.iter().map(|f| f.reliability).collect(); - let mut perf_success: V = self.0.iter().map(|f| f.perf_success).collect(); - let mut slashable_usd: V = self.0.iter().map(|f| f.slashable_usd).collect(); - - let mut permutation = - permutation::sort_unstable_by_key(&perf_success, |n| NotNan::try_from(*n).unwrap()); - permutation.apply_slice_in_place(&mut reliability); - permutation.apply_slice_in_place(&mut perf_success); - permutation.apply_slice_in_place(&mut slashable_usd); - - // BQN: pf ← ×`1-r - let pf = reliability - .iter() - .map(|r| 1.0 - r) - .scan(1.0, |s, x| { - *s *= x; - Some(*s) - }) - .collect::>(); - // BQN: ps ← r×1»pf - let ps = std::iter::once(&1.0) - .chain(&pf) - .take(SELECTION_LIMIT) - .zip(&reliability) - .map(|(p, r)| p * r) - .collect::>(); - // BQN: (+´ps×⊢)s - let slashable_usd = slashable_usd.iter().zip(&ps).map(|(a, &b)| a * b).sum(); - // BQN: (+´ps×⊢)⌾÷l - let perf_success = perf_success - .iter() - .zip(&ps) - .map(|(a, &b)| a.recip() * b) - .sum::() - .recip(); - // BQN: ⌈´f - let perf_failure = *self - .0 - .iter() - .map(|f| NotNan::try_from(f.perf_failure).unwrap()) - .max() - .unwrap(); - - // We use the max value of blocks behind to account for the possibility of incorrect - // indexing statuses. - let blocks_behind = self.0.iter().map(|f| f.blocks_behind).max().unwrap(); - let versions_behind = self.0.iter().map(|f| f.versions_behind).max().unwrap(); - let min_last_use = self.0.iter().map(|f| f.last_use).max().unwrap(); - - let exploration = exploration_weight(Instant::now().duration_since(min_last_use)); - let p_success = ps.iter().sum::(); - debug_assert!((0.0..=1.0).contains(&p_success)); - - let factors = [ - reliability_utility(p_success).mul_weight(exploration), - performance_utility(perf_success as u32).mul_weight(exploration * p_success), - performance_utility(perf_failure as u32).mul_weight(exploration * (1.0 - p_success)), - economic_security_utility(slashable_usd), - versions_behind_utility(versions_behind), - data_freshness_utility(params.block_rate_hz, ¶ms.requirements, blocks_behind), - fee_utility(&self.fee(), ¶ms.budget), - ]; - let score = weighted_product_model(factors); - - tracing::trace!( - indexers = ?self.0.iter().map(|f| f.indexing.indexer).collect::>(), - score, - ?factors, - ); - - score - } -} - -#[allow(clippy::too_many_arguments)] -pub fn expected_individual_score( - params: &UtilityParameters, - reliability: f64, - perf_success: f64, - versions_behind: u8, - blocks_behind: u64, - slashable_usd: f64, - zero_allocation: bool, - fee: &GRT, -) -> f64 { - let altruism_penalty = UtilityFactor::one(if zero_allocation { 0.8 } else { 1.0 }); - weighted_product_model([ - reliability_utility(reliability), - performance_utility(perf_success as u32), - economic_security_utility(slashable_usd), - versions_behind_utility(versions_behind), - data_freshness_utility(params.block_rate_hz, ¶ms.requirements, blocks_behind), - fee_utility(fee, ¶ms.budget), - altruism_penalty, - ]) -} - -// https://www.desmos.com/calculator/dxgonxuihk -fn economic_security_utility(slashable_usd: f64) -> UtilityFactor { - ConcaveUtilityParameters { - a: 4e-4, - weight: 1.5, - } - .concave_utility(slashable_usd) -} - -/// https://www.desmos.com/calculator/plpijnbvhu -fn reliability_utility(p_success: f64) -> UtilityFactor { - UtilityFactor::one(p_success.powi(7)) -} - -/// https://www.desmos.com/calculator/mioowuofsj -fn data_freshness_utility( - block_rate_hz: f64, - requirements: &BlockRequirements, - blocks_behind: u64, -) -> UtilityFactor { - let weight = 2.0; - // Add utility if the latest block is requested. Otherwise, data freshness is not a utility, - // but a binary of minimum block. Note that it can be both. - if !requirements.has_latest || (blocks_behind == 0) { - UtilityFactor { - utility: 1.0, - weight, - } - } else { - ConcaveUtilityParameters { - a: 32.0 * block_rate_hz, - weight, - } - .concave_utility(1.0 / blocks_behind as f64) - } -} - -fn versions_behind_utility(versions_behind: u8) -> UtilityFactor { - UtilityFactor { - utility: MIN_SCORE_CUTOFF.powi(versions_behind as i32), - weight: 1.0, - } -} - -/// Decrease utility factor weight of indexers as their time since last use increases. -/// Results in approximately 50% weight at t=60 and 10% weight at t=200. -/// https://www.desmos.com/calculator/j2s3d4tem8 -fn exploration_weight(t: Duration) -> f64 { - 0.1_f64.powf(0.005 * t.as_secs_f64()) -} - -/// Target an optimal fee of ~(1/3) of budget, since up to 3 indexers can be selected. -/// https://www.desmos.com/calculator/elzlqpb7tc -pub fn fee_utility(fee: &GRT, budget: &GRT) -> UtilityFactor { - // Any fee over budget has zero utility. - if fee > budget { - return UtilityFactor::one(0.0); - } - let one_wei = UDecimal18::from_raw_u256(U256::from(1)); - let scaled_fee = fee.0 / budget.0.saturating_add(one_wei); - // (5_f64.sqrt() - 1.0) / 2.0 - const S: f64 = 0.6180339887498949; - let mut utility = (f64::from(scaled_fee) + S).recip() - S; - // Set minimum utility, since small negative utility can result from loss of precision when the - // fee approaches the budget. - utility = utility.max(1e-18); - let weight: f64 = 1.4; - UtilityFactor { utility, weight } -} diff --git a/indexer-selection/src/simulation.rs b/indexer-selection/src/simulation.rs deleted file mode 100644 index e7f1d645..00000000 --- a/indexer-selection/src/simulation.rs +++ /dev/null @@ -1,143 +0,0 @@ -use std::collections::HashMap; -use std::time::{Duration, Instant}; - -use alloy_primitives::Address; -use anyhow::Result; -use rand::{prelude::SmallRng, Rng as _, SeedableRng as _}; -use rand_distr::Normal; - -use gateway_common::utils::testing::{bytes_from_id, init_test_tracing}; -use thegraph::types::UDecimal18; - -use crate::tokens::GRT; -use crate::{ - BlockStatus, Candidate, IndexerErrorObservation, Indexing, IndexingStatus, Selection, State, - UtilityParameters, -}; - -pub struct IndexerCharacteristics { - pub address: Address, - pub fee: GRT, - pub blocks_behind: u64, - pub latency_ms: u64, - pub success_rate: f64, - pub allocation: GRT, - pub stake: GRT, -} - -#[derive(Default)] -pub struct Results { - pub selections: Vec, - pub client_queries: u64, - pub success_rate: f64, - pub avg_latency: f64, - pub avg_blocks_behind: f64, - pub avg_indexers_selected: f64, - pub avg_selection_seconds: f64, -} - -pub async fn simulate( - characteristics: &[IndexerCharacteristics], - params: &UtilityParameters, - queries_per_second: u64, - selection_limit: u8, -) -> Result { - init_test_tracing(); - - let deployment = bytes_from_id(1).into(); - let mut results = Results { - client_queries: 10_000, - ..Default::default() - }; - - let mut isa = State::default(); - isa.network_params.slashing_percentage = UDecimal18::try_from(0.1).ok(); - isa.network_params.grt_per_usd = UDecimal18::try_from(0.1).ok().map(GRT); - - for characteristics in characteristics { - let indexing = Indexing { - indexer: characteristics.address, - deployment, - }; - isa.insert_indexing( - indexing, - IndexingStatus { - url: "http://localhost".parse().unwrap(), - stake: characteristics.stake, - allocation: characteristics.allocation, - block: Some(BlockStatus { - reported_number: params - .latest_block - .saturating_sub(characteristics.blocks_behind), - behind_reported_block: false, - min_block: None, - }), - }, - ); - } - - let candidates: Vec = characteristics - .iter() - .map(|c| Candidate { - indexing: Indexing { - indexer: c.address, - deployment, - }, - fee: c.fee.0.raw_u256().try_into().unwrap(), - versions_behind: 0, - }) - .collect(); - let characteristics: HashMap<&Address, &IndexerCharacteristics> = - characteristics.iter().map(|c| (&c.address, c)).collect(); - let mut rng = SmallRng::from_entropy(); - for client_query_index in 0..results.client_queries { - if (client_query_index % queries_per_second) == 0 { - isa.decay(); - } - - let t0 = Instant::now(); - let (mut selections, _) = isa.select_indexers(&mut rng, params, &candidates).unwrap(); - selections.truncate(selection_limit as usize); - results.avg_selection_seconds += Instant::now().duration_since(t0).as_secs_f64(); - - selections - .sort_unstable_by_key(|s| characteristics.get(&s.indexing.indexer).unwrap().latency_ms); - let responses = selections - .iter() - .map(|s| { - let c = *characteristics.get(&s.indexing.indexer).unwrap(); - (c, c.success_rate > rng.gen()) - }) - .collect::>(); - let responding_indexer = responses.iter().filter(|(_, ok)| *ok).fuse().next(); - results.avg_indexers_selected += selections.len() as f64; - if let Some((characteristics, _)) = responding_indexer { - results.success_rate += 1.0; - results.avg_blocks_behind += characteristics.blocks_behind as f64; - let latency_distr = Normal::new(characteristics.latency_ms as f64, 50.0).unwrap(); - results.avg_latency += rng.sample(latency_distr).max(0.0); - } - - results.selections.extend_from_slice(&selections); - for (characteristics, ok) in responses { - let indexing = Indexing { - indexer: characteristics.address, - deployment, - }; - let duration = Duration::from_millis(characteristics.latency_ms); - let result = match ok { - true => Ok(()), - false => Err(IndexerErrorObservation::Other), - }; - isa.observe_query(&indexing, duration, result); - } - } - - results.success_rate /= results.client_queries as f64; - results.avg_latency /= results.client_queries as f64; - results.avg_blocks_behind /= results.client_queries as f64; - results.avg_indexers_selected /= results.client_queries as f64; - results.avg_selection_seconds /= results.client_queries as f64; - - Ok(results) -} diff --git a/indexer-selection/src/test.rs b/indexer-selection/src/test.rs deleted file mode 100644 index 069bb15e..00000000 --- a/indexer-selection/src/test.rs +++ /dev/null @@ -1,342 +0,0 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; -use std::env; -use std::ops::RangeInclusive; - -use alloy_primitives::Address; -use anyhow::{bail, ensure}; -use rand::rngs::SmallRng; -use rand::seq::{IteratorRandom, SliceRandom}; -use rand::{thread_rng, Rng, RngCore as _, SeedableRng as _}; -use thegraph::types::{BlockPointer, DeploymentId, UDecimal18}; -use tokio::spawn; -use toolshed::{buffer_queue, double_buffer}; - -use gateway_common::utils::testing::bytes_from_id; - -use crate::actor::{process_updates, Update}; -use crate::tokens::GRT; -use crate::{ - BlockRequirements, BlockStatus, Candidate, IndexerError, IndexerErrors, Indexing, - IndexingState, IndexingStatus, InputError, NetworkParameters, Selection, State, - UtilityParameters, -}; - -#[derive(Clone)] -struct Config { - blocks: RangeInclusive, - indexers: RangeInclusive, - deployments: RangeInclusive, - indexings: RangeInclusive, -} - -#[derive(Debug)] -struct Topology { - grt_per_usd: GRT, - slashing_percentage: UDecimal18, - blocks: Vec, - deployments: HashSet, - indexings: HashMap, - fees: HashMap, -} - -#[derive(Debug)] -struct Request { - deployment: DeploymentId, - indexers: Vec
, - params: UtilityParameters, -} - -fn base_indexing_status() -> IndexingStatus { - IndexingStatus { - url: "http://localhost:8000".parse().unwrap(), - stake: GRT(UDecimal18::from(1)), - allocation: GRT(UDecimal18::from(1)), - block: Some(BlockStatus { - reported_number: 0, - behind_reported_block: false, - min_block: None, - }), - } -} - -fn utiliy_params( - budget: GRT, - requirements: BlockRequirements, - latest_block: u64, -) -> UtilityParameters { - UtilityParameters { - budget, - requirements, - latest_block, - block_rate_hz: 60.0_f64.recip(), - } -} - -impl Topology { - async fn gen( - rng: &mut SmallRng, - config: &Config, - update_writer: &mut buffer_queue::QueueWriter, - ) -> Self { - let deployments = (0..rng.gen_range(config.deployments.clone())) - .map(|id| bytes_from_id(id).into()) - .collect(); - let blocks = (0..rng.gen_range(config.blocks.clone())) - .map(|id| BlockPointer { - number: id as u64, - hash: bytes_from_id(id).into(), - }) - .collect::>(); - let indexings: HashMap = (0..rng - .gen_range(config.indexings.clone())) - .filter_map(|_| Self::gen_indexing(rng, config, &blocks, &deployments)) - .collect(); - let fees: HashMap = indexings - .keys() - .map(|i| (*i, Self::gen_grt(rng, &[0.0, 1e17, 1e18, 2e18]))) - .collect(); - let state = Self { - grt_per_usd: GRT(UDecimal18::from(1)), - slashing_percentage: UDecimal18::try_from(0.1).unwrap(), - blocks, - deployments, - indexings, - fees, - }; - - update_writer - .write(Update::GRTPerUSD(state.grt_per_usd)) - .unwrap(); - update_writer - .write(Update::SlashingPercentage(state.slashing_percentage)) - .unwrap(); - update_writer - .write(Update::Indexings(state.indexings.clone())) - .unwrap(); - - update_writer.flush().await.unwrap(); - state - } - - fn gen_indexing( - rng: &mut SmallRng, - config: &Config, - blocks: &[BlockPointer], - deployments: &HashSet, - ) -> Option<(Indexing, IndexingStatus)> { - let indexing = Indexing { - indexer: bytes_from_id(rng.gen_range(config.indexers.clone())).into(), - deployment: *deployments.iter().choose(rng)?, - }; - let status = IndexingStatus { - stake: GRT(Self::gen_grt(rng, &[0.0, 50e3, 100e3, 150e3]).into()), - block: blocks.choose(rng).map(|b| BlockStatus { - reported_number: b.number, - behind_reported_block: false, - min_block: None, - }), - ..base_indexing_status() - }; - Some((indexing, status)) - } - - fn gen_grt(rng: &mut SmallRng, table: &[f64; 4]) -> u128 { - *table.choose(rng).unwrap() as u128 - } - - fn gen_request(&self, rng: &mut SmallRng) -> Option { - let deployment = *self.deployments.iter().choose(rng)?; - let required_block = match (rng.gen_bool(0.1), self.blocks.choose(rng)) { - (true, Some(block)) => Some(block.number), - _ => None, - }; - Some(Request { - deployment, - indexers: self - .indexings - .keys() - .filter(|indexing| indexing.deployment == deployment) - .map(|indexing| indexing.indexer) - .collect(), - params: utiliy_params( - GRT(UDecimal18::from(1)), - BlockRequirements { - range: required_block.map(|b| (0, b)), - has_latest: required_block.is_some() && rng.gen_bool(0.5), - }, - self.blocks.last()?.number, - ), - }) - } - - fn check( - &self, - request: &Request, - result: &Result<(Vec, IndexerErrors<'_>), InputError>, - ) -> anyhow::Result<()> { - let (selections, errors) = match result { - Ok((selections, errors)) => (selections, errors), - Err(_) => bail!("unexpected InputError"), - }; - - let fees = GRT(selections.iter().map(|s| s.fee).sum::().into()); - ensure!(fees <= request.params.budget); - - let indexers_dedup: BTreeSet
= request.indexers.iter().copied().collect(); - ensure!(indexers_dedup.len() == request.indexers.len()); - - let mut expected_errors = IndexerErrors(BTreeMap::new()); - for indexer in &request.indexers { - let indexing = Indexing { - indexer: *indexer, - deployment: request.deployment, - }; - let status = self.indexings.get(&indexing).unwrap(); - let required_block = request.params.requirements.range.map(|(_, n)| n); - let fee = *self.fees.get(&indexing).unwrap(); - println!("indexer={}, fee={:?}", indexer, fee); - let mut set_err = |err: IndexerError| { - expected_errors.0.entry(err).or_default().insert(indexer); - }; - - let allowed_block = status.block.as_ref().unwrap().reported_number; - if matches!(required_block, Some(n) if n.saturating_sub(1) > allowed_block) { - set_err(IndexerError::MissingRequiredBlock); - } else if status.block.is_none() { - set_err(IndexerError::NoStatus); - } else if status.stake == GRT(UDecimal18::from(0)) { - set_err(IndexerError::NoStake); - } else if fee > request.params.budget.0.raw_u256().try_into().unwrap() { - set_err(IndexerError::FeeTooHigh); - } - } - println!("{:#?}", expected_errors); - - ensure!(errors.0 == expected_errors.0); - // An indexer must be selected if one exists without an associated error. - if selections.is_empty() { - ensure!(errors.0.iter().map(|(_, s)| s.len()).sum::() == request.indexers.len()); - } - - Ok(()) - } -} - -#[tokio::test] -async fn fuzz() { - // crate::init_tracing(false); - - let seed = env::vars() - .find(|(k, _)| k == "TEST_SEED") - .and_then(|(_, v)| v.parse::().ok()) - .unwrap_or(thread_rng().next_u64()); - println!("TEST_SEED={}", seed); - let mut rng = SmallRng::seed_from_u64(seed); - let config = Config { - blocks: 0..=5, - indexers: 0..=3, - deployments: 0..=3, - indexings: 0..=5, - }; - - for _ in 0..100 { - let (isa_state, isa_writer) = double_buffer!(State::default()); - let (mut update_writer, update_reader) = buffer_queue::pair(); - spawn(async move { - process_updates(isa_writer, update_reader).await; - panic!("ISA actor stopped"); - }); - - let topology = Topology::gen(&mut rng, &config, &mut update_writer).await; - println!("{:#?}", topology); - let request = match topology.gen_request(&mut rng) { - Some(request) => request, - None => continue, - }; - println!("{:#?}", request); - let candidates: Vec = request - .indexers - .iter() - .map(|indexer| { - let indexing = Indexing { - indexer: *indexer, - deployment: request.deployment, - }; - Candidate { - fee: *topology.fees.get(&indexing).unwrap(), - indexing, - versions_behind: 0, - } - }) - .collect(); - let result = isa_state - .latest() - .select_indexers(&mut rng, &request.params, &candidates); - if let Err(err) = topology.check(&request, &result) { - println!("{}", err); - println!("TEST_SEED={}", seed); - panic!("check failed!"); - } - } -} - -/// All else being equal, select candidates indexing lower subgraph versions. -#[test] -fn favor_higher_version() { - let mut rng = SmallRng::from_entropy(); - - let mut versions_behind = [rng.gen_range(0..3), rng.gen_range(0..3)]; - if versions_behind[0] == versions_behind[1] { - versions_behind[1] += 1; - } - let candidates: Vec = (0..2) - .map(|i| Candidate { - indexing: Indexing { - indexer: bytes_from_id(0).into(), - deployment: bytes_from_id(i).into(), - }, - fee: 1e18 as u128, - versions_behind: versions_behind[i], - }) - .collect(); - - let mut state = State { - network_params: NetworkParameters { - grt_per_usd: Some(GRT(UDecimal18::from(1))), - slashing_percentage: Some(UDecimal18::try_from(0.1).unwrap()), - }, - indexings: HashMap::from_iter([]), - }; - state.indexings.insert( - candidates[0].indexing, - IndexingState::new(base_indexing_status()), - ); - state.indexings.insert( - candidates[1].indexing, - IndexingState::new(base_indexing_status()), - ); - - let params = utiliy_params( - GRT(UDecimal18::from(1)), - BlockRequirements { - range: None, - has_latest: true, - }, - 1, - ); - let result = state.select_indexers(&mut rng, ¶ms, &candidates); - - println!("{:#?}", candidates); - println!("{:#?}", versions_behind); - println!("{:#?}", result); - - let max_version = versions_behind.iter().min().unwrap(); - let index = versions_behind - .iter() - .position(|v| v == max_version) - .unwrap(); - let expected = candidates[index].indexing; - - let selection = result.unwrap().0[0].indexing; - assert_eq!(selection, expected); -} diff --git a/indexer-selection/src/tokens.rs b/indexer-selection/src/tokens.rs deleted file mode 100644 index d2cc75ab..00000000 --- a/indexer-selection/src/tokens.rs +++ /dev/null @@ -1,12 +0,0 @@ -pub use thegraph::types::{InvalidDecimalString, UDecimal18}; - -// The following are cumbersome by design. It's better to be forced to think hard about converting -// between these types. - -/// GRT with 18 fractional digits -#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] -pub struct GRT(pub UDecimal18); - -/// USD with 18 fractional digits -#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] -pub struct USD(pub UDecimal18); diff --git a/indexer-selection/src/utility.rs b/indexer-selection/src/utility.rs deleted file mode 100644 index cf5ce5d8..00000000 --- a/indexer-selection/src/utility.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::f64::consts::E; - -#[derive(Clone, Copy, Debug)] -pub struct ConcaveUtilityParameters { - pub a: f64, - pub weight: f64, -} - -impl ConcaveUtilityParameters { - pub fn one(a: f64) -> Self { - Self { a, weight: 1.0 } - } - - /// Concave Utility Function - /// u(x) = 1 - e ^ (-ax) - /// 0 <= a < INF - /// a != NaN - /// 0 <= x < INF - /// x != NaN - /// Returns: 0.0 <= u < 1.0 - pub fn concave_utility(&self, x: f64) -> UtilityFactor { - UtilityFactor { - utility: 1.0 - E.powf(-self.a * x), - weight: self.weight, - } - } -} - -#[derive(Clone, Copy, Debug)] -pub struct UtilityFactor { - pub utility: f64, - pub weight: f64, -} - -impl UtilityFactor { - /// Just a utility value and a weight of 1.0 - pub const fn one(utility: f64) -> Self { - Self { - utility, - weight: 1.0, - } - } - - pub fn mul_weight(mut self, factor: f64) -> Self { - self.weight *= factor; - self - } -} - -/// One outcome we want is that any utility approaching 0 should seriously disadvantage the result. -/// For example, being behind a million blocks should mean never getting selected even if the other -/// utilities are good. -/// -/// Another important thing is that raising any utility should always raise the outcome when holding -/// the others constant. This ensures strictly dominant utility sets give strictly dominant results. -/// This limits the ways in which we can combine utilities. -/// -/// This uses the weighted product model. -/// See: https://en.wikipedia.org/wiki/Weighted_product_model and -/// https://towardsdatascience.com/free-yourself-from-indecision-with-weighted-product-models-48ae6fd5bf3 -/// which satisfies the above. -/// -/// This used to use the geometric mean instead, but this favored bad indexers. The first problem -/// with the geometric mean is that utilities near 0 do not disadvantage the result enough. For -/// example, given 5 factors with 4 utilities at 1.0 and the last variable, the total utility does -/// not go below 0.01 until the last variable goes down to an abysmal 0.0000000001, and the total is -/// above 0.25 even with a weight as low as 0.001. In practice this is very bad. An indexer with any -/// low utility should almost never be selected (see above example about being a million blocks -/// behind). The second problem that arose is that as more utilities were added, bad indexers seemed -/// to be favored more and more. -/// A weighted multiply just gives better results in practice. The one thing that had to change to -/// allow a weighted multiply was just that before budget was a factor of the other utilities and -/// this doesn't make sense if there is no scale for utilities (eg: adding another utility to -/// consider shouldn't make the budget go down). With geometric mean that wasn't a problem, but now -/// it would be so fee utility has been refactored accordingly and the results are much better -/// overall. -pub fn weighted_product_model(factors: impl IntoIterator) -> f64 { - factors - .into_iter() - .fold(1.0, |aggregate, UtilityFactor { utility, weight }| { - debug_assert!(utility >= 0.0); - aggregate * utility.powf(weight) - }) -} diff --git a/indexer-selection/tools/contrived-characteristics.csv b/indexer-selection/tools/contrived-characteristics.csv deleted file mode 100644 index 9bdda837..00000000 --- a/indexer-selection/tools/contrived-characteristics.csv +++ /dev/null @@ -1,11 +0,0 @@ -indexer,fee,blocks_behind,latency_ms,success_rate,allocation,stake -0x0000000000000000000000000000000000000001,0.00999,0,80,0.999,600000,500000 -0x0000000000000000000000000000000000000002,0.00040,0,80,0.999,600000,800000 -0x0000000000000000000000000000000000000003,0.000410,1,70,0.995,800000,400000 -0x0000000000000000000000000000000000000004,0.00034,2,130,0.95,400000,300000 -0x0000000000000000000000000000000000000005,0.000035,3,250,0.96,40000,400000 -0x0000000000000000000000000000000000000006,0.00024,8,200,0.80,100000,100000 -0x0000000000000000000000000000000000000007,0.00040,8,1900,0.80,100000,100000 -0x0000000000000000000000000000000000000008,0.000425,2,120,0.99,400000,2000000 -0x0000000000000000000000000000000000000009,0.000410,1,60,0.95,400000,400000 -0x000000000000000000000000000000000000000a,0.000345,4,120,0.999,400000,300000 diff --git a/indexer-selection/tools/indexer-characteristics.py b/indexer-selection/tools/indexer-characteristics.py deleted file mode 100644 index d9af4804..00000000 --- a/indexer-selection/tools/indexer-characteristics.py +++ /dev/null @@ -1,63 +0,0 @@ -# Converts "Indexer attempt" logs for some deployment into indexer characteristics - -import datetime -import json -import pandas as pd -import requests -import sys - -network_subgraph = 'https://gateway.thegraph.com/network' - -def parse_log(log): - timestamp = datetime.datetime.fromisoformat(log['timestamp'].replace('Z', '+00:00')).timestamp() - fields = log['fields'] - return ( - int(timestamp * 1000), - fields['indexer'], - float(fields['fee']), - float(fields['utility']), - int(fields['blocks_behind']), - float(fields['response_time_ms']), - fields['status'] == '200', - ) - -log_lines = [ - json.loads(l[l.index('{'):]) - for l in sys.stdin - if ("timestamp" in l) and ("Indexer attempt" in l) -] -entries = [parse_log(l) for l in log_lines] - -deployment = log_lines[0]['fields']['deployment'] -print('deployment:', deployment, file=sys.stderr) - -columns = ['t_ms', 'indexer', 'fee', 'utility', 'blocks_behind', 'response_time_ms', 'success'] -data = pd.DataFrame(entries, columns=columns) - -print('indexer,fee,blocks_behind,latency_ms,success_rate,allocation,stake') -for indexer in set(data['indexer']): - d = data[data['indexer'] == indexer] - fee = d['fee'].mean() - blocks_behind = d['blocks_behind'].mean() - latency_ms = d['response_time_ms'].mean() - success_rate = d[d['success']].shape[0] / d.shape[0] - - response = requests.post( - network_subgraph, - json={'query': f'''{{ - allocations(where:{{ - activeForIndexer:"{indexer}" - subgraphDeployment_:{{ipfsHash:"{deployment}"}} - }}){{ - allocatedTokens - }} - indexer(id:"{indexer}"){{ stakedTokens }} - }}'''}, - ).json() - allocation = sum([ - float(a['allocatedTokens'][:-18]) - for a in response['data']['allocations']] - ) - stake = float(response['data']['indexer']['stakedTokens'][:-18]) - - print(f'{indexer},{fee:.20f},{blocks_behind},{latency_ms},{success_rate},{allocation:.0f},{stake:.0f}') diff --git a/indexer-selection/tools/reliability-response.py b/indexer-selection/tools/reliability-response.py deleted file mode 100644 index 53c413b0..00000000 --- a/indexer-selection/tools/reliability-response.py +++ /dev/null @@ -1,34 +0,0 @@ -import matplotlib.pyplot as plt -import pandas as pd - -def plot_reliability_response(name, key, unit): - data = pd.read_csv(f'test-outputs/reliability-{name}-response.csv') - print(data) - - fig, axes = plt.subplots(3, 2, sharex=True, sharey=True, figsize=(12, 8), constrained_layout=True) - - success_rates = sorted(set(data['success_rate'])) - values = sorted(set(data[key])) - print('success_rates:', success_rates) - print('values:', values) - - for i, v in enumerate(values): - row, col = i // 2, i % 2 - p = axes[row, col] - - for success_rate in success_rates: - d = data[(data[key] == v) & (data['success_rate'] == success_rate)] - p.plot(d['t_m'], d['utility'], label=f'{success_rate}') - - p.set_ylim((0.0, 1.0)) - p.grid() - p.set_title(f'{v}{unit} {name}') - if col == 0: p.set_ylabel('utility') - if row == 2: p.set_xlabel('t (minute)') - if row == 2 and col == 1: p.legend(success_rates, title='Success Rate') - - fig.suptitle(f'reliability {name.title()} Response', fontsize=16) - plt.savefig(f'test-outputs/reliability-{name}-response.svg') - -plot_reliability_response('outage', 'outage_duration_m', ' minute') -plot_reliability_response('penalty', 'penalty', '') diff --git a/indexer-selection/tools/simulation.py b/indexer-selection/tools/simulation.py deleted file mode 100644 index 98825369..00000000 --- a/indexer-selection/tools/simulation.py +++ /dev/null @@ -1,40 +0,0 @@ -import matplotlib -import matplotlib.pyplot as plt -import pandas as pd -import sys - -output = sys.argv[1] if len(sys.argv) > 1 else '' -data = pd.read_csv(sys.stdin) - -labels = sorted(set(data.label)) - -data['sort_order'] = data.apply( - lambda r: data[(data.indexer == r.indexer) & (data.label == labels[0])].selections.sum(), - axis=1, -) -data = data.sort_values(by='sort_order') - -matplotlib.rcParams['font.family'] = 'monospace' -fig, axes = plt.subplots(len(labels), 2, figsize=(16, 12), sharex='col', sharey=True, constrained_layout=True) - -for (row, label) in enumerate(labels): - d = data[data.label == label] - print(d) - axes[row, 0].barh(d.indexer, d.selections) - axes[row, 0].invert_yaxis() - - axes[row, 1].barh(d.indexer, d.fees) - axes[row, 1].invert_yaxis() - - axes[row, 0].set_yticks(range(0, d.shape[0])) - axes[row, 0].set_yticklabels(d.apply(lambda r: f'{r.detail}\n({r.indexer})', axis=1)) - axes[row, 0].set_title(label) - -axes[-1, 0].set_xlabel('total selections') -axes[-1, 1].set_xlabel('total fees (GRT)') - -if output == '': - plt.show() -else: - print('saving to', output) - plt.savefig(output)