diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 44be014a2..bb5f51d95 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -279,7 +279,10 @@ where Ok(stopped) => { match stopped { ProcessorStopReason::LogTrimGap { to_lsn } => - info!(?to_lsn, "Shutting partition processor down because we encountered a trim gap in the log."), + info!( + trim_gap_to_lsn = ?to_lsn, + "Shutting partition processor down because we encountered a trim gap in the log." + ), _ => warn!("Shutting partition processor down because it stopped unexpectedly.") } }, @@ -495,6 +498,7 @@ where } } Record::TrimGap(to_lsn) => { + // todo(pavel): this assumption may be violated if a command batch contains some records and a trim gap! assert!(trim_gap.is_none(), "Expecting only a single trim gap!"); trim_gap = Some(to_lsn) } diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index c0cb9ca53..a9bd758aa 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -16,10 +16,11 @@ use tracing::{debug, info, instrument, warn}; use restate_bifrost::Bifrost; use restate_core::{Metadata, RuntimeTaskHandle, TaskCenter, TaskKind}; use restate_invoker_impl::Service as InvokerService; +use restate_partition_store::snapshots::LocalPartitionSnapshot; use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_types::cluster::cluster_state::PartitionProcessorStatus; -use restate_types::config::Configuration; +use restate_types::config::{Configuration, WorkerOptions}; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::live::Live; use restate_types::logs::Lsn; @@ -135,99 +136,15 @@ impl SpawnPartitionProcessorTask { let key_range = key_range.clone(); move || async move { - let partition_store = if fast_forward_lsn.is_some() || !partition_store_manager - .has_partition_store(pp_builder.partition_id) - .await - { - let snapshot = if snapshot_repository.is_none() { - debug!( - partition_id = %partition_id, - "No snapshot repository configured", - ); - None - } else { - debug!( - partition_id = %partition_id, - "Looking for partition snapshot from which to bootstrap partition store", - ); - snapshot_repository.expect("is some").get_latest(partition_id).await? - }; - - - if let Some(snapshot) = snapshot { - match fast_forward_lsn { - Some(trim_gap_to_lsn) => { - if snapshot.min_applied_lsn >= trim_gap_to_lsn { - info!("Found snapshot with state beyond the trim gap, resetting local partition store..."); - partition_store_manager.drop_partition(partition_id).await; - } else { - warn!("Latest snapshot is before the trim gap!"); - } - }, - None => { - info!( - partition_id = %partition_id, - "Found snapshot to bootstrap partition, restoring it", - ); - }, - } - - let snapshot_path = snapshot.base_dir.clone(); - match partition_store_manager - .open_partition_store_from_snapshot( - partition_id, - key_range.clone(), - snapshot, - &options.storage.rocksdb, - ) - .await { - Ok(partition_store) => { - let res = tokio::fs::remove_dir_all(&snapshot_path).await; - if let Err(e) = res { - // This is not critical; since we move the SST files into RocksDB on import, at - // worst the snapshot metadata file will be retained. - warn!( - partition_id = %partition_id, - ?snapshot_path, - "Failed to remove local snapshot directory, continuing with startup: {:?}", - e - ); - } - partition_store - } - Err(e) => { - warn!( - partition_id = %partition_id, - ?snapshot_path, - "Failed to import snapshot, local copy retained" - ); - return Err(anyhow::anyhow!(e)); - } - } - } else { - info!( - partition_id = %partition_id, - "No snapshot found to bootstrap partition, creating new store", - ); - partition_store_manager - .open_partition_store( - partition_id, - key_range, - OpenMode::CreateIfMissing, - &options.storage.rocksdb, - ) - .await? - } - } else { - partition_store_manager - .open_partition_store( - partition_id, - key_range, - OpenMode::OpenExisting, - &options.storage.rocksdb, - ) - .await? - }; + let partition_store = open_partition_store( + partition_id, + partition_store_manager, + snapshot_repository, + fast_forward_lsn, + &options, + key_range, + ) + .await?; TaskCenter::spawn_child( TaskKind::SystemService, @@ -256,3 +173,152 @@ impl SpawnPartitionProcessorTask { Ok((state, root_task_handle)) } } + +async fn open_partition_store( + partition_id: PartitionId, + partition_store_manager: PartitionStoreManager, + snapshot_repository: Option, + fast_forward_lsn: Option, + options: &WorkerOptions, + key_range: RangeInclusive, +) -> anyhow::Result { + if partition_store_manager + .has_partition_store(partition_id) + .await + && fast_forward_lsn.is_none() + { + return Ok(partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::OpenExisting, + &options.storage.rocksdb, + ) + .await?); + } + + // We don't have a local partition store initialized - or we have a fast-forward LSN target set. + // Attempt to get the latest available snapshot from the snapshots repository: + let snapshot = match snapshot_repository { + Some(repository) => { + debug!("Looking for partition snapshot from which to bootstrap partition store"); + // todo(pavel): pass target LSN to repository + repository.get_latest(partition_id).await? + } + None => { + debug!("No snapshot repository configured"); + None + } + }; + + match (snapshot, fast_forward_lsn) { + (None, None) => { + info!("No snapshot found to bootstrap partition, creating new store"); + Ok(partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::CreateIfMissing, + &options.storage.rocksdb, + ) + .await?) + } + (Some(snapshot), None) => { + info!(partition_id = %partition_id, "Found partition snapshot, restoring it"); + Ok(import_snapshot( + partition_id, + key_range, + snapshot, + partition_store_manager, + options, + ) + .await?) + } + (Some(snapshot), Some(fast_forward_lsn)) + if snapshot.min_applied_lsn >= fast_forward_lsn => + { + // We trust that the fast_forward_lsn is greater than the locally applied LSN. + info!( + latest_snapshot_lsn = ?snapshot.min_applied_lsn, + ?fast_forward_lsn, + "Found snapshot with LSN >= target LSN, dropping local partition store state", + ); + partition_store_manager.drop_partition(partition_id).await; + Ok(import_snapshot( + partition_id, + key_range, + snapshot, + partition_store_manager, + options, + ) + .await?) + } + (maybe_snapshot, Some(fast_forward_lsn)) => { + // Play it safe and keep the partition store intact; we can't do much else at this + // point. We'll likely halt again as soon as the processor starts up. + if let Some(snapshot) = maybe_snapshot { + warn!( + ?snapshot.min_applied_lsn, + ?fast_forward_lsn, + "Latest available snapshot is older than the the fast-forward target LSN!", + ); + } else { + warn!( + ?fast_forward_lsn, + "A fast-forward target LSN is set, but no snapshot available for partition!", + ); + } + // todo(pavel): backoff for a while and return Err + Ok(partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::OpenExisting, + &options.storage.rocksdb, + ) + .await?) + } + } +} + +async fn import_snapshot( + partition_id: PartitionId, + key_range: RangeInclusive, + snapshot: LocalPartitionSnapshot, + partition_store_manager: PartitionStoreManager, + options: &WorkerOptions, +) -> anyhow::Result { + let snapshot_path = snapshot.base_dir.clone(); + match partition_store_manager + .open_partition_store_from_snapshot( + partition_id, + key_range.clone(), + snapshot, + &options.storage.rocksdb, + ) + .await + { + Ok(partition_store) => { + let res = tokio::fs::remove_dir_all(&snapshot_path).await; + if let Err(e) = res { + // This is not critical; since we move the SST files into RocksDB on import, + // at worst only the snapshot metadata file will remain in the staging dir + warn!( + partition_id = %partition_id, + ?snapshot_path, + "Failed to remove local snapshot directory, continuing with startup: {:?}", + e, + ); + } + Ok(partition_store) + } + Err(e) => { + warn!( + partition_id = %partition_id, + ?snapshot_path, + "Failed to import snapshot, local snapshot data retained" + ); + Err(anyhow::anyhow!(e)) + } + } +}