From 5636c9fa49fb2d47b170bcbf778f35b7779b0479 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Thu, 14 Dec 2023 13:03:25 +0100 Subject: [PATCH 1/7] Remove unnecessary panic --- crates/core/src/contract.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/core/src/contract.rs b/crates/core/src/contract.rs index 5255d1d4c..b1aea2d28 100644 --- a/crates/core/src/contract.rs +++ b/crates/core/src/contract.rs @@ -127,7 +127,6 @@ where tracing::debug!(%error, "shutting down contract handler"); error })?; - todo!() } _ => unreachable!(), } From 5c40e5f75bfa10afab2d535de4b7f709772750ee Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Thu, 14 Dec 2023 20:01:17 +0100 Subject: [PATCH 2/7] Fix put op to properly forward and return back answer --- crates/core/src/client_events.rs | 36 ++-- .../src/node/network_bridge/p2p_protoc.rs | 1 + crates/core/src/node/testing_impl.rs | 1 + crates/core/src/operations/put.rs | 163 +++++++++++------- crates/core/src/tracing.rs | 2 +- crates/fdev/src/network_metrics_server.rs | 4 +- network-monitor/src/app.ts | 9 +- schemas/flatbuffers/network_events.fbs | 0 stdlib | 2 +- 9 files changed, 135 insertions(+), 83 deletions(-) create mode 100644 schemas/flatbuffers/network_events.fbs diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index 66313f275..037d8bdc9 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -342,8 +342,8 @@ pub(crate) mod test { while state.current_iteration < state.max_iterations { state.current_iteration += 1; let for_this_peer = self.gen_range(0..state.num_peers) == state.this_peer; - match self.gen_range(0..4) { - 0 => { + match self.gen_range(0..100) { + val if (0..10).contains(&val) => { if state.max_contract_num <= state.existing_contracts.len() { continue; } @@ -360,36 +360,36 @@ pub(crate) mod test { return Some(request.into()); } } - 1 => { + val if (10..35).contains(&val) => { if let Some(contract) = self.choose(&state.existing_contracts) { - let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec())); if !for_this_peer { continue; } - let request = ContractRequest::Update { - key: contract.key().clone(), - data: delta, + let key = contract.key(); + let fetch_contract = state.owns_contracts.contains(&key); + let request = ContractRequest::Get { + key, + fetch_contract, }; - if state.owns_contracts.contains(&contract.key()) { - return Some(request.into()); - } + return Some(request.into()); } } - 2 => { + val if (35..85).contains(&val) => { if let Some(contract) = self.choose(&state.existing_contracts) { + let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec())); if !for_this_peer { continue; } - let key = contract.key(); - let fetch_contract = state.owns_contracts.contains(&key); - let request = ContractRequest::Get { - key, - fetch_contract, + let request = ContractRequest::Update { + key: contract.key().clone(), + data: delta, }; - return Some(request.into()); + if state.owns_contracts.contains(&contract.key()) { + return Some(request.into()); + } } } - 3 => { + val if (85..100).contains(&val) => { if let Some(contract) = self.choose(&state.existing_contracts) { let key = contract.key(); let summary = StateSummary::from(self.random_byte_vec()); diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 1521346fe..c452bee85 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -279,6 +279,7 @@ impl P2pConnManager { ) -> Result<(), anyhow::Error> { use ConnMngrActions::*; + // FIXME: this two containers need to be clean up on transaction time-out let mut pending_from_executor = HashSet::new(); let mut tx_to_client: HashMap = HashMap::new(); diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index a2627224f..f238c48a5 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -1050,6 +1050,7 @@ where NB: NetworkBridge + NetworkBridgeExt, UsrEv: ClientEventsProxy + Send + 'static, { + // todo: this two containers need to be clean up on transaction time-out let mut pending_from_executor = HashSet::new(); let mut tx_to_client: HashMap = HashMap::new(); loop { diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 231cb5a07..83f61ef71 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -231,14 +231,6 @@ impl Operation for PutOp { .ring .within_subscribing_distance(&Location::from(&key)) { - if !is_subscribed_contract { - tracing::debug!(tx = %id, %key, "Contract not cached @ peer {}", target.peer); - match try_subscribing_to_contract(op_manager, key.clone()).await { - Ok(_) => {} - Err(err) => return Err(err), - } - } - // after the contract has been cached, push the update query tracing::debug!(tx = %id, "Attempting contract value update"); put_contract( op_manager, @@ -256,7 +248,7 @@ impl Operation for PutOp { ); } - if let Some(new_htl) = htl.checked_sub(1) { + let last_hop = if let Some(new_htl) = htl.checked_sub(1) { // forward changes in the contract to nodes closer to the contract location, if possible let put_here = forward_put( op_manager, @@ -265,6 +257,7 @@ impl Operation for PutOp { value.clone(), *id, new_htl, + vec![sender.peer], ) .await; if put_here && !is_subscribed_contract { @@ -278,14 +271,27 @@ impl Operation for PutOp { ) .await?; } - } + put_here + } else { + // should put in this location, no hops left + put_contract( + op_manager, + key.clone(), + value.clone(), + RelatedContracts::default(), + contract, + ) + .await?; + true + }; let broadcast_to = op_manager.get_broadcast_targets(&key, &sender.peer); match try_to_broadcast( *id, + last_hop, op_manager, self.state, - broadcast_to, + (broadcast_to, *sender), key.clone(), (contract.clone(), value.clone()), ) @@ -327,9 +333,10 @@ impl Operation for PutOp { match try_to_broadcast( *id, + false, op_manager, self.state, - broadcast_to, + (broadcast_to, *sender), key.clone(), (contract.clone(), new_value), ) @@ -349,6 +356,7 @@ impl Operation for PutOp { key, new_value, contract, + upstream, } => { let sender = op_manager.ring.own_location(); let mut broadcasted_to = *broadcasted_to; @@ -393,19 +401,39 @@ impl Operation for PutOp { broadcasted_to += broadcast_to.len() - incorrect_results; tracing::debug!( - "successfully broadcasted put into contract {key} to {broadcasted_to} peers" + "Successfully broadcasted put into contract {key} to {broadcasted_to} peers" ); // Subscriber nodes have been notified of the change, the operation is completed - return_msg = Some(PutMsg::SuccessfulPut { id: *id }); + return_msg = Some(PutMsg::SuccessfulPut { + id: *id, + target: *upstream, + }); new_state = None; } - PutMsg::SuccessfulPut { .. } => { + PutMsg::SuccessfulPut { id, .. } => { match self.state { - Some(PutState::AwaitingResponse { contract, .. }) => { - tracing::debug!("Successfully updated value for {}", contract,); + Some(PutState::AwaitingResponse { key, upstream }) => { + let is_subscribed_contract = + op_manager.ring.is_subscribed_to_contract(&key); + if !is_subscribed_contract + && op_manager + .ring + .within_subscribing_distance(&Location::from(&key)) + { + tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching"); + start_subscription(op_manager, key.clone()).await; + } + tracing::debug!(tx = %id, "Successfully updated value for {key}"); new_state = None; - return_msg = None; + if let Some(upstream) = upstream { + return_msg = Some(PutMsg::SuccessfulPut { + id: *id, + target: upstream, + }); + } else { + return_msg = None; + } } _ => return Err(OpError::invalid_transition(self.id)), }; @@ -420,6 +448,7 @@ impl Operation for PutOp { new_value, htl, sender, + skip_list, } => { let key = contract.key(); let peer_loc = op_manager.ring.own_location(); @@ -435,13 +464,6 @@ impl Operation for PutOp { .ring .within_subscribing_distance(&Location::from(&key)); if is_subscribed_contract || within_caching_dist { - if !is_subscribed_contract { - tracing::debug!(%key, "Contract not cached @ peer {}", peer_loc.peer); - match try_subscribing_to_contract(op_manager, key.clone()).await { - Ok(_) => {} - Err(err) => return Err(err), - } - } // after the contract has been cached, push the update query put_contract( op_manager, @@ -454,7 +476,10 @@ impl Operation for PutOp { } // if successful, forward to the next closest peers (if any) - if let Some(new_htl) = htl.checked_sub(1) { + let last_hop = if let Some(new_htl) = htl.checked_sub(1) { + let mut new_skip_list = skip_list.clone(); + new_skip_list.push(sender.peer); + // only hop forward if there are closer peers let put_here = forward_put( op_manager, conn_manager, @@ -462,6 +487,7 @@ impl Operation for PutOp { new_value.clone(), *id, new_htl, + new_skip_list, ) .await; if put_here && !is_subscribed_contract { @@ -475,14 +501,27 @@ impl Operation for PutOp { ) .await?; } - } + put_here + } else { + // should put in this location, no hops left + put_contract( + op_manager, + key.clone(), + new_value.clone(), + RelatedContracts::default(), + contract, + ) + .await?; + true + }; let broadcast_to = op_manager.get_broadcast_targets(&key, &sender.peer); match try_to_broadcast( *id, + last_hop, op_manager, self.state, - broadcast_to, + (broadcast_to, *sender), key.clone(), (contract.clone(), new_value.clone()), ) @@ -536,35 +575,20 @@ fn build_op_result( }) } -pub(super) async fn try_subscribing_to_contract( - op_manager: &OpManager, - key: ContractKey, -) -> Result<(), OpError> { - // this node does not have the contract, so instead store the contract and execute the put op. - let res = op_manager - .notify_contract_handler(ContractHandlerEvent::Subscribe { key: key.clone() }) - .await?; - if let ContractHandlerEvent::SubscribeResponse { - response: Ok(subcribed_to), - .. - } = res - { - op_manager.ring.add_subscription(key, subcribed_to); - tracing::debug!("Contract successfully cached"); - Ok(()) - } else { - tracing::error!( - "Contract handler returned wrong event when trying to cache contract, this should not happen!" - ); - Err(OpError::UnexpectedOpState) +#[inline] +async fn start_subscription(op_manager: &OpManager, key: ContractKey) { + let op = super::subscribe::start_op(key.clone()); + if let Err(error) = super::subscribe::request_subscribe(op_manager, op).await { + tracing::warn!(%error, "Error subscribing to contract"); } } async fn try_to_broadcast( id: Transaction, + last_hop: bool, op_manager: &OpManager, state: Option, - broadcast_to: Vec, + (broadcast_to, upstream): (Vec, PeerKeyLocation), key: ContractKey, (contract, new_value): (ContractContainer, WrappedState), ) -> Result<(Option, Option), OpError> { @@ -573,16 +597,19 @@ async fn try_to_broadcast( match state { Some(PutState::ReceivedRequest | PutState::BroadcastOngoing { .. }) => { - if broadcast_to.is_empty() { + if broadcast_to.is_empty() && !last_hop { // broadcast complete tracing::debug!( "Empty broadcast list while updating value for contract {}", key ); // means the whole tx finished so can return early - new_state = Some(PutState::AwaitingResponse { contract: key }); + new_state = Some(PutState::AwaitingResponse { + key, + upstream: Some(upstream), + }); return_msg = None; - } else { + } else if !broadcast_to.is_empty() { tracing::debug!("Callback to start broadcasting to other nodes"); new_state = Some(PutState::BroadcastOngoing); return_msg = Some(PutMsg::Broadcasting { @@ -592,6 +619,7 @@ async fn try_to_broadcast( broadcast_to, key, contract, + upstream, }); let op = PutOp { @@ -603,6 +631,12 @@ async fn try_to_broadcast( .notify_op_change(NetMessage::from(return_msg.unwrap()), OpEnum::Put(op)) .await?; return Err(OpError::StatePushed); + } else { + new_state = None; + return_msg = Some(PutMsg::SuccessfulPut { + id, + target: upstream, + }); } } _ => return Err(OpError::invalid_transition(id)), @@ -656,7 +690,8 @@ enum PutState { htl: usize, }, AwaitingResponse { - contract: ContractKey, + key: ContractKey, + upstream: Option, }, BroadcastOngoing, } @@ -694,7 +729,10 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re related_contracts, }) => { let key = contract.key(); - let new_state = Some(PutState::AwaitingResponse { contract: key }); + let new_state = Some(PutState::AwaitingResponse { + key, + upstream: None, + }); let msg = PutMsg::RequestPut { id, contract, @@ -763,14 +801,16 @@ async fn forward_put( new_value: WrappedState, id: Transaction, htl: usize, + skip_list: Vec, ) -> bool where CB: NetworkBridge, { let key = contract.key(); let contract_loc = Location::from(&key); - const EMPTY: &[PeerId] = &[]; - let forward_to = op_manager.ring.closest_potentially_caching(&key, EMPTY); + let forward_to = op_manager + .ring + .closest_potentially_caching(&key, &*skip_list); let own_pkloc = op_manager.ring.own_location(); let own_loc = own_pkloc.location.expect("infallible"); if let Some(peer) = forward_to { @@ -789,6 +829,7 @@ where contract: contract.clone(), new_value: new_value.clone(), htl, + skip_list, }) .into(), ) @@ -830,9 +871,13 @@ mod messages { new_value: WrappedState, /// current htl, reduced by one at each hop htl: usize, + skip_list: Vec, }, /// Value successfully inserted/updated. - SuccessfulPut { id: Transaction }, + SuccessfulPut { + id: Transaction, + target: PeerKeyLocation, + }, /// Target the node which is closest to the key SeekNode { id: Transaction, @@ -853,6 +898,7 @@ mod messages { key: ContractKey, new_value: WrappedState, contract: ContractContainer, + upstream: PeerKeyLocation, }, /// Broadcasting a change to a peer, which then will relay the changes to other peers. BroadcastTo { @@ -881,6 +927,7 @@ mod messages { match self { Self::SeekNode { target, .. } => Some(target), Self::RequestPut { target, .. } => Some(target), + Self::SuccessfulPut { target, .. } => Some(target), _ => None, } } diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index d4633963b..675293545 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -772,7 +772,7 @@ async fn send_to_metrics_server( let msg = PeerChange::removed_connection_msg(*from, send_msg.peer_id); ws_stream.send(Message::Binary(msg)).await } - // todo: send op events too 8put, get, update, etc) so we can keep track of transactions + // todo: send op events too (put, get, update, etc) so we can keep track of transactions _ => Ok(()), }; if let Err(error) = res { diff --git a/crates/fdev/src/network_metrics_server.rs b/crates/fdev/src/network_metrics_server.rs index ebe5f0ab2..68b7ac14a 100644 --- a/crates/fdev/src/network_metrics_server.rs +++ b/crates/fdev/src/network_metrics_server.rs @@ -72,7 +72,7 @@ async fn run_server( let router = Router::new() .route("/", get(home)) .route("/push-stats/", get(push_stats)) - .route("/pull-stats/", get(pull_stats)) + .route("/pull-stats/peer-changes/", get(pull_peer_changes)) .with_state(Arc::new(ServerState { changes, peer_data: DashMap::new(), @@ -153,7 +153,7 @@ async fn push_interface(ws: WebSocket, state: Arc) -> anyhow::Resul Ok(()) } -async fn pull_stats( +async fn pull_peer_changes( ws: WebSocketUpgrade, State(state): State>, ) -> axum::response::Response { diff --git a/network-monitor/src/app.ts b/network-monitor/src/app.ts index 5de1af9da..cdd7ee309 100644 --- a/network-monitor/src/app.ts +++ b/network-monitor/src/app.ts @@ -2,11 +2,14 @@ import * as flatbuffers from "flatbuffers"; import * as fbTopology from "./generated/topology"; import { handleChange } from "./topology"; -const socket = new WebSocket("ws://127.0.0.1:55010/pull-stats/"); +const PEER_CHANGES = new WebSocket( + "ws://127.0.0.1:55010/pull-stats/peer-changes/" +); +PEER_CHANGES.onmessage = handlePeerChanges; -socket.onmessage = handleMessage; +// const DELIVER_MESSAGE = new WebSocket("ws://127.0.0.1:55010/pull-stats/network-events/"); -function handleMessage(event: MessageEvent) { +function handlePeerChanges(event: MessageEvent) { const data = event.data as Blob; convertBlobToUint8Array(data) .then((uint8Array) => { diff --git a/schemas/flatbuffers/network_events.fbs b/schemas/flatbuffers/network_events.fbs new file mode 100644 index 000000000..e69de29bb diff --git a/stdlib b/stdlib index 73121030e..9b8ff58ae 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit 73121030e7dbc84379dfeed80a21c6188f520d34 +Subproject commit 9b8ff58ae58bffb22aed9f9b41984bb173fc5b2c From 81f3b50f6d7412c15739d75ce0e4422f95f764ea Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Thu, 14 Dec 2023 20:01:47 +0100 Subject: [PATCH 3/7] Clean up unnecessary subscription handling at contract handler --- crates/core/src/contract.rs | 17 --------- crates/core/src/contract/executor.rs | 18 +++++----- .../src/contract/executor/mock_runtime.rs | 19 +--------- crates/core/src/contract/executor/runtime.rs | 36 ++++++++----------- crates/core/src/contract/handler.rs | 26 -------------- crates/core/src/operations/subscribe.rs | 16 +++------ crates/core/src/ring.rs | 4 +++ 7 files changed, 34 insertions(+), 102 deletions(-) diff --git a/crates/core/src/contract.rs b/crates/core/src/contract.rs index b1aea2d28..7748709ea 100644 --- a/crates/core/src/contract.rs +++ b/crates/core/src/contract.rs @@ -111,23 +111,6 @@ where error })?; } - ContractHandlerEvent::Subscribe { key } => { - let response = contract_handler - .executor() - .subscribe_to_contract(key.clone()) - .await; - contract_handler - .channel() - .send_to_sender( - id, - ContractHandlerEvent::SubscribeResponse { key, response }, - ) - .await - .map_err(|error| { - tracing::debug!(%error, "shutting down contract handler"); - error - })?; - } _ => unreachable!(), } } diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index 6f28a856f..cddf058b6 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -25,7 +25,6 @@ use crate::node::OpManager; ))] use crate::operations::get::GetResult; use crate::operations::{OpEnum, OpError}; -use crate::ring::PeerKeyLocation; use crate::wasm_runtime::{ ContractRuntimeInterface, ContractStore, DelegateRuntimeInterface, DelegateStore, Runtime, SecretsStore, StateStore, StateStoreError, @@ -220,8 +219,16 @@ impl ExecutorToEventLoopChannel { { let op = message.initiate_op(&self.op_manager); let tx = *op.id(); - self.end.waiting_for_op_tx.send(tx).await?; - >::resume_op(op, &self.op_manager).await?; + self.end.waiting_for_op_tx.send(tx).await.map_err(|e| { + tracing::debug!("failed to send request to executor, channel closed"); + e + })?; + >::resume_op(op, &self.op_manager) + .await + .map_err(|e| { + tracing::debug!("failed to resume operation: {e}"); + e + })?; Ok(tx) } @@ -418,11 +425,6 @@ pub(crate) trait ContractExecutor: Send + 'static { related_contracts: RelatedContracts<'static>, code: Option, ) -> Result; - - async fn subscribe_to_contract( - &mut self, - key: ContractKey, - ) -> Result; } /// A WASM executor which will run any contracts, delegates, etc. registered. diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index ad89c3797..3a90caca8 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -84,7 +84,7 @@ impl ContractExecutor for Executor { &mut self, key: ContractKey, state: Either>, - related_contracts: RelatedContracts<'static>, + _related_contracts: RelatedContracts<'static>, code: Option, ) -> Result { // todo: instead allow to perform mutations per contract based on incoming value so we can track @@ -95,28 +95,11 @@ impl ContractExecutor for Executor { .store(key, incoming_state.clone(), contract.params().into_owned()) .await .map_err(ExecutorError::other)?; - - let request = PutContract { - contract, - state: incoming_state.clone(), - related_contracts, - }; - let _op: Result = self.op_request(request).await; - return Ok(incoming_state); } _ => unreachable!(), } } - - async fn subscribe_to_contract( - &mut self, - key: ContractKey, - ) -> Result { - let request = SubscribeContract { key }; - let result: operations::subscribe::SubscribeResult = self.op_request(request).await?; - Ok(result.subscribed_to) - } } #[cfg(test)] diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index 864191e5d..1cd5ccd8a 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -50,10 +50,12 @@ impl ContractExecutor for Executor { })? }; - let contract = if let Some(code) = self.runtime.contract_store.fetch_contract(&key, ¶ms) + let remove_if_fail = if self + .runtime + .contract_store + .fetch_contract(&key, ¶ms) + .is_none() { - code - } else { let code = code.ok_or_else(|| { ExecutorError::request(StdContractError::MissingContract { key: key.clone().into(), @@ -63,7 +65,9 @@ impl ContractExecutor for Executor { .contract_store .store_contract(code.clone()) .map_err(ExecutorError::other)?; - code + true + } else { + false }; let mut updates = match update { @@ -72,7 +76,9 @@ impl ContractExecutor for Executor { .runtime .validate_state(&key, ¶ms, &incoming_state, &related_contracts) .map_err(|err| { - let _ = self.runtime.contract_store.remove_contract(&key); + if remove_if_fail { + let _ = self.runtime.contract_store.remove_contract(&key); + } ExecutorError::other(err) })?; match result { @@ -91,13 +97,6 @@ impl ContractExecutor for Executor { } } - let request = PutContract { - contract, - state: incoming_state.clone(), - related_contracts: related_contracts.clone(), - }; - let _op: operations::put::PutResult = self.op_request(request).await?; - vec![UpdateData::State(incoming_state.clone().into())] } Either::Right(delta) => { @@ -105,7 +104,9 @@ impl ContractExecutor for Executor { .runtime .validate_delta(&key, ¶ms, &delta) .map_err(|err| { - let _ = self.runtime.contract_store.remove_contract(&key); + if remove_if_fail { + let _ = self.runtime.contract_store.remove_contract(&key); + } ExecutorError::other(err) })?; if !valid { @@ -155,15 +156,6 @@ impl ContractExecutor for Executor { }; Ok(updated_state) } - - async fn subscribe_to_contract( - &mut self, - key: ContractKey, - ) -> Result { - let request = SubscribeContract { key }; - let result: operations::subscribe::SubscribeResult = self.op_request(request).await?; - Ok(result.subscribed_to) - } } impl Executor { diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index ae217e604..02a3d7240 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -17,7 +17,6 @@ use super::{ }; use crate::client_events::HostResult; use crate::message::Transaction; -use crate::ring::PeerKeyLocation; use crate::{client_events::ClientId, node::PeerCliConfig, wasm_runtime::Runtime, DynError}; pub(crate) struct ClientResponsesReceiver(UnboundedReceiver<(ClientId, HostResult)>); @@ -357,14 +356,6 @@ pub(crate) enum ContractHandlerEvent { key: ContractKey, response: Result, }, - /// Subscribe to a contract. - Subscribe { key: ContractKey }, - /// The response to a subscribe event - SubscribeResponse { - key: ContractKey, - /// If successful, returns the peer to which it subscribed to for updates. - response: Result, - }, } impl std::fmt::Display for ContractHandlerEvent { @@ -399,23 +390,6 @@ impl std::fmt::Display for ContractHandlerEvent { write!(f, "get query failed {{ {key} }}",) } }, - ContractHandlerEvent::Subscribe { key } => { - write!(f, "subscribe {{ {key} }}") - } - ContractHandlerEvent::SubscribeResponse { key, response } => match response { - Ok(_) => { - write!(f, "subscribe response {{ {key} }}",) - } - Err(_) => { - write!(f, "subscribe failed {{ {key} }}",) - } - }, - // ContractHandlerEvent::Cache(container) => { - // write!(f, "caching {{ {} }}", container.key()) - // } - // ContractHandlerEvent::CacheResult(r) => { - // write!(f, "caching result {{ {} }}", r.is_ok()) - // } } } } diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index cd6f8478e..40a854bcc 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -32,21 +32,17 @@ enum SubscribeState { retries: usize, upstream_subscriber: Option, }, - Completed { - subscribed_to: PeerKeyLocation, - }, + Completed {}, } -pub(crate) struct SubscribeResult { - pub subscribed_to: PeerKeyLocation, -} +pub(crate) struct SubscribeResult {} impl TryFrom for SubscribeResult { type Error = OpError; fn try_from(value: SubscribeOp) -> Result { - if let Some(SubscribeState::Completed { subscribed_to }) = value.state { - Ok(SubscribeResult { subscribed_to }) + if let Some(SubscribeState::Completed {}) = value.state { + Ok(SubscribeResult {}) } else { Err(OpError::UnexpectedOpState) } @@ -365,9 +361,7 @@ impl Operation for SubscribeOp { ); op_manager.ring.add_subscription(key.clone(), *sender); - new_state = Some(SubscribeState::Completed { - subscribed_to: *sender, - }); + new_state = Some(SubscribeState::Completed {}); if let Some(upstream_subscriber) = upstream_subscriber { return_msg = Some(SubscribeMsg::ReturnSub { id: *id, diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index d74bf1c11..50e976f5d 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -340,7 +340,11 @@ impl Ring { pub fn within_subscribing_distance(&self, loc: &Location) -> bool { const CACHING_DISTANCE: f64 = 0.05; const MAX_CACHED: usize = 100; + const MIN_CACHED: usize = MAX_CACHED / 4; let caching_distance = Distance::new(CACHING_DISTANCE); + if self.subscriptions.len() < MIN_CACHED { + return true; + } self.subscriptions.len() < MAX_CACHED && self .own_location() From b161b0f819e390bcbb09518e8fdea0f93e53f622 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Fri, 15 Dec 2023 13:20:52 +0100 Subject: [PATCH 4/7] Actually return to client the host response + forward get always is possible --- crates/core/src/contract.rs | 4 +- .../src/contract/executor/mock_runtime.rs | 29 +- crates/core/src/contract/executor/runtime.rs | 26 +- crates/core/src/contract/handler.rs | 6 +- crates/core/src/node.rs | 14 +- .../src/node/network_bridge/p2p_protoc.rs | 2 +- crates/core/src/node/testing_impl.rs | 2 +- crates/core/src/operations.rs | 61 ++-- crates/core/src/operations/connect.rs | 7 + crates/core/src/operations/get.rs | 341 +++++++++--------- crates/core/src/operations/put.rs | 53 +-- crates/core/src/operations/subscribe.rs | 17 +- crates/core/src/operations/update.rs | 6 +- 13 files changed, 319 insertions(+), 249 deletions(-) diff --git a/crates/core/src/contract.rs b/crates/core/src/contract.rs index 7748709ea..1b4ea3e39 100644 --- a/crates/core/src/contract.rs +++ b/crates/core/src/contract.rs @@ -39,11 +39,11 @@ where match contract_handler .executor() .fetch_contract(key.clone(), fetch_contract) - .instrument(tracing::info_span!("fetch_contract", %key)) + .instrument(tracing::info_span!("fetch_contract", %key, %fetch_contract)) .await { Ok((state, contract)) => { - tracing::debug!("Fetched contract {key}"); + tracing::debug!(with_contract = %fetch_contract, has_contract = %contract.is_some(), "Fetched contract {key}"); contract_handler .channel() .send_to_sender( diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index 3a90caca8..2d75c3f5a 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -48,7 +48,7 @@ impl ContractExecutor for Executor { async fn fetch_contract( &mut self, key: ContractKey, - _fetch_contract: bool, + fetch_contract: bool, ) -> Result<(WrappedState, Option), ExecutorError> { let Some(parameters) = self .state_store @@ -57,18 +57,21 @@ impl ContractExecutor for Executor { .map_err(ExecutorError::other)? else { return Err(ExecutorError::other(format!( - "missing parameters for contract {key}" + "missing state and/or parameters for contract {key}" ))); }; + let contract = if fetch_contract { + self.runtime + .contract_store + .fetch_contract(&key, ¶meters) + } else { + None + }; let Ok(state) = self.state_store.get(&key).await else { return Err(ExecutorError::other(format!( "missing state for contract {key}" ))); }; - let contract = self - .runtime - .contract_store - .fetch_contract(&key, ¶meters); Ok((state, contract)) } @@ -91,13 +94,25 @@ impl ContractExecutor for Executor { // state values over the network match (state, code) { (Either::Left(incoming_state), Some(contract)) => { + self.runtime + .contract_store + .store_contract(contract.clone()) + .map_err(ExecutorError::other)?; self.state_store .store(key, incoming_state.clone(), contract.params().into_owned()) .await .map_err(ExecutorError::other)?; return Ok(incoming_state); } - _ => unreachable!(), + // (Either::Left(_), None) => { + // return Err(ExecutorError::request(RequestError::from( + // StdContractError::Get { + // key: key.clone(), + // cause: "Missing contract or parameters".into(), + // }, + // ))); + // } + (update, contract) => unreachable!("{update:?}, {contract:?}"), } } } diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index 1cd5ccd8a..f8817a787 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -7,6 +7,8 @@ impl ContractExecutor for Executor { key: ContractKey, fetch_contract: bool, ) -> Result<(WrappedState, Option), ExecutorError> { + // FIXME: this logic shouldn't be the same as when requested from apps + // since we don't have to try get from network when is not present locally! match self.perform_contract_get(fetch_contract, key).await { Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { contract, @@ -591,7 +593,9 @@ impl Executor { { let state = match self.local_state_or_from_network(&id).await? { Either::Left(state) => state, - Either::Right(GetResult { state, contract }) => { + Either::Right(GetResult { + state, contract, .. + }) => { let Some(contract) = contract else { return Err(ExecutorError::request( RequestError::ContractError(StdContractError::Get { @@ -658,7 +662,7 @@ impl Executor { Ok(new_state) } - async fn perform_contract_get(&mut self, contract: bool, key: ContractKey) -> Response { + async fn perform_contract_get(&mut self, fetch_contract: bool, key: ContractKey) -> Response { let mut got_contract = None; #[cfg(any( @@ -666,23 +670,23 @@ impl Executor { all(feature = "local-mode", feature = "network-mode"), ))] { - if contract && self.mode == OperationMode::Local { + if fetch_contract && self.mode == OperationMode::Local { let Some(contract) = self.get_contract_locally(&key).await? else { return Err(ExecutorError::request(RequestError::from( StdContractError::Get { key: key.clone(), - cause: "Missing contract or parameters".into(), + cause: "Missing contract and/or parameters".into(), }, ))); }; got_contract = Some(contract); - } else if contract { + } else if fetch_contract { got_contract = self.get_contract_from_network(key.clone()).await?; } } #[cfg(all(feature = "local-mode", not(feature = "network-mode")))] - if contract { + if fetch_contract { let Some(contract) = self .get_contract_locally(&key) .await @@ -695,13 +699,13 @@ impl Executor { }, ))); }; - got_contract = Some(contract); + got_contract = Some(fetch_contract); } #[cfg(all(feature = "network-mode", not(feature = "local-mode")))] - if contract { + if fetch_contract { if let Ok(Some(contract)) = self.get_contract_locally(&key).await { - got_contract = Some(contract); + got_contract = Some(fetch_contract); } else { got_contract = self.get_contract_from_network(key.clone()).await?; } @@ -989,7 +993,9 @@ impl Executor { .local_state_or_from_network(&key.clone().into()) .await? { - Either::Right(GetResult { state, contract }) => { + Either::Right(GetResult { + state, contract, .. + }) => { let Some(contract) = contract else { return Err(ExecutorError::request(RequestError::ContractError( StdContractError::Get { diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index 02a3d7240..d5df42e5f 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -247,7 +247,9 @@ pub(crate) fn contract_handler_channel() -> ( static EV_ID: AtomicU64 = AtomicU64::new(0); impl ContractHandlerChannel { - pub async fn recv_from_client_event(&mut self) -> Result<(ClientId, Transaction), DynError> { + pub async fn relay_transaction_result_to_client( + &mut self, + ) -> Result<(ClientId, Transaction), DynError> { self.end .wait_for_res_rx .recv() @@ -281,7 +283,7 @@ impl ContractHandlerChannel { } } - pub async fn waiting_for_transaction( + pub async fn waiting_for_transaction_result( &self, transaction: Transaction, client_id: ClientId, diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 3829d1558..dbffb9b95 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -10,6 +10,7 @@ use std::{ fmt::Display, + io::Write, net::{IpAddr, Ipv4Addr}, path::PathBuf, str::FromStr, @@ -378,7 +379,7 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc, op_manager: Arc, op_manager: Arc { if let Some((client_id, cb)) = client_req_handler_callback { - let _ = cb.send((client_id, op_res.to_host_result(client_id))); + let _ = cb.send((client_id, op_res.to_host_result())); } // check operations.rs:handle_op_result to see what's the meaning of each state // in case more cases want to be handled when feeding information to the OpManager @@ -568,8 +569,9 @@ async fn report_result( second_trace_lines.join("\n") }) .unwrap_or_default(); - tracing::error!(%tx, ?state, "Wrong state"); - eprintln!("Operation error trace:\n{trace}"); + let log = + format!("Transaction ({tx}) error trace:\n {trace} \nstate:\n {state:?}\n"); + std::io::stderr().write_all(log.as_bytes()).unwrap(); } #[cfg(not(any(debug_assertions, test)))] { diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index c452bee85..061854e9a 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -389,7 +389,7 @@ impl P2pConnManager { Ok(Right(ClosedChannel)) } } - event_id = client_wait_for_transaction.recv_from_client_event() => { + event_id = client_wait_for_transaction.relay_transaction_result_to_client() => { let (client_id, transaction) = event_id.map_err(|err| anyhow::anyhow!(err))?; tx_to_client.insert(transaction, client_id); continue; diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index f238c48a5..598d08edf 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -1070,7 +1070,7 @@ where anyhow::bail!("node controller channel shutdown, fatal error"); } } - event_id = wait_for_event.recv_from_client_event() => { + event_id = wait_for_event.relay_transaction_result_to_client() => { if let Ok((client_id, transaction)) = event_id { tx_to_client.insert(transaction, client_id); } diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 6cd0a5858..51843ee71 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -6,7 +6,7 @@ use futures::{future::BoxFuture, Future}; use tokio::sync::mpsc::error::SendError; use crate::{ - client_events::{ClientId, HostResult}, + client_events::HostResult, contract::{ContractError, ExecutorError}, message::{InnerMessage, NetMessage, Transaction, TransactionType}, node::{ConnectionError, NetworkBridge, OpManager, OpNotAvailable, PeerId}, @@ -19,6 +19,31 @@ pub(crate) mod put; pub(crate) mod subscribe; pub(crate) mod update; +pub(crate) trait Operation +where + Self: Sized + TryInto, +{ + type Message: InnerMessage + std::fmt::Display; + + type Result; + + fn load_or_init<'a>( + op_manager: &'a OpManager, + msg: &'a Self::Message, + ) -> BoxFuture<'a, Result, OpError>>; + + fn id(&self) -> &Transaction; + + #[allow(clippy::type_complexity)] + fn process_message<'a, CB: NetworkBridge>( + self, + conn_manager: &'a mut CB, + op_manager: &'a OpManager, + input: &'a Self::Message, + // client_id: Option, + ) -> Pin> + Send + 'a>>; +} + pub(crate) struct OperationResult { /// Inhabited if there is a message to return to the other peer. pub return_msg: Option, @@ -144,12 +169,9 @@ impl OpEnum { pub fn outcome(&self) -> OpOutcome; pub fn finalized(&self) -> bool; pub fn record_transfer(&mut self); + pub fn to_host_result(&self) -> HostResult; } } - - pub fn to_host_result(&self, _client_id: ClientId) -> HostResult { - todo!() - } } macro_rules! try_from_op_enum { @@ -277,27 +299,10 @@ impl From> for OpError { } } -pub(crate) trait Operation -where - Self: Sized + TryInto, -{ - type Message: InnerMessage + std::fmt::Display; - - type Result; - - fn load_or_init<'a>( - op_manager: &'a OpManager, - msg: &'a Self::Message, - ) -> BoxFuture<'a, Result, OpError>>; - - fn id(&self) -> &Transaction; - - #[allow(clippy::type_complexity)] - fn process_message<'a, CB: NetworkBridge>( - self, - conn_manager: &'a mut CB, - op_manager: &'a OpManager, - input: &'a Self::Message, - // client_id: Option, - ) -> Pin> + Send + 'a>>; +#[inline] +async fn start_subscription(op_manager: &OpManager, key: freenet_stdlib::prelude::ContractKey) { + let op = subscribe::start_op(key.clone()); + if let Err(error) = subscribe::request_subscribe(op_manager, op).await { + tracing::warn!(%error, "Error subscribing to contract"); + } } diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 8afbaa588..d6b6931c5 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -1,10 +1,12 @@ //! Operation which seeks new connections in the ring. +use freenet_stdlib::client_api::HostResponse; use futures::future::BoxFuture; use futures::{Future, FutureExt}; use std::pin::Pin; use std::{collections::HashSet, time::Duration}; use super::{OpError, OpInitialization, OpOutcome, Operation, OperationResult}; +use crate::client_events::HostResult; use crate::{ message::{InnerMessage, NetMessage, Transaction}, node::{ConnectionError, NetworkBridge, OpManager, PeerId}, @@ -37,6 +39,11 @@ impl ConnectOp { } pub(super) fn record_transfer(&mut self) {} + + pub(super) fn to_host_result(&self) -> HostResult { + // this should't ever be called since clients can't request explicit connects + Ok(HostResponse::Ok) + } } /// Not really used since client requests will never interact with this directly. diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index d2c35e32a..47fa7ca1e 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -1,13 +1,14 @@ use std::pin::Pin; use std::{future::Future, time::Instant}; +use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use freenet_stdlib::prelude::*; use futures::future::BoxFuture; use futures::FutureExt; -use crate::contract::ExecutorError; +use crate::client_events::HostResult; use crate::{ - contract::{ContractError, ContractHandlerEvent, StoreResponse}, + contract::{ContractHandlerEvent, StoreResponse}, message::{InnerMessage, NetMessage, Transaction}, node::{NetworkBridge, OpManager, PeerId}, operations::{OpInitialization, Operation}, @@ -149,6 +150,7 @@ enum RecordingStats { } pub(crate) struct GetResult { + key: ContractKey, pub state: WrappedState, pub contract: Option, } @@ -174,7 +176,9 @@ pub(crate) struct GetOp { impl GetOp { pub(super) fn outcome(&self) -> OpOutcome { if let Some(( - GetResult { state, contract }, + GetResult { + state, contract, .. + }, GetStats { next_peer: Some(target_peer), contract_location, @@ -232,6 +236,26 @@ impl GetOp { } } } + + pub(super) fn to_host_result(&self) -> HostResult { + match &self.result { + Some(GetResult { + key, + state, + contract, + }) => Ok(HostResponse::ContractResponse( + freenet_stdlib::client_api::ContractResponse::GetResponse { + key: key.clone(), + contract: contract.clone(), + state: state.clone(), + }, + )), + None => Err(ErrorKind::OperationError { + cause: "get didn't finish successfully".into(), + } + .into()), + } + } } impl Operation for GetOp { @@ -281,7 +305,7 @@ impl Operation for GetOp { fn process_message<'a, NB: NetworkBridge>( self, - conn_manager: &'a mut NB, + _conn_manager: &'a mut NB, op_manager: &'a OpManager, input: &'a Self::Message, ) -> Pin> + Send + 'a>> { @@ -338,159 +362,95 @@ impl Operation for GetOp { let fetch_contract = *fetch_contract; let this_peer = *target; - let is_cached_contract = op_manager.ring.is_subscribed_to_contract(&key); + let is_subscribed_contract = op_manager.ring.is_subscribed_to_contract(&key); if let Some(s) = stats.as_mut() { s.next_peer = Some(this_peer); } - if !is_cached_contract { - tracing::warn!( - tx = %id, - %key, - this_peer = %this_peer.peer, - "Contract not found while processing a get request", - ); - - if htl == 0 { - tracing::warn!( - tx = %id, - sender = %sender.peer, - "The maximum hops has been exceeded, sending error \ - back to the node", - ); - - return build_op_result( - self.id, - None, - Some(GetMsg::ReturnGet { - key, - id, - value: StoreResponse { - state: None, - contract: None, - }, - sender: op_manager.ring.own_location(), - target: *sender, // return to requester - skip_list: skip_list.clone(), - }), - None, - stats, - ); - } - - let new_htl = htl - 1; - let Some(new_target) = op_manager + if !is_subscribed_contract + && op_manager .ring - .closest_potentially_caching(&key, [&sender.peer].as_slice()) - else { - tracing::warn!( - tx = %id, - %key, - this_peer = %this_peer.peer, - "No other peers found while trying getting contract", - ); - return Err(OpError::RingError(RingError::NoCachingPeers(key))); - }; - let mut new_skip_list = skip_list.clone(); - new_skip_list.push(target.peer); - continue_seeking( - conn_manager, - &new_target, - (GetMsg::SeekNode { - id, - key, - fetch_contract, - sender: this_peer, - target: new_target, - htl: new_htl, - skip_list: new_skip_list, - }) - .into(), - ) - .await?; + .within_subscribing_distance(&Location::from(&key)) + { + tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching"); + super::start_subscription(op_manager, key.clone()).await; + } - return_msg = None; - new_state = Some(GetState::AwaitingResponse { - requester: Some(*sender), - retries: 0, - fetch_contract, - }); - } else if let ContractHandlerEvent::GetResponse { - key: returned_key, - response: value, - } = op_manager + let get_result = op_manager .notify_contract_handler(ContractHandlerEvent::GetQuery { key: key.clone(), fetch_contract, }) - .await? - { - match check_contract_found( - key.clone(), - id, - fetch_contract, - &value, - returned_key.clone(), - ) { - Ok(_) => {} - Err(err) => return Err(err), + .await; + + let (returned_key, value) = match get_result { + Ok(ContractHandlerEvent::GetResponse { key, response }) => (key, response), + _ => { + return try_forward_or_return( + id, + key, + (htl, fetch_contract), + (this_peer, *sender), + skip_list, + op_manager, + stats, + ) + .await; } + }; - tracing::debug!(tx = %id, "Contract {returned_key} found @ peer {}", target.peer); - - match self.state { - Some(GetState::AwaitingResponse { requester, .. }) => { - if let Some(requester) = requester { - tracing::debug!(tx = %id, "Returning contract {} to {}", key, sender.peer); - new_state = None; - let value = match value { - Ok(res) => res, - Err(err) => { - tracing::error!(tx = %id, "error: {err}"); - return Err(OpError::ExecutorError(err)); - } - }; - return_msg = Some(GetMsg::ReturnGet { - id, - key, - value, - sender: *target, - target: requester, - skip_list: skip_list.clone(), - }); - } else { - tracing::debug!( - tx = %id, - "Completed operation, get response received for contract {key}" - ); - // Completed op - new_state = None; - return_msg = None; - } - } - Some(GetState::ReceivedRequest) => { - tracing::debug!(tx = %id, "Returning contract {} to {}", key, sender.peer); + tracing::debug!(tx = %id, "Contract {returned_key} found @ peer {}", target.peer); + + match self.state { + Some(GetState::AwaitingResponse { requester, .. }) => { + if let Some(requester) = requester { new_state = None; let value = match value { Ok(res) => res, - Err(err) => { - tracing::error!(tx = %id, "error: {err}"); - return Err(OpError::ExecutorError(err)); + Err(error) => { + tracing::error!(tx = %id, %error, "Error while getting contract from storage"); + return Err(OpError::ExecutorError(error)); } }; + tracing::debug!(tx = %id, "Returning contract {} to {}", key, sender.peer); return_msg = Some(GetMsg::ReturnGet { id, key, value, sender: *target, - target: *sender, + target: requester, skip_list: skip_list.clone(), }); + } else { + tracing::debug!( + tx = %id, + "Completed operation, get response received for contract {key}" + ); + // Completed op + new_state = None; + return_msg = None; } - _ => return Err(OpError::invalid_transition(self.id)), - }; - } else { - return Err(OpError::invalid_transition(id)); + } + Some(GetState::ReceivedRequest) => { + new_state = None; + let value = match value { + Ok(res) => res, + Err(error) => { + tracing::error!(tx = %id, %error, "Error while getting contract from storage"); + return Err(OpError::ExecutorError(error)); + } + }; + tracing::debug!(tx = %id, "Returning contract {} to {}", key, sender.peer); + return_msg = Some(GetMsg::ReturnGet { + id, + key, + value, + sender: *target, + target: *sender, + skip_list: skip_list.clone(), + }); + } + _ => return Err(OpError::invalid_transition(self.id)), } } GetMsg::ReturnGet { @@ -668,6 +628,7 @@ impl Operation for GetOp { new_state = None; return_msg = None; result = Some(GetResult { + key: key.clone(), state: value.clone(), contract: contract.clone(), }); @@ -676,6 +637,7 @@ impl Operation for GetOp { new_state = None; return_msg = None; result = Some(GetResult { + key: key.clone(), state: value.clone(), contract: contract.clone(), }); @@ -726,49 +688,90 @@ fn build_op_result( }) } -async fn continue_seeking( - conn_manager: &mut NB, - new_target: &PeerKeyLocation, - retry_msg: NetMessage, -) -> Result<(), OpError> { - tracing::debug!( - tx = %retry_msg.id(), - "Forwarding get request to {}", - new_target.peer +async fn try_forward_or_return( + id: Transaction, + key: ContractKey, + (htl, fetch_contract): (usize, bool), + (this_peer, sender): (PeerKeyLocation, PeerKeyLocation), + skip_list: &[PeerId], + op_manager: &OpManager, + stats: Option, +) -> Result { + tracing::warn!( + tx = %id, + %key, + this_peer = %this_peer.peer, + "Contract not found while processing a get request", ); - conn_manager.send(&new_target.peer, retry_msg).await?; - Ok(()) -} -fn check_contract_found( - key: ContractKey, - id: Transaction, - fetch_contract: bool, - value: &Result, - returned_key: ContractKey, -) -> Result<(), OpError> { - if returned_key != key { - // shouldn't be a reachable path - tracing::error!( + let mut new_skip_list = skip_list.to_vec(); + new_skip_list.push(this_peer.peer); + + let new_htl = htl - 1; + if new_htl == 0 { + tracing::warn!( tx = %id, - "contract retrieved ({}) and asked ({}) are not the same", - returned_key, - key + sender = %sender.peer, + "The maximum hops has been exceeded, sending error \ + back to the node", ); - return Err(OpError::invalid_transition(id)); - } - match &value { - Ok(StoreResponse { - state: None, - contract: None, - }) => Err(OpError::ContractError(ContractError::ContractNotFound(key))), - Ok(StoreResponse { - state: Some(_), - contract: None, - }) if fetch_contract => Err(OpError::ContractError(ContractError::ContractNotFound(key))), - _ => Ok(()), + return build_op_result( + id, + None, + Some(GetMsg::ReturnGet { + key, + id, + value: StoreResponse { + state: None, + contract: None, + }, + sender: op_manager.ring.own_location(), + target: sender, // return to requester + skip_list: new_skip_list, + }), + None, + stats, + ); } + + let Some(new_target) = op_manager + .ring + .closest_potentially_caching(&key, [&sender.peer].as_slice()) + else { + tracing::warn!( + tx = %id, + %key, + this_peer = %this_peer.peer, + "No other peers found while trying getting contract", + ); + return Err(OpError::RingError(RingError::NoCachingPeers(key))); + }; + + tracing::debug!( + tx = %id, + "Forwarding get request to {}", + new_target.peer + ); + build_op_result( + id, + Some(GetState::AwaitingResponse { + requester: Some(sender), + retries: 0, + fetch_contract, + }), + Some(GetMsg::SeekNode { + id, + key, + fetch_contract, + sender: this_peer, + target: new_target, + htl: new_htl, + skip_list: new_skip_list, + }), + None, + stats, + ) } mod messages { diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 83f61ef71..e428c8423 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -7,12 +7,16 @@ use std::pin::Pin; use std::time::Instant; pub(crate) use self::messages::PutMsg; -use freenet_stdlib::prelude::*; +use freenet_stdlib::{ + client_api::{ErrorKind, HostResponse}, + prelude::*, +}; use futures::future::BoxFuture; use futures::FutureExt; use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::{ + client_events::HostResult, contract::ContractHandlerEvent, message::{InnerMessage, NetMessage, Transaction}, node::{NetworkBridge, OpManager, PeerId}, @@ -60,6 +64,7 @@ impl PutOp { .as_ref() .map(|s| matches!(s.step, RecordingStats::Completed)) .unwrap_or(false) + || matches!(self.state, Some(PutState::Finished { .. })) } pub(super) fn record_transfer(&mut self) { @@ -79,6 +84,19 @@ impl PutOp { } } } + + pub(super) fn to_host_result(&self) -> HostResult { + if let Some(PutState::Finished { key }) = &self.state { + Ok(HostResponse::ContractResponse( + freenet_stdlib::client_api::ContractResponse::PutResponse { key: key.clone() }, + )) + } else { + Err(ErrorKind::OperationError { + cause: "put didn't finish successfully".into(), + } + .into()) + } + } } struct PutStats { @@ -422,10 +440,15 @@ impl Operation for PutOp { .within_subscribing_distance(&Location::from(&key)) { tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching"); - start_subscription(op_manager, key.clone()).await; + super::start_subscription(op_manager, key.clone()).await; } - tracing::debug!(tx = %id, "Successfully updated value for {key}"); - new_state = None; + tracing::info!( + tx = %id, + %key, + this_peer = %op_manager.ring.peer_key, + "Peer completed contract value put", + ); + new_state = Some(PutState::Finished { key }); if let Some(upstream) = upstream { return_msg = Some(PutMsg::SuccessfulPut { id: *id, @@ -437,10 +460,6 @@ impl Operation for PutOp { } _ => return Err(OpError::invalid_transition(self.id)), }; - tracing::info!( - this_peer = %op_manager.ring.peer_key, - "Peer completed contract value put", - ); } PutMsg::PutForward { id, @@ -575,14 +594,6 @@ fn build_op_result( }) } -#[inline] -async fn start_subscription(op_manager: &OpManager, key: ContractKey) { - let op = super::subscribe::start_op(key.clone()); - if let Err(error) = super::subscribe::request_subscribe(op_manager, op).await { - tracing::warn!(%error, "Error subscribing to contract"); - } -} - async fn try_to_broadcast( id: Transaction, last_hop: bool, @@ -653,10 +664,7 @@ pub(crate) fn start_op( ) -> PutOp { let key = contract.key(); let contract_location = Location::from(&key); - tracing::debug!( - "Requesting put to contract {} @ loc({contract_location})", - key, - ); + tracing::debug!(%contract_location, %key, "Requesting put"); let id = Transaction::new::(); // let payload_size = contract.data().len(); @@ -681,7 +689,7 @@ pub(crate) fn start_op( } } -enum PutState { +pub enum PutState { ReceivedRequest, PrepareRequest { contract: ContractContainer, @@ -694,6 +702,9 @@ enum PutState { upstream: Option, }, BroadcastOngoing, + Finished { + key: ContractKey, + }, } /// Request to insert/update a value into a contract. diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 40a854bcc..59fdb88cb 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -1,12 +1,16 @@ use std::future::Future; use std::pin::Pin; -use freenet_stdlib::prelude::*; +use freenet_stdlib::{ + client_api::{ErrorKind, HostResponse}, + prelude::*, +}; use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::{ + client_events::HostResult, contract::ContractError, message::{InnerMessage, NetMessage, Transaction}, node::{NetworkBridge, OpManager, PeerId}, @@ -117,6 +121,17 @@ impl SubscribeOp { } pub(super) fn record_transfer(&mut self) {} + + pub(super) fn to_host_result(&self) -> HostResult { + if let Some(SubscribeState::Completed {}) = self.state { + Ok(HostResponse::Ok) + } else { + Err(ErrorKind::OperationError { + cause: "subscribe didn't finish successfully".into(), + } + .into()) + } + } } impl Operation for SubscribeOp { diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 342161a50..f5e3f0dac 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -5,7 +5,7 @@ use futures::future::BoxFuture; use super::{OpError, OpOutcome, Operation}; use crate::{ - client_events::ClientId, + client_events::{ClientId, HostResult}, node::{NetworkBridge, OpManager}, }; @@ -23,6 +23,10 @@ impl UpdateOp { } pub fn record_transfer(&mut self) {} + + pub(super) fn to_host_result(&self) -> HostResult { + todo!() + } } pub(crate) struct UpdateResult {} From 18e51b1bfbdd21ecb279e01db6e2621f34efebdb Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Fri, 15 Dec 2023 13:21:49 +0100 Subject: [PATCH 5/7] Only subscribe ong ets after successful get if it applies --- crates/core/src/client_events.rs | 26 ++++++++++------ crates/core/src/operations.rs | 26 +++++++++++++--- crates/core/src/operations/get.rs | 51 +++++++++++++++---------------- crates/core/src/operations/put.rs | 2 +- 4 files changed, 64 insertions(+), 41 deletions(-) diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index 037d8bdc9..ab4d48495 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -1,7 +1,7 @@ //! Clients events related logic and type definitions. For example, receival of client events from applications throught the HTTP gateway. use freenet_stdlib::client_api::ClientRequest; -use freenet_stdlib::client_api::{ClientError, HostResponse}; +use freenet_stdlib::client_api::{ClientError, ContractResponse, HostResponse}; use futures::future::BoxFuture; use std::fmt::Debug; use std::fmt::Display; @@ -300,8 +300,18 @@ pub(crate) mod test { fn send( &mut self, _id: ClientId, - _response: Result, + response: Result, ) -> BoxFuture<'_, Result<(), ClientError>> { + if let Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, .. + })) = response + { + self.internal_state + .as_mut() + .expect("state should be set") + .owns_contracts + .insert(key); + } async { Ok(()) }.boxed() } } @@ -347,18 +357,17 @@ pub(crate) mod test { if state.max_contract_num <= state.existing_contracts.len() { continue; } + if !for_this_peer { + continue; + } let contract = self.gen_contract_container(); let request = ContractRequest::Put { contract: contract.clone(), state: WrappedState::new(self.random_byte_vec()), related_contracts: RelatedContracts::new(), }; - let key = contract.key(); state.existing_contracts.push(contract); - if for_this_peer { - state.owns_contracts.insert(key); - return Some(request.into()); - } + return Some(request.into()); } val if (10..35).contains(&val) => { if let Some(contract) = self.choose(&state.existing_contracts) { @@ -366,10 +375,9 @@ pub(crate) mod test { continue; } let key = contract.key(); - let fetch_contract = state.owns_contracts.contains(&key); let request = ContractRequest::Get { key, - fetch_contract, + fetch_contract: true, }; return Some(request.into()); } diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 51843ee71..90e41fd44 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -299,10 +299,26 @@ impl From> for OpError { } } -#[inline] -async fn start_subscription(op_manager: &OpManager, key: freenet_stdlib::prelude::ContractKey) { - let op = subscribe::start_op(key.clone()); - if let Err(error) = subscribe::request_subscribe(op_manager, op).await { - tracing::warn!(%error, "Error subscribing to contract"); +/// If the contract is not found, it will try to get it first if the `try_get` parameter is set. +async fn start_subscription( + op_manager: &OpManager, + key: freenet_stdlib::prelude::ContractKey, + try_get: bool, +) { + let sub_op = subscribe::start_op(key.clone()); + if let Err(error) = subscribe::request_subscribe(op_manager, sub_op).await { + if !try_get { + tracing::warn!(%error, "Error subscribing to contract"); + return; + } + if let OpError::ContractError(ContractError::ContractNotFound(_)) = &error { + tracing::debug!(%key, "Contract not found, trying to get it first"); + let get_op = get::start_op(key, true); + if let Err(error) = get::request_get(op_manager, get_op).await { + tracing::warn!(%error, "Error getting contract"); + } + } else { + tracing::warn!(%error, "Error subscribing to contract"); + } } } diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 47fa7ca1e..ad8d0349f 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -362,20 +362,10 @@ impl Operation for GetOp { let fetch_contract = *fetch_contract; let this_peer = *target; - let is_subscribed_contract = op_manager.ring.is_subscribed_to_contract(&key); if let Some(s) = stats.as_mut() { s.next_peer = Some(this_peer); } - if !is_subscribed_contract - && op_manager - .ring - .within_subscribing_distance(&Location::from(&key)) - { - tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching"); - super::start_subscription(op_manager, key.clone()).await; - } - let get_result = op_manager .notify_contract_handler(ContractHandlerEvent::GetQuery { key: key.clone(), @@ -599,23 +589,32 @@ impl Operation for GetOp { return Err(OpError::StatePushed); } - let res = op_manager - .notify_contract_handler(ContractHandlerEvent::PutQuery { - key: key.clone(), - state: value.clone(), - related_contracts: RelatedContracts::default(), - contract: contract.clone(), - }) - .await?; - match res { - ContractHandlerEvent::PutResponse { new_value: Ok(_) } => {} - ContractHandlerEvent::PutResponse { - new_value: Err(err), - } => { - tracing::debug!(tx = %id, error = %err, "Failed put at executor"); - return Err(OpError::ExecutorError(err)); + let is_subscribed_contract = op_manager.ring.is_subscribed_to_contract(&key); + if !is_subscribed_contract + && op_manager + .ring + .within_subscribing_distance(&Location::from(&key)) + { + tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching"); + let res = op_manager + .notify_contract_handler(ContractHandlerEvent::PutQuery { + key: key.clone(), + state: value.clone(), + related_contracts: RelatedContracts::default(), + contract: contract.clone(), + }) + .await?; + match res { + ContractHandlerEvent::PutResponse { new_value: Ok(_) } => {} + ContractHandlerEvent::PutResponse { + new_value: Err(err), + } => { + tracing::debug!(tx = %id, error = %err, "Failed put at executor"); + return Err(OpError::ExecutorError(err)); + } + _ => unreachable!(), } - _ => unreachable!(), + super::start_subscription(op_manager, key.clone(), false).await; } match self.state { diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index e428c8423..21f7e8a6e 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -440,7 +440,7 @@ impl Operation for PutOp { .within_subscribing_distance(&Location::from(&key)) { tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching"); - super::start_subscription(op_manager, key.clone()).await; + super::start_subscription(op_manager, key.clone(), true).await; } tracing::info!( tx = %id, From f145b71ce860713153f8120fefd7089e4a510a6f Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Fri, 15 Dec 2023 15:54:46 +0100 Subject: [PATCH 6/7] Only do subscritions after get op if a put was successful previously --- crates/core/src/operations/get.rs | 5 +++-- stdlib | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index ad8d0349f..d07739d31 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -605,7 +605,9 @@ impl Operation for GetOp { }) .await?; match res { - ContractHandlerEvent::PutResponse { new_value: Ok(_) } => {} + ContractHandlerEvent::PutResponse { new_value: Ok(_) } => { + super::start_subscription(op_manager, key.clone(), false).await; + } ContractHandlerEvent::PutResponse { new_value: Err(err), } => { @@ -614,7 +616,6 @@ impl Operation for GetOp { } _ => unreachable!(), } - super::start_subscription(op_manager, key.clone(), false).await; } match self.state { diff --git a/stdlib b/stdlib index 9b8ff58ae..7729fe328 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit 9b8ff58ae58bffb22aed9f9b41984bb173fc5b2c +Subproject commit 7729fe328f360067a8ac21fa5809ac0bf748198f From a273ce24843459aba2b8d2a17e134f1dbc0fd009 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 17 Dec 2023 20:10:28 +0100 Subject: [PATCH 7/7] Fixes to get and implicit subs so results are sent back --- crates/core/src/client_events.rs | 50 ++++--- crates/core/src/contract/handler.rs | 9 +- crates/core/src/node.rs | 25 +++- crates/core/src/node/testing_impl.rs | 2 +- crates/core/src/operations.rs | 6 +- crates/core/src/operations/get.rs | 185 +++++++++++++++--------- crates/core/src/operations/put.rs | 3 +- crates/core/src/operations/subscribe.rs | 24 ++- 8 files changed, 199 insertions(+), 105 deletions(-) diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index ab4d48495..d243d994e 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -324,7 +324,6 @@ pub(crate) mod test { max_iterations: usize, max_contract_num: usize, owns_contracts: HashSet, - subcribed_contract: HashSet, existing_contracts: Vec, } @@ -337,6 +336,15 @@ pub(crate) mod test { fn choose<'a, T>(&mut self, vec: &'a [T]) -> Option<&'a T>; + fn choose_random_from_iter<'a, T>( + &mut self, + mut iter: impl ExactSizeIterator + 'a, + ) -> Option<&'a T> { + let len = iter.len(); + let idx = self.gen_range(0..len); + iter.nth(idx) + } + /// The goal of this function is to generate a random event that is valid for the current /// global state of the network. /// @@ -353,13 +361,10 @@ pub(crate) mod test { state.current_iteration += 1; let for_this_peer = self.gen_range(0..state.num_peers) == state.this_peer; match self.gen_range(0..100) { - val if (0..10).contains(&val) => { + val if (0..5).contains(&val) => { if state.max_contract_num <= state.existing_contracts.len() { continue; } - if !for_this_peer { - continue; - } let contract = self.gen_contract_container(); let request = ContractRequest::Put { contract: contract.clone(), @@ -367,9 +372,12 @@ pub(crate) mod test { related_contracts: RelatedContracts::new(), }; state.existing_contracts.push(contract); + if !for_this_peer { + continue; + } return Some(request.into()); } - val if (10..35).contains(&val) => { + val if (5..35).contains(&val) => { if let Some(contract) = self.choose(&state.existing_contracts) { if !for_this_peer { continue; @@ -382,7 +390,7 @@ pub(crate) mod test { return Some(request.into()); } } - val if (35..85).contains(&val) => { + val if (35..80).contains(&val) => { if let Some(contract) = self.choose(&state.existing_contracts) { let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec())); if !for_this_peer { @@ -397,19 +405,23 @@ pub(crate) mod test { } } } - val if (85..100).contains(&val) => { - if let Some(contract) = self.choose(&state.existing_contracts) { - let key = contract.key(); - let summary = StateSummary::from(self.random_byte_vec()); - if !for_this_peer || state.subcribed_contract.contains(&key) { - continue; - } - let request = ContractRequest::Subscribe { - key, - summary: Some(summary), - }; - return Some(request.into()); + val if (80..100).contains(&val) => { + let summary = StateSummary::from(self.random_byte_vec()); + + let Some(from_existing) = self.choose(state.existing_contracts.as_slice()) + else { + continue; + }; + + let key = from_existing.key(); + if !for_this_peer { + continue; } + let request = ContractRequest::Subscribe { + key, + summary: Some(summary), + }; + return Some(request.into()); } _ => unreachable!(), } diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index d5df42e5f..32fc7740c 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -365,7 +365,14 @@ impl std::fmt::Display for ContractHandlerEvent { match self { ContractHandlerEvent::PutQuery { key, contract, .. } => { if let Some(contract) = contract { - write!(f, "put query {{ {key}, params: {:?} }}", contract.params()) + use std::fmt::Write; + let mut params = String::new(); + params.push_str("0x"); + for b in contract.params().as_ref().iter().take(8) { + write!(&mut params, "{:02x}", b)?; + } + params.push_str("..."); + write!(f, "put query {{ {key}, params: {params} }}",) } else { write!(f, "put query {{ {key} }}") } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index dbffb9b95..a57402a08 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -331,6 +331,7 @@ async fn client_event_handling( continue; } }; + // fixme: only allow in certain modes (e.g. while testing) if let ClientRequest::Disconnect { cause } = &*req.request { node_controller.send(NodeEvent::Disconnect { cause: cause.clone() }).await.ok(); break; @@ -358,7 +359,7 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc match ops { ContractRequest::Put { @@ -414,7 +415,8 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc { - const TIMEOUT: Duration = Duration::from_secs(10); + const TIMEOUT: Duration = Duration::from_secs(30); + let mut missing_contract = false; let timeout = tokio::time::timeout(TIMEOUT, async { // Initialize a subscribe op. loop { @@ -433,8 +435,9 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc { tracing::warn!("Still waiting for {key} contract"); @@ -442,7 +445,7 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc { tracing::error!("{}", err); - break; + break Err(err); } Ok(()) => { if missing_contract { @@ -451,13 +454,21 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc { + tracing::error!(%key, "Timeout while waiting for contract to start subscription"); + } + Ok(Err(error)) => { + tracing::error!(%key, %error, "Error while subscribing to contract"); + } + Ok(Ok(_)) => { + tracing::debug!(%key, "Started subscription to contract"); + } } } _ => { diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index 598d08edf..f65ae40ef 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -54,7 +54,7 @@ pub fn get_free_port() -> Result { Err(()) } -pub fn get_dynamic_port() -> u16 { +fn get_dynamic_port() -> u16 { const FIRST_DYNAMIC_PORT: u16 = 49152; const LAST_DYNAMIC_PORT: u16 = 65535; rand::thread_rng().gen_range(FIRST_DYNAMIC_PORT..LAST_DYNAMIC_PORT) diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 90e41fd44..b4cec3401 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -300,7 +300,7 @@ impl From> for OpError { } /// If the contract is not found, it will try to get it first if the `try_get` parameter is set. -async fn start_subscription( +async fn start_subscription_request( op_manager: &OpManager, key: freenet_stdlib::prelude::ContractKey, try_get: bool, @@ -311,9 +311,9 @@ async fn start_subscription( tracing::warn!(%error, "Error subscribing to contract"); return; } - if let OpError::ContractError(ContractError::ContractNotFound(_)) = &error { + if let OpError::ContractError(ContractError::ContractNotFound(key)) = &error { tracing::debug!(%key, "Contract not found, trying to get it first"); - let get_op = get::start_op(key, true); + let get_op = get::start_op(key.clone(), true); if let Err(error) = get::request_get(op_manager, get_op).await { tracing::warn!(%error, "Error getting contract"); } diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index d07739d31..7d7503ec2 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -373,8 +373,15 @@ impl Operation for GetOp { }) .await; - let (returned_key, value) = match get_result { - Ok(ContractHandlerEvent::GetResponse { key, response }) => (key, response), + let (returned_key, contract, state) = match get_result { + Ok(ContractHandlerEvent::GetResponse { + key, + response: + Ok(StoreResponse { + state: Some(state), + contract, + }), + }) => (key, contract, state), _ => { return try_forward_or_return( id, @@ -395,18 +402,14 @@ impl Operation for GetOp { Some(GetState::AwaitingResponse { requester, .. }) => { if let Some(requester) = requester { new_state = None; - let value = match value { - Ok(res) => res, - Err(error) => { - tracing::error!(tx = %id, %error, "Error while getting contract from storage"); - return Err(OpError::ExecutorError(error)); - } - }; tracing::debug!(tx = %id, "Returning contract {} to {}", key, sender.peer); return_msg = Some(GetMsg::ReturnGet { id, key, - value, + value: StoreResponse { + state: Some(state), + contract, + }, sender: *target, target: requester, skip_list: skip_list.clone(), @@ -423,18 +426,14 @@ impl Operation for GetOp { } Some(GetState::ReceivedRequest) => { new_state = None; - let value = match value { - Ok(res) => res, - Err(error) => { - tracing::error!(tx = %id, %error, "Error while getting contract from storage"); - return Err(OpError::ExecutorError(error)); - } - }; tracing::debug!(tx = %id, "Returning contract {} to {}", key, sender.peer); return_msg = Some(GetMsg::ReturnGet { id, key, - value, + value: StoreResponse { + state: Some(state), + contract, + }, sender: *target, target: *sender, skip_list: skip_list.clone(), @@ -446,11 +445,7 @@ impl Operation for GetOp { GetMsg::ReturnGet { id, key, - value: - StoreResponse { - state: None, - contract: None, - }, + value: StoreResponse { state: None, .. }, sender, target, skip_list, @@ -561,13 +556,6 @@ impl Operation for GetOp { sender.peer ); - let op = GetOp { - id, - state: self.state, - result: None, - stats, - }; - let mut new_skip_list = skip_list.clone(); new_skip_list.push(sender.peer); op_manager @@ -583,65 +571,125 @@ impl Operation for GetOp { target: *target, skip_list: new_skip_list, }), - OpEnum::Get(op), + OpEnum::Get(GetOp { + id, + state: self.state, + result: None, + stats, + }), ) .await?; return Err(OpError::StatePushed); } - let is_subscribed_contract = op_manager.ring.is_subscribed_to_contract(&key); - if !is_subscribed_contract - && op_manager - .ring - .within_subscribing_distance(&Location::from(&key)) - { - tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching"); + let is_original_requester = matches!( + self.state, + Some(GetState::AwaitingResponse { + requester: None, + .. + }) + ); + let should_subscribe = op_manager + .ring + .within_subscribing_distance(&Location::from(&key)); + let should_put = is_original_requester || should_subscribe; + + if should_put { let res = op_manager .notify_contract_handler(ContractHandlerEvent::PutQuery { key: key.clone(), state: value.clone(), - related_contracts: RelatedContracts::default(), + related_contracts: RelatedContracts::default(), // fixme: i think we need to get the related contracts so the final put is ok contract: contract.clone(), }) .await?; match res { ContractHandlerEvent::PutResponse { new_value: Ok(_) } => { - super::start_subscription(op_manager, key.clone(), false).await; + let is_subscribed_contract = + op_manager.ring.is_subscribed_to_contract(&key); + if !is_subscribed_contract && should_subscribe { + tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching"); + super::start_subscription_request( + op_manager, + key.clone(), + false, + ) + .await; + } } ContractHandlerEvent::PutResponse { new_value: Err(err), } => { - tracing::debug!(tx = %id, error = %err, "Failed put at executor"); - return Err(OpError::ExecutorError(err)); + if is_original_requester { + tracing::debug!(tx = %id, error = %err, "Failed put at executor"); + return Err(OpError::ExecutorError(err)); + } else { + let mut new_skip_list = skip_list.clone(); + new_skip_list.push(sender.peer); + + op_manager + .notify_op_change( + NetMessage::from(GetMsg::ReturnGet { + id, + key, + value: StoreResponse { + state: None, + contract: None, + }, + sender: *sender, + target: *target, + skip_list: new_skip_list, + }), + OpEnum::Get(GetOp { + id, + state: self.state, + result: None, + stats, + }), + ) + .await?; + return Err(OpError::StatePushed); + } } _ => unreachable!(), } } match self.state { - Some(GetState::AwaitingResponse { fetch_contract, .. }) => { - if fetch_contract && contract.is_none() { - tracing::error!( - tx = %id, - "Get response received for contract {key}, but the contract wasn't returned" - ); - new_state = None; - return_msg = None; - result = Some(GetResult { - key: key.clone(), - state: value.clone(), - contract: contract.clone(), - }); - } else { - tracing::info!(tx = %id, %key, "Get response received for contract"); - new_state = None; - return_msg = None; - result = Some(GetResult { - key: key.clone(), - state: value.clone(), + Some(GetState::AwaitingResponse { + requester: None, .. + }) => { + tracing::info!(tx = %id, %key, "Get response received for contract at original requester"); + new_state = None; + return_msg = None; + result = Some(GetResult { + key: key.clone(), + state: value.clone(), + contract: contract.clone(), + }); + } + Some(GetState::AwaitingResponse { + requester: Some(requester), + .. + }) => { + tracing::info!(tx = %id, %key, "Get response received for contract at hop peer"); + new_state = None; + return_msg = Some(GetMsg::ReturnGet { + id, + key: key.clone(), + value: StoreResponse { + state: Some(value.clone()), contract: contract.clone(), - }); - } + }, + sender: *target, + target: requester, + skip_list: skip_list.clone(), + }); + result = Some(GetResult { + key: key.clone(), + state: value.clone(), + contract: contract.clone(), + }); } Some(GetState::ReceivedRequest) => { tracing::info!(tx = %id, "Returning contract {} to {}", key, sender.peer); @@ -650,18 +698,17 @@ impl Operation for GetOp { id, key, value: StoreResponse { - state: None, - contract: None, + state: Some(value.clone()), + contract: contract.clone(), }, - sender: *sender, - target: *target, + sender: *target, + target: *sender, skip_list: skip_list.clone(), }); } _ => return Err(OpError::invalid_transition(self.id)), }; } - _ => return Err(OpError::UnexpectedOpState), } build_op_result(self.id, new_state, return_msg, result, stats) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 21f7e8a6e..4cc843c42 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -440,7 +440,8 @@ impl Operation for PutOp { .within_subscribing_distance(&Location::from(&key)) { tracing::debug!(tx = %id, %key, peer = %op_manager.ring.peer_key, "Contract not cached @ peer, caching"); - super::start_subscription(op_manager, key.clone(), true).await; + super::start_subscription_request(op_manager, key.clone(), true) + .await; } tracing::info!( tx = %id, diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 59fdb88cb..05699b035 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -65,10 +65,26 @@ pub(crate) async fn request_subscribe( sub_op: SubscribeOp, ) -> Result<(), OpError> { let (target, _id) = if let Some(SubscribeState::PrepareRequest { id, key }) = &sub_op.state { - if !op_manager.ring.is_subscribed_to_contract(key) { - return Err(OpError::ContractError(ContractError::ContractNotFound( - key.clone(), - ))); + match op_manager + .notify_contract_handler(crate::contract::ContractHandlerEvent::GetQuery { + key: key.clone(), + fetch_contract: false, + }) + .await? + { + crate::contract::ContractHandlerEvent::GetResponse { + response: Ok(crate::contract::StoreResponse { state: Some(_), .. }), + .. + } => {} + crate::contract::ContractHandlerEvent::GetResponse { + key, + response: Ok(crate::contract::StoreResponse { state: None, .. }), + } => { + return Err(OpError::ContractError(ContractError::ContractNotFound( + key.clone(), + ))); + } + _ => return Err(OpError::UnexpectedOpState), } const EMPTY: &[PeerId] = &[]; (