Skip to content

Commit

Permalink
feat(rebaser): support multiple rebasers that coordinate together
Browse files Browse the repository at this point in the history
This change alters the implementation of the `DvuDebouncer` task to
coordinate with other potenially running Rebaser services to ensure that
(as before) only *one* dependent values update (i.e. DVU) is running
per-change set.

As before, this debouncer Tokio task runs alongside each
`ChangeSetRequestsTask` that a Rebaser spawns to process a work queue of rebase
requests. However, the debouncer task now coordinates with other debouncer
tasks via a NATS KV store which acts as a distributed lock (or could be though
of as a leader election). Each key corresponds to a unique change set within a
workspace. That is, the key is of the form `{workspace_pk}.{change_set_id}`.
Additionally, there is a `max_age` set for all keys which will naturally age
out if no update/delete/purge operations are performed. This "aging out" of a
key should only happen when a debouncer task crashes or panics without properly
cleaning up its state *or* when a Rebaser service is disconnected from its
network or connection to NATS (i.e. a network partition).

The general algorithm for each debouncer task is as follows:

```mermaid
stateDiagram-v2
    Wait: Waits to become leader
    WatchKey: Watch key for delete/age out
    AttemptsLeader: Attempt to become leader
    Leader: Becomes leader (owns key)
    LeaderWaits: Wait for debounce window
    CheckIfPending: Check if values are pending
    PendingValues: Prepares to run DVU
    SetRunningStatus: Set key value status to "running"
    PerformDvu: Enqueue, run, and block on DVU
    KeepaliveWaits: Wait for keepalive window
    UpdateKey: Update key preventing age out

    [*] --> Wait
    Wait --> WatchKey
    WatchKey --> AttemptsLeader: Key deleted
    WatchKey --> AttemptsLeader: Key aged out
    AttemptsLeader --> WatchKey: Loses key acquire
    AttemptsLeader --> Leader: Wins key acquire
    Leader --> Wait: Returns to waiting
    state Leader {
        [*] --> LeaderWaits
        LeaderWaits --> CheckIfPending: Tick time elapses
        CheckIfPending --> LeaderWaits: No pending values
        CheckIfPending --> PendingValues: Pending values found
        PendingValues --> SetRunningStatus
        SetRunningStatus --> PerformDvu
        PerformDvu --> DeleteKey
        DeleteKey --> [*]
        --
        [*] --> KeepaliveWaits
        KeepaliveWaits --> UpdateKey: Tick time elapsed
        UpdateKey --> KeepaliveWaits
    }
```

Signed-off-by: Fletcher Nichol <[email protected]>
  • Loading branch information
fnichol committed Jul 23, 2024
1 parent a9f4e31 commit e05573a
Show file tree
Hide file tree
Showing 14 changed files with 730 additions and 166 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 2 additions & 11 deletions lib/dal/src/job/definition/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use futures::Future;
use serde::{Deserialize, Serialize};
use si_events::ActionResultState;
use si_std::time::jitter_duration;
use telemetry::prelude::*;
use tryhard::RetryPolicy;
use veritech_client::{ActionRunResultSuccess, ResourceStatus};
Expand Down Expand Up @@ -321,22 +322,12 @@ where
{
let span = Span::current();

// Jitter implementation thanks to the `fure` crate, released under the MIT license.
//
// See: https://github.com/Leonqn/fure/blob/8945c35655f7e0f6966d8314ab21a297181cc080/src/backoff.rs#L44-L51
fn jitter(duration: Duration) -> Duration {
let jitter = rand::random::<f64>();
let secs = ((duration.as_secs() as f64) * jitter).ceil() as u64;
let nanos = ((f64::from(duration.subsec_nanos())) * jitter).ceil() as u32;
Duration::new(secs, nanos)
}

tryhard::retry_fn(f)
.retries(max_attempts.saturating_sub(1))
.custom_backoff(|attempt, err: &JobConsumerError| {
if let JobConsumerError::Transactions(TransactionsError::ConflictsOccurred(_)) = err {
span.record(span_field, attempt);
RetryPolicy::Delay(jitter(fixed_delay))
RetryPolicy::Delay(jitter_duration(fixed_delay))
} else {
RetryPolicy::Break
}
Expand Down
52 changes: 52 additions & 0 deletions lib/rebaser-server/DEBOUNCER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Dependent Values Update Debouncer

## High Level Algorithm

The debouncer task coordinates with other debouncer tasks via a NATS KV store
which acts as a distributed lock (or could be though of as a leader election).
Each key corresponds to a unique change set within a workspace. That is, the
key is of the form `{workspace_pk}.{change_set_id}`. Additionally, there is a
`max_age` set for all keys which will naturally age out if no
update/delete/purge operations are performed. This "aging out" of a key should
only happen when a debouncer task crashes or panics without properly cleaning
up its state *or* when a Rebaser service is disconnected from its network or
connection to NATS (i.e. a network partition).

The general algorithm for each debouncer task is as follows:

```mermaid
stateDiagram-v2
Wait: Waits to become leader
WatchKey: Watch key for delete/age out
AttemptsLeader: Attempt to become leader
Leader: Becomes leader (owns key)
LeaderWaits: Wait for debounce window
CheckIfPending: Check if values are pending
PendingValues: Prepares to run DVU
SetRunningStatus: Set key value status to "running"
PerformDvu: Enqueue, run, and block on DVU
KeepaliveWaits: Wait for keepalive window
UpdateKey: Update key preventing age out
[*] --> Wait
Wait --> WatchKey
WatchKey --> AttemptsLeader: Key deleted
WatchKey --> AttemptsLeader: Key aged out
AttemptsLeader --> WatchKey: Loses key acquire
AttemptsLeader --> Leader: Wins key acquire
Leader --> Wait: Returns to waiting
state Leader {
[*] --> LeaderWaits
LeaderWaits --> CheckIfPending: Tick time elapses
CheckIfPending --> LeaderWaits: No pending values
CheckIfPending --> PendingValues: Pending values found
PendingValues --> SetRunningStatus
SetRunningStatus --> PerformDvu
PerformDvu --> DeleteKey
DeleteKey --> [*]
--
[*] --> KeepaliveWaits
KeepaliveWaits --> UpdateKey: Tick time elapsed
UpdateKey --> KeepaliveWaits
}
```
38 changes: 29 additions & 9 deletions lib/rebaser-server/src/change_set_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use telemetry::prelude::*;
use tokio_util::sync::CancellationToken;
use tower::ServiceBuilder;

use crate::{dvu_debouncer::DvuDebouncer, ServerError as Error, ServerMetadata, ServerResult};
use crate::{
dvu_debouncer_task::DvuDebouncerTask, ServerError as Error, ServerMetadata, ServerResult,
};

use self::app_state::AppState;

Expand All @@ -32,6 +34,7 @@ pub mod handlers;
pub struct ChangeSetRequestsTask {
metadata: Arc<ServerMetadata>,
inner: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>,
debouncer_task: DvuDebouncerTask,
shutdown_token: CancellationToken,
}

Expand All @@ -45,26 +48,30 @@ impl fmt::Debug for ChangeSetRequestsTask {
}

impl ChangeSetRequestsTask {
const NAME: &'static str = "Rebaser::ChangeSetRequestsTask";
const NAME: &'static str = "rebaser_server::change_set_requests_task";

/// Creates and returns a runnable [`ChangeSetRequestsTask`].
#[allow(clippy::too_many_arguments)]
pub fn create(
metadata: Arc<ServerMetadata>,
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
incoming: jetstream::consumer::pull::Stream,
kv: jetstream::kv::Store,
ctx_builder: DalContextBuilder,
shutdown_token: CancellationToken,
dvu_interval: Duration,
) -> Self {
let dvu_debouncer = DvuDebouncer::new(
) -> ServerResult<Self> {
let debouncer_task = DvuDebouncerTask::create(
metadata.instance_id().to_owned(),
kv,
workspace_id,
change_set_id,
shutdown_token.clone(),
ctx_builder.clone(),
dvu_interval,
);
let state = AppState::new(workspace_id, change_set_id, ctx_builder, dvu_debouncer);
)?;

let state = AppState::new(workspace_id, change_set_id, ctx_builder);

let app = ServiceBuilder::new()
.concurrency_limit(1)
Expand All @@ -80,11 +87,12 @@ impl ChangeSetRequestsTask {
let inner = naxum::serve(incoming, app.into_make_service())
.with_graceful_shutdown(naxum::wait_on_cancelled(shutdown_token.clone()));

Self {
Ok(Self {
metadata,
inner: Box::new(inner.into_future()),
debouncer_task,
shutdown_token,
}
})
}

/// Runs the service to completion or until the first internal error is encountered.
Expand All @@ -98,7 +106,19 @@ impl ChangeSetRequestsTask {
/// Runs the service to completion, returning its result (i.e. whether it successful or an
/// internal error was encountered).
pub async fn try_run(self) -> ServerResult<()> {
// Spawn the task to run alongside the app and setup a drop guard for the task so that it
// shuts down if the app errors or crashes
let debouncer_task_drop_guard = self.debouncer_task.cancellation_token().drop_guard();
let debouncer_task_handle = tokio::spawn(self.debouncer_task.run());

self.inner.await.map_err(Error::Naxum)?;

// Perform clean shutdown of task by cancelling it and awaiting its shutdown
debouncer_task_drop_guard.disarm().cancel();
debouncer_task_handle
.await
.map_err(|_err| Error::DvuDebouncerTaskJoin)?;

debug!(task = Self::NAME, "main loop shutdown complete");
Ok(())
}
Expand Down
6 changes: 0 additions & 6 deletions lib/rebaser-server/src/change_set_requests/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
use dal::DalContextBuilder;
use si_events::{ChangeSetId, WorkspacePk};

use crate::dvu_debouncer::DvuDebouncer;

/// Application state.
#[derive(Clone, Debug)]
pub struct AppState {
Expand All @@ -14,8 +12,6 @@ pub struct AppState {
pub change_set_id: ChangeSetId,
/// DAL context builder for each processing request
pub ctx_builder: DalContextBuilder,
/// The DVU debouncer
pub dvu_debouncer: DvuDebouncer,
}

impl AppState {
Expand All @@ -24,13 +20,11 @@ impl AppState {
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
ctx_builder: DalContextBuilder,
dvu_debouncer: DvuDebouncer,
) -> Self {
Self {
workspace_id,
change_set_id,
ctx_builder,
dvu_debouncer,
}
}
}
136 changes: 0 additions & 136 deletions lib/rebaser-server/src/dvu_debouncer.rs

This file was deleted.

Loading

0 comments on commit e05573a

Please sign in to comment.