Skip to content

Commit

Permalink
Add trim-gap handling by fast-forwarding the partition state on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 23, 2024
1 parent 745ff17 commit 1870774
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
23 changes: 14 additions & 9 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ pub struct PartitionProcessorManager {

pending_snapshots: HashMap<PartitionId, PendingSnapshotTask>,
snapshot_export_tasks: FuturesUnordered<TaskHandle<SnapshotResultInternal>>,
snapshot_import_tasks: FuturesUnordered<TaskHandle<ImportSnapshotResultInternal>>,
snapshot_repository: Option<SnapshotRepository>,
fast_forward_on_startup: HashMap<PartitionId, Lsn>,
}

struct PendingSnapshotTask {
Expand All @@ -114,7 +114,6 @@ struct PendingSnapshotTask {
}

type SnapshotResultInternal = Result<PartitionSnapshotMetadata, SnapshotError>;
type ImportSnapshotResultInternal = Result<(), SnapshotError>;

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -204,6 +203,7 @@ impl PartitionProcessorManager {
snapshot_export_tasks: FuturesUnordered::default(),
pending_snapshots: HashMap::default(),
snapshot_repository,
fast_forward_on_startup: HashMap::default(),
}
}

Expand Down Expand Up @@ -432,10 +432,17 @@ impl PartitionProcessorManager {
match result {
Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => {
if self.snapshot_repository.is_some() {
info!(%partition_id, "Partition processor stopped due to a log trim gap, will look for snapshot with LSN >= {to_lsn}");
// todo(pavel): spawn snapshot import task for the partition
info!(
%partition_id,
trim_gap_to_lsn = ?to_lsn,
"Partition processor stopped due to a log trim gap, will stop and attempt to fast-forward",
);
self.fast_forward_on_startup.insert(partition_id, to_lsn);
} else {
warn!(%partition_id, "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}");
warn!(
%partition_id,
"Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}",
);
}
}
_ => {
Expand Down Expand Up @@ -732,10 +739,7 @@ impl PartitionProcessorManager {
self.spawn_create_snapshot_task(partition_id, snapshot_repository, Some(sender));
}

fn on_create_snapshot_task_completed(
&mut self,
result: Result<PartitionSnapshotMetadata, SnapshotError>,
) {
fn on_create_snapshot_task_completed(&mut self, result: SnapshotResultInternal) {
let (partition_id, response) = match result {
Ok(metadata) => {
self.archived_lsns
Expand Down Expand Up @@ -892,6 +896,7 @@ impl PartitionProcessorManager {
self.bifrost.clone(),
self.partition_store_manager.clone(),
self.snapshot_repository.clone(),
self.fast_forward_on_startup.remove(&partition_id),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use restate_types::cluster::cluster_state::PartitionProcessorStatus;
use restate_types::config::Configuration;
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::live::Live;
use restate_types::logs::Lsn;
use restate_types::schema::Schema;

use crate::invoker_integration::EntryEnricher;
Expand All @@ -39,6 +40,7 @@ pub struct SpawnPartitionProcessorTask {
bifrost: Bifrost,
partition_store_manager: PartitionStoreManager,
snapshot_repository: Option<SnapshotRepository>,
fast_forward_lsn: Option<Lsn>,
}

impl SpawnPartitionProcessorTask {
Expand All @@ -51,6 +53,7 @@ impl SpawnPartitionProcessorTask {
bifrost: Bifrost,
partition_store_manager: PartitionStoreManager,
snapshot_repository: Option<SnapshotRepository>,
fast_forward_lsn: Option<Lsn>,
) -> Self {
Self {
task_name,
Expand All @@ -60,6 +63,7 @@ impl SpawnPartitionProcessorTask {
bifrost,
partition_store_manager,
snapshot_repository,
fast_forward_lsn,
}
}

Expand All @@ -83,6 +87,7 @@ impl SpawnPartitionProcessorTask {
bifrost,
partition_store_manager,
snapshot_repository,
fast_forward_lsn,
} = self;

let config = configuration.pinned();
Expand Down Expand Up @@ -130,7 +135,7 @@ impl SpawnPartitionProcessorTask {
let key_range = key_range.clone();

move || async move {
let partition_store = if !partition_store_manager
let partition_store = if fast_forward_lsn.is_some() || !partition_store_manager
.has_partition_store(pp_builder.partition_id)
.await
{
Expand All @@ -150,10 +155,22 @@ impl SpawnPartitionProcessorTask {


if let Some(snapshot) = snapshot {
info!(
partition_id = %partition_id,
"Found snapshot to bootstrap partition, restoring it",
);
match fast_forward_lsn {
Some(trim_gap_to_lsn) => {
if snapshot.min_applied_lsn >= trim_gap_to_lsn {
info!("Found snapshot with state beyond the trim gap, resetting local partition store...");
partition_store_manager.drop_partition(partition_id).await;
} else {
warn!("Latest snapshot is before the trim gap!");
}
},
None => {
info!(
partition_id = %partition_id,
"Found snapshot to bootstrap partition, restoring it",
);
},
}

let snapshot_path = snapshot.base_dir.clone();
match partition_store_manager
Expand Down

0 comments on commit 1870774

Please sign in to comment.