diff --git a/roles/pool/src/lib/mod.rs b/roles/pool/src/lib/mod.rs index 7e68fc1a78..217f1db7c9 100644 --- a/roles/pool/src/lib/mod.rs +++ b/roles/pool/src/lib/mod.rs @@ -15,10 +15,23 @@ use tracing::{error, info, warn}; use tokio::select; +#[derive(Debug, Clone, PartialEq)] +pub struct DroppedDownstreams(Vec); + +impl DroppedDownstreams { + pub fn new() -> DroppedDownstreams { + DroppedDownstreams(vec![]) + } + pub fn push(&mut self, id: u32) { + self.0.push(id); + } +} + #[derive(Debug, Clone, PartialEq)] pub enum PoolState { Initial, - Running, + Running(DroppedDownstreams), + DownstreamInstanceDropped, } #[derive(Debug, Clone)] @@ -65,10 +78,13 @@ impl PoolSv2 { status::Sender::DownstreamListener(status_tx), ); // Set the state to running - let _ = self.state.safe_lock(|s| *s = PoolState::Running); + let _ = self + .state + .safe_lock(|s| *s = PoolState::Running(DroppedDownstreams(vec![]))); // Start the error handling loop // See `./status.rs` and `utils/error_handling` for information on how this operates + let internal_state = self.state.clone(); loop { let task_status = select! { task_status = status_rx.recv() => task_status, @@ -109,8 +125,35 @@ impl PoolSv2 { .safe_lock(|p| p.remove_downstream(downstream_id)) .is_err() { + error!( + "Failed to remove downstream instance {}, shutting down.", + downstream_id + ); break Ok(()); } + // Add the downstream id to the list of removed downstreams + internal_state + .safe_lock(|s| { + let mut current = s.clone(); + match current { + PoolState::Running(ref mut removed) => { + let new = DroppedDownstreams( + removed + .0 + .iter() + .cloned() + .chain(std::iter::once(downstream_id)) + .collect(), + ); + *s = PoolState::Running(new); + } + _ => { + // This should.. never happen + warn!("Downstream instance dropped but pool is not running"); + } + } + }) + .expect("Failed to set state to DownstreamInstanceDropped"); } } }