From 8e345dbcf9184c5440036015d9a85f6c9a466dcb Mon Sep 17 00:00:00 2001 From: Nick Gerace Date: Wed, 28 Aug 2024 15:30:45 -0400 Subject: [PATCH] Allow ordered task groups for graceful shutdown This commit allows for ordered task groups, which are comprised of shutdown token and a corresponding tracker, when performing graceful shutdown. In other words, when performing graceful shutdown, we will now fully drain task groups in order. If using graceful shutdown functionality from the "si-service" crate, then the timeout condition will apply to all task groups. Future considerations: In the future, we may want to create a manager struct to manage task groups as well as handle the shutdown for them. We may also want to switch from a flat dependency list to a dependency tree in case we want parallelism during shutdown. Using a manager struct could also help with naming task groups. Right now, we log the number of task groups being drained, but they are unnamed. It could be helpful in the future to know the order at which they are drained. Additional changes: This commit also deletes some dead comments as well as increases some shutdown logging to "info" level. In addition to that, we no longer clone the tracker used for creating LayerDB clients; we borrow it. Signed-off-by: Nick Gerace --- bin/pinga/src/main.rs | 27 ++++++++++++----- bin/rebaser/src/main.rs | 27 ++++++++++++----- bin/sdf/src/main.rs | 45 +++++++++++++--------------- bin/veritech/src/main.rs | 18 ++++++----- lib/pinga-server/src/server.rs | 7 +++-- lib/rebaser-server/src/server.rs | 7 +++-- lib/si-layer-cache/src/activities.rs | 2 +- lib/si-layer-cache/src/db.rs | 2 +- lib/si-service/src/shutdown.rs | 22 +++++++++----- 9 files changed, 96 insertions(+), 61 deletions(-) diff --git a/bin/pinga/src/main.rs b/bin/pinga/src/main.rs index 7fab7e8d27..99bba0eb21 100644 --- a/bin/pinga/src/main.rs +++ b/bin/pinga/src/main.rs @@ -12,8 +12,12 @@ fn main() -> Result<()> { } async fn async_main() -> Result<()> { - let tracker = TaskTracker::new(); - let token = CancellationToken::new(); + let main_tracker = TaskTracker::new(); + let main_token = CancellationToken::new(); + let layer_db_tracker = TaskTracker::new(); + let layer_db_token = CancellationToken::new(); + let telemetry_tracker = TaskTracker::new(); + let telemetry_token = CancellationToken::new(); color_eyre::install()?; let args = args::parse(); @@ -39,7 +43,7 @@ async fn async_main() -> Result<()> { ]) .build()?; - telemetry_application::init(config, &tracker, token.clone())? + telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())? }; startup::startup("pinga").await?; @@ -53,16 +57,25 @@ async fn async_main() -> Result<()> { let config = Config::try_from(args)?; - let server = Server::from_config(config, token.clone(), tracker.clone()).await?; + let server = Server::from_config( + config, + main_token.clone(), + &layer_db_tracker, + layer_db_token.clone(), + ) + .await?; - tracker.spawn(async move { + main_tracker.spawn(async move { info!("ready to receive messages"); server.run().await }); shutdown::graceful( - tracker, - token, + [ + (main_tracker, main_token), + (layer_db_tracker, layer_db_token), + (telemetry_tracker, telemetry_token), + ], Some(telemetry_shutdown.into_future()), Some(GRACEFUL_SHUTDOWN_TIMEOUT), ) diff --git a/bin/rebaser/src/main.rs b/bin/rebaser/src/main.rs index 4f75556c6e..4817dee608 100644 --- a/bin/rebaser/src/main.rs +++ b/bin/rebaser/src/main.rs @@ -10,8 +10,12 @@ fn main() -> Result<()> { } async fn async_main() -> Result<()> { - let tracker = TaskTracker::new(); - let token = CancellationToken::new(); + let main_tracker = TaskTracker::new(); + let main_token = CancellationToken::new(); + let layer_db_tracker = TaskTracker::new(); + let layer_db_token = CancellationToken::new(); + let telemetry_tracker = TaskTracker::new(); + let telemetry_token = CancellationToken::new(); color_eyre::install()?; let args = args::parse(); @@ -37,7 +41,7 @@ async fn async_main() -> Result<()> { ]) .build()?; - telemetry_application::init(config, &tracker, token.clone())? + telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())? }; startup::startup("rebaser").await?; @@ -51,16 +55,25 @@ async fn async_main() -> Result<()> { let config = Config::try_from(args)?; - let server = Server::from_config(config, token.clone(), tracker.clone()).await?; + let server = Server::from_config( + config, + main_token.clone(), + &layer_db_tracker, + layer_db_token.clone(), + ) + .await?; - tracker.spawn(async move { + main_tracker.spawn(async move { info!("ready to receive messages"); server.run().await }); shutdown::graceful( - tracker, - token, + [ + (main_tracker, main_token), + (layer_db_tracker, layer_db_token), + (telemetry_tracker, telemetry_token), + ], Some(telemetry_shutdown.into_future()), Some(Duration::from_secs(10)), ) diff --git a/bin/sdf/src/main.rs b/bin/sdf/src/main.rs index 82f60ad20e..d0b679e756 100644 --- a/bin/sdf/src/main.rs +++ b/bin/sdf/src/main.rs @@ -34,8 +34,10 @@ fn main() -> Result<()> { } async fn async_main() -> Result<()> { - let shutdown_token = CancellationToken::new(); - let task_tracker = TaskTracker::new(); + let layer_db_tracker = TaskTracker::new(); + let layer_db_token = CancellationToken::new(); + let telemetry_tracker = TaskTracker::new(); + let telemetry_token = CancellationToken::new(); color_eyre::install()?; let args = args::parse(); @@ -55,7 +57,7 @@ async fn async_main() -> Result<()> { .interesting_modules(vec!["dal", "si_data_nats", "si_data_pg", "si_layer_cache"]) .build()?; - telemetry_application::init(config, &task_tracker, shutdown_token.clone())? + telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())? }; startup::startup("sdf").await?; @@ -120,8 +122,8 @@ async fn async_main() -> Result<()> { Multiplexer::new(&nats_conn, CRDT_MULTIPLEXER_SUBJECT).await?; let (layer_db, layer_db_graceful_shutdown) = - LayerDb::from_config(config.layer_db_config().clone(), shutdown_token.clone()).await?; - task_tracker.spawn(layer_db_graceful_shutdown.into_future()); + LayerDb::from_config(config.layer_db_config().clone(), layer_db_token.clone()).await?; + layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future()); let feature_flags_service = FeatureFlagService::new(config.boot_feature_flags().clone()); @@ -153,7 +155,8 @@ async fn async_main() -> Result<()> { let posthog_client = Server::start_posthog(config.posthog()).await?; - task_tracker.close(); + layer_db_tracker.close(); + telemetry_tracker.close(); match config.incoming_stream() { IncomingStream::HTTPSocket(_) => { @@ -169,14 +172,6 @@ async fn async_main() -> Result<()> { )?; let _second_shutdown_broadcast_rx = initial_shutdown_broadcast_rx.resubscribe(); - // Server::start_resource_refresh_scheduler( - // services_context.clone(), - // initial_shutdown_broadcast_rx, - // ) - // .await; - - // Server::start_status_updater(services_context, second_shutdown_broadcast_rx).await?; - server.run().await?; } IncomingStream::UnixDomainSocket(_) => { @@ -193,14 +188,6 @@ async fn async_main() -> Result<()> { .await?; let _second_shutdown_broadcast_rx = initial_shutdown_broadcast_rx.resubscribe(); - // Server::start_resource_refresh_scheduler( - // services_context.clone(), - // initial_shutdown_broadcast_rx, - // ) - // .await; - - // Server::start_status_updater(services_context, second_shutdown_broadcast_rx).await?; - server.run().await?; } } @@ -210,8 +197,18 @@ async fn async_main() -> Result<()> { // the program however, axum has shut down so it's an appropriate time to cancel other // remaining tasks and wait on their graceful shutdowns { - shutdown_token.cancel(); - task_tracker.wait().await; + // TODO(nick): Fletcher's comment above still stands, but now we shutdown for multiple task groups. + for (tracker, token) in [ + (layer_db_tracker, layer_db_token), + (telemetry_tracker, telemetry_token), + ] { + info!("performing graceful shutdown for task group"); + tracker.close(); + token.cancel(); + tracker.wait().await; + } + + // TODO(nick): we need to handle telemetry shutdown properly as well. telemetry_shutdown.wait().await?; } diff --git a/bin/veritech/src/main.rs b/bin/veritech/src/main.rs index d85208ce40..0d607ceb8c 100644 --- a/bin/veritech/src/main.rs +++ b/bin/veritech/src/main.rs @@ -12,8 +12,10 @@ fn main() -> Result<()> { } async fn async_main() -> Result<()> { - let tracker = TaskTracker::new(); - let token = CancellationToken::new(); + let main_tracker = TaskTracker::new(); + let main_token = CancellationToken::new(); + let telemetry_tracker = TaskTracker::new(); + let telemetry_token = CancellationToken::new(); color_eyre::install()?; let args = args::parse(); @@ -33,7 +35,7 @@ async fn async_main() -> Result<()> { .interesting_modules(vec!["naxum", "si_data_nats"]) .build()?; - telemetry_application::init(config, &tracker, token.clone())? + telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())? }; startup::startup("veritech").await?; @@ -47,16 +49,18 @@ async fn async_main() -> Result<()> { let config = Config::try_from(args)?; - let server = Server::from_config(config, token.clone()).await?; + let server = Server::from_config(config, main_token.clone()).await?; - tracker.spawn(async move { + main_tracker.spawn(async move { info!("ready to receive messages"); server.run().await }); shutdown::graceful( - tracker, - token, + [ + (main_tracker, main_token), + (telemetry_tracker, telemetry_token), + ], Some(telemetry_shutdown.into_future()), Some(GRACEFUL_SHUTDOWN_TIMEOUT), ) diff --git a/lib/pinga-server/src/server.rs b/lib/pinga-server/src/server.rs index 9327455dfa..275b4ce221 100644 --- a/lib/pinga-server/src/server.rs +++ b/lib/pinga-server/src/server.rs @@ -74,7 +74,8 @@ impl Server { pub async fn from_config( config: Config, token: CancellationToken, - tracker: TaskTracker, + layer_db_tracker: &TaskTracker, + layer_db_token: CancellationToken, ) -> ServerResult { dal::init()?; @@ -87,8 +88,8 @@ impl Server { Self::create_symmetric_crypto_service(config.symmetric_crypto_service()).await?; let (layer_db, layer_db_graceful_shutdown) = - LayerDb::from_config(config.layer_db_config().clone(), token.clone()).await?; - tracker.spawn(layer_db_graceful_shutdown.into_future()); + LayerDb::from_config(config.layer_db_config().clone(), layer_db_token).await?; + layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future()); let services_context = ServicesContext::new( pg_pool, diff --git a/lib/rebaser-server/src/server.rs b/lib/rebaser-server/src/server.rs index 487bb9d9dd..d61f7dd537 100644 --- a/lib/rebaser-server/src/server.rs +++ b/lib/rebaser-server/src/server.rs @@ -69,7 +69,8 @@ impl Server { pub async fn from_config( config: Config, shutdown_token: CancellationToken, - tracker: TaskTracker, + layer_db_tracker: &TaskTracker, + layer_db_token: CancellationToken, ) -> ServerResult { dal::init()?; @@ -82,9 +83,9 @@ impl Server { Self::create_symmetric_crypto_service(config.symmetric_crypto_service()).await?; let (layer_db, layer_db_graceful_shutdown) = - DalLayerDb::from_config(config.layer_db_config().clone(), shutdown_token.clone()) + DalLayerDb::from_config(config.layer_db_config().clone(), layer_db_token.clone()) .await?; - tracker.spawn(layer_db_graceful_shutdown.into_future()); + layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future()); let services_context = ServicesContext::new( pg_pool, diff --git a/lib/si-layer-cache/src/activities.rs b/lib/si-layer-cache/src/activities.rs index 696951d4b8..e1fa74270a 100644 --- a/lib/si-layer-cache/src/activities.rs +++ b/lib/si-layer-cache/src/activities.rs @@ -272,7 +272,7 @@ impl ActivityMultiplexerTask { debug!("activity multiplexer task has ended; likely a bug"); }, () = token.cancelled() => { - debug!("activity multiplexer has been cancelled; shutting down"); + info!("activity multiplexer has been cancelled; shutting down"); }, } Ok(()) diff --git a/lib/si-layer-cache/src/db.rs b/lib/si-layer-cache/src/db.rs index 8a128b601a..d6a5ed76f6 100644 --- a/lib/si-layer-cache/src/db.rs +++ b/lib/si-layer-cache/src/db.rs @@ -270,7 +270,7 @@ impl IntoFuture for LayerDbGracefulShutdown { // Close the tracker so no further tasks are spawned tracker.close(); - trace!("received graceful shutdown signal, waiting for tasks to shutdown"); + info!("received graceful shutdown signal, waiting for tasks to shutdown"); // Wait for all outstanding tasks to complete tracker.wait().await; diff --git a/lib/si-service/src/shutdown.rs b/lib/si-service/src/shutdown.rs index b7209b7c15..4797bf5d28 100644 --- a/lib/si-service/src/shutdown.rs +++ b/lib/si-service/src/shutdown.rs @@ -42,15 +42,15 @@ impl ShutdownError { /// /// This function sets up a signal handler for both `SIGINT` (i.e. `Ctrl+c`) and `SIGTERM` so usage /// of this function with other code intercepting these signals is *highly* discouraged. -pub async fn graceful( - tracker: TaskTracker, - token: CancellationToken, +pub async fn graceful( + trackers_with_tokens: I, telemetry_guard: Option, shutdown_timeout: Option, ) -> Result<(), ShutdownError> where Fut: Future>, E: error::Error + Send + Sync + 'static, + I: IntoIterator, { let mut sig_int = unix::signal(SignalKind::interrupt()).map_err(ShutdownError::Signal)?; let mut sig_term = unix::signal(SignalKind::terminate()).map_err(ShutdownError::Signal)?; @@ -58,20 +58,26 @@ where tokio::select! { _ = sig_int.recv() => { info!("received SIGINT, performing graceful shutdown"); - tracker.close(); - token.cancel(); } _ = sig_term.recv() => { info!("received SIGTERM, performing graceful shutdown"); + } + } + + // Create a future for draining each task group + let drain_future = async { + for (tracker, token) in trackers_with_tokens { + info!("performing graceful shutdown for task group"); tracker.close(); token.cancel(); + tracker.wait().await; } - } + }; // Wait for all tasks to finish match shutdown_timeout { Some(duration) => { - if let Err(_elapsed) = timeout(duration, tracker.wait()).await { + if let Err(_elapsed) = timeout(duration, drain_future).await { warn!("graceful shutdown timeout exceeded; completing shutdown anyway"); if let Some(telemetry_guard) = telemetry_guard { // Wait for telemetry to shutdown @@ -81,7 +87,7 @@ where } } None => { - tracker.wait().await; + drain_future.await; } }