Skip to content

Commit

Permalink
Merge pull request #909 from freenet/186548078-op-logic-fixes-2
Browse files Browse the repository at this point in the history
186548078 - Network op logic fixes
  • Loading branch information
iduartgomez authored Dec 18, 2023
2 parents 6ef6048 + a273ce2 commit 025b041
Show file tree
Hide file tree
Showing 21 changed files with 689 additions and 518 deletions.
90 changes: 55 additions & 35 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Clients events related logic and type definitions. For example, receival of client events from applications throught the HTTP gateway.
use freenet_stdlib::client_api::ClientRequest;
use freenet_stdlib::client_api::{ClientError, HostResponse};
use freenet_stdlib::client_api::{ClientError, ContractResponse, HostResponse};
use futures::future::BoxFuture;
use std::fmt::Debug;
use std::fmt::Display;
Expand Down Expand Up @@ -300,8 +300,18 @@ pub(crate) mod test {
fn send(
&mut self,
_id: ClientId,
_response: Result<HostResponse, ClientError>,
response: Result<HostResponse, ClientError>,
) -> BoxFuture<'_, Result<(), ClientError>> {
if let Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key, ..
})) = response
{
self.internal_state
.as_mut()
.expect("state should be set")
.owns_contracts
.insert(key);
}
async { Ok(()) }.boxed()
}
}
Expand All @@ -314,7 +324,6 @@ pub(crate) mod test {
max_iterations: usize,
max_contract_num: usize,
owns_contracts: HashSet<ContractKey>,
subcribed_contract: HashSet<ContractKey>,
existing_contracts: Vec<ContractContainer>,
}

Expand All @@ -327,6 +336,15 @@ pub(crate) mod test {

fn choose<'a, T>(&mut self, vec: &'a [T]) -> Option<&'a T>;

fn choose_random_from_iter<'a, T>(
&mut self,
mut iter: impl ExactSizeIterator<Item = &'a T> + 'a,
) -> Option<&'a T> {
let len = iter.len();
let idx = self.gen_range(0..len);
iter.nth(idx)
}

/// The goal of this function is to generate a random event that is valid for the current
/// global state of the network.
///
Expand All @@ -342,8 +360,8 @@ pub(crate) mod test {
while state.current_iteration < state.max_iterations {
state.current_iteration += 1;
let for_this_peer = self.gen_range(0..state.num_peers) == state.this_peer;
match self.gen_range(0..4) {
0 => {
match self.gen_range(0..100) {
val if (0..5).contains(&val) => {
if state.max_contract_num <= state.existing_contracts.len() {
continue;
}
Expand All @@ -353,56 +371,58 @@ pub(crate) mod test {
state: WrappedState::new(self.random_byte_vec()),
related_contracts: RelatedContracts::new(),
};
let key = contract.key();
state.existing_contracts.push(contract);
if for_this_peer {
state.owns_contracts.insert(key);
return Some(request.into());
}
}
1 => {
if let Some(contract) = self.choose(&state.existing_contracts) {
let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
if !for_this_peer {
continue;
}
let request = ContractRequest::Update {
key: contract.key().clone(),
data: delta,
};
if state.owns_contracts.contains(&contract.key()) {
return Some(request.into());
}
if !for_this_peer {
continue;
}
return Some(request.into());
}
2 => {
val if (5..35).contains(&val) => {
if let Some(contract) = self.choose(&state.existing_contracts) {
if !for_this_peer {
continue;
}
let key = contract.key();
let fetch_contract = state.owns_contracts.contains(&key);
let request = ContractRequest::Get {
key,
fetch_contract,
fetch_contract: true,
};
return Some(request.into());
}
}
3 => {
val if (35..80).contains(&val) => {
if let Some(contract) = self.choose(&state.existing_contracts) {
let key = contract.key();
let summary = StateSummary::from(self.random_byte_vec());
if !for_this_peer || state.subcribed_contract.contains(&key) {
let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
if !for_this_peer {
continue;
}
let request = ContractRequest::Subscribe {
key,
summary: Some(summary),
let request = ContractRequest::Update {
key: contract.key().clone(),
data: delta,
};
return Some(request.into());
if state.owns_contracts.contains(&contract.key()) {
return Some(request.into());
}
}
}
val if (80..100).contains(&val) => {
let summary = StateSummary::from(self.random_byte_vec());

let Some(from_existing) = self.choose(state.existing_contracts.as_slice())
else {
continue;
};

let key = from_existing.key();
if !for_this_peer {
continue;
}
let request = ContractRequest::Subscribe {
key,
summary: Some(summary),
};
return Some(request.into());
}
_ => unreachable!(),
}
}
Expand Down
22 changes: 2 additions & 20 deletions crates/core/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ where
match contract_handler
.executor()
.fetch_contract(key.clone(), fetch_contract)
.instrument(tracing::info_span!("fetch_contract", %key))
.instrument(tracing::info_span!("fetch_contract", %key, %fetch_contract))
.await
{
Ok((state, contract)) => {
tracing::debug!("Fetched contract {key}");
tracing::debug!(with_contract = %fetch_contract, has_contract = %contract.is_some(), "Fetched contract {key}");
contract_handler
.channel()
.send_to_sender(
Expand Down Expand Up @@ -111,24 +111,6 @@ where
error
})?;
}
ContractHandlerEvent::Subscribe { key } => {
let response = contract_handler
.executor()
.subscribe_to_contract(key.clone())
.await;
contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::SubscribeResponse { key, response },
)
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
todo!()
}
_ => unreachable!(),
}
}
Expand Down
18 changes: 10 additions & 8 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::node::OpManager;
))]
use crate::operations::get::GetResult;
use crate::operations::{OpEnum, OpError};
use crate::ring::PeerKeyLocation;
use crate::wasm_runtime::{
ContractRuntimeInterface, ContractStore, DelegateRuntimeInterface, DelegateStore, Runtime,
SecretsStore, StateStore, StateStoreError,
Expand Down Expand Up @@ -220,8 +219,16 @@ impl ExecutorToEventLoopChannel<ExecutorHalve> {
{
let op = message.initiate_op(&self.op_manager);
let tx = *op.id();
self.end.waiting_for_op_tx.send(tx).await?;
<T as ComposeNetworkMessage<Op>>::resume_op(op, &self.op_manager).await?;
self.end.waiting_for_op_tx.send(tx).await.map_err(|e| {
tracing::debug!("failed to send request to executor, channel closed");
e
})?;
<T as ComposeNetworkMessage<Op>>::resume_op(op, &self.op_manager)
.await
.map_err(|e| {
tracing::debug!("failed to resume operation: {e}");
e
})?;
Ok(tx)
}

Expand Down Expand Up @@ -418,11 +425,6 @@ pub(crate) trait ContractExecutor: Send + 'static {
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> Result<WrappedState, ExecutorError>;

async fn subscribe_to_contract(
&mut self,
key: ContractKey,
) -> Result<PeerKeyLocation, ExecutorError>;
}

/// A WASM executor which will run any contracts, delegates, etc. registered.
Expand Down
48 changes: 23 additions & 25 deletions crates/core/src/contract/executor/mock_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl ContractExecutor for Executor<MockRuntime> {
async fn fetch_contract(
&mut self,
key: ContractKey,
_fetch_contract: bool,
fetch_contract: bool,
) -> Result<(WrappedState, Option<ContractContainer>), ExecutorError> {
let Some(parameters) = self
.state_store
Expand All @@ -57,18 +57,21 @@ impl ContractExecutor for Executor<MockRuntime> {
.map_err(ExecutorError::other)?
else {
return Err(ExecutorError::other(format!(
"missing parameters for contract {key}"
"missing state and/or parameters for contract {key}"
)));
};
let contract = if fetch_contract {
self.runtime
.contract_store
.fetch_contract(&key, &parameters)
} else {
None
};
let Ok(state) = self.state_store.get(&key).await else {
return Err(ExecutorError::other(format!(
"missing state for contract {key}"
)));
};
let contract = self
.runtime
.contract_store
.fetch_contract(&key, &parameters);
Ok((state, contract))
}

Expand All @@ -84,39 +87,34 @@ impl ContractExecutor for Executor<MockRuntime> {
&mut self,
key: ContractKey,
state: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
_related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> Result<WrappedState, ExecutorError> {
// todo: instead allow to perform mutations per contract based on incoming value so we can track
// state values over the network
match (state, code) {
(Either::Left(incoming_state), Some(contract)) => {
self.runtime
.contract_store
.store_contract(contract.clone())
.map_err(ExecutorError::other)?;
self.state_store
.store(key, incoming_state.clone(), contract.params().into_owned())
.await
.map_err(ExecutorError::other)?;

let request = PutContract {
contract,
state: incoming_state.clone(),
related_contracts,
};
let _op: Result<operations::put::PutResult, _> = self.op_request(request).await;

return Ok(incoming_state);
}
_ => unreachable!(),
// (Either::Left(_), None) => {
// return Err(ExecutorError::request(RequestError::from(
// StdContractError::Get {
// key: key.clone(),
// cause: "Missing contract or parameters".into(),
// },
// )));
// }
(update, contract) => unreachable!("{update:?}, {contract:?}"),
}
}

async fn subscribe_to_contract(
&mut self,
key: ContractKey,
) -> Result<PeerKeyLocation, ExecutorError> {
let request = SubscribeContract { key };
let result: operations::subscribe::SubscribeResult = self.op_request(request).await?;
Ok(result.subscribed_to)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 025b041

Please sign in to comment.