From 747ce03cee2810f0daa7b292462c92db5176639d Mon Sep 17 00:00:00 2001 From: Fletcher Nichol Date: Thu, 26 Sep 2024 10:30:10 -0600 Subject: [PATCH] feat(rebaser): change quiescent shutdown to reduce missed activity This change alters the logic that helps a change set "process" task to shut down when no Rebaser requests have been seen over our `quiescent_period`. Prior to this change there was a shutdown window period where the `ChangeSetProcessorTask` would not be looking for new Rebaser requests to process while waiting for the `SerialDvuTask` to end. As a hedge against this scenario the process task handler checks the change set subject just before ending to ensure that if there's at least one request message that we don't ack/delete the task. In this altered version of a quiescent shutdown we notice the quiet period as before in the Rebaser requests subscription stream. However, now a `quiesced_notify` `tokio::sync::Notify` is fired to signal the `SerialDvuTask`. Then the `ChangeSetProcessorTask` continues to process any further requests that may show up (remember that after running a "dvu" job, another Rebaser request is often submitted). Meanwhile in the `SerialDvuTask`, it will continue to run "dvu" jobs as long as the `run_dvu_notify` has been set (in effect "draining" any pending runs), and only then will check to see if the `quiesced_notify` has been set. If it has, then it will cancel the `quiesced_token` which cause `SerialDvuTask` to return with an `Ok(Shutdown::Quiesced)` and that same `CancellationToken` will cause the Naxum app in `ChangeSetProcessorTask` to be gracefully shut down. With these changes, the one or two remaining "dvu" jobs will not cause the process task to stop processing further Rebaser requests. For example, let's assuming that the last 2 "dvu" jobs take 8 minutes each. That means that the process task is in a quiescent shutdown for up to the next 8 * 2 = 16 minutes, during which time any further Rebaser requests will also be processed (whereas they may not have been prior to this change). Signed-off-by: Fletcher Nichol --- .../src/change_set_processor_task.rs | 76 ++++++++----------- lib/rebaser-server/src/handlers.rs | 24 ++++-- lib/rebaser-server/src/lib.rs | 6 ++ lib/rebaser-server/src/serial_dvu_task.rs | 76 ++++++++++++++++--- 4 files changed, 119 insertions(+), 63 deletions(-) diff --git a/lib/rebaser-server/src/change_set_processor_task.rs b/lib/rebaser-server/src/change_set_processor_task.rs index 1463377267..6ef072cdb8 100644 --- a/lib/rebaser-server/src/change_set_processor_task.rs +++ b/lib/rebaser-server/src/change_set_processor_task.rs @@ -47,7 +47,6 @@ pub(crate) struct ChangeSetProcessorTask { workspace_id: WorkspacePk, change_set_id: ChangeSetId, inner: Box> + Unpin + Send>, - quiescence_token: CancellationToken, } impl ChangeSetProcessorTask { @@ -62,46 +61,53 @@ impl ChangeSetProcessorTask { workspace_id: WorkspacePk, change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, - run_notify: Arc, + run_dvu_notify: Arc, quiescent_period: Duration, + quiesced_notify: Arc, + quiesced_token: CancellationToken, task_token: CancellationToken, ) -> Self { - let state = AppState::new(workspace_id, change_set_id, nats, ctx_builder, run_notify); - - let quiescence_token = CancellationToken::new(); + let state = AppState::new( + workspace_id, + change_set_id, + nats, + ctx_builder, + run_dvu_notify, + ); let captured = QuiescedCaptured { instance_id: metadata.instance_id().to_string(), workspace_id, change_set_id, - quiescence_token: quiescence_token.clone(), + quiesced_notify: quiesced_notify.clone(), }; let inactive_aware_incoming = incoming // Looks for a gap between incoming messages greater than the duration .timeout(quiescent_period) - // Fire the quiescence token which triggers a distinctive shutdown where we *know* we - // want to remove the task from the set of work. + // Fire quiesced_notify which triggers a specific shutdown of the serial dvu task where + // we *know* we want to remove the task from the set of work. .inspect_err(move |_elapsed| { let QuiescedCaptured { instance_id, workspace_id, change_set_id, - quiescence_token, + quiesced_notify, } = &captured; debug!( service.instance.id = instance_id, si.workspace.id = %workspace_id, si.change_set.id = %change_set_id, - "rate of requests has become inactive, shutting down processing tasks", + "rate of requests has become inactive, triggering a quiesced shutdown", ); - quiescence_token.cancel(); + // Notify the serial dvu task that we want to shutdown due to a quiet period + quiesced_notify.notify_one(); }) - // Once the first inactive period is detected, this stream is closed (i.e. returns - // `None`) - .map_while(result::Result::ok) - .fuse(); + // Continue processing messages as normal until the Naxum app's graceful shutdown is + // triggered. This means we turn the stream back from a stream of + // `Result, Elapsed>` into `Result` + .filter_map(|maybe_elapsed_item| maybe_elapsed_item.ok()); let app = ServiceBuilder::new() .layer( @@ -115,10 +121,7 @@ impl ChangeSetProcessorTask { let inner = naxum::serve_with_incoming_limit(inactive_aware_incoming, app.into_make_service(), 1) - .with_graceful_shutdown(graceful_shutdown_signal( - task_token, - quiescence_token.clone(), - )); + .with_graceful_shutdown(graceful_shutdown_signal(task_token, quiesced_token)); let inner_fut = inner.into_future(); @@ -127,44 +130,27 @@ impl ChangeSetProcessorTask { workspace_id, change_set_id, inner: Box::new(inner_fut), - quiescence_token, } } - pub(crate) async fn try_run(self) -> Result { + pub(crate) async fn try_run(self) -> Result<()> { self.inner.await.map_err(Error::Naxum)?; - if self.quiescence_token.is_cancelled() { - debug!( - task = Self::NAME, - si.workspace.id = %self.workspace_id, - si.change_set.id = %self.change_set_id, - "shutdown due to quiescent period", - ); - Ok(Shutdown::Quiesced) - } else { - debug!( - task = Self::NAME, - si.workspace.id = %self.workspace_id, - si.change_set.id = %self.change_set_id, - "shutdown complete", - ); - Ok(Shutdown::Graceful) - } + debug!( + task = Self::NAME, + si.workspace.id = %self.workspace_id, + si.change_set.id = %self.change_set_id, + "shutdown complete", + ); + Ok(()) } } -#[derive(Debug)] -pub(crate) enum Shutdown { - Graceful, - Quiesced, -} - struct QuiescedCaptured { instance_id: String, workspace_id: WorkspacePk, change_set_id: ChangeSetId, - quiescence_token: CancellationToken, + quiesced_notify: Arc, } #[derive(Clone, Debug)] diff --git a/lib/rebaser-server/src/handlers.rs b/lib/rebaser-server/src/handlers.rs index 576b361ba7..e743cfa9e4 100644 --- a/lib/rebaser-server/src/handlers.rs +++ b/lib/rebaser-server/src/handlers.rs @@ -21,8 +21,9 @@ use ulid::Ulid; use crate::{ app_state::AppState, - change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError, Shutdown}, + change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError}, serial_dvu_task::{SerialDvuTask, SerialDvuTaskError}, + Shutdown, }; const CONSUMER_NAME_PREFIX: &str = "rebaser-requests"; @@ -104,7 +105,10 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> // We want to indendently control the lifecyle of our tasks let tasks_token = CancellationToken::new(); - let run_notify = Arc::new(Notify::new()); + let run_dvu_notify = Arc::new(Notify::new()); + + let quiesced_token = CancellationToken::new(); + let quiesced_notify = Arc::new(Notify::new()); let incoming = requests_stream .create_consumer(rebaser_requests_per_change_set_consumer_config( @@ -125,7 +129,9 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> workspace.id, change_set.id, ctx_builder.clone(), - run_notify.clone(), + run_dvu_notify.clone(), + quiesced_notify.clone(), + quiesced_token.clone(), tasks_token.clone(), ); @@ -137,8 +143,10 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> workspace.id, change_set.id, ctx_builder, - run_notify, + run_dvu_notify, quiescent_period, + quiesced_notify, + quiesced_token, tasks_token.clone(), ); @@ -164,11 +172,9 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> // Processor task completed processor_task_result_result = processor_task_result => { match processor_task_result_result { - // A quiet period was found in the stream; reply `Ok` to ack and remove this task - Ok(Ok(Shutdown::Quiesced)) => Ok(()), // Processor exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry - Ok(Ok(Shutdown::Graceful)) => Err(Error::ChangeSetProcessorCompleted), + Ok(Ok(())) => Err(Error::ChangeSetProcessorCompleted), // Processor exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::ChangeSetProcessor(err)), // Tokio join error on processor exit; reply `Err` to nack for task to persist and @@ -179,9 +185,11 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> // Serial dvu task completed dvu_task_result_result = dvu_task_result => { match dvu_task_result_result { + // A quiet period was found in the stream; reply `Ok` to ack and remove this task + Ok(Ok(Shutdown::Quiesced)) => Ok(()), // Serial dvu exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry - Ok(Ok(())) => Err(Error::SerialDvuCompleted), + Ok(Ok(Shutdown::Graceful)) => Err(Error::SerialDvuCompleted), // Serial dvu exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::SerialDvu(err)), // Tokio join error on serial dvu exit; reply `Err` to nack for task to persist and diff --git a/lib/rebaser-server/src/lib.rs b/lib/rebaser-server/src/lib.rs index c756a131c9..e6f81ce2fe 100644 --- a/lib/rebaser-server/src/lib.rs +++ b/lib/rebaser-server/src/lib.rs @@ -102,3 +102,9 @@ impl ServerError { type Error = ServerError; type Result = std::result::Result; + +#[derive(Debug)] +pub(crate) enum Shutdown { + Graceful, + Quiesced, +} diff --git a/lib/rebaser-server/src/serial_dvu_task.rs b/lib/rebaser-server/src/serial_dvu_task.rs index 9934b44dae..c38e57d15d 100644 --- a/lib/rebaser-server/src/serial_dvu_task.rs +++ b/lib/rebaser-server/src/serial_dvu_task.rs @@ -7,7 +7,7 @@ use thiserror::Error; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; -use crate::ServerMetadata; +use crate::{ServerMetadata, Shutdown}; #[remain::sorted] #[derive(Debug, Error)] @@ -15,6 +15,9 @@ pub(crate) enum SerialDvuTaskError { /// Error when using a DAL context #[error("dal context transaction error: {0}")] DalContext(#[from] dal::TransactionsError), + /// When failing to do an operation using the [`WorkspaceSnapshot`] + #[error("workspace snapshot error: {0}")] + WorkspaceSnapshot(#[from] dal::WorkspaceSnapshotError), } type Result = result::Result; @@ -24,19 +27,24 @@ pub(crate) struct SerialDvuTask { workspace_id: WorkspacePk, change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, - run_notify: Arc, + run_dvu_notify: Arc, + quiesced_notify: Arc, + quiesced_token: CancellationToken, token: CancellationToken, } impl SerialDvuTask { const NAME: &'static str = "rebaser_server::serial_dvu_task"; + #[allow(clippy::too_many_arguments)] pub(crate) fn create( metadata: Arc, workspace_id: WorkspacePk, change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, - run_notify: Arc, + run_dvu_notify: Arc, + quiesced_notify: Arc, + quiesced_token: CancellationToken, token: CancellationToken, ) -> Self { Self { @@ -44,18 +52,24 @@ impl SerialDvuTask { workspace_id, change_set_id, ctx_builder, - run_notify, + run_dvu_notify, + quiesced_notify, + quiesced_token, token, } } - pub(crate) async fn try_run(self) -> Result<()> { - loop { + pub(crate) async fn try_run(self) -> Result { + // Attempt to run an initial dvu in case there are processed requests that haven't yet been + // finished with a dvu run + self.maybe_run_initial_dvu().await?; + + let shutdown_cause = loop { tokio::select! { biased; // Signal to run a DVU has fired - _ = self.run_notify.notified() => { + _ = self.run_dvu_notify.notified() => { debug!( task = Self::NAME, service.instance.id = self.metadata.instance_id(), @@ -65,6 +79,21 @@ impl SerialDvuTask { ); self.run_dvu().await?; } + // Signal to shutdown from a quiet period has fired + _ = self.quiesced_notify.notified() => { + debug!( + task = Self::NAME, + service.instance.id = self.metadata.instance_id(), + si.workspace.id = %self.workspace_id, + si.change_set.id = %self.change_set_id, + "quiesced notified, starting to shut down", + ); + // Fire the quiesced_token so that the processing task immediately stops + // processing additional requests + self.quiesced_token.cancel(); + + break Shutdown::Quiesced; + } // Cancellation token has fired, time to shut down _ = self.token.cancelled() => { debug!( @@ -74,19 +103,21 @@ impl SerialDvuTask { si.change_set.id = %self.change_set_id, "received cancellation", ); - break; + + break Shutdown::Graceful; } } - } + }; debug!( task = Self::NAME, + cause = ?shutdown_cause, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, "shutdown complete", ); - Ok(()) + Ok(shutdown_cause) } async fn run_dvu(&self) -> Result<()> { @@ -107,4 +138,29 @@ impl SerialDvuTask { Ok(()) } + + async fn maybe_run_initial_dvu(&self) -> Result<()> { + let builder = self.ctx_builder.clone(); + let ctx = builder + .build_for_change_set_as_system(self.workspace_id.into(), self.change_set_id.into()) + .await?; + + if ctx + .workspace_snapshot()? + .has_dependent_value_roots() + .await? + { + info!( + task = Self::NAME, + service.instance.id = self.metadata.instance_id(), + si.workspace.id = %self.workspace_id, + si.change_set.id = %self.change_set_id, + "enqueuing *initial* dependent_values_update", + ); + ctx.enqueue_dependent_values_update().await?; + ctx.blocking_commit_no_rebase().await?; + } + + Ok(()) + } }