diff --git a/bin/pinga/src/main.rs b/bin/pinga/src/main.rs index 7fab7e8d27..99bba0eb21 100644 --- a/bin/pinga/src/main.rs +++ b/bin/pinga/src/main.rs @@ -12,8 +12,12 @@ fn main() -> Result<()> { } async fn async_main() -> Result<()> { - let tracker = TaskTracker::new(); - let token = CancellationToken::new(); + let main_tracker = TaskTracker::new(); + let main_token = CancellationToken::new(); + let layer_db_tracker = TaskTracker::new(); + let layer_db_token = CancellationToken::new(); + let telemetry_tracker = TaskTracker::new(); + let telemetry_token = CancellationToken::new(); color_eyre::install()?; let args = args::parse(); @@ -39,7 +43,7 @@ async fn async_main() -> Result<()> { ]) .build()?; - telemetry_application::init(config, &tracker, token.clone())? + telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())? }; startup::startup("pinga").await?; @@ -53,16 +57,25 @@ async fn async_main() -> Result<()> { let config = Config::try_from(args)?; - let server = Server::from_config(config, token.clone(), tracker.clone()).await?; + let server = Server::from_config( + config, + main_token.clone(), + &layer_db_tracker, + layer_db_token.clone(), + ) + .await?; - tracker.spawn(async move { + main_tracker.spawn(async move { info!("ready to receive messages"); server.run().await }); shutdown::graceful( - tracker, - token, + [ + (main_tracker, main_token), + (layer_db_tracker, layer_db_token), + (telemetry_tracker, telemetry_token), + ], Some(telemetry_shutdown.into_future()), Some(GRACEFUL_SHUTDOWN_TIMEOUT), ) diff --git a/bin/rebaser/src/main.rs b/bin/rebaser/src/main.rs index 4f75556c6e..4817dee608 100644 --- a/bin/rebaser/src/main.rs +++ b/bin/rebaser/src/main.rs @@ -10,8 +10,12 @@ fn main() -> Result<()> { } async fn async_main() -> Result<()> { - let tracker = TaskTracker::new(); - let token = CancellationToken::new(); + let main_tracker = TaskTracker::new(); + let main_token = CancellationToken::new(); + let layer_db_tracker = TaskTracker::new(); + let layer_db_token = CancellationToken::new(); + let telemetry_tracker = TaskTracker::new(); + let telemetry_token = CancellationToken::new(); color_eyre::install()?; let args = args::parse(); @@ -37,7 +41,7 @@ async fn async_main() -> Result<()> { ]) .build()?; - telemetry_application::init(config, &tracker, token.clone())? + telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())? }; startup::startup("rebaser").await?; @@ -51,16 +55,25 @@ async fn async_main() -> Result<()> { let config = Config::try_from(args)?; - let server = Server::from_config(config, token.clone(), tracker.clone()).await?; + let server = Server::from_config( + config, + main_token.clone(), + &layer_db_tracker, + layer_db_token.clone(), + ) + .await?; - tracker.spawn(async move { + main_tracker.spawn(async move { info!("ready to receive messages"); server.run().await }); shutdown::graceful( - tracker, - token, + [ + (main_tracker, main_token), + (layer_db_tracker, layer_db_token), + (telemetry_tracker, telemetry_token), + ], Some(telemetry_shutdown.into_future()), Some(Duration::from_secs(10)), ) diff --git a/bin/sdf/src/main.rs b/bin/sdf/src/main.rs index 82f60ad20e..d0b679e756 100644 --- a/bin/sdf/src/main.rs +++ b/bin/sdf/src/main.rs @@ -34,8 +34,10 @@ fn main() -> Result<()> { } async fn async_main() -> Result<()> { - let shutdown_token = CancellationToken::new(); - let task_tracker = TaskTracker::new(); + let layer_db_tracker = TaskTracker::new(); + let layer_db_token = CancellationToken::new(); + let telemetry_tracker = TaskTracker::new(); + let telemetry_token = CancellationToken::new(); color_eyre::install()?; let args = args::parse(); @@ -55,7 +57,7 @@ async fn async_main() -> Result<()> { .interesting_modules(vec!["dal", "si_data_nats", "si_data_pg", "si_layer_cache"]) .build()?; - telemetry_application::init(config, &task_tracker, shutdown_token.clone())? + telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())? }; startup::startup("sdf").await?; @@ -120,8 +122,8 @@ async fn async_main() -> Result<()> { Multiplexer::new(&nats_conn, CRDT_MULTIPLEXER_SUBJECT).await?; let (layer_db, layer_db_graceful_shutdown) = - LayerDb::from_config(config.layer_db_config().clone(), shutdown_token.clone()).await?; - task_tracker.spawn(layer_db_graceful_shutdown.into_future()); + LayerDb::from_config(config.layer_db_config().clone(), layer_db_token.clone()).await?; + layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future()); let feature_flags_service = FeatureFlagService::new(config.boot_feature_flags().clone()); @@ -153,7 +155,8 @@ async fn async_main() -> Result<()> { let posthog_client = Server::start_posthog(config.posthog()).await?; - task_tracker.close(); + layer_db_tracker.close(); + telemetry_tracker.close(); match config.incoming_stream() { IncomingStream::HTTPSocket(_) => { @@ -169,14 +172,6 @@ async fn async_main() -> Result<()> { )?; let _second_shutdown_broadcast_rx = initial_shutdown_broadcast_rx.resubscribe(); - // Server::start_resource_refresh_scheduler( - // services_context.clone(), - // initial_shutdown_broadcast_rx, - // ) - // .await; - - // Server::start_status_updater(services_context, second_shutdown_broadcast_rx).await?; - server.run().await?; } IncomingStream::UnixDomainSocket(_) => { @@ -193,14 +188,6 @@ async fn async_main() -> Result<()> { .await?; let _second_shutdown_broadcast_rx = initial_shutdown_broadcast_rx.resubscribe(); - // Server::start_resource_refresh_scheduler( - // services_context.clone(), - // initial_shutdown_broadcast_rx, - // ) - // .await; - - // Server::start_status_updater(services_context, second_shutdown_broadcast_rx).await?; - server.run().await?; } } @@ -210,8 +197,18 @@ async fn async_main() -> Result<()> { // the program however, axum has shut down so it's an appropriate time to cancel other // remaining tasks and wait on their graceful shutdowns { - shutdown_token.cancel(); - task_tracker.wait().await; + // TODO(nick): Fletcher's comment above still stands, but now we shutdown for multiple task groups. + for (tracker, token) in [ + (layer_db_tracker, layer_db_token), + (telemetry_tracker, telemetry_token), + ] { + info!("performing graceful shutdown for task group"); + tracker.close(); + token.cancel(); + tracker.wait().await; + } + + // TODO(nick): we need to handle telemetry shutdown properly as well. telemetry_shutdown.wait().await?; } diff --git a/bin/veritech/src/main.rs b/bin/veritech/src/main.rs index d85208ce40..0d607ceb8c 100644 --- a/bin/veritech/src/main.rs +++ b/bin/veritech/src/main.rs @@ -12,8 +12,10 @@ fn main() -> Result<()> { } async fn async_main() -> Result<()> { - let tracker = TaskTracker::new(); - let token = CancellationToken::new(); + let main_tracker = TaskTracker::new(); + let main_token = CancellationToken::new(); + let telemetry_tracker = TaskTracker::new(); + let telemetry_token = CancellationToken::new(); color_eyre::install()?; let args = args::parse(); @@ -33,7 +35,7 @@ async fn async_main() -> Result<()> { .interesting_modules(vec!["naxum", "si_data_nats"]) .build()?; - telemetry_application::init(config, &tracker, token.clone())? + telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())? }; startup::startup("veritech").await?; @@ -47,16 +49,18 @@ async fn async_main() -> Result<()> { let config = Config::try_from(args)?; - let server = Server::from_config(config, token.clone()).await?; + let server = Server::from_config(config, main_token.clone()).await?; - tracker.spawn(async move { + main_tracker.spawn(async move { info!("ready to receive messages"); server.run().await }); shutdown::graceful( - tracker, - token, + [ + (main_tracker, main_token), + (telemetry_tracker, telemetry_token), + ], Some(telemetry_shutdown.into_future()), Some(GRACEFUL_SHUTDOWN_TIMEOUT), ) diff --git a/lib/pinga-server/src/server.rs b/lib/pinga-server/src/server.rs index 9327455dfa..275b4ce221 100644 --- a/lib/pinga-server/src/server.rs +++ b/lib/pinga-server/src/server.rs @@ -74,7 +74,8 @@ impl Server { pub async fn from_config( config: Config, token: CancellationToken, - tracker: TaskTracker, + layer_db_tracker: &TaskTracker, + layer_db_token: CancellationToken, ) -> ServerResult { dal::init()?; @@ -87,8 +88,8 @@ impl Server { Self::create_symmetric_crypto_service(config.symmetric_crypto_service()).await?; let (layer_db, layer_db_graceful_shutdown) = - LayerDb::from_config(config.layer_db_config().clone(), token.clone()).await?; - tracker.spawn(layer_db_graceful_shutdown.into_future()); + LayerDb::from_config(config.layer_db_config().clone(), layer_db_token).await?; + layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future()); let services_context = ServicesContext::new( pg_pool, diff --git a/lib/rebaser-server/src/server.rs b/lib/rebaser-server/src/server.rs index 487bb9d9dd..d61f7dd537 100644 --- a/lib/rebaser-server/src/server.rs +++ b/lib/rebaser-server/src/server.rs @@ -69,7 +69,8 @@ impl Server { pub async fn from_config( config: Config, shutdown_token: CancellationToken, - tracker: TaskTracker, + layer_db_tracker: &TaskTracker, + layer_db_token: CancellationToken, ) -> ServerResult { dal::init()?; @@ -82,9 +83,9 @@ impl Server { Self::create_symmetric_crypto_service(config.symmetric_crypto_service()).await?; let (layer_db, layer_db_graceful_shutdown) = - DalLayerDb::from_config(config.layer_db_config().clone(), shutdown_token.clone()) + DalLayerDb::from_config(config.layer_db_config().clone(), layer_db_token.clone()) .await?; - tracker.spawn(layer_db_graceful_shutdown.into_future()); + layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future()); let services_context = ServicesContext::new( pg_pool, diff --git a/lib/si-layer-cache/src/activities.rs b/lib/si-layer-cache/src/activities.rs index 696951d4b8..e1fa74270a 100644 --- a/lib/si-layer-cache/src/activities.rs +++ b/lib/si-layer-cache/src/activities.rs @@ -272,7 +272,7 @@ impl ActivityMultiplexerTask { debug!("activity multiplexer task has ended; likely a bug"); }, () = token.cancelled() => { - debug!("activity multiplexer has been cancelled; shutting down"); + info!("activity multiplexer has been cancelled; shutting down"); }, } Ok(()) diff --git a/lib/si-layer-cache/src/db.rs b/lib/si-layer-cache/src/db.rs index 8a128b601a..d6a5ed76f6 100644 --- a/lib/si-layer-cache/src/db.rs +++ b/lib/si-layer-cache/src/db.rs @@ -270,7 +270,7 @@ impl IntoFuture for LayerDbGracefulShutdown { // Close the tracker so no further tasks are spawned tracker.close(); - trace!("received graceful shutdown signal, waiting for tasks to shutdown"); + info!("received graceful shutdown signal, waiting for tasks to shutdown"); // Wait for all outstanding tasks to complete tracker.wait().await; diff --git a/lib/si-service/src/shutdown.rs b/lib/si-service/src/shutdown.rs index b7209b7c15..4797bf5d28 100644 --- a/lib/si-service/src/shutdown.rs +++ b/lib/si-service/src/shutdown.rs @@ -42,15 +42,15 @@ impl ShutdownError { /// /// This function sets up a signal handler for both `SIGINT` (i.e. `Ctrl+c`) and `SIGTERM` so usage /// of this function with other code intercepting these signals is *highly* discouraged. -pub async fn graceful( - tracker: TaskTracker, - token: CancellationToken, +pub async fn graceful( + trackers_with_tokens: I, telemetry_guard: Option, shutdown_timeout: Option, ) -> Result<(), ShutdownError> where Fut: Future>, E: error::Error + Send + Sync + 'static, + I: IntoIterator, { let mut sig_int = unix::signal(SignalKind::interrupt()).map_err(ShutdownError::Signal)?; let mut sig_term = unix::signal(SignalKind::terminate()).map_err(ShutdownError::Signal)?; @@ -58,20 +58,26 @@ where tokio::select! { _ = sig_int.recv() => { info!("received SIGINT, performing graceful shutdown"); - tracker.close(); - token.cancel(); } _ = sig_term.recv() => { info!("received SIGTERM, performing graceful shutdown"); + } + } + + // Create a future for draining each task group + let drain_future = async { + for (tracker, token) in trackers_with_tokens { + info!("performing graceful shutdown for task group"); tracker.close(); token.cancel(); + tracker.wait().await; } - } + }; // Wait for all tasks to finish match shutdown_timeout { Some(duration) => { - if let Err(_elapsed) = timeout(duration, tracker.wait()).await { + if let Err(_elapsed) = timeout(duration, drain_future).await { warn!("graceful shutdown timeout exceeded; completing shutdown anyway"); if let Some(telemetry_guard) = telemetry_guard { // Wait for telemetry to shutdown @@ -81,7 +87,7 @@ where } } None => { - tracker.wait().await; + drain_future.await; } }