diff --git a/Cargo.lock b/Cargo.lock index e510fceb34..acea262684 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5511,6 +5511,7 @@ dependencies = [ "si-std", "telemetry", "telemetry-nats", + "telemetry-utils", "thiserror 2.0.11", "tokio", "tokio-stream", diff --git a/lib/rebaser-server/BUCK b/lib/rebaser-server/BUCK index 074ce838aa..358fc81d99 100644 --- a/lib/rebaser-server/BUCK +++ b/lib/rebaser-server/BUCK @@ -19,6 +19,7 @@ rust_library( "//lib/si-settings:si-settings", "//lib/si-std:si-std", "//lib/telemetry-nats-rs:telemetry-nats", + "//lib/telemetry-utils-rs:telemetry-utils", "//lib/telemetry-rs:telemetry", "//lib/veritech-client:veritech-client", "//third-party/rust:derive_builder", diff --git a/lib/rebaser-server/Cargo.toml b/lib/rebaser-server/Cargo.toml index 4f8861d077..dd6fa2fd83 100644 --- a/lib/rebaser-server/Cargo.toml +++ b/lib/rebaser-server/Cargo.toml @@ -26,6 +26,7 @@ si-settings = { path = "../../lib/si-settings" } si-std = { path = "../../lib/si-std" } telemetry = { path = "../../lib/telemetry-rs" } telemetry-nats = { path = "../../lib/telemetry-nats-rs" } +telemetry-utils = { path = "../../lib/telemetry-utils-rs" } veritech-client = { path = "../../lib/veritech-client" } derive_builder = { workspace = true } diff --git a/lib/rebaser-server/src/change_set_processor_task.rs b/lib/rebaser-server/src/change_set_processor_task.rs index a24d6b39ad..041c30fa10 100644 --- a/lib/rebaser-server/src/change_set_processor_task.rs +++ b/lib/rebaser-server/src/change_set_processor_task.rs @@ -24,6 +24,7 @@ use si_data_nats::{ }; use si_events::{ChangeSetId, WorkspacePk}; use telemetry::prelude::*; +use telemetry_utils::metric; use thiserror::Error; use tokio::sync::Notify; use tokio_stream::StreamExt as _; @@ -64,8 +65,10 @@ impl ChangeSetProcessorTask { workspace_id: WorkspacePk, change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, - run_notify: Arc, + run_dvu_notify: Arc, quiescent_period: Duration, + quiesced_notify: Arc, + quiesced_token: CancellationToken, task_token: CancellationToken, server_tracker: TaskTracker, ) -> Self { @@ -78,44 +81,41 @@ impl ChangeSetProcessorTask { change_set_id, nats, ctx_builder, - run_notify, + run_dvu_notify, server_tracker, ); - let quiescence_token = CancellationToken::new(); - let captured = QuiescedCaptured { instance_id: metadata.instance_id().to_string(), workspace_id, change_set_id, - quiescence_token: quiescence_token.clone(), + quiesced_notify: quiesced_notify.clone(), }; - let inactive_aware_incoming = incoming // Looks for a gap between incoming messages greater than the duration .timeout(quiescent_period) - // Fire the quiescence token which triggers a distinctive shutdown where we *know* we - // want to remove the task from the set of work. + // Fire quiesced_notify which triggers a specific shutdown of the serial dvu task where + // we *know* we want to remove the task from the set of work. .inspect_err(move |_elapsed| { let QuiescedCaptured { instance_id, workspace_id, change_set_id, - quiescence_token, + quiesced_notify, } = &captured; - - debug!( + info!( service.instance.id = instance_id, si.workspace.id = %workspace_id, si.change_set.id = %change_set_id, - "rate of requests has become inactive, shutting down processing tasks", + "rate of requests has become inactive, triggering a quiesced shutdown", ); - quiescence_token.cancel(); + // Notify the serial dvu task that we want to shutdown due to a quiet period + quiesced_notify.notify_one(); }) - // Once the first inactive period is detected, this stream is closed (i.e. returns - // `None`) - .map_while(result::Result::ok) - .fuse(); + // Continue processing messages as normal until the Naxum app's graceful shutdown is + // triggered. This means we turn the stream back from a stream of + // `Result, Elapsed>` into `Result` + .filter_map(|maybe_elapsed_item| maybe_elapsed_item.ok()); let app = ServiceBuilder::new() .layer( @@ -135,10 +135,7 @@ impl ChangeSetProcessorTask { let inner = naxum::serve_with_incoming_limit(inactive_aware_incoming, app.into_make_service(), 1) - .with_graceful_shutdown(graceful_shutdown_signal( - task_token, - quiescence_token.clone(), - )); + .with_graceful_shutdown(graceful_shutdown_signal(task_token, quiesced_token)); let inner_fut = inner.into_future(); @@ -147,44 +144,28 @@ impl ChangeSetProcessorTask { workspace_id, change_set_id, inner: Box::new(inner_fut), - quiescence_token, } } - pub(crate) async fn try_run(self) -> Result { + pub(crate) async fn try_run(self) -> Result<()> { self.inner.await.map_err(Error::Naxum)?; + metric!(counter.change_set_processor_task.change_set_task = -1); - if self.quiescence_token.is_cancelled() { - debug!( - task = Self::NAME, - si.workspace.id = %self.workspace_id, - si.change_set.id = %self.change_set_id, - "shutdown due to quiescent period", - ); - Ok(Shutdown::Quiesced) - } else { - debug!( - task = Self::NAME, - si.workspace.id = %self.workspace_id, - si.change_set.id = %self.change_set_id, - "shutdown complete", - ); - Ok(Shutdown::Graceful) - } + info!( + task = Self::NAME, + si.workspace.id = %self.workspace_id, + si.change_set.id = %self.change_set_id, + "shutdown complete", + ); + Ok(()) } } -#[derive(Debug)] -pub(crate) enum Shutdown { - Graceful, - Quiesced, -} - struct QuiescedCaptured { instance_id: String, workspace_id: WorkspacePk, change_set_id: ChangeSetId, - quiescence_token: CancellationToken, + quiesced_notify: Arc, } #[derive(Clone, Debug)] @@ -207,7 +188,7 @@ impl post_process::OnSuccess for DeleteMessageOnSuccess { let stream = self.stream.clone(); Box::pin(async move { - trace!("deleting message on success"); + info!("deleting message on success"); if let Err(err) = stream.delete_message(info.stream_sequence).await { warn!( si.error.message = ?err, @@ -305,6 +286,7 @@ mod handlers { use si_data_nats::HeaderMap; use telemetry::prelude::*; use telemetry_nats::propagation; + use telemetry_utils::metric; use thiserror::Error; use crate::{ @@ -349,6 +331,7 @@ mod handlers { impl IntoResponse for HandlerError { fn into_response(self) -> Response { + metric!(counter.change_set_processor_task.failed_rebase = 1); // TODO(fnichol): there are different responses, esp. for expected interrupted error!(si.error.message = ?self, "failed to process message"); Response::default_internal_server_error() diff --git a/lib/rebaser-server/src/handlers.rs b/lib/rebaser-server/src/handlers.rs index 4cee08577e..b63ebe5d07 100644 --- a/lib/rebaser-server/src/handlers.rs +++ b/lib/rebaser-server/src/handlers.rs @@ -21,8 +21,9 @@ use ulid::Ulid; use crate::{ app_state::AppState, - change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError, Shutdown}, + change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError}, serial_dvu_task::{SerialDvuTask, SerialDvuTaskError}, + Shutdown, }; const CONSUMER_NAME_PREFIX: &str = "rebaser-requests"; @@ -105,7 +106,9 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> // We want to indendently control the lifecyle of our tasks let tasks_token = CancellationToken::new(); - let run_notify = Arc::new(Notify::new()); + let run_dvu_notify = Arc::new(Notify::new()); + let quiesced_token = CancellationToken::new(); + let quiesced_notify = Arc::new(Notify::new()); let incoming = requests_stream .create_consumer(rebaser_requests_per_change_set_consumer_config( @@ -126,7 +129,9 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> workspace.id, change_set.id, ctx_builder.clone(), - run_notify.clone(), + run_dvu_notify.clone(), + quiesced_notify.clone(), + quiesced_token.clone(), tasks_token.clone(), ); @@ -138,8 +143,10 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> workspace.id, change_set.id, ctx_builder, - run_notify, + run_dvu_notify, quiescent_period, + quiesced_notify, + quiesced_token, tasks_token.clone(), server_tracker, ); @@ -166,11 +173,9 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> // Processor task completed processor_task_result_result = processor_task_result => { match processor_task_result_result { - // A quiet period was found in the stream; reply `Ok` to ack and remove this task - Ok(Ok(Shutdown::Quiesced)) => Ok(()), // Processor exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry - Ok(Ok(Shutdown::Graceful)) => Err(Error::ChangeSetProcessorCompleted), + Ok(Ok(())) => Err(Error::ChangeSetProcessorCompleted), // Processor exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::ChangeSetProcessor(err)), // Tokio join error on processor exit; reply `Err` to nack for task to persist and @@ -181,9 +186,11 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> // Serial dvu task completed dvu_task_result_result = dvu_task_result => { match dvu_task_result_result { + // A quiet period was found in the stream; reply Ok to ack and remove this task + Ok(Ok(Shutdown::Quiesced)) => Ok(()), // Serial dvu exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry - Ok(Ok(())) => Err(Error::SerialDvuCompleted), + Ok(Ok(Shutdown::Graceful)) => Err(Error::SerialDvuCompleted), // Serial dvu exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::SerialDvu(err)), // Tokio join error on serial dvu exit; reply `Err` to nack for task to persist and diff --git a/lib/rebaser-server/src/lib.rs b/lib/rebaser-server/src/lib.rs index c756a131c9..e6f81ce2fe 100644 --- a/lib/rebaser-server/src/lib.rs +++ b/lib/rebaser-server/src/lib.rs @@ -102,3 +102,9 @@ impl ServerError { type Error = ServerError; type Result = std::result::Result; + +#[derive(Debug)] +pub(crate) enum Shutdown { + Graceful, + Quiesced, +} diff --git a/lib/rebaser-server/src/serial_dvu_task.rs b/lib/rebaser-server/src/serial_dvu_task.rs index 1e37ce5ff3..1283bb684f 100644 --- a/lib/rebaser-server/src/serial_dvu_task.rs +++ b/lib/rebaser-server/src/serial_dvu_task.rs @@ -3,11 +3,12 @@ use std::{result, sync::Arc}; use dal::DalContextBuilder; use si_events::{ChangeSetId, WorkspacePk}; use telemetry::prelude::*; +use telemetry_utils::metric; use thiserror::Error; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; -use crate::ServerMetadata; +use crate::{ServerMetadata, Shutdown}; #[remain::sorted] #[derive(Debug, Error)] @@ -15,6 +16,9 @@ pub(crate) enum SerialDvuTaskError { /// Error when using a DAL context #[error("dal context transaction error: {0}")] DalContext(#[from] dal::TransactionsError), + /// When failing to do an operation using the [`WorkspaceSnapshot`] + #[error("workspace snapshot error: {0}")] + WorkspaceSnapshot(#[from] dal::WorkspaceSnapshotError), } type Result = result::Result; @@ -25,18 +29,23 @@ pub(crate) struct SerialDvuTask { change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, run_notify: Arc, + quiesced_notify: Arc, + quiesced_token: CancellationToken, token: CancellationToken, } impl SerialDvuTask { const NAME: &'static str = "rebaser_server::serial_dvu_task"; + #[allow(clippy::too_many_arguments)] pub(crate) fn create( metadata: Arc, workspace_id: WorkspacePk, change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, run_notify: Arc, + quiesced_notify: Arc, + quiesced_token: CancellationToken, token: CancellationToken, ) -> Self { Self { @@ -45,18 +54,26 @@ impl SerialDvuTask { change_set_id, ctx_builder, run_notify, + quiesced_notify, + quiesced_token, token, } } - pub(crate) async fn try_run(self) -> Result<()> { - loop { + pub(crate) async fn try_run(self) -> Result { + metric!(counter.serial_dvu_task.change_set_in_progress = 1); + + // Attempt to run an initial dvu in case there are processed requests that haven't yet been + // finished with a dvu run + self.maybe_run_initial_dvu().await?; + + let shutdown_cause = loop { tokio::select! { biased; // Signal to run a DVU has fired _ = self.run_notify.notified() => { - debug!( + info!( task = Self::NAME, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, @@ -65,28 +82,47 @@ impl SerialDvuTask { ); self.run_dvu().await?; } + // Signal to shutdown from a quiet period has fired + _ = self.quiesced_notify.notified() => { + info!( + task = Self::NAME, + service.instance.id = self.metadata.instance_id(), + si.workspace.id = %self.workspace_id, + si.change_set.id = %self.change_set_id, + "quiesced notified, starting to shut down", + ); + + // Fire the quiesced_token so that the processing task immediately stops + // processing additional requests + self.quiesced_token.cancel(); + + break Shutdown::Quiesced; + } // Cancellation token has fired, time to shut down _ = self.token.cancelled() => { - debug!( + info!( task = Self::NAME, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, - "received cancellation", + "received cancellation, shutting down", ); - break; + break Shutdown::Graceful; } } - } + }; - debug!( + info!( task = Self::NAME, + cause = ?shutdown_cause, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, "shutdown complete", ); - Ok(()) + metric!(counter.serial_dvu_task.change_set_in_progress = -1); + + Ok(shutdown_cause) } #[instrument( @@ -100,6 +136,8 @@ impl SerialDvuTask { ), )] async fn run_dvu(&self) -> Result<()> { + metric!(counter.serial_dvu_task.dvu_running = 1); + let builder = self.ctx_builder.clone(); let ctx = builder .build_for_change_set_as_system(self.workspace_id, self.change_set_id, None) @@ -107,6 +145,35 @@ impl SerialDvuTask { ctx.enqueue_dependent_values_update().await?; ctx.blocking_commit_no_rebase().await?; + metric!(counter.serial_dvu_task.dvu_running = -1); + + Ok(()) + } + + async fn maybe_run_initial_dvu(&self) -> Result<()> { + let builder = self.ctx_builder.clone(); + let ctx = builder + .build_for_change_set_as_system(self.workspace_id, self.change_set_id, None) + .await?; + + if ctx + .workspace_snapshot()? + .has_dependent_value_roots() + .await? + { + metric!(counter.serial_dvu_task.initial_dvu_running = 1); + + info!( + task = Self::NAME, + service.instance.id = self.metadata.instance_id(), + si.workspace.id = %self.workspace_id, + si.change_set.id = %self.change_set_id, + "enqueuing *initial* dependent_values_update", + ); + ctx.enqueue_dependent_values_update().await?; + ctx.blocking_commit_no_rebase().await?; + metric!(counter.serial_dvu_task.initial_dvu_running = -1); + } Ok(()) }