Skip to content

Commit

Permalink
Fix test issues
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Nov 7, 2023
1 parent 959b855 commit 01f310c
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 404 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ async fn report_result(
})
.unwrap_or_default();
tracing::error!(%tx, ?state, "Wrong state");
eprintln!("{trace}");
eprintln!("Operation error trace:\n{trace}");
}
#[cfg(not(debug_assertions))]
{
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,12 @@ where
"Selecting close peer to forward request",
);
// FIXME: target the `desired_location`
ring.routing(&joiner.location.unwrap(), Some(&req_peer.peer), &skip_list)
.and_then(|pkl| (pkl.peer != joiner.peer).then_some(pkl))
ring.routing(
joiner.location.unwrap(),
Some(&req_peer.peer),
skip_list.as_slice(),
)
.and_then(|pkl| (pkl.peer != joiner.peer).then_some(pkl))
};

if let Some(forward_to) = forward_to {
Expand Down
23 changes: 14 additions & 9 deletions crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl Operation for GetOp {
if htl == 0 {
tracing::warn!(
tx = %id,
"The maximum HOPS number has been exceeded, sending the error \
"The maximum hops has been exceeded, sending the error \
back to the node @ {}",
sender.peer
);
Expand All @@ -275,7 +275,10 @@ impl Operation for GetOp {
}

let new_htl = htl - 1;
let Some(new_target) = op_storage.ring.closest_caching(&key, &[]) else {
let Some(new_target) = op_storage
.ring
.closest_caching(&key, [&sender.peer].as_slice())
else {
tracing::warn!(tx = %id, "No other peers found while trying getting contract {key} @ {}", target.peer);
return Err(OpError::RingError(RingError::NoCachingPeers(key)));
};
Expand Down Expand Up @@ -367,9 +370,10 @@ impl Operation for GetOp {
sender,
target,
} => {
let this_loc = target;
let this_peer = target;
tracing::warn!(
tx = %id,
%this_peer,
"Neither contract or contract value for contract `{}` found at peer {}, \
retrying with other peers",
key,
Expand Down Expand Up @@ -397,7 +401,7 @@ impl Operation for GetOp {
id: *id,
key: key.clone(),
target,
sender: *this_loc,
sender: *this_peer,
fetch_contract,
htl: MAX_GET_RETRY_HOPS,
});
Expand Down Expand Up @@ -697,13 +701,14 @@ pub(crate) async fn request_get(
client_id: Option<ClientId>,
) -> Result<(), OpError> {
let (target, id) = if let Some(GetState::PrepareRequest { key, id, .. }) = &get_op.state {
const EMPTY: &[PeerKey] = &[];
// the initial request must provide:
// - a location in the network where the contract resides
// - and the key of the contract value to get
(
op_storage
.ring
.closest_caching(key, &[])
.closest_caching(key, EMPTY)
.into_iter()
.next()
.ok_or(RingError::EmptyRing)?,
Expand Down Expand Up @@ -901,7 +906,7 @@ mod test {

// trigger get @ node-0, which does not own the contract
sim_nw
.trigger_event("node-0", 1, Some(Duration::from_millis(50)))
.trigger_event("node-0", 1, Some(Duration::from_secs(1)))
.await?;
assert!(sim_nw.has_got_contract("node-0", &key));
Ok(())
Expand Down Expand Up @@ -939,14 +944,15 @@ mod test {

// trigger get @ node-1, which does not own the contract
sim_nw
.trigger_event("node-1", 1, Some(Duration::from_millis(50)))
.trigger_event("node-1", 1, Some(Duration::from_secs(1)))
.await?;
assert!(!sim_nw.has_got_contract("node-1", &key));
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn contract_found_after_retry() -> Result<(), anyhow::Error> {
// crate::config::set_logger();
const NUM_NODES: usize = 2usize;
const NUM_GW: usize = 1usize;

Expand Down Expand Up @@ -1005,9 +1011,8 @@ mod test {
.await;
sim_nw.start_with_spec(get_specs).await;
sim_nw.check_connectivity(Duration::from_secs(3)).await?;

sim_nw
.trigger_event("node-0", 1, Some(Duration::from_millis(200)))
.trigger_event("node-0", 1, Some(Duration::from_secs(1)))
.await?;
assert!(sim_nw.has_got_contract("node-0", &key));
Ok(())
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ pub(crate) async fn request_put(
// - and the value to put
let target = op_storage
.ring
.closest_caching(&key, &[sender.peer])
.closest_caching(&key, [&sender.peer].as_slice())
.into_iter()
.next()
.ok_or(RingError::EmptyRing)?;
Expand Down Expand Up @@ -771,7 +771,8 @@ async fn forward_changes<CB>(
{
let key = contract.key();
let contract_loc = Location::from(&key);
let forward_to = op_storage.ring.closest_caching(&key, &[]);
const EMPTY: &[PeerKey] = &[];
let forward_to = op_storage.ring.closest_caching(&key, EMPTY);
let own_loc = op_storage.ring.own_location().location.expect("infallible");
if let Some(peer) = forward_to {
let other_loc = peer.location.as_ref().expect("infallible");
Expand Down Expand Up @@ -1018,7 +1019,7 @@ mod test {

// trigger the put op @ gw-0
sim_nw
.trigger_event("gateway-0", 1, Some(Duration::from_millis(200)))
.trigger_event("gateway-0", 1, Some(Duration::from_secs(1)))
.await?;
assert!(sim_nw.has_put_contract("gateway-0", &key, &new_value));
assert!(sim_nw.event_listener.contract_broadcasted(&key));
Expand Down
9 changes: 6 additions & 3 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ impl Operation for SubscribeOp {
if !op_storage.ring.is_contract_cached(key) {
tracing::debug!(tx = %id, "Contract {} not found at {}, trying other peer", key, target.peer);

let Some(new_target) = op_storage.ring.closest_caching(key, &[sender.peer])
let Some(new_target) = op_storage
.ring
.closest_caching(key, [&sender.peer].as_slice())
else {
tracing::warn!(tx = %id, "No peer found while trying getting contract {key}");
return Err(OpError::RingError(RingError::NoCachingPeers(key.clone())));
Expand Down Expand Up @@ -345,10 +347,11 @@ pub(crate) async fn request_subscribe(
key.clone(),
)));
}
const EMPTY: &[PeerKey] = &[];
(
op_storage
.ring
.closest_caching(key, &[])
.closest_caching(key, EMPTY)
.into_iter()
.next()
.ok_or_else(|| RingError::NoCachingPeers(key.clone()))?,
Expand Down Expand Up @@ -526,7 +529,7 @@ mod test {
sim_nw.start_with_spec(subscribe_specs).await;
sim_nw.check_connectivity(Duration::from_secs(3)).await?;
sim_nw
.trigger_event("node-1", 1, Some(Duration::from_secs(2)))
.trigger_event("node-1", 1, Some(Duration::from_secs(1)))
.await?;
assert!(sim_nw.has_got_contract("node-1", &contract_key));
tokio::time::sleep(Duration::from_secs(3)).await;
Expand Down
66 changes: 32 additions & 34 deletions crates/core/src/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ impl Ring {
} else if open_conn < self.min_connections {
true
} else if open_conn >= self.max_connections {
tracing::debug!(peer = %self.peer_key, "max open connections reached");
false
} else {
let strategy = if self
Expand All @@ -411,7 +410,7 @@ impl Ring {
self.topology_manager
.write()
.evaluate_new_connection(location, strategy)
.expect("already have > min connections, so neighbors shouldn't be empty")
.unwrap_or(false)
};
if !accepted {
self.open_connections
Expand All @@ -436,8 +435,8 @@ impl Ring {
open_at: Instant::now(),
});
self.location_for_peer.write().insert(peer, loc);
let topology_manager = &mut *self.topology_manager.write();
let current_neighbors = &Self::current_neighbors(&cbl);
let topology_manager = &mut *self.topology_manager.write();
topology_manager
.refresh_cache(current_neighbors)
.expect("current neightbors shouldn't be empty here ever, just added at least one")
Expand All @@ -448,7 +447,7 @@ impl Ring {
pub fn closest_caching(
&self,
contract_key: &ContractKey,
skip_list: &[PeerKey],
skip_list: impl Contains<PeerKey>,
) -> Option<PeerKeyLocation> {
self.routing(Location::from(contract_key), None, skip_list)
}
Expand Down Expand Up @@ -620,15 +619,12 @@ impl Ring {
const REMOVAL_TICK_DURATION: Duration = Duration::from_secs(60 * 5);
#[cfg(test)]
const REMOVAL_TICK_DURATION: Duration = Duration::from_secs(1);
#[cfg(test)]
const ACQUIRE_CONNS_TICK_DURATION: Duration = Duration::from_millis(100);
#[cfg(not(test))]
const ACQUIRE_CONNS_TICK_DURATION: Duration = Duration::from_secs(1);

let mut check_interval = tokio::time::interval(REMOVAL_TICK_DURATION);
check_interval.tick().await;
check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut acquire_max_connections = tokio::time::interval(ACQUIRE_CONNS_TICK_DURATION);
acquire_max_connections.tick().await;
acquire_max_connections.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut missing = BTreeMap::new();

Expand All @@ -645,7 +641,7 @@ impl Ring {
}
Err(sync::mpsc::error::TryRecvError::Empty) => break,
Err(sync::mpsc::error::TryRecvError::Disconnected) => {
tracing::debug!("shutting down connection maintenance");
tracing::debug!("Shutting down connection maintenance");
break 'outer Err("finished".into());
}
}
Expand All @@ -656,16 +652,22 @@ impl Ring {

let open_connections = self
.open_connections
.load(std::sync::atomic::Ordering::Acquire);
if open_connections < self.max_connections && open_connections > self.min_connections {
.load(std::sync::atomic::Ordering::SeqCst);

if open_connections < self.max_connections {
self.fast_acquisition
.store(true, std::sync::atomic::Ordering::Release);
// requires more connections
let ideal_location = {
self.topology_manager
.write()
.get_best_candidate_location()
.expect("we only acquire connections when we have at least the minimum established neighbors already")
let loc = { self.topology_manager.read().get_best_candidate_location() };
match loc {
Ok(loc) => loc,
Err(_) => {
tracing::debug!(peer = %self.own_location(), "Insufficient data gathered by the topology manager");
acquire_max_connections.tick().await;
continue;
}
}
};
self.acquire_new(
ideal_location,
Expand All @@ -674,20 +676,11 @@ impl Ring {
)
.await
.map_err(|error| {
tracing::debug!(?error, "shutting down connection maintenance task");
tracing::debug!(?error, "Shutting down connection maintenance task");
error
})?;
if self
.open_connections
.load(std::sync::atomic::Ordering::Acquire)
< self.max_connections
{
acquire_max_connections.tick().await;
continue;
} else {
check_interval.tick().await;
continue;
}
acquire_max_connections.tick().await;
continue;
}

let mut should_swap = {
Expand All @@ -707,10 +700,15 @@ impl Ring {
self.fast_acquisition
.store(false, std::sync::atomic::Ordering::Release);
let ideal_location = {
self.topology_manager
.write()
.get_best_candidate_location()
.expect("we only swap when we have some established neighbors already")
let loc = { self.topology_manager.read().get_best_candidate_location() };
match loc {
Ok(loc) => loc,
Err(_) => {
tracing::debug!(peer = %self.own_location(), "Insufficient data gathered by the topology manager");
check_interval.tick().await;
continue;
}
}
};
self.acquire_new(
ideal_location,
Expand All @@ -719,7 +717,7 @@ impl Ring {
)
.await
.map_err(|error| {
tracing::warn!(?error, "shutting down connection maintenance task");
tracing::warn!(?error, "Shutting down connection maintenance task");
error
})?;
for peer in should_swap.drain(..) {
Expand All @@ -729,7 +727,7 @@ impl Ring {
)))
.await
.map_err(|error| {
tracing::debug!(?error, "shutting down connection maintenance task");
tracing::debug!(?error, "Shutting down connection maintenance task");
error
})?;
}
Expand Down
Loading

0 comments on commit 01f310c

Please sign in to comment.