From 3c1cbe4fbe706af2f965b97510eae2af63b4c797 Mon Sep 17 00:00:00 2001 From: hsantos Date: Mon, 6 Jan 2025 11:27:58 +0100 Subject: [PATCH 1/7] Skip unnecessary updates if the new state matches the current one --- crates/core/src/contract/executor/runtime.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index fbae1fa9f..7bdbdf55d 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -536,6 +536,12 @@ impl Executor { } }; let new_state = WrappedState::new(new_state.into_bytes()); + + if new_state.as_ref() == current_state.as_ref() { + tracing::debug!("No changes in state for contract {key}, avoiding update"); + return Ok(Either::Left(current_state.clone())); + } + self.state_store .update(key, new_state.clone()) .await From 5b6270fcb1e8e21e7ab55bbf5efe852040f919f2 Mon Sep 17 00:00:00 2001 From: hsantos Date: Mon, 6 Jan 2025 23:16:02 +0100 Subject: [PATCH 2/7] don't start update op without state changes --- crates/core/src/client_events.rs | 3 ++ crates/core/src/contract.rs | 24 +++++++---- crates/core/src/contract/executor.rs | 9 +++- .../src/contract/executor/mock_runtime.rs | 7 ++-- crates/core/src/contract/executor/runtime.rs | 42 ++++++++++++++++++- crates/core/src/contract/handler.rs | 7 ++++ 6 files changed, 76 insertions(+), 16 deletions(-) diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index 4a2c82afb..f2a5a690b 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -360,6 +360,9 @@ async fn process_open_request( Ok(ContractHandlerEvent::UpdateResponse { new_value: Err(err), }) => Err(OpError::from(err)), + Ok(ContractHandlerEvent::UpdateNoChange { .. }) => { + todo!("update no change, do not start op") + } Err(err) => Err(err.into()), Ok(_) => Err(OpError::UnexpectedOpState), } diff --git a/crates/core/src/contract.rs b/crates/core/src/contract.rs index 6fca35355..f5cdaa5da 100644 --- a/crates/core/src/contract.rs +++ b/crates/core/src/contract.rs @@ -10,7 +10,7 @@ mod handler; pub mod storages; pub(crate) use executor::{ - executor_channel, mock_runtime::MockRuntime, Callback, ExecutorToEventLoopChannel, + executor_channel, mock_runtime::MockRuntime, Callback, ExecutorToEventLoopChannel, UpsertResult, NetworkEventListenerHalve, }; pub(crate) use handler::{ @@ -87,17 +87,21 @@ where } => { let put_result = contract_handler .executor() - .upsert_contract_state(key, Either::Left(state), related_contracts, contract) + .upsert_contract_state(key, Either::Left(state.clone()), related_contracts, contract) .instrument(tracing::info_span!("upsert_contract_state", %key)) .await; + let event_result = match put_result { + Ok(UpsertResult::NoChange) => ContractHandlerEvent::PutResponse {new_value: Ok(state)}, + Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::PutResponse {new_value: Ok(state)}, + Err(err) => ContractHandlerEvent::PutResponse {new_value: Err(err)}, + }; + contract_handler .channel() .send_to_sender( id, - ContractHandlerEvent::PutResponse { - new_value: put_result.map_err(Into::into), - }, + event_result, ) .await .map_err(|error| { @@ -123,13 +127,17 @@ where .instrument(tracing::info_span!("upsert_contract_state", %key)) .await; + let event_result = match update_result { + Ok(UpsertResult::NoChange) => ContractHandlerEvent::UpdateNoChange { key }, + Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::UpdateResponse {new_value: Ok(state)}, + Err(err) => ContractHandlerEvent::UpdateResponse {new_value: Err(err)}, + }; + contract_handler .channel() .send_to_sender( id, - ContractHandlerEvent::UpdateResponse { - new_value: update_result.map_err(Into::into), - }, + event_result, ) .await .map_err(|error| { diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index a0d9217aa..50b63a0b7 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -32,7 +32,6 @@ use crate::{ client_events::{ClientId, HostResult}, operations::{self, Operation}, }; - use super::storages::Storage; pub(super) mod mock_runtime; @@ -399,6 +398,12 @@ struct UpdateContract { new_state: WrappedState, } +#[derive(Debug)] +pub(crate) enum UpsertResult { + NoChange, + Updated(WrappedState), +} + impl ComposeNetworkMessage for UpdateContract { fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp { let UpdateContract { key, new_state } = self; @@ -428,7 +433,7 @@ pub(crate) trait ContractExecutor: Send + 'static { update: Either>, related_contracts: RelatedContracts<'static>, code: Option, - ) -> impl Future> + Send; + ) -> impl Future> + Send; fn register_contract_notifier( &mut self, diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index f3addfb8e..018c3a3f6 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -82,7 +82,7 @@ impl ContractExecutor for Executor { state: Either>, _related_contracts: RelatedContracts<'static>, code: Option, - ) -> Result { + ) -> Result { // todo: instead allow to perform mutations per contract based on incoming value so we can track // state values over the network match (state, code) { @@ -95,16 +95,15 @@ impl ContractExecutor for Executor { .store(key, incoming_state.clone(), contract.params().into_owned()) .await .map_err(ExecutorError::other)?; - Ok(incoming_state) + Ok(UpsertResult::Updated(incoming_state)) } (Either::Left(incoming_state), None) => { // update case - self.state_store .update(&key, incoming_state.clone()) .await .map_err(ExecutorError::other)?; - Ok(incoming_state) + Ok(UpsertResult::Updated(incoming_state)) } (update, contract) => unreachable!("{update:?}, {contract:?}"), } diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index 7bdbdf55d..ebc895ab0 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -18,7 +18,7 @@ impl ContractExecutor for Executor { update: Either>, related_contracts: RelatedContracts<'static>, code: Option, - ) -> Result { + ) -> Result { let params = if let Some(code) = &code { code.params() } else { @@ -134,7 +134,13 @@ impl ContractExecutor for Executor { .validate_state(&key, ¶ms, &updated_state, &related_contracts) .map_err(|e| ExecutorError::execution(e, None))? { - ValidateResult::Valid => Ok(updated_state), + ValidateResult::Valid => { + if updated_state.as_ref() == current_state.as_ref() { + Ok(UpsertResult::NoChange) + } else { + Ok(UpsertResult::Updated(updated_state)) + } + } ValidateResult::Invalid => Err(ExecutorError::request( freenet_stdlib::client_api::ContractError::Update { key, @@ -888,3 +894,35 @@ impl Executor { Ok(Either::Right(get_result)) } } + +#[cfg(test)] +mod tests { + use super::*; + use tokio::runtime::Runtime; + + #[tokio::test] + async fn test_attempt_state_update() { + // Setup the necessary environment + let mut executor = Executor::new_mock("test_attempt_state_update", None) + .await + .unwrap(); + let key = ContractKey::from(vec![1, 2, 3]); + let state = WrappedState::from(vec![4, 5, 6]); + let related_contracts = RelatedContracts::default(); + let code = Some(ContractContainer::Wasm(ContractWasmAPIVersion::V1( + WrappedContract::new( + Arc::new(ContractCode::from(vec![7, 8, 9])), + Parameters::from(vec![10, 11, 12]), + ), + ))); + + // Call the function + let result = executor + .runtime + .attempt_state_update(key, Either::Left(state), related_contracts, code) + .await; + + // Assert the result + assert!(result.is_ok()); + } +} diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index 267384047..be5b74947 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -334,6 +334,10 @@ pub(crate) enum ContractHandlerEvent { UpdateResponse { new_value: Result, }, + // The response to an update query where the state has not changed + UpdateNoChange { + key: ContractKey, + }, RegisterSubscriberListener { key: ContractKey, client_id: ClientId, @@ -397,6 +401,9 @@ impl std::fmt::Display for ContractHandlerEvent { write!(f, "update query failed {{ {e} }}",) } }, + ContractHandlerEvent::UpdateNoChange { key } => { + write!(f, "update query no change {{ {key} }}",) + } ContractHandlerEvent::RegisterSubscriberListener { key, client_id, .. } => { write!( f, From 74d044b99783067c68f52e01f2b0d279becece62 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Mon, 6 Jan 2025 23:12:18 +0000 Subject: [PATCH 3/7] format --- crates/core/src/contract.rs | 41 +++++++++++++++++----------- crates/core/src/contract/executor.rs | 2 +- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/crates/core/src/contract.rs b/crates/core/src/contract.rs index f5cdaa5da..e099436a9 100644 --- a/crates/core/src/contract.rs +++ b/crates/core/src/contract.rs @@ -10,8 +10,8 @@ mod handler; pub mod storages; pub(crate) use executor::{ - executor_channel, mock_runtime::MockRuntime, Callback, ExecutorToEventLoopChannel, UpsertResult, - NetworkEventListenerHalve, + executor_channel, mock_runtime::MockRuntime, Callback, ExecutorToEventLoopChannel, + NetworkEventListenerHalve, UpsertResult, }; pub(crate) use handler::{ client_responses_channel, contract_handler_channel, in_memory::MemoryContractHandler, @@ -87,22 +87,30 @@ where } => { let put_result = contract_handler .executor() - .upsert_contract_state(key, Either::Left(state.clone()), related_contracts, contract) + .upsert_contract_state( + key, + Either::Left(state.clone()), + related_contracts, + contract, + ) .instrument(tracing::info_span!("upsert_contract_state", %key)) .await; let event_result = match put_result { - Ok(UpsertResult::NoChange) => ContractHandlerEvent::PutResponse {new_value: Ok(state)}, - Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::PutResponse {new_value: Ok(state)}, - Err(err) => ContractHandlerEvent::PutResponse {new_value: Err(err)}, + Ok(UpsertResult::NoChange) => ContractHandlerEvent::PutResponse { + new_value: Ok(state), + }, + Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::PutResponse { + new_value: Ok(state), + }, + Err(err) => ContractHandlerEvent::PutResponse { + new_value: Err(err), + }, }; contract_handler .channel() - .send_to_sender( - id, - event_result, - ) + .send_to_sender(id, event_result) .await .map_err(|error| { tracing::debug!(%error, "shutting down contract handler"); @@ -129,16 +137,17 @@ where let event_result = match update_result { Ok(UpsertResult::NoChange) => ContractHandlerEvent::UpdateNoChange { key }, - Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::UpdateResponse {new_value: Ok(state)}, - Err(err) => ContractHandlerEvent::UpdateResponse {new_value: Err(err)}, + Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::UpdateResponse { + new_value: Ok(state), + }, + Err(err) => ContractHandlerEvent::UpdateResponse { + new_value: Err(err), + }, }; contract_handler .channel() - .send_to_sender( - id, - event_result, - ) + .send_to_sender(id, event_result) .await .map_err(|error| { tracing::debug!(%error, "shutting down contract handler"); diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index 50b63a0b7..34aabf0ae 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -19,6 +19,7 @@ use freenet_stdlib::prelude::*; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{self}; +use super::storages::Storage; use crate::config::Config; use crate::message::Transaction; use crate::node::OpManager; @@ -32,7 +33,6 @@ use crate::{ client_events::{ClientId, HostResult}, operations::{self, Operation}, }; -use super::storages::Storage; pub(super) mod mock_runtime; pub(super) mod runtime; From 9d110c7e0e0a02c4fb07ad42ac71a5955e91de84 Mon Sep 17 00:00:00 2001 From: "Ian Clarke (aider)" Date: Mon, 6 Jan 2025 23:17:32 +0000 Subject: [PATCH 4/7] refactor: Reorganize rate.rs to follow Rust code structure conventions --- crates/core/src/topology/rate.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/crates/core/src/topology/rate.rs b/crates/core/src/topology/rate.rs index 091f26bf6..f4baf7978 100644 --- a/crates/core/src/topology/rate.rs +++ b/crates/core/src/topology/rate.rs @@ -12,6 +12,22 @@ pub struct RateProportion { value: f64, } +impl RateProportion { + pub(crate) fn new(value: f64) -> Self { + debug_assert!( + (0.0..=1.0).contains(&value), + "Proportion must be between 0 and 1" + ); + RateProportion { value } + } +} + +impl PartialOrd for RateProportion { + fn partial_cmp(&self, other: &Self) -> Option { + self.value.partial_cmp(&other.value) + } +} + impl Rate { pub fn new(value: f64, divisor: Duration) -> Self { debug_assert!(value >= 0.0, "Value must be non-negative"); @@ -55,12 +71,6 @@ impl PartialOrd for Rate { } } -impl PartialOrd for RateProportion { - fn partial_cmp(&self, other: &Self) -> Option { - self.value.partial_cmp(&other.value) - } -} - impl std::ops::Add for Rate { type Output = Self; @@ -87,16 +97,6 @@ impl std::ops::AddAssign for Rate { } } -impl RateProportion { - pub(crate) fn new(value: f64) -> Self { - debug_assert!( - (0.0..=1.0).contains(&value), - "Proportion must be between 0 and 1" - ); - RateProportion { value } - } -} - #[cfg(test)] mod tests { use super::*; From ae9560d633f60f94a10956f47974af743227136f Mon Sep 17 00:00:00 2001 From: "Ian Clarke (aider)" Date: Mon, 6 Jan 2025 23:19:55 +0000 Subject: [PATCH 5/7] refactor: Fix MockRuntime and runtime test compilation errors --- .../core/src/contract/executor/mock_runtime.rs | 17 ++++++++++++++++- crates/core/src/contract/executor/runtime.rs | 3 +-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index 018c3a3f6..0dddb8d0f 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -5,10 +5,25 @@ pub(crate) struct MockRuntime { pub contract_store: ContractStore, } +impl MockRuntime { + pub async fn attempt_state_update( + &self, + key: ContractKey, + state: Either>, + _related_contracts: RelatedContracts<'static>, + _code: Option, + ) -> Result>, ExecutorError> { + match state { + Either::Left(state) => Ok(Either::Left(state)), + Either::Right(_) => Err(ExecutorError::other(anyhow::anyhow!("Delta updates not supported in mock"))) + } + } +} + impl Executor { pub async fn new_mock( identifier: &str, - event_loop_channel: ExecutorToEventLoopChannel, + event_loop_channel: Option>, ) -> anyhow::Result { let data_dir = Self::test_data_dir(identifier); diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index ebc895ab0..dcf3270bf 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -898,7 +898,6 @@ impl Executor { #[cfg(test)] mod tests { use super::*; - use tokio::runtime::Runtime; #[tokio::test] async fn test_attempt_state_update() { @@ -906,7 +905,7 @@ mod tests { let mut executor = Executor::new_mock("test_attempt_state_update", None) .await .unwrap(); - let key = ContractKey::from(vec![1, 2, 3]); + let key = ContractKey::from(ContractInstanceId::from([1, 2, 3])); let state = WrappedState::from(vec![4, 5, 6]); let related_contracts = RelatedContracts::default(); let code = Some(ContractContainer::Wasm(ContractWasmAPIVersion::V1( From fa73563c6e7083330f83acffaad870b97c540dc3 Mon Sep 17 00:00:00 2001 From: "Ian Clarke (aider)" Date: Mon, 6 Jan 2025 23:20:37 +0000 Subject: [PATCH 6/7] fix: Resolve type mismatches and unused variable warnings in executor code --- crates/core/src/contract/executor/mock_runtime.rs | 4 ++-- crates/core/src/contract/executor/runtime.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index 0dddb8d0f..531361aa6 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -8,7 +8,7 @@ pub(crate) struct MockRuntime { impl MockRuntime { pub async fn attempt_state_update( &self, - key: ContractKey, + _key: ContractKey, state: Either>, _related_contracts: RelatedContracts<'static>, _code: Option, @@ -44,7 +44,7 @@ impl Executor { || Ok(()), OperationMode::Local, MockRuntime { contract_store }, - Some(event_loop_channel), + event_loop_channel, ) .await?; Ok(executor) diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index dcf3270bf..bc2771761 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -905,7 +905,7 @@ mod tests { let mut executor = Executor::new_mock("test_attempt_state_update", None) .await .unwrap(); - let key = ContractKey::from(ContractInstanceId::from([1, 2, 3])); + let key = ContractKey::from(ContractInstanceId::from_bytes(&[1, 2, 3])); let state = WrappedState::from(vec![4, 5, 6]); let related_contracts = RelatedContracts::default(); let code = Some(ContractContainer::Wasm(ContractWasmAPIVersion::V1( From c7226783279da60a4085fab4bfe6d897a6622b65 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Mon, 6 Jan 2025 23:21:45 +0000 Subject: [PATCH 7/7] revert erronous commits - wrong branch --- .../src/contract/executor/mock_runtime.rs | 19 ++--------- crates/core/src/contract/executor/runtime.rs | 3 +- crates/core/src/topology/rate.rs | 32 +++++++++---------- 3 files changed, 20 insertions(+), 34 deletions(-) diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index 531361aa6..018c3a3f6 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -5,25 +5,10 @@ pub(crate) struct MockRuntime { pub contract_store: ContractStore, } -impl MockRuntime { - pub async fn attempt_state_update( - &self, - _key: ContractKey, - state: Either>, - _related_contracts: RelatedContracts<'static>, - _code: Option, - ) -> Result>, ExecutorError> { - match state { - Either::Left(state) => Ok(Either::Left(state)), - Either::Right(_) => Err(ExecutorError::other(anyhow::anyhow!("Delta updates not supported in mock"))) - } - } -} - impl Executor { pub async fn new_mock( identifier: &str, - event_loop_channel: Option>, + event_loop_channel: ExecutorToEventLoopChannel, ) -> anyhow::Result { let data_dir = Self::test_data_dir(identifier); @@ -44,7 +29,7 @@ impl Executor { || Ok(()), OperationMode::Local, MockRuntime { contract_store }, - event_loop_channel, + Some(event_loop_channel), ) .await?; Ok(executor) diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index bc2771761..ebc895ab0 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -898,6 +898,7 @@ impl Executor { #[cfg(test)] mod tests { use super::*; + use tokio::runtime::Runtime; #[tokio::test] async fn test_attempt_state_update() { @@ -905,7 +906,7 @@ mod tests { let mut executor = Executor::new_mock("test_attempt_state_update", None) .await .unwrap(); - let key = ContractKey::from(ContractInstanceId::from_bytes(&[1, 2, 3])); + let key = ContractKey::from(vec![1, 2, 3]); let state = WrappedState::from(vec![4, 5, 6]); let related_contracts = RelatedContracts::default(); let code = Some(ContractContainer::Wasm(ContractWasmAPIVersion::V1( diff --git a/crates/core/src/topology/rate.rs b/crates/core/src/topology/rate.rs index f4baf7978..091f26bf6 100644 --- a/crates/core/src/topology/rate.rs +++ b/crates/core/src/topology/rate.rs @@ -12,22 +12,6 @@ pub struct RateProportion { value: f64, } -impl RateProportion { - pub(crate) fn new(value: f64) -> Self { - debug_assert!( - (0.0..=1.0).contains(&value), - "Proportion must be between 0 and 1" - ); - RateProportion { value } - } -} - -impl PartialOrd for RateProportion { - fn partial_cmp(&self, other: &Self) -> Option { - self.value.partial_cmp(&other.value) - } -} - impl Rate { pub fn new(value: f64, divisor: Duration) -> Self { debug_assert!(value >= 0.0, "Value must be non-negative"); @@ -71,6 +55,12 @@ impl PartialOrd for Rate { } } +impl PartialOrd for RateProportion { + fn partial_cmp(&self, other: &Self) -> Option { + self.value.partial_cmp(&other.value) + } +} + impl std::ops::Add for Rate { type Output = Self; @@ -97,6 +87,16 @@ impl std::ops::AddAssign for Rate { } } +impl RateProportion { + pub(crate) fn new(value: f64) -> Self { + debug_assert!( + (0.0..=1.0).contains(&value), + "Proportion must be between 0 and 1" + ); + RateProportion { value } + } +} + #[cfg(test)] mod tests { use super::*;