Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rebaser): change quiescent shutdown to reduce missed activity #4707

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 31 additions & 45 deletions lib/rebaser-server/src/change_set_processor_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ pub(crate) struct ChangeSetProcessorTask {
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
inner: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>,
quiescence_token: CancellationToken,
}

impl ChangeSetProcessorTask {
Expand All @@ -62,46 +61,53 @@ impl ChangeSetProcessorTask {
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
ctx_builder: DalContextBuilder,
run_notify: Arc<Notify>,
run_dvu_notify: Arc<Notify>,
quiescent_period: Duration,
quiesced_notify: Arc<Notify>,
quiesced_token: CancellationToken,
task_token: CancellationToken,
) -> Self {
let state = AppState::new(workspace_id, change_set_id, nats, ctx_builder, run_notify);

let quiescence_token = CancellationToken::new();
let state = AppState::new(
workspace_id,
change_set_id,
nats,
ctx_builder,
run_dvu_notify,
);

let captured = QuiescedCaptured {
instance_id: metadata.instance_id().to_string(),
workspace_id,
change_set_id,
quiescence_token: quiescence_token.clone(),
quiesced_notify: quiesced_notify.clone(),
};

let inactive_aware_incoming = incoming
// Looks for a gap between incoming messages greater than the duration
.timeout(quiescent_period)
// Fire the quiescence token which triggers a distinctive shutdown where we *know* we
// want to remove the task from the set of work.
// Fire quiesced_notify which triggers a specific shutdown of the serial dvu task where
// we *know* we want to remove the task from the set of work.
.inspect_err(move |_elapsed| {
let QuiescedCaptured {
instance_id,
workspace_id,
change_set_id,
quiescence_token,
quiesced_notify,
} = &captured;

debug!(
service.instance.id = instance_id,
si.workspace.id = %workspace_id,
si.change_set.id = %change_set_id,
"rate of requests has become inactive, shutting down processing tasks",
"rate of requests has become inactive, triggering a quiesced shutdown",
);
quiescence_token.cancel();
// Notify the serial dvu task that we want to shutdown due to a quiet period
quiesced_notify.notify_one();
})
// Once the first inactive period is detected, this stream is closed (i.e. returns
// `None`)
.map_while(result::Result::ok)
.fuse();
// Continue processing messages as normal until the Naxum app's graceful shutdown is
// triggered. This means we turn the stream back from a stream of
// `Result<Result<Message, _>, Elapsed>` into `Result<Message, _>`
.filter_map(|maybe_elapsed_item| maybe_elapsed_item.ok());

let app = ServiceBuilder::new()
.layer(
Expand All @@ -115,10 +121,7 @@ impl ChangeSetProcessorTask {

let inner =
naxum::serve_with_incoming_limit(inactive_aware_incoming, app.into_make_service(), 1)
.with_graceful_shutdown(graceful_shutdown_signal(
task_token,
quiescence_token.clone(),
));
.with_graceful_shutdown(graceful_shutdown_signal(task_token, quiesced_token));

let inner_fut = inner.into_future();

Expand All @@ -127,44 +130,27 @@ impl ChangeSetProcessorTask {
workspace_id,
change_set_id,
inner: Box::new(inner_fut),
quiescence_token,
}
}

pub(crate) async fn try_run(self) -> Result<Shutdown> {
pub(crate) async fn try_run(self) -> Result<()> {
self.inner.await.map_err(Error::Naxum)?;

if self.quiescence_token.is_cancelled() {
debug!(
task = Self::NAME,
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown due to quiescent period",
);
Ok(Shutdown::Quiesced)
} else {
debug!(
task = Self::NAME,
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown complete",
);
Ok(Shutdown::Graceful)
}
debug!(
task = Self::NAME,
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown complete",
);
Ok(())
}
}

#[derive(Debug)]
pub(crate) enum Shutdown {
Graceful,
Quiesced,
}

struct QuiescedCaptured {
instance_id: String,
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
quiescence_token: CancellationToken,
quiesced_notify: Arc<Notify>,
}

#[derive(Clone, Debug)]
Expand Down
24 changes: 16 additions & 8 deletions lib/rebaser-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use ulid::Ulid;

use crate::{
app_state::AppState,
change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError, Shutdown},
change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError},
serial_dvu_task::{SerialDvuTask, SerialDvuTaskError},
Shutdown,
};

const CONSUMER_NAME_PREFIX: &str = "rebaser-requests";
Expand Down Expand Up @@ -104,7 +105,10 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
// We want to indendently control the lifecyle of our tasks
let tasks_token = CancellationToken::new();

let run_notify = Arc::new(Notify::new());
let run_dvu_notify = Arc::new(Notify::new());

let quiesced_token = CancellationToken::new();
let quiesced_notify = Arc::new(Notify::new());

let incoming = requests_stream
.create_consumer(rebaser_requests_per_change_set_consumer_config(
Expand All @@ -125,7 +129,9 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
workspace.id,
change_set.id,
ctx_builder.clone(),
run_notify.clone(),
run_dvu_notify.clone(),
quiesced_notify.clone(),
quiesced_token.clone(),
tasks_token.clone(),
);

Expand All @@ -137,8 +143,10 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
workspace.id,
change_set.id,
ctx_builder,
run_notify,
run_dvu_notify,
quiescent_period,
quiesced_notify,
quiesced_token,
tasks_token.clone(),
);

Expand All @@ -164,11 +172,9 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
// Processor task completed
processor_task_result_result = processor_task_result => {
match processor_task_result_result {
// A quiet period was found in the stream; reply `Ok` to ack and remove this task
Ok(Ok(Shutdown::Quiesced)) => Ok(()),
// Processor exited cleanly, but unexpectedly; reply `Err` to nack for task to
// persist and retry
Ok(Ok(Shutdown::Graceful)) => Err(Error::ChangeSetProcessorCompleted),
Ok(Ok(())) => Err(Error::ChangeSetProcessorCompleted),
// Processor exited with error; reply `Err` to nack for task to persist and retry
Ok(Err(err)) => Err(Error::ChangeSetProcessor(err)),
// Tokio join error on processor exit; reply `Err` to nack for task to persist and
Expand All @@ -179,9 +185,11 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
// Serial dvu task completed
dvu_task_result_result = dvu_task_result => {
match dvu_task_result_result {
// A quiet period was found in the stream; reply `Ok` to ack and remove this task
Ok(Ok(Shutdown::Quiesced)) => Ok(()),
// Serial dvu exited cleanly, but unexpectedly; reply `Err` to nack for task to
// persist and retry
Ok(Ok(())) => Err(Error::SerialDvuCompleted),
Ok(Ok(Shutdown::Graceful)) => Err(Error::SerialDvuCompleted),
// Serial dvu exited with error; reply `Err` to nack for task to persist and retry
Ok(Err(err)) => Err(Error::SerialDvu(err)),
// Tokio join error on serial dvu exit; reply `Err` to nack for task to persist and
Expand Down
6 changes: 6 additions & 0 deletions lib/rebaser-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,9 @@ impl ServerError {
type Error = ServerError;

type Result<T> = std::result::Result<T, ServerError>;

#[derive(Debug)]
pub(crate) enum Shutdown {
Graceful,
Quiesced,
}
76 changes: 66 additions & 10 deletions lib/rebaser-server/src/serial_dvu_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use thiserror::Error;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;

use crate::ServerMetadata;
use crate::{ServerMetadata, Shutdown};

#[remain::sorted]
#[derive(Debug, Error)]
pub(crate) enum SerialDvuTaskError {
/// Error when using a DAL context
#[error("dal context transaction error: {0}")]
DalContext(#[from] dal::TransactionsError),
/// When failing to do an operation using the [`WorkspaceSnapshot`]
#[error("workspace snapshot error: {0}")]
WorkspaceSnapshot(#[from] dal::WorkspaceSnapshotError),
}

type Result<T> = result::Result<T, SerialDvuTaskError>;
Expand All @@ -24,38 +27,49 @@ pub(crate) struct SerialDvuTask {
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
ctx_builder: DalContextBuilder,
run_notify: Arc<Notify>,
run_dvu_notify: Arc<Notify>,
quiesced_notify: Arc<Notify>,
quiesced_token: CancellationToken,
token: CancellationToken,
}

impl SerialDvuTask {
const NAME: &'static str = "rebaser_server::serial_dvu_task";

#[allow(clippy::too_many_arguments)]
pub(crate) fn create(
metadata: Arc<ServerMetadata>,
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
ctx_builder: DalContextBuilder,
run_notify: Arc<Notify>,
run_dvu_notify: Arc<Notify>,
quiesced_notify: Arc<Notify>,
quiesced_token: CancellationToken,
token: CancellationToken,
) -> Self {
Self {
metadata,
workspace_id,
change_set_id,
ctx_builder,
run_notify,
run_dvu_notify,
quiesced_notify,
quiesced_token,
token,
}
}

pub(crate) async fn try_run(self) -> Result<()> {
loop {
pub(crate) async fn try_run(self) -> Result<Shutdown> {
// Attempt to run an initial dvu in case there are processed requests that haven't yet been
// finished with a dvu run
self.maybe_run_initial_dvu().await?;

let shutdown_cause = loop {
tokio::select! {
biased;

// Signal to run a DVU has fired
_ = self.run_notify.notified() => {
_ = self.run_dvu_notify.notified() => {
debug!(
task = Self::NAME,
service.instance.id = self.metadata.instance_id(),
Expand All @@ -65,6 +79,21 @@ impl SerialDvuTask {
);
self.run_dvu().await?;
}
// Signal to shutdown from a quiet period has fired
_ = self.quiesced_notify.notified() => {
debug!(
task = Self::NAME,
service.instance.id = self.metadata.instance_id(),
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"quiesced notified, starting to shut down",
);
// Fire the quiesced_token so that the processing task immediately stops
// processing additional requests
self.quiesced_token.cancel();

break Shutdown::Quiesced;
}
// Cancellation token has fired, time to shut down
_ = self.token.cancelled() => {
debug!(
Expand All @@ -74,19 +103,21 @@ impl SerialDvuTask {
si.change_set.id = %self.change_set_id,
"received cancellation",
);
break;

break Shutdown::Graceful;
}
}
}
};

debug!(
task = Self::NAME,
cause = ?shutdown_cause,
service.instance.id = self.metadata.instance_id(),
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown complete",
);
Ok(())
Ok(shutdown_cause)
}

async fn run_dvu(&self) -> Result<()> {
Expand All @@ -107,4 +138,29 @@ impl SerialDvuTask {

Ok(())
}

async fn maybe_run_initial_dvu(&self) -> Result<()> {
let builder = self.ctx_builder.clone();
let ctx = builder
.build_for_change_set_as_system(self.workspace_id.into(), self.change_set_id.into())
.await?;

if ctx
.workspace_snapshot()?
.has_dependent_value_roots()
.await?
{
info!(
task = Self::NAME,
service.instance.id = self.metadata.instance_id(),
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"enqueuing *initial* dependent_values_update",
);
ctx.enqueue_dependent_values_update().await?;
ctx.blocking_commit_no_rebase().await?;
}

Ok(())
}
}