From 18707749d2faa6ddf48c009ba174829b9bb64927 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 23 Dec 2024 13:54:21 +0200 Subject: [PATCH] Add trim-gap handling by fast-forwarding the partition state on startup --- .../worker/src/partition_processor_manager.rs | 23 +++++++++------- .../spawn_processor_task.rs | 27 +++++++++++++++---- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index dd9154576..db29a9b59 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -104,8 +104,8 @@ pub struct PartitionProcessorManager { pending_snapshots: HashMap, snapshot_export_tasks: FuturesUnordered>, - snapshot_import_tasks: FuturesUnordered>, snapshot_repository: Option, + fast_forward_on_startup: HashMap, } struct PendingSnapshotTask { @@ -114,7 +114,6 @@ struct PendingSnapshotTask { } type SnapshotResultInternal = Result; -type ImportSnapshotResultInternal = Result<(), SnapshotError>; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -204,6 +203,7 @@ impl PartitionProcessorManager { snapshot_export_tasks: FuturesUnordered::default(), pending_snapshots: HashMap::default(), snapshot_repository, + fast_forward_on_startup: HashMap::default(), } } @@ -432,10 +432,17 @@ impl PartitionProcessorManager { match result { Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => { if self.snapshot_repository.is_some() { - info!(%partition_id, "Partition processor stopped due to a log trim gap, will look for snapshot with LSN >= {to_lsn}"); - // todo(pavel): spawn snapshot import task for the partition + info!( + %partition_id, + trim_gap_to_lsn = ?to_lsn, + "Partition processor stopped due to a log trim gap, will stop and attempt to fast-forward", + ); + self.fast_forward_on_startup.insert(partition_id, to_lsn); } else { - warn!(%partition_id, "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}"); + warn!( + %partition_id, + "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}", + ); } } _ => { @@ -732,10 +739,7 @@ impl PartitionProcessorManager { self.spawn_create_snapshot_task(partition_id, snapshot_repository, Some(sender)); } - fn on_create_snapshot_task_completed( - &mut self, - result: Result, - ) { + fn on_create_snapshot_task_completed(&mut self, result: SnapshotResultInternal) { let (partition_id, response) = match result { Ok(metadata) => { self.archived_lsns @@ -892,6 +896,7 @@ impl PartitionProcessorManager { self.bifrost.clone(), self.partition_store_manager.clone(), self.snapshot_repository.clone(), + self.fast_forward_on_startup.remove(&partition_id), ) } diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 7bd876bb7..c0cb9ca53 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -22,6 +22,7 @@ use restate_types::cluster::cluster_state::PartitionProcessorStatus; use restate_types::config::Configuration; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::live::Live; +use restate_types::logs::Lsn; use restate_types::schema::Schema; use crate::invoker_integration::EntryEnricher; @@ -39,6 +40,7 @@ pub struct SpawnPartitionProcessorTask { bifrost: Bifrost, partition_store_manager: PartitionStoreManager, snapshot_repository: Option, + fast_forward_lsn: Option, } impl SpawnPartitionProcessorTask { @@ -51,6 +53,7 @@ impl SpawnPartitionProcessorTask { bifrost: Bifrost, partition_store_manager: PartitionStoreManager, snapshot_repository: Option, + fast_forward_lsn: Option, ) -> Self { Self { task_name, @@ -60,6 +63,7 @@ impl SpawnPartitionProcessorTask { bifrost, partition_store_manager, snapshot_repository, + fast_forward_lsn, } } @@ -83,6 +87,7 @@ impl SpawnPartitionProcessorTask { bifrost, partition_store_manager, snapshot_repository, + fast_forward_lsn, } = self; let config = configuration.pinned(); @@ -130,7 +135,7 @@ impl SpawnPartitionProcessorTask { let key_range = key_range.clone(); move || async move { - let partition_store = if !partition_store_manager + let partition_store = if fast_forward_lsn.is_some() || !partition_store_manager .has_partition_store(pp_builder.partition_id) .await { @@ -150,10 +155,22 @@ impl SpawnPartitionProcessorTask { if let Some(snapshot) = snapshot { - info!( - partition_id = %partition_id, - "Found snapshot to bootstrap partition, restoring it", - ); + match fast_forward_lsn { + Some(trim_gap_to_lsn) => { + if snapshot.min_applied_lsn >= trim_gap_to_lsn { + info!("Found snapshot with state beyond the trim gap, resetting local partition store..."); + partition_store_manager.drop_partition(partition_id).await; + } else { + warn!("Latest snapshot is before the trim gap!"); + } + }, + None => { + info!( + partition_id = %partition_id, + "Found snapshot to bootstrap partition, restoring it", + ); + }, + } let snapshot_path = snapshot.base_dir.clone(); match partition_store_manager