Skip to content

Commit

Permalink
Fixes to get and implicit subs so results are sent back
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Dec 18, 2023
1 parent f145b71 commit a273ce2
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 105 deletions.
50 changes: 31 additions & 19 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ pub(crate) mod test {
max_iterations: usize,
max_contract_num: usize,
owns_contracts: HashSet<ContractKey>,
subcribed_contract: HashSet<ContractKey>,
existing_contracts: Vec<ContractContainer>,
}

Expand All @@ -337,6 +336,15 @@ pub(crate) mod test {

fn choose<'a, T>(&mut self, vec: &'a [T]) -> Option<&'a T>;

fn choose_random_from_iter<'a, T>(
&mut self,
mut iter: impl ExactSizeIterator<Item = &'a T> + 'a,
) -> Option<&'a T> {
let len = iter.len();
let idx = self.gen_range(0..len);
iter.nth(idx)
}

/// The goal of this function is to generate a random event that is valid for the current
/// global state of the network.
///
Expand All @@ -353,23 +361,23 @@ pub(crate) mod test {
state.current_iteration += 1;
let for_this_peer = self.gen_range(0..state.num_peers) == state.this_peer;
match self.gen_range(0..100) {
val if (0..10).contains(&val) => {
val if (0..5).contains(&val) => {
if state.max_contract_num <= state.existing_contracts.len() {
continue;
}
if !for_this_peer {
continue;
}
let contract = self.gen_contract_container();
let request = ContractRequest::Put {
contract: contract.clone(),
state: WrappedState::new(self.random_byte_vec()),
related_contracts: RelatedContracts::new(),
};
state.existing_contracts.push(contract);
if !for_this_peer {
continue;
}
return Some(request.into());
}
val if (10..35).contains(&val) => {
val if (5..35).contains(&val) => {
if let Some(contract) = self.choose(&state.existing_contracts) {
if !for_this_peer {
continue;
Expand All @@ -382,7 +390,7 @@ pub(crate) mod test {
return Some(request.into());
}
}
val if (35..85).contains(&val) => {
val if (35..80).contains(&val) => {
if let Some(contract) = self.choose(&state.existing_contracts) {
let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
if !for_this_peer {
Expand All @@ -397,19 +405,23 @@ pub(crate) mod test {
}
}
}
val if (85..100).contains(&val) => {
if let Some(contract) = self.choose(&state.existing_contracts) {
let key = contract.key();
let summary = StateSummary::from(self.random_byte_vec());
if !for_this_peer || state.subcribed_contract.contains(&key) {
continue;
}
let request = ContractRequest::Subscribe {
key,
summary: Some(summary),
};
return Some(request.into());
val if (80..100).contains(&val) => {
let summary = StateSummary::from(self.random_byte_vec());

let Some(from_existing) = self.choose(state.existing_contracts.as_slice())
else {
continue;
};

let key = from_existing.key();
if !for_this_peer {
continue;
}
let request = ContractRequest::Subscribe {
key,
summary: Some(summary),
};
return Some(request.into());
}
_ => unreachable!(),
}
Expand Down
9 changes: 8 additions & 1 deletion crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,14 @@ impl std::fmt::Display for ContractHandlerEvent {
match self {
ContractHandlerEvent::PutQuery { key, contract, .. } => {
if let Some(contract) = contract {
write!(f, "put query {{ {key}, params: {:?} }}", contract.params())
use std::fmt::Write;
let mut params = String::new();
params.push_str("0x");
for b in contract.params().as_ref().iter().take(8) {
write!(&mut params, "{:02x}", b)?;
}
params.push_str("...");
write!(f, "put query {{ {key}, params: {params} }}",)
} else {
write!(f, "put query {{ {key} }}")
}
Expand Down
25 changes: 18 additions & 7 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ async fn client_event_handling<ClientEv>(
continue;
}
};
// fixme: only allow in certain modes (e.g. while testing)
if let ClientRequest::Disconnect { cause } = &*req.request {
node_controller.send(NodeEvent::Disconnect { cause: cause.clone() }).await.ok();
break;
Expand Down Expand Up @@ -358,7 +359,7 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
let fut = async move {
let client_id = request.client_id;

let mut missing_contract = false;
// fixme: communicate back errors in this loop to the client somehow
match *request.request {
ClientRequest::ContractOp(ops) => match ops {
ContractRequest::Put {
Expand Down Expand Up @@ -414,7 +415,8 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
}
}
ContractRequest::Subscribe { key, .. } => {
const TIMEOUT: Duration = Duration::from_secs(10);
const TIMEOUT: Duration = Duration::from_secs(30);
let mut missing_contract = false;
let timeout = tokio::time::timeout(TIMEOUT, async {
// Initialize a subscribe op.
loop {
Expand All @@ -433,16 +435,17 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
if let Err(error) = get::request_get(&op_manager, get_op).await
{
tracing::error!(%key, %error, "Failed getting the contract while previously trying to subscribe; bailing");
break;
break Err(error);
}
continue;
}
Err(OpError::ContractError(ContractError::ContractNotFound(_))) => {
tracing::warn!("Still waiting for {key} contract");
tokio::time::sleep(Duration::from_secs(2)).await
}
Err(err) => {
tracing::error!("{}", err);
break;
break Err(err);
}
Ok(()) => {
if missing_contract {
Expand All @@ -451,13 +454,21 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
);
}
tracing::debug!(%key, "Starting subscribe request");
break;
break Ok(());
}
}
}
});
if timeout.await.is_err() {
tracing::error!(%key, "Timeout while waiting for contract");
match timeout.await {
Err(_) => {
tracing::error!(%key, "Timeout while waiting for contract to start subscription");
}
Ok(Err(error)) => {
tracing::error!(%key, %error, "Error while subscribing to contract");
}
Ok(Ok(_)) => {
tracing::debug!(%key, "Started subscription to contract");
}
}
}
_ => {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn get_free_port() -> Result<u16, ()> {
Err(())
}

pub fn get_dynamic_port() -> u16 {
fn get_dynamic_port() -> u16 {
const FIRST_DYNAMIC_PORT: u16 = 49152;
const LAST_DYNAMIC_PORT: u16 = 65535;
rand::thread_rng().gen_range(FIRST_DYNAMIC_PORT..LAST_DYNAMIC_PORT)
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl<T> From<SendError<T>> for OpError {
}

/// If the contract is not found, it will try to get it first if the `try_get` parameter is set.
async fn start_subscription(
async fn start_subscription_request(
op_manager: &OpManager,
key: freenet_stdlib::prelude::ContractKey,
try_get: bool,
Expand All @@ -311,9 +311,9 @@ async fn start_subscription(
tracing::warn!(%error, "Error subscribing to contract");
return;
}
if let OpError::ContractError(ContractError::ContractNotFound(_)) = &error {
if let OpError::ContractError(ContractError::ContractNotFound(key)) = &error {
tracing::debug!(%key, "Contract not found, trying to get it first");
let get_op = get::start_op(key, true);
let get_op = get::start_op(key.clone(), true);
if let Err(error) = get::request_get(op_manager, get_op).await {
tracing::warn!(%error, "Error getting contract");
}
Expand Down
Loading

0 comments on commit a273ce2

Please sign in to comment.