From 1b4cafaf7bcbe55d8b58c94d4ea4184ddfcfc53f Mon Sep 17 00:00:00 2001 From: jbesraa Date: Thu, 12 Sep 2024 16:48:23 +0300 Subject: [PATCH] Save dropped `PoolSv2` downstreams This commit adds a new property to the `Running` state called `DroppedDownstreams` that saves a vector of `Vec`, u32 referring to the `downstream_id`. --- roles/pool/src/lib/mod.rs | 47 ++++++++++++++++++++- roles/tests-integration/tests/common/mod.rs | 2 +- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/roles/pool/src/lib/mod.rs b/roles/pool/src/lib/mod.rs index 7e68fc1a78..217f1db7c9 100644 --- a/roles/pool/src/lib/mod.rs +++ b/roles/pool/src/lib/mod.rs @@ -15,10 +15,23 @@ use tracing::{error, info, warn}; use tokio::select; +#[derive(Debug, Clone, PartialEq)] +pub struct DroppedDownstreams(Vec); + +impl DroppedDownstreams { + pub fn new() -> DroppedDownstreams { + DroppedDownstreams(vec![]) + } + pub fn push(&mut self, id: u32) { + self.0.push(id); + } +} + #[derive(Debug, Clone, PartialEq)] pub enum PoolState { Initial, - Running, + Running(DroppedDownstreams), + DownstreamInstanceDropped, } #[derive(Debug, Clone)] @@ -65,10 +78,13 @@ impl PoolSv2 { status::Sender::DownstreamListener(status_tx), ); // Set the state to running - let _ = self.state.safe_lock(|s| *s = PoolState::Running); + let _ = self + .state + .safe_lock(|s| *s = PoolState::Running(DroppedDownstreams(vec![]))); // Start the error handling loop // See `./status.rs` and `utils/error_handling` for information on how this operates + let internal_state = self.state.clone(); loop { let task_status = select! { task_status = status_rx.recv() => task_status, @@ -109,8 +125,35 @@ impl PoolSv2 { .safe_lock(|p| p.remove_downstream(downstream_id)) .is_err() { + error!( + "Failed to remove downstream instance {}, shutting down.", + downstream_id + ); break Ok(()); } + // Add the downstream id to the list of removed downstreams + internal_state + .safe_lock(|s| { + let mut current = s.clone(); + match current { + PoolState::Running(ref mut removed) => { + let new = DroppedDownstreams( + removed + .0 + .iter() + .cloned() + .chain(std::iter::once(downstream_id)) + .collect(), + ); + *s = PoolState::Running(new); + } + _ => { + // This should.. never happen + warn!("Downstream instance dropped but pool is not running"); + } + } + }) + .expect("Failed to set state to DownstreamInstanceDropped"); } } } diff --git a/roles/tests-integration/tests/common/mod.rs b/roles/tests-integration/tests/common/mod.rs index 85f8412f78..0bb7b94ca0 100644 --- a/roles/tests-integration/tests/common/mod.rs +++ b/roles/tests-integration/tests/common/mod.rs @@ -275,7 +275,7 @@ pub async fn start_template_provider_and_pool() -> Result<(PoolSv2, u16, Templat } } let state = pool.state().await.safe_lock(|s| s.clone()).unwrap(); - assert_eq!(state, pool_sv2::PoolState::Running); + assert_eq!(state, pool_sv2::PoolState::Running(pool_sv2::DroppedDownstreams::new())); template_provider.stop(); Ok(( pool,