From 007ceab71084a24655bcd3f9b88d5dc77865a82a Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Mon, 6 Nov 2023 16:10:00 +0100 Subject: [PATCH] Feed request records to topology manager --- crates/core/src/contract/executor.rs | 6 +- crates/core/src/contract/storages/rocks_db.rs | 11 ++- crates/core/src/contract/storages/sqlite.rs | 8 +- crates/core/src/message.rs | 22 ++++- crates/core/src/node.rs | 6 +- crates/core/src/node/in_memory_impl.rs | 2 +- .../core/src/node/network_bridge/in_memory.rs | 2 +- .../src/node/network_bridge/p2p_protoc.rs | 4 +- crates/core/src/node/op_state_manager.rs | 23 +++-- crates/core/src/operations/connect.rs | 4 + crates/core/src/operations/get.rs | 13 ++- crates/core/src/operations/put.rs | 19 ++-- crates/core/src/operations/subscribe.rs | 16 +++- crates/core/src/operations/update.rs | 4 + crates/core/src/ring.rs | 57 +++++++++--- crates/core/src/runtime/wasm_runtime.rs | 2 +- crates/core/src/topology.rs | 25 +++-- .../core/src/topology/connection_evaluator.rs | 91 ++++++++++++++++++- .../topology/connection_evaluator/tests.rs | 84 ----------------- .../src/topology/request_density_tracker.rs | 10 -- crates/core/src/topology/small_world_rand.rs | 2 +- stdlib | 2 +- 22 files changed, 246 insertions(+), 167 deletions(-) delete mode 100644 crates/core/src/topology/connection_evaluator/tests.rs diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index 58319f760..d11fb3a9e 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -673,7 +673,7 @@ impl Executor { let key = contract.key(); let params = contract.params(); - if self.get_local_contract(&key.id()).await.is_ok() { + if self.get_local_contract(key.id()).await.is_ok() { // already existing contract, just try to merge states return self .perform_contract_update(key, UpdateData::State(state.into())) @@ -963,7 +963,7 @@ impl Executor { { if self.mode == OperationMode::Local { return Err(ExecutorError::request(RequestError::ContractError( - CoreContractError::MissingRelated { key: key.id() }, + CoreContractError::MissingRelated { key: *key.id() }, ))); } } @@ -1109,7 +1109,7 @@ impl Executor { } if iterations == DEPENDENCY_CYCLE_LIMIT_GUARD { return Err(ExecutorError::request(CoreContractError::MissingRelated { - key: original_key.id(), + key: *original_key.id(), })); } Ok(()) diff --git a/crates/core/src/contract/storages/rocks_db.rs b/crates/core/src/contract/storages/rocks_db.rs index 5c77e20d6..d3777f771 100644 --- a/crates/core/src/contract/storages/rocks_db.rs +++ b/crates/core/src/contract/storages/rocks_db.rs @@ -33,13 +33,13 @@ impl StateStorage for RocksDb { async fn store(&mut self, key: ContractKey, state: WrappedState) -> Result<(), Self::Error> { self.0 - .put([key.bytes(), RocksDb::STATE_SUFFIX].concat(), state)?; + .put([key.as_bytes(), RocksDb::STATE_SUFFIX].concat(), state)?; Ok(()) } async fn get(&self, key: &ContractKey) -> Result, Self::Error> { - match self.0.get([key.bytes(), RocksDb::STATE_SUFFIX].concat()) { + match self.0.get([key.as_bytes(), RocksDb::STATE_SUFFIX].concat()) { Ok(result) => { if let Some(r) = result.map(|r| Some(WrappedState::new(r))) { Ok(r) @@ -69,7 +69,7 @@ impl StateStorage for RocksDb { params: Parameters<'static>, ) -> Result<(), Self::Error> { self.0 - .put([key.bytes(), RocksDb::PARAMS_SUFFIX].concat(), params)?; + .put([key.as_bytes(), RocksDb::PARAMS_SUFFIX].concat(), params)?; Ok(()) } @@ -78,7 +78,10 @@ impl StateStorage for RocksDb { &'a self, key: &'a ContractKey, ) -> Result>, Self::Error> { - match self.0.get([key.bytes(), RocksDb::PARAMS_SUFFIX].concat()) { + match self + .0 + .get([key.as_bytes(), RocksDb::PARAMS_SUFFIX].concat()) + { Ok(result) => Ok(result .map(|r| Some(Parameters::from(r))) .expect("vec bytes")), diff --git a/crates/core/src/contract/storages/sqlite.rs b/crates/core/src/contract/storages/sqlite.rs index 9d25f6c31..9a02fb335 100644 --- a/crates/core/src/contract/storages/sqlite.rs +++ b/crates/core/src/contract/storages/sqlite.rs @@ -66,7 +66,7 @@ impl StateStorage for Pool { ON CONFLICT(contract) DO UPDATE SET state = excluded.state ", ) - .bind(key.bytes()) + .bind(key.as_bytes()) .bind(state.as_ref()) .execute(&self.0) .await?; @@ -75,7 +75,7 @@ impl StateStorage for Pool { async fn get(&self, key: &ContractKey) -> Result, Self::Error> { match sqlx::query("SELECT state FROM states WHERE contract = ?") - .bind(key.bytes()) + .bind(key.as_bytes()) .map(|row: SqliteRow| Some(WrappedState::new(row.get("state")))) .fetch_one(&self.0) .await @@ -97,7 +97,7 @@ impl StateStorage for Pool { ON CONFLICT(contract) DO UPDATE SET params = excluded.params ", ) - .bind(key.bytes()) + .bind(key.as_bytes()) .bind(params.as_ref()) .execute(&self.0) .await?; @@ -109,7 +109,7 @@ impl StateStorage for Pool { key: &'a ContractKey, ) -> Result>, Self::Error> { match sqlx::query("SELECT params FROM states WHERE contract = ?") - .bind(key.bytes()) + .bind(key.as_bytes()) .map(|row: SqliteRow| Some(Parameters::from(row.get::, _>("params")))) .fetch_one(&self.0) .await diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index b95bab18c..a38f97d18 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -13,7 +13,7 @@ use crate::{ operations::{ connect::ConnectMsg, get::GetMsg, put::PutMsg, subscribe::SubscribeMsg, update::UpdateMsg, }, - ring::PeerKeyLocation, + ring::{Location, PeerKeyLocation}, }; pub(crate) use sealed_msg_type::{TransactionType, TransactionTypeId}; @@ -43,7 +43,7 @@ impl Transaction { // Self { id } } - pub fn tx_type(&self) -> TransactionType { + pub fn transaction_type(&self) -> TransactionType { let id_byte = (self.id.0 & 0xFFu128) as u8; match id_byte { 0 => TransactionType::Connect, @@ -222,6 +222,8 @@ pub(crate) trait InnerMessage: Into { fn target(&self) -> Option<&PeerKeyLocation>; fn terminal(&self) -> bool; + + fn requested_location(&self) -> Option; } /// Internal node events emitted to the event loop. @@ -294,6 +296,18 @@ impl Message { } } + pub fn requested_location(&self) -> Option { + use Message::*; + match self { + Connect(op) => op.requested_location(), + Put(op) => op.requested_location(), + Get(op) => op.requested_location(), + Subscribe(op) => op.requested_location(), + Update(op) => op.requested_location(), + Aborted(_) => None, + } + } + pub fn track_stats(&self) -> bool { use Message::*; !matches!(self, Connect(_) | Subscribe(_) | Aborted(_)) @@ -325,9 +339,9 @@ mod tests { let ts_0 = Ulid::new(); std::thread::sleep(Duration::from_millis(1)); let tx = Transaction::update(TransactionType::Connect, Ulid::new()); - assert_eq!(tx.tx_type(), TransactionType::Connect); + assert_eq!(tx.transaction_type(), TransactionType::Connect); let tx = Transaction::update(TransactionType::Subscribe, Ulid::new()); - assert_eq!(tx.tx_type(), TransactionType::Subscribe); + assert_eq!(tx.transaction_type(), TransactionType::Subscribe); std::thread::sleep(Duration::from_millis(1)); let ts_1 = Ulid::new(); assert!( diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index c8e0f8530..f9d8b468c 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -296,7 +296,7 @@ where tracing::warn!("Performing a new join, attempt {}", backoff.retries() + 1); if backoff.sleep().await.is_none() { tracing::error!("Max number of retries reached"); - return Err(OpError::MaxRetriesExceeded(tx_id, tx_id.tx_type())); + return Err(OpError::MaxRetriesExceeded(tx_id, tx_id.transaction_type())); } op.backoff = Some(backoff); } @@ -696,7 +696,7 @@ async fn handle_cancelled_op( where CM: NetworkBridge + Send + Sync, { - if let TransactionType::Connect = tx.tx_type() { + if let TransactionType::Connect = tx.transaction_type() { // the attempt to join the network failed, this could be a fatal error since the node // is useless without connecting to the network, we will retry with exponential backoff match op_storage.pop(&tx) { @@ -712,7 +712,7 @@ where } } Ok(Some(OpEnum::Connect(_))) => { - return Err(OpError::MaxRetriesExceeded(tx, tx.tx_type())); + return Err(OpError::MaxRetriesExceeded(tx, tx.transaction_type())); } _ => {} } diff --git a/crates/core/src/node/in_memory_impl.rs b/crates/core/src/node/in_memory_impl.rs index 90bd3da40..69733740e 100644 --- a/crates/core/src/node/in_memory_impl.rs +++ b/crates/core/src/node/in_memory_impl.rs @@ -176,7 +176,7 @@ impl NodeInMemory { }; if let Ok(Either::Left(Message::Aborted(tx))) = msg { - let tx_type = tx.tx_type(); + let tx_type = tx.transaction_type(); let res = handle_cancelled_op( tx, self.peer_key, diff --git a/crates/core/src/node/network_bridge/in_memory.rs b/crates/core/src/node/network_bridge/in_memory.rs index 7fb382701..aef942e4f 100644 --- a/crates/core/src/node/network_bridge/in_memory.rs +++ b/crates/core/src/node/network_bridge/in_memory.rs @@ -100,7 +100,7 @@ impl NetworkBridge for MemoryConnManager { .expect("unique lock") .register_events(EventLog::from_outbound_msg(&msg, &self.op_manager)) .await; - self.op_manager.sending_transaction(target, msg.id()); + self.op_manager.sending_transaction(target, &msg); let msg = bincode::serialize(&msg)?; self.transport.send(*target, msg); Ok(()) diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index b87946857..2b8f6ae51 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -224,7 +224,7 @@ impl NetworkBridge for P2pBridge { .try_lock() .expect("single reference") .register_events(EventLog::from_outbound_msg(&msg, &self.op_manager)); - self.op_manager.sending_transaction(target, msg.id()); + self.op_manager.sending_transaction(target, &msg); self.ev_listener_tx .send(Left((*target, Box::new(msg)))) .await @@ -431,7 +431,7 @@ impl P2pConnManager { let cb = self.bridge.clone(); match msg { Message::Aborted(tx) => { - let tx_type = tx.tx_type(); + let tx_type = tx.transaction_type(); let res = handle_cancelled_op( tx, op_manager.ring.peer_key, diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index f11898f8e..df8cab25b 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -131,27 +131,27 @@ impl OpManager { match op { OpEnum::Connect(op) => { #[cfg(debug_assertions)] - check_id_op!(id.tx_type(), TransactionType::Connect); + check_id_op!(id.transaction_type(), TransactionType::Connect); self.ops.connect.insert(id, *op); } OpEnum::Put(op) => { #[cfg(debug_assertions)] - check_id_op!(id.tx_type(), TransactionType::Put); + check_id_op!(id.transaction_type(), TransactionType::Put); self.ops.put.insert(id, op); } OpEnum::Get(op) => { #[cfg(debug_assertions)] - check_id_op!(id.tx_type(), TransactionType::Get); + check_id_op!(id.transaction_type(), TransactionType::Get); self.ops.get.insert(id, op); } OpEnum::Subscribe(op) => { #[cfg(debug_assertions)] - check_id_op!(id.tx_type(), TransactionType::Subscribe); + check_id_op!(id.transaction_type(), TransactionType::Subscribe); self.ops.subscribe.insert(id, op); } OpEnum::Update(op) => { #[cfg(debug_assertions)] - check_id_op!(id.tx_type(), TransactionType::Update); + check_id_op!(id.transaction_type(), TransactionType::Update); self.ops.update.insert(id, op); } } @@ -169,7 +169,7 @@ impl OpManager { } return Err(OpNotAvailable::Running); } - let op = match id.tx_type() { + let op = match id.transaction_type() { TransactionType::Connect => self .ops .connect @@ -201,7 +201,12 @@ impl OpManager { } /// Notify the operation manager that a transaction is being transacted over the network. - pub fn sending_transaction(&self, peer: &PeerKey, transaction: &Transaction) { + pub fn sending_transaction(&self, peer: &PeerKey, msg: &Message) { + let transaction = msg.id(); + if let Some(loc) = msg.requested_location() { + self.ring + .record_request(loc, transaction.transaction_type()); + } self.ring .live_tx_tracker .add_transaction(*peer, *transaction); @@ -227,7 +232,7 @@ async fn garbage_cleanup_task( if ops.completed.remove(&tx).is_some() { continue; } - let still_waiting = match tx.tx_type() { + let still_waiting = match tx.transaction_type() { TransactionType::Connect => ops.connect.remove(&tx).is_none(), TransactionType::Put => ops.put.remove(&tx).is_none(), TransactionType::Get => ops.get.remove(&tx).is_none(), @@ -256,7 +261,7 @@ async fn garbage_cleanup_task( if ops.completed.remove(&tx).is_some() { continue; } - let removed = match tx.tx_type() { + let removed = match tx.transaction_type() { TransactionType::Connect => ops.connect.remove(&tx).is_some(), TransactionType::Put => ops.put.remove(&tx).is_some(), TransactionType::Get => ops.get.remove(&tx).is_some(), diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 157e0534a..85cc6839b 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -1020,6 +1020,10 @@ mod messages { } | Connected { .. } ) } + + fn requested_location(&self) -> Option { + self.target().and_then(|pkloc| pkloc.location) + } } impl ConnectMsg { diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 64ddec1b0..8bad7c9a6 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -415,7 +415,10 @@ impl Operation for GetOp { "Failed getting a value for contract {}, reached max retries", key ); - return Err(OpError::MaxRetriesExceeded(*id, id.tx_type())); + return Err(OpError::MaxRetriesExceeded( + *id, + id.transaction_type(), + )); } } Some(GetState::ReceivedRequest) => { @@ -809,6 +812,14 @@ mod messages { use GetMsg::*; matches!(self, ReturnGet { .. }) } + + fn requested_location(&self) -> Option { + match self { + GetMsg::RequestGet { key, .. } => Some(Location::from(key.id())), + GetMsg::SeekNode { key, .. } => Some(Location::from(key.id())), + GetMsg::ReturnGet { key, .. } => Some(Location::from(key.id())), + } + } } impl GetMsg { diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 9219ba056..0ffef8ed2 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -806,12 +806,6 @@ mod messages { #[derive(Debug, Serialize, Deserialize)] pub(crate) enum PutMsg { - /// Initialize the put operation by routing the value - RouteValue { - id: Transaction, - htl: usize, - target: PeerKeyLocation, - }, /// Internal node instruction to find a route to the target node. RequestPut { id: Transaction, @@ -876,7 +870,6 @@ mod messages { fn id(&self) -> &Transaction { match self { Self::SeekNode { id, .. } => id, - Self::RouteValue { id, .. } => id, Self::RequestPut { id, .. } => id, Self::Broadcasting { id, .. } => id, Self::SuccessfulUpdate { id, .. } => id, @@ -901,6 +894,17 @@ mod messages { SuccessfulUpdate { .. } | SeekNode { .. } | PutForward { .. } ) } + + fn requested_location(&self) -> Option { + match self { + Self::SeekNode { contract, .. } => Some(Location::from(contract.id())), + Self::RequestPut { contract, .. } => Some(Location::from(contract.id())), + Self::Broadcasting { key, .. } => Some(Location::from(key.id())), + Self::PutForward { contract, .. } => Some(Location::from(contract.id())), + Self::BroadcastTo { key, .. } => Some(Location::from(key.id())), + _ => None, + } + } } impl PutMsg { @@ -918,7 +922,6 @@ mod messages { let id = self.id(); match self { Self::SeekNode { .. } => write!(f, "SeekNode(id: {id})"), - Self::RouteValue { .. } => write!(f, "RouteValue(id: {id})"), Self::RequestPut { .. } => write!(f, "RequestPut(id: {id})"), Self::Broadcasting { .. } => write!(f, "Broadcasting(id: {id})"), Self::SuccessfulUpdate { .. } => write!(f, "SusscessfulUpdate(id: {id})"), diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 7e64a5f16..65f6496aa 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -11,7 +11,7 @@ use crate::{ message::{InnerMessage, Message, Transaction}, node::{NetworkBridge, OpManager, PeerKey}, operations::{op_trait::Operation, OpInitialization}, - ring::{PeerKeyLocation, RingError}, + ring::{Location, PeerKeyLocation, RingError}, }; use super::{OpEnum, OpError, OpOutcome, OperationResult}; @@ -250,7 +250,10 @@ impl Operation for SubscribeOp { retries: retries + 1, }); } else { - return Err(OpError::MaxRetriesExceeded(*id, id.tx_type())); + return Err(OpError::MaxRetriesExceeded( + *id, + id.transaction_type(), + )); } } _ => return Err(OpError::invalid_transition(self.id)), @@ -432,6 +435,15 @@ mod messages { use SubscribeMsg::*; matches!(self, ReturnSub { .. } | SeekNode { .. }) } + + fn requested_location(&self) -> Option { + match self { + Self::SeekNode { key, .. } => Some(Location::from(key.id())), + Self::RequestSub { key, .. } => Some(Location::from(key.id())), + Self::ReturnSub { key, .. } => Some(Location::from(key.id())), + _ => None, + } + } } impl SubscribeMsg { diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 17d6bc290..47f955e57 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -84,6 +84,10 @@ mod messages { fn terminal(&self) -> bool { todo!() } + + fn requested_location(&self) -> Option { + todo!() + } } impl Display for UpdateMsg { diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index db72cdb17..229752d0b 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -11,6 +11,7 @@ //! - final location use std::hash::Hash; +use std::sync::atomic::AtomicBool; use std::{ cmp::Reverse, collections::BTreeMap, @@ -29,14 +30,15 @@ use anyhow::bail; use arrayvec::ArrayVec; use dashmap::{mapref::one::Ref as DmRef, DashMap, DashSet}; use either::Either; -use freenet_stdlib::prelude::ContractKey; +use freenet_stdlib::prelude::{ContractInstanceId, ContractKey}; use parking_lot::RwLock; use rand::seq::SliceRandom; use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::sync; -use crate::topology::TopologyManager; +use crate::message::TransactionType; +use crate::topology::{AcquisitionStrategy, TopologyManager}; use crate::{ config::GlobalExecutor, message::Transaction, @@ -169,6 +171,9 @@ pub(crate) struct Ring { min_connections: usize, router: Arc>, topology_manager: RwLock, + /// Fast is for when there are less than our target number of connections so we want to acquire new connections quickly. + /// Slow is for when there are enough connections so we need to drop a connection in order to replace it. + fast_acquisition: AtomicBool, connections_by_location: RwLock>>, location_for_peer: RwLock>, /// contracts in the ring cached by this node @@ -263,6 +268,7 @@ impl Ring { min_connections, router, topology_manager, + fast_acquisition: AtomicBool::new(true), connections_by_location: RwLock::new(BTreeMap::new()), location_for_peer: RwLock::new(BTreeMap::new()), cached_contracts: DashSet::new(), @@ -393,12 +399,17 @@ impl Ring { tracing::debug!(peer = %self.peer_key, "max open connections reached"); false } else { + let strategy = if self + .fast_acquisition + .load(std::sync::atomic::Ordering::Acquire) + { + AcquisitionStrategy::Fast + } else { + AcquisitionStrategy::Slow + }; self.topology_manager .write() - .evaluate_new_connection( - location, - crate::topology::AcquisitionStrategy::Slow, // todo: conditionally change this - ) + .evaluate_new_connection(location, strategy) .expect("already have > min connections, so neighbors shouldn't be empty") }; if !accepted { @@ -408,6 +419,12 @@ impl Ring { accepted } + pub fn record_request(&self, requested_location: Location, request_type: TransactionType) { + self.topology_manager + .write() + .record_request(requested_location, request_type); + } + pub fn add_connection(&self, loc: Location, peer: PeerKey) { let mut cbl = self.connections_by_location.write(); cbl.entry(loc).or_default().push(Connection { @@ -640,6 +657,8 @@ impl Ring { .open_connections .load(std::sync::atomic::Ordering::Acquire); if open_connections < self.max_connections && open_connections > self.min_connections { + self.fast_acquisition + .store(true, std::sync::atomic::Ordering::Release); // requires more connections let ideal_location = { self.topology_manager @@ -684,6 +703,8 @@ impl Ring { ) }; if !should_swap.is_empty() { + self.fast_acquisition + .store(false, std::sync::atomic::Ordering::Release); let ideal_location = { self.topology_manager .write() @@ -818,6 +839,16 @@ impl Location { pub fn as_f64(&self) -> f64 { self.0 } + + fn from_contract_key(bytes: &[u8]) -> Self { + let mut value = 0.0; + let mut divisor = 256.0; + for byte in bytes { + value += *byte as f64 / divisor; + divisor *= 256.0; + } + Location::try_from(value).expect("value should be between 0 and 1") + } } impl std::ops::Add for Location { @@ -835,13 +866,13 @@ impl std::ops::Add for Location { /// (which have been hashed with a strong, cryptographically safe, hash function first). impl From<&ContractKey> for Location { fn from(key: &ContractKey) -> Self { - let mut value = 0.0; - let mut divisor = 256.0; - for byte in key.bytes().iter().take(7) { - value += *byte as f64 / divisor; - divisor *= 256.0; - } - Location::try_from(value).expect("value should be between 0 and 1") + Self::from_contract_key(key.id().as_bytes()) + } +} + +impl From<&ContractInstanceId> for Location { + fn from(key: &ContractInstanceId) -> Self { + Self::from_contract_key(key.as_bytes()) } } diff --git a/crates/core/src/runtime/wasm_runtime.rs b/crates/core/src/runtime/wasm_runtime.rs index 4c9d2db29..ad7b0de0a 100644 --- a/crates/core/src/runtime/wasm_runtime.rs +++ b/crates/core/src/runtime/wasm_runtime.rs @@ -198,7 +198,7 @@ impl Runtime { .clone(); let instance = self.prepare_instance(&module)?; self.set_instance_mem(req_bytes, &instance)?; - RunningInstance::new(self, instance, Key::Contract(key.id())) + RunningInstance::new(self, instance, Key::Contract(*key.id())) } pub(super) fn prepare_delegate_call( diff --git a/crates/core/src/topology.rs b/crates/core/src/topology.rs index de838388e..341543777 100644 --- a/crates/core/src/topology.rs +++ b/crates/core/src/topology.rs @@ -1,4 +1,7 @@ -use crate::ring::{Distance, Location}; +use crate::{ + message::TransactionType, + ring::{Distance, Location}, +}; use request_density_tracker::cached_density_map::CachedDensityMap; use std::{ collections::BTreeMap, @@ -6,13 +9,13 @@ use std::{ }; use tracing::{debug, error, info}; -use self::request_density_tracker::DensityMapError; -pub(crate) use small_world_rand::random_link_distance; - mod connection_evaluator; mod request_density_tracker; mod small_world_rand; +use request_density_tracker::DensityMapError; +use small_world_rand::random_link_distance; + const SLOW_CONNECTION_EVALUATOR_WINDOW_DURATION: Duration = Duration::from_secs(5 * 60); const FAST_CONNECTION_EVALUATOR_WINDOW_DURATION: Duration = Duration::from_secs(60); const REQUEST_DENSITY_TRACKER_WINDOW_SIZE: usize = 10_000; @@ -78,7 +81,7 @@ impl TopologyManager { pub(crate) fn record_request( &mut self, requested_location: Location, - _request_type: RequestType, + _request_type: TransactionType, ) { debug!("Recording request for location: {:?}", requested_location); self.request_density_tracker.sample(requested_location); @@ -172,13 +175,7 @@ impl TopologyManager { } } -pub(crate) enum RequestType { - Get, - Put, - Join, - Subscribe, -} - +// FIXME pub(crate) enum AcquisitionStrategy { /// Acquire new connections slowly, be picky Slow, @@ -190,7 +187,7 @@ pub(crate) enum AcquisitionStrategy { #[cfg(test)] mod tests { use super::TopologyManager; - use crate::ring::Location; + use crate::{message::TransactionType, ring::Location}; #[test] fn test_topology_manager() { @@ -206,7 +203,7 @@ mod tests { // Simulate a bunch of random requests clustered around 0.35 for _ in 0..1000 { let requested_location = topology_manager.random_location(); - topology_manager.record_request(requested_location, super::RequestType::Get); + topology_manager.record_request(requested_location, TransactionType::Get); requests.push(requested_location); } diff --git a/crates/core/src/topology/connection_evaluator.rs b/crates/core/src/topology/connection_evaluator.rs index 00d56ee4e..b80b6d68e 100644 --- a/crates/core/src/topology/connection_evaluator.rs +++ b/crates/core/src/topology/connection_evaluator.rs @@ -52,4 +52,93 @@ impl ConnectionEvaluator { } #[cfg(test)] -mod tests; +mod tests { + use super::*; + + #[test] + fn test_record_first_score() { + let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); + let current_time = Instant::now(); + assert!(evaluator.record_and_eval_with_current_time(5.0, current_time)); + // Assert evaluator.scores contains the new score + assert_eq!(evaluator.scores.len(), 1); + assert_eq!(evaluator.scores[0].1, 5.0); + assert_eq!(evaluator.scores[0].0, current_time); + } + + #[test] + fn test_not_best_in_time_window() { + let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); + + let start_time = Instant::now(); + evaluator.record_and_eval_with_current_time(5.0, start_time); + assert!( + !evaluator.record_and_eval_with_current_time(4.0, start_time + Duration::from_secs(5)), + ); + } + + #[test] + fn test_best_in_time_window() { + let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); + + let start_time = Instant::now(); + evaluator.record_and_eval_with_current_time(5.0, start_time); + assert!( + evaluator.record_and_eval_with_current_time(4.0, start_time + Duration::from_secs(11)), + ); + } + + #[test] + fn test_remove_outdated_scores() { + let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); + + let start_time = Instant::now(); + evaluator.record_and_eval_with_current_time(5.0, start_time); + evaluator.record_and_eval_with_current_time(6.0, start_time + Duration::from_secs(5)); + evaluator.record_and_eval_with_current_time(4.5, start_time + Duration::from_secs(11)); + assert_eq!(evaluator.scores.len(), 2); + } + + #[test] + fn test_empty_window_duration() { + let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(0)); + let current_time = Instant::now(); + assert!(evaluator.record_and_eval_with_current_time(5.0, current_time)); + assert!(!evaluator.record_and_eval_with_current_time(4.0, current_time)); + } + + #[test] + fn test_multiple_scores_same_timestamp() { + let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); + let current_time = Instant::now(); + evaluator.record_only_with_current_time(5.0, current_time); + evaluator.record_only_with_current_time(6.0, current_time); + assert_eq!(evaluator.scores.len(), 2); + assert!(!evaluator + .record_and_eval_with_current_time(4.0, current_time + Duration::from_secs(5)),); + } + + #[test] + fn test_negative_scores() { + let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); + let start_time = Instant::now(); + assert!(evaluator.record_and_eval_with_current_time(-5.0, start_time),); + assert!( + evaluator.record_and_eval_with_current_time(-4.0, start_time + Duration::from_secs(5)), + ); + assert!( + !evaluator.record_and_eval_with_current_time(-6.0, start_time + Duration::from_secs(5)), + ); + } + + #[test] + fn test_large_number_of_scores() { + let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); + let start_time = Instant::now(); + for i in 0..1000 { + evaluator.record_only_with_current_time(i as f64, start_time + Duration::from_secs(i)); + } + assert!(evaluator + .record_and_eval_with_current_time(1000.0, start_time + Duration::from_secs(1001)),); + } +} diff --git a/crates/core/src/topology/connection_evaluator/tests.rs b/crates/core/src/topology/connection_evaluator/tests.rs deleted file mode 100644 index 52bbf4974..000000000 --- a/crates/core/src/topology/connection_evaluator/tests.rs +++ /dev/null @@ -1,84 +0,0 @@ -use super::*; - -#[test] -fn test_record_first_score() { - let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); - let current_time = Instant::now(); - assert!(evaluator.record_and_eval_with_current_time(5.0, current_time)); - // Assert evaluator.scores contains the new score - assert_eq!(evaluator.scores.len(), 1); - assert_eq!(evaluator.scores[0].1, 5.0); - assert_eq!(evaluator.scores[0].0, current_time); -} - -#[test] -fn test_not_best_in_time_window() { - let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); - - let start_time = Instant::now(); - evaluator.record_and_eval_with_current_time(5.0, start_time); - assert!(!evaluator.record_and_eval_with_current_time(4.0, start_time + Duration::from_secs(5)),); -} - -#[test] -fn test_best_in_time_window() { - let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); - - let start_time = Instant::now(); - evaluator.record_and_eval_with_current_time(5.0, start_time); - assert!(evaluator.record_and_eval_with_current_time(4.0, start_time + Duration::from_secs(11)),); -} - -#[test] -fn test_remove_outdated_scores() { - let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); - - let start_time = Instant::now(); - evaluator.record_and_eval_with_current_time(5.0, start_time); - evaluator.record_and_eval_with_current_time(6.0, start_time + Duration::from_secs(5)); - evaluator.record_and_eval_with_current_time(4.5, start_time + Duration::from_secs(11)); - assert_eq!(evaluator.scores.len(), 2); -} - -#[test] -fn test_empty_window_duration() { - let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(0)); - let current_time = Instant::now(); - assert!(evaluator.record_and_eval_with_current_time(5.0, current_time)); - assert!(!evaluator.record_and_eval_with_current_time(4.0, current_time)); -} - -#[test] -fn test_multiple_scores_same_timestamp() { - let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); - let current_time = Instant::now(); - evaluator.record_only_with_current_time(5.0, current_time); - evaluator.record_only_with_current_time(6.0, current_time); - assert_eq!(evaluator.scores.len(), 2); - assert!( - !evaluator.record_and_eval_with_current_time(4.0, current_time + Duration::from_secs(5)), - ); -} - -#[test] -fn test_negative_scores() { - let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); - let start_time = Instant::now(); - assert!(evaluator.record_and_eval_with_current_time(-5.0, start_time),); - assert!(evaluator.record_and_eval_with_current_time(-4.0, start_time + Duration::from_secs(5)),); - assert!( - !evaluator.record_and_eval_with_current_time(-6.0, start_time + Duration::from_secs(5)), - ); -} - -#[test] -fn test_large_number_of_scores() { - let mut evaluator = ConnectionEvaluator::new(Duration::from_secs(10)); - let start_time = Instant::now(); - for i in 0..1000 { - evaluator.record_only_with_current_time(i as f64, start_time + Duration::from_secs(i)); - } - assert!( - evaluator.record_and_eval_with_current_time(1000.0, start_time + Duration::from_secs(1001)), - ); -} diff --git a/crates/core/src/topology/request_density_tracker.rs b/crates/core/src/topology/request_density_tracker.rs index ea7b6ff05..1199a95f7 100644 --- a/crates/core/src/topology/request_density_tracker.rs +++ b/crates/core/src/topology/request_density_tracker.rs @@ -193,16 +193,6 @@ impl DensityMap { } } -// Define the custom error type using thiserror -#[derive(Error, Debug)] -pub(super) enum DensityError { - #[error("Not enough samples to determine lower and upper bounds")] - CantFindBounds, - - #[error("Window radius too big. Window radius should be <= 50% of the number of samples ({samples}) and window size ({window_size}).")] - WindowTooBig { samples: usize, window_size: usize }, -} - #[derive(Error, Debug)] pub(crate) enum DensityMapError { #[error("The neighbors BTreeMap is empty.")] diff --git a/crates/core/src/topology/small_world_rand.rs b/crates/core/src/topology/small_world_rand.rs index 57fb309fe..5f5e1c005 100644 --- a/crates/core/src/topology/small_world_rand.rs +++ b/crates/core/src/topology/small_world_rand.rs @@ -3,7 +3,7 @@ use rand::Rng; use crate::ring::Distance; // Function to generate a random link distance based on Kleinberg's d^{-1} distribution -pub(crate) fn random_link_distance(d_min: Distance) -> Distance { +pub(super) fn random_link_distance(d_min: Distance) -> Distance { let d_max = 0.5; // Generate a uniform random number between 0 and 1 diff --git a/stdlib b/stdlib index f36475537..10167caf6 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit f36475537a1fa2e1f19721bbfac452ce508f02b9 +Subproject commit 10167caf635fe5876b0e755c700f0e17b6e177a0