Skip to content

Commit

Permalink
Allow ordered task groups for graceful shutdown
Browse files Browse the repository at this point in the history
This commit allows for ordered task groups, which are comprised of
shutdown token and a corresponding tracker, when performing graceful
shutdown. In other words, when performing graceful shutdown, we will
now fully drain task groups in order. If using graceful shutdown
functionality from the "si-service" crate, then the timeout condition
will apply to all task groups.

Future considerations:

In the future, we may want to create a manager struct to manage task
groups as well as handle the shutdown for them. We may also want to
switch from a flat dependency list to a dependency tree in case we want
parallelism during shutdown.

Using a manager struct could also help with naming task groups. Right
now, we log the number of task groups being drained, but they are
unnamed. It could be helpful in the future to know the order at which
they are drained.

Additional changes:

This commit also deletes some dead comments as well as increases some
shutdown logging to "info" level. In addition to that, we no longer
clone the tracker used for creating LayerDB clients; we borrow it.

Signed-off-by: Nick Gerace <[email protected]>
  • Loading branch information
nickgerace committed Aug 28, 2024
1 parent f91836a commit 8e345db
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 61 deletions.
27 changes: 20 additions & 7 deletions bin/pinga/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ fn main() -> Result<()> {
}

async fn async_main() -> Result<()> {
let tracker = TaskTracker::new();
let token = CancellationToken::new();
let main_tracker = TaskTracker::new();
let main_token = CancellationToken::new();
let layer_db_tracker = TaskTracker::new();
let layer_db_token = CancellationToken::new();
let telemetry_tracker = TaskTracker::new();
let telemetry_token = CancellationToken::new();

color_eyre::install()?;
let args = args::parse();
Expand All @@ -39,7 +43,7 @@ async fn async_main() -> Result<()> {
])
.build()?;

telemetry_application::init(config, &tracker, token.clone())?
telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())?
};

startup::startup("pinga").await?;
Expand All @@ -53,16 +57,25 @@ async fn async_main() -> Result<()> {

let config = Config::try_from(args)?;

let server = Server::from_config(config, token.clone(), tracker.clone()).await?;
let server = Server::from_config(
config,
main_token.clone(),
&layer_db_tracker,
layer_db_token.clone(),
)
.await?;

tracker.spawn(async move {
main_tracker.spawn(async move {
info!("ready to receive messages");
server.run().await
});

shutdown::graceful(
tracker,
token,
[
(main_tracker, main_token),
(layer_db_tracker, layer_db_token),
(telemetry_tracker, telemetry_token),
],
Some(telemetry_shutdown.into_future()),
Some(GRACEFUL_SHUTDOWN_TIMEOUT),
)
Expand Down
27 changes: 20 additions & 7 deletions bin/rebaser/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ fn main() -> Result<()> {
}

async fn async_main() -> Result<()> {
let tracker = TaskTracker::new();
let token = CancellationToken::new();
let main_tracker = TaskTracker::new();
let main_token = CancellationToken::new();
let layer_db_tracker = TaskTracker::new();
let layer_db_token = CancellationToken::new();
let telemetry_tracker = TaskTracker::new();
let telemetry_token = CancellationToken::new();

color_eyre::install()?;
let args = args::parse();
Expand All @@ -37,7 +41,7 @@ async fn async_main() -> Result<()> {
])
.build()?;

telemetry_application::init(config, &tracker, token.clone())?
telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())?
};

startup::startup("rebaser").await?;
Expand All @@ -51,16 +55,25 @@ async fn async_main() -> Result<()> {

let config = Config::try_from(args)?;

let server = Server::from_config(config, token.clone(), tracker.clone()).await?;
let server = Server::from_config(
config,
main_token.clone(),
&layer_db_tracker,
layer_db_token.clone(),
)
.await?;

tracker.spawn(async move {
main_tracker.spawn(async move {
info!("ready to receive messages");
server.run().await
});

shutdown::graceful(
tracker,
token,
[
(main_tracker, main_token),
(layer_db_tracker, layer_db_token),
(telemetry_tracker, telemetry_token),
],
Some(telemetry_shutdown.into_future()),
Some(Duration::from_secs(10)),
)
Expand Down
45 changes: 21 additions & 24 deletions bin/sdf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ fn main() -> Result<()> {
}

async fn async_main() -> Result<()> {
let shutdown_token = CancellationToken::new();
let task_tracker = TaskTracker::new();
let layer_db_tracker = TaskTracker::new();
let layer_db_token = CancellationToken::new();
let telemetry_tracker = TaskTracker::new();
let telemetry_token = CancellationToken::new();

color_eyre::install()?;
let args = args::parse();
Expand All @@ -55,7 +57,7 @@ async fn async_main() -> Result<()> {
.interesting_modules(vec!["dal", "si_data_nats", "si_data_pg", "si_layer_cache"])
.build()?;

telemetry_application::init(config, &task_tracker, shutdown_token.clone())?
telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())?
};

startup::startup("sdf").await?;
Expand Down Expand Up @@ -120,8 +122,8 @@ async fn async_main() -> Result<()> {
Multiplexer::new(&nats_conn, CRDT_MULTIPLEXER_SUBJECT).await?;

let (layer_db, layer_db_graceful_shutdown) =
LayerDb::from_config(config.layer_db_config().clone(), shutdown_token.clone()).await?;
task_tracker.spawn(layer_db_graceful_shutdown.into_future());
LayerDb::from_config(config.layer_db_config().clone(), layer_db_token.clone()).await?;
layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future());

let feature_flags_service = FeatureFlagService::new(config.boot_feature_flags().clone());

Expand Down Expand Up @@ -153,7 +155,8 @@ async fn async_main() -> Result<()> {

let posthog_client = Server::start_posthog(config.posthog()).await?;

task_tracker.close();
layer_db_tracker.close();
telemetry_tracker.close();

match config.incoming_stream() {
IncomingStream::HTTPSocket(_) => {
Expand All @@ -169,14 +172,6 @@ async fn async_main() -> Result<()> {
)?;
let _second_shutdown_broadcast_rx = initial_shutdown_broadcast_rx.resubscribe();

// Server::start_resource_refresh_scheduler(
// services_context.clone(),
// initial_shutdown_broadcast_rx,
// )
// .await;

// Server::start_status_updater(services_context, second_shutdown_broadcast_rx).await?;

server.run().await?;
}
IncomingStream::UnixDomainSocket(_) => {
Expand All @@ -193,14 +188,6 @@ async fn async_main() -> Result<()> {
.await?;
let _second_shutdown_broadcast_rx = initial_shutdown_broadcast_rx.resubscribe();

// Server::start_resource_refresh_scheduler(
// services_context.clone(),
// initial_shutdown_broadcast_rx,
// )
// .await;

// Server::start_status_updater(services_context, second_shutdown_broadcast_rx).await?;

server.run().await?;
}
}
Expand All @@ -210,8 +197,18 @@ async fn async_main() -> Result<()> {
// the program however, axum has shut down so it's an appropriate time to cancel other
// remaining tasks and wait on their graceful shutdowns
{
shutdown_token.cancel();
task_tracker.wait().await;
// TODO(nick): Fletcher's comment above still stands, but now we shutdown for multiple task groups.
for (tracker, token) in [
(layer_db_tracker, layer_db_token),
(telemetry_tracker, telemetry_token),
] {
info!("performing graceful shutdown for task group");
tracker.close();
token.cancel();
tracker.wait().await;
}

// TODO(nick): we need to handle telemetry shutdown properly as well.
telemetry_shutdown.wait().await?;
}

Expand Down
18 changes: 11 additions & 7 deletions bin/veritech/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ fn main() -> Result<()> {
}

async fn async_main() -> Result<()> {
let tracker = TaskTracker::new();
let token = CancellationToken::new();
let main_tracker = TaskTracker::new();
let main_token = CancellationToken::new();
let telemetry_tracker = TaskTracker::new();
let telemetry_token = CancellationToken::new();

color_eyre::install()?;
let args = args::parse();
Expand All @@ -33,7 +35,7 @@ async fn async_main() -> Result<()> {
.interesting_modules(vec!["naxum", "si_data_nats"])
.build()?;

telemetry_application::init(config, &tracker, token.clone())?
telemetry_application::init(config, &telemetry_tracker, telemetry_token.clone())?
};

startup::startup("veritech").await?;
Expand All @@ -47,16 +49,18 @@ async fn async_main() -> Result<()> {

let config = Config::try_from(args)?;

let server = Server::from_config(config, token.clone()).await?;
let server = Server::from_config(config, main_token.clone()).await?;

tracker.spawn(async move {
main_tracker.spawn(async move {
info!("ready to receive messages");
server.run().await
});

shutdown::graceful(
tracker,
token,
[
(main_tracker, main_token),
(telemetry_tracker, telemetry_token),
],
Some(telemetry_shutdown.into_future()),
Some(GRACEFUL_SHUTDOWN_TIMEOUT),
)
Expand Down
7 changes: 4 additions & 3 deletions lib/pinga-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ impl Server {
pub async fn from_config(
config: Config,
token: CancellationToken,
tracker: TaskTracker,
layer_db_tracker: &TaskTracker,
layer_db_token: CancellationToken,
) -> ServerResult<Self> {
dal::init()?;

Expand All @@ -87,8 +88,8 @@ impl Server {
Self::create_symmetric_crypto_service(config.symmetric_crypto_service()).await?;

let (layer_db, layer_db_graceful_shutdown) =
LayerDb::from_config(config.layer_db_config().clone(), token.clone()).await?;
tracker.spawn(layer_db_graceful_shutdown.into_future());
LayerDb::from_config(config.layer_db_config().clone(), layer_db_token).await?;
layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future());

let services_context = ServicesContext::new(
pg_pool,
Expand Down
7 changes: 4 additions & 3 deletions lib/rebaser-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ impl Server {
pub async fn from_config(
config: Config,
shutdown_token: CancellationToken,
tracker: TaskTracker,
layer_db_tracker: &TaskTracker,
layer_db_token: CancellationToken,
) -> ServerResult<Self> {
dal::init()?;

Expand All @@ -82,9 +83,9 @@ impl Server {
Self::create_symmetric_crypto_service(config.symmetric_crypto_service()).await?;

let (layer_db, layer_db_graceful_shutdown) =
DalLayerDb::from_config(config.layer_db_config().clone(), shutdown_token.clone())
DalLayerDb::from_config(config.layer_db_config().clone(), layer_db_token.clone())
.await?;
tracker.spawn(layer_db_graceful_shutdown.into_future());
layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future());

let services_context = ServicesContext::new(
pg_pool,
Expand Down
2 changes: 1 addition & 1 deletion lib/si-layer-cache/src/activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl ActivityMultiplexerTask {
debug!("activity multiplexer task has ended; likely a bug");
},
() = token.cancelled() => {
debug!("activity multiplexer has been cancelled; shutting down");
info!("activity multiplexer has been cancelled; shutting down");
},
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion lib/si-layer-cache/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ impl IntoFuture for LayerDbGracefulShutdown {

// Close the tracker so no further tasks are spawned
tracker.close();
trace!("received graceful shutdown signal, waiting for tasks to shutdown");
info!("received graceful shutdown signal, waiting for tasks to shutdown");
// Wait for all outstanding tasks to complete
tracker.wait().await;

Expand Down
22 changes: 14 additions & 8 deletions lib/si-service/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,42 @@ impl ShutdownError {
///
/// This function sets up a signal handler for both `SIGINT` (i.e. `Ctrl+c`) and `SIGTERM` so usage
/// of this function with other code intercepting these signals is *highly* discouraged.
pub async fn graceful<Fut, E>(
tracker: TaskTracker,
token: CancellationToken,
pub async fn graceful<Fut, E, I>(
trackers_with_tokens: I,
telemetry_guard: Option<Fut>,
shutdown_timeout: Option<Duration>,
) -> Result<(), ShutdownError>
where
Fut: Future<Output = Result<(), E>>,
E: error::Error + Send + Sync + 'static,
I: IntoIterator<Item = (TaskTracker, CancellationToken)>,
{
let mut sig_int = unix::signal(SignalKind::interrupt()).map_err(ShutdownError::Signal)?;
let mut sig_term = unix::signal(SignalKind::terminate()).map_err(ShutdownError::Signal)?;

tokio::select! {
_ = sig_int.recv() => {
info!("received SIGINT, performing graceful shutdown");
tracker.close();
token.cancel();
}
_ = sig_term.recv() => {
info!("received SIGTERM, performing graceful shutdown");
}
}

// Create a future for draining each task group
let drain_future = async {
for (tracker, token) in trackers_with_tokens {
info!("performing graceful shutdown for task group");
tracker.close();
token.cancel();
tracker.wait().await;
}
}
};

// Wait for all tasks to finish
match shutdown_timeout {
Some(duration) => {
if let Err(_elapsed) = timeout(duration, tracker.wait()).await {
if let Err(_elapsed) = timeout(duration, drain_future).await {
warn!("graceful shutdown timeout exceeded; completing shutdown anyway");
if let Some(telemetry_guard) = telemetry_guard {
// Wait for telemetry to shutdown
Expand All @@ -81,7 +87,7 @@ where
}
}
None => {
tracker.wait().await;
drain_future.await;
}
}

Expand Down

0 comments on commit 8e345db

Please sign in to comment.