Skip to content

Commit

Permalink
Save dropped PoolSv2 downstreams
Browse files Browse the repository at this point in the history
This commit adds a new property to the `Running` state called
`DroppedDownstreams` that saves a vector of `Vec<u32>`, u32 referring to
the `downstream_id`.
  • Loading branch information
jbesraa committed Sep 16, 2024
1 parent b49416c commit 1b4cafa
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
47 changes: 45 additions & 2 deletions roles/pool/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,23 @@ use tracing::{error, info, warn};

use tokio::select;

#[derive(Debug, Clone, PartialEq)]
pub struct DroppedDownstreams(Vec<u32>);

impl DroppedDownstreams {
pub fn new() -> DroppedDownstreams {
DroppedDownstreams(vec![])
}
pub fn push(&mut self, id: u32) {
self.0.push(id);
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum PoolState {
Initial,
Running,
Running(DroppedDownstreams),
DownstreamInstanceDropped,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -65,10 +78,13 @@ impl PoolSv2 {
status::Sender::DownstreamListener(status_tx),
);
// Set the state to running
let _ = self.state.safe_lock(|s| *s = PoolState::Running);
let _ = self
.state
.safe_lock(|s| *s = PoolState::Running(DroppedDownstreams(vec![])));

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
let internal_state = self.state.clone();
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
Expand Down Expand Up @@ -109,8 +125,35 @@ impl PoolSv2 {
.safe_lock(|p| p.remove_downstream(downstream_id))
.is_err()
{
error!(
"Failed to remove downstream instance {}, shutting down.",
downstream_id
);
break Ok(());
}
// Add the downstream id to the list of removed downstreams
internal_state
.safe_lock(|s| {
let mut current = s.clone();
match current {
PoolState::Running(ref mut removed) => {
let new = DroppedDownstreams(
removed
.0
.iter()
.cloned()
.chain(std::iter::once(downstream_id))
.collect(),
);
*s = PoolState::Running(new);
}
_ => {
// This should.. never happen
warn!("Downstream instance dropped but pool is not running");
}
}
})
.expect("Failed to set state to DownstreamInstanceDropped");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion roles/tests-integration/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ pub async fn start_template_provider_and_pool() -> Result<(PoolSv2, u16, Templat
}
}
let state = pool.state().await.safe_lock(|s| s.clone()).unwrap();
assert_eq!(state, pool_sv2::PoolState::Running);
assert_eq!(state, pool_sv2::PoolState::Running(pool_sv2::DroppedDownstreams::new()));
template_provider.stop();
Ok((
pool,
Expand Down

0 comments on commit 1b4cafa

Please sign in to comment.