From 41ff6ff07fda51f08d0777dd82988829957ceb19 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Fri, 20 Dec 2024 15:10:12 +0200 Subject: [PATCH 01/11] Add stopped-reason to ProcessorManager root future This allows us to signal the PPM about log trim gaps that the PP may encounter at runtime, which require special handling. --- crates/core/src/task_center.rs | 7 +- crates/core/src/task_center/handle.rs | 8 +- crates/worker/src/partition/mod.rs | 151 +++++++++++------- .../worker/src/partition_processor_manager.rs | 29 +++- .../spawn_processor_task.rs | 8 +- 5 files changed, 137 insertions(+), 66 deletions(-) diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index 5f2969cb0..146c8f9d0 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -520,15 +520,16 @@ impl TaskCenterInner { /// Starts the `root_future` on a new runtime. The runtime is stopped once the root future /// completes. - pub fn start_runtime( + pub fn start_runtime( self: &Arc, root_task_kind: TaskKind, runtime_name: &'static str, partition_id: Option, root_future: impl FnOnce() -> F + Send + 'static, - ) -> Result>, RuntimeError> + ) -> Result, RuntimeError> where - F: Future> + 'static, + F: Future + 'static, + R: Send + 'static, { if self.shutdown_requested.load(Ordering::Relaxed) { return Err(ShutdownError.into()); diff --git a/crates/core/src/task_center/handle.rs b/crates/core/src/task_center/handle.rs index e87c7da27..a2da66187 100644 --- a/crates/core/src/task_center/handle.rs +++ b/crates/core/src/task_center/handle.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::fmt::Debug; use std::future::Future; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -77,15 +78,16 @@ impl Handle { self.inner.block_on(future) } - pub fn start_runtime( + pub fn start_runtime( &self, root_task_kind: TaskKind, runtime_name: &'static str, partition_id: Option, root_future: impl FnOnce() -> F + Send + 'static, - ) -> Result>, RuntimeError> + ) -> Result, RuntimeError> where - F: Future> + 'static, + F: Future + 'static, + R: Send + Debug + 'static, { self.inner .start_runtime(root_task_kind, runtime_name, partition_id, root_future) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 571812edd..44be014a2 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -252,26 +252,44 @@ pub struct PartitionProcessor { partition_store: PartitionStore, } +#[derive(Debug)] +pub enum ProcessorStopReason { + Cancelled, + LogTrimGap { to_lsn: Lsn }, +} + +enum Record { + Envelope(Lsn, Arc), + TrimGap(Lsn), +} + impl PartitionProcessor where Codec: RawEntryCodec + Default + Debug, InvokerSender: restate_invoker_api::InvokerHandle> + Clone, { - #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))] - pub async fn run(mut self) -> anyhow::Result<()> { + #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty + ))] + pub async fn run(mut self) -> anyhow::Result { info!("Starting the partition processor."); let res = tokio::select! { res = self.run_inner() => { match res.as_ref() { - Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."), + Ok(stopped) => { + match stopped { + ProcessorStopReason::LogTrimGap { to_lsn } => + info!(?to_lsn, "Shutting partition processor down because we encountered a trim gap in the log."), + _ => warn!("Shutting partition processor down because it stopped unexpectedly.") + } + }, Err(err) => warn!("Shutting partition processor down because it failed: {err}"), } res }, _ = cancellation_watcher() => { debug!("Shutting partition processor down because it was cancelled."); - Ok(()) + Ok(ProcessorStopReason::Cancelled) }, }; @@ -293,7 +311,8 @@ where res } - async fn run_inner(&mut self) -> anyhow::Result<()> { + // Runs as long as log records are available, or returns + async fn run_inner(&mut self) -> anyhow::Result { let mut partition_store = self.partition_store.clone(); let last_applied_lsn = partition_store.get_applied_lsn().await?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); @@ -355,11 +374,18 @@ where .map_ok(|entry| { trace!(?entry, "Read entry"); let lsn = entry.sequence_number(); - let Some(envelope) = entry.try_decode_arc::() else { - // trim-gap - unimplemented!("Handling trim gap is currently not supported") - }; - anyhow::Ok((lsn, envelope?)) + if entry.is_data_record() { + entry + .try_decode_arc::() + .map(|envelope| anyhow::Ok(Record::Envelope(lsn, envelope?))) + .expect("data record is present") + } else { + anyhow::Ok(Record::TrimGap( + entry + .trim_gap_to_sequence_number() + .expect("trim gap has to-LSN"), + )) + } }) .try_take_while(|entry| { // a catch-all safety net if all lower layers didn't filter this record out. This @@ -367,9 +393,10 @@ where // // At some point, we should remove this and trust that stored records have Keys // stored correctly. - std::future::ready(Ok(entry - .as_ref() - .is_ok_and(|(_, envelope)| envelope.matches_key_query(&key_query)))) + std::future::ready(Ok(entry.as_ref().is_ok_and(|r| match r { + Record::Envelope(_, e) => e.matches_key_query(&key_query), + Record::TrimGap(_) => true, + }))) }); // avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis. @@ -417,49 +444,60 @@ where // clear buffers used when applying the next record action_collector.clear(); - for (lsn, envelope) in command_buffer.drain(..) { - let command_start = Instant::now(); - - trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header); - - let leadership_change = self.apply_record( - lsn, - envelope, - &mut transaction, - &mut action_collector).await?; - - apply_command_latency.record(command_start.elapsed()); - - if let Some((header, announce_leader)) = leadership_change { - // commit all changes so far, this is important so that the actuators see all changes - // when becoming leader. - transaction.commit().await?; - - // We can ignore all actions collected so far because as a new leader we have to instruct the - // actuators afresh. - action_collector.clear(); - - self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); - if header.source.is_processor_generational() { - let Source::Processor { generational_node_id, .. } = header.source else { - unreachable!("processor source must have generational_node_id"); - }; - // all new AnnounceLeader messages should come from a PartitionProcessor - self.status.last_observed_leader_node = generational_node_id; - } else if announce_leader.node_id.is_some() { - // older AnnounceLeader messages have the announce_leader.node_id set - self.status.last_observed_leader_node = announce_leader.node_id; - } + let mut trim_gap = None; - let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?; + for record in command_buffer.drain(..) { + match record { + Record::Envelope(lsn, envelope) => { + assert!(trim_gap.is_none(), "Expecting single trim gap to be the last record in a batch!"); + let command_start = Instant::now(); - Span::current().record("is_leader", is_leader); + trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header); - if is_leader { - self.status.effective_mode = RunMode::Leader; - } + let leadership_change = self.apply_record( + lsn, + envelope, + &mut transaction, + &mut action_collector).await?; - transaction = partition_store.transaction(); + apply_command_latency.record(command_start.elapsed()); + + if let Some((header, announce_leader)) = leadership_change { + // commit all changes so far, this is important so that the actuators see all changes + // when becoming leader. + transaction.commit().await?; + + // We can ignore all actions collected so far because as a new leader we have to instruct the + // actuators afresh. + action_collector.clear(); + + self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); + if header.source.is_processor_generational() { + let Source::Processor { generational_node_id, .. } = header.source else { + unreachable!("processor source must have generational_node_id"); + }; + // all new AnnounceLeader messages should come from a PartitionProcessor + self.status.last_observed_leader_node = generational_node_id; + } else if announce_leader.node_id.is_some() { + // older AnnounceLeader messages have the announce_leader.node_id set + self.status.last_observed_leader_node = announce_leader.node_id; + } + + let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?; + + Span::current().record("is_leader", is_leader); + + if is_leader { + self.status.effective_mode = RunMode::Leader; + } + + transaction = partition_store.transaction(); + } + } + Record::TrimGap(to_lsn) => { + assert!(trim_gap.is_none(), "Expecting only a single trim gap!"); + trim_gap = Some(to_lsn) + } } } @@ -468,6 +506,10 @@ where let actions_start = Instant::now(); self.leadership_state.handle_actions(action_collector.drain(..)).await?; record_actions_latency.record(actions_start.elapsed()); + + if let Some(to_lsn) = trim_gap { + return Ok(ProcessorStopReason::LogTrimGap { to_lsn }) + } }, result = self.leadership_state.run() => { let action_effects = result?; @@ -817,11 +859,10 @@ where async fn read_commands( log_reader: &mut S, max_batching_size: usize, - record_buffer: &mut Vec<(Lsn, Arc)>, + record_buffer: &mut Vec, ) -> anyhow::Result<()> where - S: Stream)>, restate_bifrost::Error>> - + Unpin, + S: Stream, restate_bifrost::Error>> + Unpin, { // beyond this point we must not await; otherwise we are no longer cancellation safe let first_record = log_reader.next().await; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 1a0492d89..dd9154576 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -73,6 +73,7 @@ use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository}; +use crate::partition::ProcessorStopReason; use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; use crate::partition_processor_manager::processor_state::{ @@ -103,6 +104,7 @@ pub struct PartitionProcessorManager { pending_snapshots: HashMap, snapshot_export_tasks: FuturesUnordered>, + snapshot_import_tasks: FuturesUnordered>, snapshot_repository: Option, } @@ -112,6 +114,7 @@ struct PendingSnapshotTask { } type SnapshotResultInternal = Result; +type ImportSnapshotResultInternal = Result<(), SnapshotError>; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -425,7 +428,20 @@ impl PartitionProcessorManager { ProcessorState::Started { processor, .. } => { self.invokers_status_reader .remove(processor.as_ref().expect("must be some").key_range()); - warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}"); + + match result { + Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => { + if self.snapshot_repository.is_some() { + info!(%partition_id, "Partition processor stopped due to a log trim gap, will look for snapshot with LSN >= {to_lsn}"); + // todo(pavel): spawn snapshot import task for the partition + } else { + warn!(%partition_id, "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}"); + } + } + _ => { + warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}") + } + } } ProcessorState::Stopping { processor, @@ -487,7 +503,7 @@ impl PartitionProcessorManager { fn await_runtime_task_result( &mut self, partition_id: PartitionId, - runtime_task_handle: RuntimeTaskHandle>, + runtime_task_handle: RuntimeTaskHandle>, ) { self.asynchronous_operations.spawn( async move { @@ -921,8 +937,13 @@ struct AsynchronousEvent { #[derive(strum::IntoStaticStr)] enum EventKind { - Started(anyhow::Result<(StartedProcessor, RuntimeTaskHandle>)>), - Stopped(anyhow::Result<()>), + Started( + anyhow::Result<( + StartedProcessor, + RuntimeTaskHandle>, + )>, + ), + Stopped(anyhow::Result), NewLeaderEpoch { leader_epoch_token: LeaderEpochToken, result: anyhow::Result, diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index f71d9907d..7bd876bb7 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -27,6 +27,7 @@ use restate_types::schema::Schema; use crate::invoker_integration::EntryEnricher; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::snapshots::SnapshotRepository; +use crate::partition::ProcessorStopReason; use crate::partition_processor_manager::processor_state::StartedProcessor; use crate::PartitionProcessorBuilder; @@ -68,7 +69,12 @@ impl SpawnPartitionProcessorTask { partition_id=%self.partition_id, ) )] - pub fn run(self) -> anyhow::Result<(StartedProcessor, RuntimeTaskHandle>)> { + pub fn run( + self, + ) -> anyhow::Result<( + StartedProcessor, + RuntimeTaskHandle>, + )> { let Self { task_name, partition_id, From c29f64f3327c8b38d36bc6fbe992e80d1747558a Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 23 Dec 2024 13:54:21 +0200 Subject: [PATCH 02/11] Add trim-gap handling by fast-forwarding the partition state on startup --- .../worker/src/partition_processor_manager.rs | 23 +++++++++------- .../spawn_processor_task.rs | 27 +++++++++++++++---- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index dd9154576..db29a9b59 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -104,8 +104,8 @@ pub struct PartitionProcessorManager { pending_snapshots: HashMap, snapshot_export_tasks: FuturesUnordered>, - snapshot_import_tasks: FuturesUnordered>, snapshot_repository: Option, + fast_forward_on_startup: HashMap, } struct PendingSnapshotTask { @@ -114,7 +114,6 @@ struct PendingSnapshotTask { } type SnapshotResultInternal = Result; -type ImportSnapshotResultInternal = Result<(), SnapshotError>; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -204,6 +203,7 @@ impl PartitionProcessorManager { snapshot_export_tasks: FuturesUnordered::default(), pending_snapshots: HashMap::default(), snapshot_repository, + fast_forward_on_startup: HashMap::default(), } } @@ -432,10 +432,17 @@ impl PartitionProcessorManager { match result { Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => { if self.snapshot_repository.is_some() { - info!(%partition_id, "Partition processor stopped due to a log trim gap, will look for snapshot with LSN >= {to_lsn}"); - // todo(pavel): spawn snapshot import task for the partition + info!( + %partition_id, + trim_gap_to_lsn = ?to_lsn, + "Partition processor stopped due to a log trim gap, will stop and attempt to fast-forward", + ); + self.fast_forward_on_startup.insert(partition_id, to_lsn); } else { - warn!(%partition_id, "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}"); + warn!( + %partition_id, + "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}", + ); } } _ => { @@ -732,10 +739,7 @@ impl PartitionProcessorManager { self.spawn_create_snapshot_task(partition_id, snapshot_repository, Some(sender)); } - fn on_create_snapshot_task_completed( - &mut self, - result: Result, - ) { + fn on_create_snapshot_task_completed(&mut self, result: SnapshotResultInternal) { let (partition_id, response) = match result { Ok(metadata) => { self.archived_lsns @@ -892,6 +896,7 @@ impl PartitionProcessorManager { self.bifrost.clone(), self.partition_store_manager.clone(), self.snapshot_repository.clone(), + self.fast_forward_on_startup.remove(&partition_id), ) } diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 7bd876bb7..c0cb9ca53 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -22,6 +22,7 @@ use restate_types::cluster::cluster_state::PartitionProcessorStatus; use restate_types::config::Configuration; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::live::Live; +use restate_types::logs::Lsn; use restate_types::schema::Schema; use crate::invoker_integration::EntryEnricher; @@ -39,6 +40,7 @@ pub struct SpawnPartitionProcessorTask { bifrost: Bifrost, partition_store_manager: PartitionStoreManager, snapshot_repository: Option, + fast_forward_lsn: Option, } impl SpawnPartitionProcessorTask { @@ -51,6 +53,7 @@ impl SpawnPartitionProcessorTask { bifrost: Bifrost, partition_store_manager: PartitionStoreManager, snapshot_repository: Option, + fast_forward_lsn: Option, ) -> Self { Self { task_name, @@ -60,6 +63,7 @@ impl SpawnPartitionProcessorTask { bifrost, partition_store_manager, snapshot_repository, + fast_forward_lsn, } } @@ -83,6 +87,7 @@ impl SpawnPartitionProcessorTask { bifrost, partition_store_manager, snapshot_repository, + fast_forward_lsn, } = self; let config = configuration.pinned(); @@ -130,7 +135,7 @@ impl SpawnPartitionProcessorTask { let key_range = key_range.clone(); move || async move { - let partition_store = if !partition_store_manager + let partition_store = if fast_forward_lsn.is_some() || !partition_store_manager .has_partition_store(pp_builder.partition_id) .await { @@ -150,10 +155,22 @@ impl SpawnPartitionProcessorTask { if let Some(snapshot) = snapshot { - info!( - partition_id = %partition_id, - "Found snapshot to bootstrap partition, restoring it", - ); + match fast_forward_lsn { + Some(trim_gap_to_lsn) => { + if snapshot.min_applied_lsn >= trim_gap_to_lsn { + info!("Found snapshot with state beyond the trim gap, resetting local partition store..."); + partition_store_manager.drop_partition(partition_id).await; + } else { + warn!("Latest snapshot is before the trim gap!"); + } + }, + None => { + info!( + partition_id = %partition_id, + "Found snapshot to bootstrap partition, restoring it", + ); + }, + } let snapshot_path = snapshot.base_dir.clone(); match partition_store_manager From 87f0c5cdf896fa120288e146e676bfe1bc4630d9 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 23 Dec 2024 16:04:53 +0200 Subject: [PATCH 03/11] Simplify open partition store logic --- crates/worker/src/partition/mod.rs | 6 +- .../spawn_processor_task.rs | 254 +++++++++++------- 2 files changed, 165 insertions(+), 95 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 44be014a2..bb5f51d95 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -279,7 +279,10 @@ where Ok(stopped) => { match stopped { ProcessorStopReason::LogTrimGap { to_lsn } => - info!(?to_lsn, "Shutting partition processor down because we encountered a trim gap in the log."), + info!( + trim_gap_to_lsn = ?to_lsn, + "Shutting partition processor down because we encountered a trim gap in the log." + ), _ => warn!("Shutting partition processor down because it stopped unexpectedly.") } }, @@ -495,6 +498,7 @@ where } } Record::TrimGap(to_lsn) => { + // todo(pavel): this assumption may be violated if a command batch contains some records and a trim gap! assert!(trim_gap.is_none(), "Expecting only a single trim gap!"); trim_gap = Some(to_lsn) } diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index c0cb9ca53..a9bd758aa 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -16,10 +16,11 @@ use tracing::{debug, info, instrument, warn}; use restate_bifrost::Bifrost; use restate_core::{Metadata, RuntimeTaskHandle, TaskCenter, TaskKind}; use restate_invoker_impl::Service as InvokerService; +use restate_partition_store::snapshots::LocalPartitionSnapshot; use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_types::cluster::cluster_state::PartitionProcessorStatus; -use restate_types::config::Configuration; +use restate_types::config::{Configuration, WorkerOptions}; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::live::Live; use restate_types::logs::Lsn; @@ -135,99 +136,15 @@ impl SpawnPartitionProcessorTask { let key_range = key_range.clone(); move || async move { - let partition_store = if fast_forward_lsn.is_some() || !partition_store_manager - .has_partition_store(pp_builder.partition_id) - .await - { - let snapshot = if snapshot_repository.is_none() { - debug!( - partition_id = %partition_id, - "No snapshot repository configured", - ); - None - } else { - debug!( - partition_id = %partition_id, - "Looking for partition snapshot from which to bootstrap partition store", - ); - snapshot_repository.expect("is some").get_latest(partition_id).await? - }; - - - if let Some(snapshot) = snapshot { - match fast_forward_lsn { - Some(trim_gap_to_lsn) => { - if snapshot.min_applied_lsn >= trim_gap_to_lsn { - info!("Found snapshot with state beyond the trim gap, resetting local partition store..."); - partition_store_manager.drop_partition(partition_id).await; - } else { - warn!("Latest snapshot is before the trim gap!"); - } - }, - None => { - info!( - partition_id = %partition_id, - "Found snapshot to bootstrap partition, restoring it", - ); - }, - } - - let snapshot_path = snapshot.base_dir.clone(); - match partition_store_manager - .open_partition_store_from_snapshot( - partition_id, - key_range.clone(), - snapshot, - &options.storage.rocksdb, - ) - .await { - Ok(partition_store) => { - let res = tokio::fs::remove_dir_all(&snapshot_path).await; - if let Err(e) = res { - // This is not critical; since we move the SST files into RocksDB on import, at - // worst the snapshot metadata file will be retained. - warn!( - partition_id = %partition_id, - ?snapshot_path, - "Failed to remove local snapshot directory, continuing with startup: {:?}", - e - ); - } - partition_store - } - Err(e) => { - warn!( - partition_id = %partition_id, - ?snapshot_path, - "Failed to import snapshot, local copy retained" - ); - return Err(anyhow::anyhow!(e)); - } - } - } else { - info!( - partition_id = %partition_id, - "No snapshot found to bootstrap partition, creating new store", - ); - partition_store_manager - .open_partition_store( - partition_id, - key_range, - OpenMode::CreateIfMissing, - &options.storage.rocksdb, - ) - .await? - } - } else { - partition_store_manager - .open_partition_store( - partition_id, - key_range, - OpenMode::OpenExisting, - &options.storage.rocksdb, - ) - .await? - }; + let partition_store = open_partition_store( + partition_id, + partition_store_manager, + snapshot_repository, + fast_forward_lsn, + &options, + key_range, + ) + .await?; TaskCenter::spawn_child( TaskKind::SystemService, @@ -256,3 +173,152 @@ impl SpawnPartitionProcessorTask { Ok((state, root_task_handle)) } } + +async fn open_partition_store( + partition_id: PartitionId, + partition_store_manager: PartitionStoreManager, + snapshot_repository: Option, + fast_forward_lsn: Option, + options: &WorkerOptions, + key_range: RangeInclusive, +) -> anyhow::Result { + if partition_store_manager + .has_partition_store(partition_id) + .await + && fast_forward_lsn.is_none() + { + return Ok(partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::OpenExisting, + &options.storage.rocksdb, + ) + .await?); + } + + // We don't have a local partition store initialized - or we have a fast-forward LSN target set. + // Attempt to get the latest available snapshot from the snapshots repository: + let snapshot = match snapshot_repository { + Some(repository) => { + debug!("Looking for partition snapshot from which to bootstrap partition store"); + // todo(pavel): pass target LSN to repository + repository.get_latest(partition_id).await? + } + None => { + debug!("No snapshot repository configured"); + None + } + }; + + match (snapshot, fast_forward_lsn) { + (None, None) => { + info!("No snapshot found to bootstrap partition, creating new store"); + Ok(partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::CreateIfMissing, + &options.storage.rocksdb, + ) + .await?) + } + (Some(snapshot), None) => { + info!(partition_id = %partition_id, "Found partition snapshot, restoring it"); + Ok(import_snapshot( + partition_id, + key_range, + snapshot, + partition_store_manager, + options, + ) + .await?) + } + (Some(snapshot), Some(fast_forward_lsn)) + if snapshot.min_applied_lsn >= fast_forward_lsn => + { + // We trust that the fast_forward_lsn is greater than the locally applied LSN. + info!( + latest_snapshot_lsn = ?snapshot.min_applied_lsn, + ?fast_forward_lsn, + "Found snapshot with LSN >= target LSN, dropping local partition store state", + ); + partition_store_manager.drop_partition(partition_id).await; + Ok(import_snapshot( + partition_id, + key_range, + snapshot, + partition_store_manager, + options, + ) + .await?) + } + (maybe_snapshot, Some(fast_forward_lsn)) => { + // Play it safe and keep the partition store intact; we can't do much else at this + // point. We'll likely halt again as soon as the processor starts up. + if let Some(snapshot) = maybe_snapshot { + warn!( + ?snapshot.min_applied_lsn, + ?fast_forward_lsn, + "Latest available snapshot is older than the the fast-forward target LSN!", + ); + } else { + warn!( + ?fast_forward_lsn, + "A fast-forward target LSN is set, but no snapshot available for partition!", + ); + } + // todo(pavel): backoff for a while and return Err + Ok(partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::OpenExisting, + &options.storage.rocksdb, + ) + .await?) + } + } +} + +async fn import_snapshot( + partition_id: PartitionId, + key_range: RangeInclusive, + snapshot: LocalPartitionSnapshot, + partition_store_manager: PartitionStoreManager, + options: &WorkerOptions, +) -> anyhow::Result { + let snapshot_path = snapshot.base_dir.clone(); + match partition_store_manager + .open_partition_store_from_snapshot( + partition_id, + key_range.clone(), + snapshot, + &options.storage.rocksdb, + ) + .await + { + Ok(partition_store) => { + let res = tokio::fs::remove_dir_all(&snapshot_path).await; + if let Err(e) = res { + // This is not critical; since we move the SST files into RocksDB on import, + // at worst only the snapshot metadata file will remain in the staging dir + warn!( + partition_id = %partition_id, + ?snapshot_path, + "Failed to remove local snapshot directory, continuing with startup: {:?}", + e, + ); + } + Ok(partition_store) + } + Err(e) => { + warn!( + partition_id = %partition_id, + ?snapshot_path, + "Failed to import snapshot, local snapshot data retained" + ); + Err(anyhow::anyhow!(e)) + } + } +} From ff82b72ecef7840bcc77f03253fd8d08e6fc3713 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 23 Dec 2024 16:28:50 +0200 Subject: [PATCH 04/11] Self-review --- crates/worker/src/partition/mod.rs | 6 ++---- crates/worker/src/partition_processor_manager.rs | 9 +++------ 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index bb5f51d95..2006d653c 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -268,8 +268,7 @@ where Codec: RawEntryCodec + Default + Debug, InvokerSender: restate_invoker_api::InvokerHandle> + Clone, { - #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty - ))] + #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))] pub async fn run(mut self) -> anyhow::Result { info!("Starting the partition processor."); @@ -281,7 +280,7 @@ where ProcessorStopReason::LogTrimGap { to_lsn } => info!( trim_gap_to_lsn = ?to_lsn, - "Shutting partition processor down because we encountered a trim gap in the log." + "Shutting partition processor down because it encountered a trim gap in the log." ), _ => warn!("Shutting partition processor down because it stopped unexpectedly.") } @@ -314,7 +313,6 @@ where res } - // Runs as long as log records are available, or returns async fn run_inner(&mut self) -> anyhow::Result { let mut partition_store = self.partition_store.clone(); let last_applied_lsn = partition_store.get_applied_lsn().await?; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index db29a9b59..d76f66375 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -348,8 +348,7 @@ impl PartitionProcessorManager { } } - #[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner) - ))] + #[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))] fn on_asynchronous_event(&mut self, event: AsynchronousEvent) { let AsynchronousEvent { partition_id, @@ -433,20 +432,18 @@ impl PartitionProcessorManager { Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => { if self.snapshot_repository.is_some() { info!( - %partition_id, trim_gap_to_lsn = ?to_lsn, - "Partition processor stopped due to a log trim gap, will stop and attempt to fast-forward", + "Partition processor stopped due to a log trim gap, will attempt to fast-forward on restart", ); self.fast_forward_on_startup.insert(partition_id, to_lsn); } else { warn!( - %partition_id, "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}", ); } } _ => { - warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}") + warn!("Partition processor exited unexpectedly: {result:?}") } } } From c42b1309d52a5756c48375375cb70525e5f93e4e Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 23 Dec 2024 16:44:40 +0200 Subject: [PATCH 05/11] Stop reading commands from the log if a TrimGap is encountered --- crates/worker/src/partition/mod.rs | 19 ++++++++++++++++--- .../worker/src/partition_processor_manager.rs | 6 +++++- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 2006d653c..bafd241db 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -263,6 +263,12 @@ enum Record { TrimGap(Lsn), } +impl Record { + fn is_trim_gap(&self) -> bool { + matches!(self, Record::TrimGap(_)) + } +} + impl PartitionProcessor where Codec: RawEntryCodec + Default + Debug, @@ -496,7 +502,6 @@ where } } Record::TrimGap(to_lsn) => { - // todo(pavel): this assumption may be violated if a command batch contains some records and a trim gap! assert!(trim_gap.is_none(), "Expecting only a single trim gap!"); trim_gap = Some(to_lsn) } @@ -857,7 +862,7 @@ where } /// Tries to read as many records from the `log_reader` as are immediately available and stops - /// reading at `max_batching_size`. + /// reading at `max_batching_size`, or at a TrimGap - whichever comes first. async fn read_commands( log_reader: &mut S, max_batching_size: usize, @@ -885,7 +890,15 @@ where anyhow::bail!("Read stream terminated for partition processor"); }; - record_buffer.push(record??); + let record = record??; + let trim_gap = record.is_trim_gap(); + + record_buffer.push(record); + + // Ensure that a trim gap is always the last record in a batch + if trim_gap { + break; + } } else { // no more immediately available records found break; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index d76f66375..ad063cabc 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -348,7 +348,11 @@ impl PartitionProcessorManager { } } - #[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))] + #[instrument( + level = "debug", + skip_all, + fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)) + )] fn on_asynchronous_event(&mut self, event: AsynchronousEvent) { let AsynchronousEvent { partition_id, From 1e3ed7e912cd0beef41f720dfc2e6a61780f7ffd Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 23 Dec 2024 16:52:44 +0200 Subject: [PATCH 06/11] Delay startup when we have a fast-forward target we can't reach --- .../partition_processor_manager/spawn_processor_task.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index a9bd758aa..04dba6e08 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::ops::RangeInclusive; +use std::time::Duration; use tokio::sync::{mpsc, watch}; use tracing::{debug, info, instrument, warn}; @@ -268,7 +269,13 @@ async fn open_partition_store( "A fast-forward target LSN is set, but no snapshot available for partition!", ); } - // todo(pavel): backoff for a while and return Err + + // We expect the processor startup attempt will fail, avoid spinning too fast. + tokio::time::sleep(Duration::from_millis( + 10_000 + rand::random::() % 10_000, + )) + .await; + Ok(partition_store_manager .open_partition_store( partition_id, From 49c9004d5e145738d7f7a6557a56b6685e80d584 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Tue, 24 Dec 2024 11:57:06 +0200 Subject: [PATCH 07/11] PR feedback misc --- crates/worker/src/partition/mod.rs | 21 +++++++++---------- .../spawn_processor_task.rs | 8 +++---- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index bafd241db..a4e734e9c 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -274,23 +274,22 @@ where Codec: RawEntryCodec + Default + Debug, InvokerSender: restate_invoker_api::InvokerHandle> + Clone, { - #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))] + #[instrument( + level = "error", skip_all, + fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty) + )] pub async fn run(mut self) -> anyhow::Result { info!("Starting the partition processor."); let res = tokio::select! { res = self.run_inner() => { match res.as_ref() { - Ok(stopped) => { - match stopped { - ProcessorStopReason::LogTrimGap { to_lsn } => - info!( - trim_gap_to_lsn = ?to_lsn, - "Shutting partition processor down because it encountered a trim gap in the log." - ), - _ => warn!("Shutting partition processor down because it stopped unexpectedly.") - } - }, + Ok( ProcessorStopReason::LogTrimGap { to_lsn } ) => + info!( + trim_gap_to_lsn = ?to_lsn, + "Shutting partition processor down because it encountered a trim gap in the log." + ), + Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."), Err(err) => warn!("Shutting partition processor down because it failed: {err}"), } res diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 04dba6e08..0c702daaa 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -311,9 +311,8 @@ async fn import_snapshot( // This is not critical; since we move the SST files into RocksDB on import, // at worst only the snapshot metadata file will remain in the staging dir warn!( - partition_id = %partition_id, - ?snapshot_path, - "Failed to remove local snapshot directory, continuing with startup: {:?}", + snapshot_path = %snapshot_path.display(), + "Failed to remove local snapshot directory, continuing with startup: {}", e, ); } @@ -321,8 +320,7 @@ async fn import_snapshot( } Err(e) => { warn!( - partition_id = %partition_id, - ?snapshot_path, + snapshot_path = %snapshot_path.display(), "Failed to import snapshot, local snapshot data retained" ); Err(anyhow::anyhow!(e)) From 56116797ff1b6975e09902911566e6df35a47a58 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Tue, 24 Dec 2024 14:05:08 +0200 Subject: [PATCH 08/11] Model TrimGapEncountered as an error within the PP --- crates/worker/src/partition/mod.rs | 94 +++++++++---------- .../worker/src/partition_processor_manager.rs | 18 ++-- .../spawn_processor_task.rs | 10 +- 3 files changed, 61 insertions(+), 61 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index a4e734e9c..5c05795b5 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -24,7 +24,7 @@ use tracing::{debug, error, info, instrument, trace, warn, Span}; use restate_bifrost::Bifrost; use restate_core::network::{HasConnection, Incoming, Outgoing}; -use restate_core::{cancellation_watcher, TaskCenter, TaskKind}; +use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; use restate_partition_store::{PartitionStore, PartitionStoreTransaction}; use restate_storage_api::deduplication_table::{ DedupInformation, DedupSequenceNumber, DeduplicationTable, ProducerId, @@ -252,10 +252,16 @@ pub struct PartitionProcessor { partition_store: PartitionStore, } -#[derive(Debug)] +#[derive(Debug, derive_more::Display, thiserror::Error)] pub enum ProcessorStopReason { - Cancelled, - LogTrimGap { to_lsn: Lsn }, + TrimGapEncountered { gap_to_lsn: Lsn }, + Storage(#[from] StorageError), + Bifrost(#[from] restate_bifrost::Error), + StateMachine(#[from] state_machine::Error), + ActionEffect(#[from] leadership::Error), + ShutdownError(#[from] ShutdownError), + LogReadStreamTerminated, + Other(#[from] anyhow::Error), } enum Record { @@ -263,12 +269,6 @@ enum Record { TrimGap(Lsn), } -impl Record { - fn is_trim_gap(&self) -> bool { - matches!(self, Record::TrimGap(_)) - } -} - impl PartitionProcessor where Codec: RawEntryCodec + Default + Debug, @@ -278,25 +278,25 @@ where level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty) )] - pub async fn run(mut self) -> anyhow::Result { + pub async fn run(mut self) -> Result<(), ProcessorStopReason> { info!("Starting the partition processor."); let res = tokio::select! { res = self.run_inner() => { match res.as_ref() { - Ok( ProcessorStopReason::LogTrimGap { to_lsn } ) => + Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."), + Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) => info!( - trim_gap_to_lsn = ?to_lsn, + trim_gap_to_lsn = ?gap_to_lsn, "Shutting partition processor down because it encountered a trim gap in the log." ), - Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."), Err(err) => warn!("Shutting partition processor down because it failed: {err}"), } res }, _ = cancellation_watcher() => { debug!("Shutting partition processor down because it was cancelled."); - Ok(ProcessorStopReason::Cancelled) + Ok(()) }, }; @@ -318,9 +318,12 @@ where res } - async fn run_inner(&mut self) -> anyhow::Result { + async fn run_inner(&mut self) -> Result<(), ProcessorStopReason> { let mut partition_store = self.partition_store.clone(); - let last_applied_lsn = partition_store.get_applied_lsn().await?; + let last_applied_lsn = partition_store + .get_applied_lsn() + .await + .map_err(ProcessorStopReason::from)?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); self.status.last_applied_log_lsn = Some(last_applied_lsn); @@ -329,7 +332,8 @@ where let current_tail = self .bifrost .find_tail(LogId::from(self.partition_id)) - .await?; + .await + .map_err(ProcessorStopReason::from)?; debug!( last_applied_lsn = %last_applied_lsn, @@ -376,7 +380,8 @@ where key_query.clone(), last_applied_lsn.next(), Lsn::MAX, - )? + ) + .map_err(ProcessorStopReason::from)? .map_ok(|entry| { trace!(?entry, "Read entry"); let lsn = entry.sequence_number(); @@ -386,6 +391,7 @@ where .map(|envelope| anyhow::Ok(Record::Envelope(lsn, envelope?))) .expect("data record is present") } else { + // read_commands will translate trim gaps to errors anyhow::Ok(Record::TrimGap( entry .trim_gap_to_sequence_number() @@ -450,12 +456,9 @@ where // clear buffers used when applying the next record action_collector.clear(); - let mut trim_gap = None; - for record in command_buffer.drain(..) { match record { Record::Envelope(lsn, envelope) => { - assert!(trim_gap.is_none(), "Expecting single trim gap to be the last record in a batch!"); let command_start = Instant::now(); trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header); @@ -464,14 +467,14 @@ where lsn, envelope, &mut transaction, - &mut action_collector).await?; + &mut action_collector).await.map_err(ProcessorStopReason::from)?; apply_command_latency.record(command_start.elapsed()); if let Some((header, announce_leader)) = leadership_change { // commit all changes so far, this is important so that the actuators see all changes // when becoming leader. - transaction.commit().await?; + transaction.commit().await.map_err(ProcessorStopReason::from)?; // We can ignore all actions collected so far because as a new leader we have to instruct the // actuators afresh. @@ -489,7 +492,7 @@ where self.status.last_observed_leader_node = announce_leader.node_id; } - let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?; + let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorStopReason::from)?; Span::current().record("is_leader", is_leader); @@ -500,29 +503,24 @@ where transaction = partition_store.transaction(); } } - Record::TrimGap(to_lsn) => { - assert!(trim_gap.is_none(), "Expecting only a single trim gap!"); - trim_gap = Some(to_lsn) + Record::TrimGap(gap_to_lsn) => { + return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) } } } // Commit our changes and notify actuators about actions if we are the leader - transaction.commit().await?; + transaction.commit().await.map_err(ProcessorStopReason::from)?; let actions_start = Instant::now(); - self.leadership_state.handle_actions(action_collector.drain(..)).await?; + self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorStopReason::from)?; record_actions_latency.record(actions_start.elapsed()); - - if let Some(to_lsn) = trim_gap { - return Ok(ProcessorStopReason::LogTrimGap { to_lsn }) - } }, result = self.leadership_state.run() => { - let action_effects = result?; + let action_effects = result.map_err(ProcessorStopReason::from)?; // We process the action_effects not directly in the run future because it // requires the run future to be cancellation safe. In the future this could be // implemented. - self.leadership_state.handle_action_effects(action_effects).await?; + self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorStopReason::from)?; } } // Allow other tasks on this thread to run, but only if we have exhausted the coop @@ -861,12 +859,12 @@ where } /// Tries to read as many records from the `log_reader` as are immediately available and stops - /// reading at `max_batching_size`, or at a TrimGap - whichever comes first. + /// reading at `max_batching_size`. Trim gaps will result in an immediate error. async fn read_commands( log_reader: &mut S, max_batching_size: usize, record_buffer: &mut Vec, - ) -> anyhow::Result<()> + ) -> Result<(), ProcessorStopReason> where S: Stream, restate_bifrost::Error>> + Unpin, { @@ -874,8 +872,7 @@ where let first_record = log_reader.next().await; let Some(first_record) = first_record else { - // read stream terminated! - anyhow::bail!("Read stream terminated for partition processor"); + return Err(ProcessorStopReason::LogReadStreamTerminated); }; record_buffer.clear(); @@ -885,18 +882,13 @@ where // read more message from the stream but only if they are immediately available if let Some(record) = log_reader.next().now_or_never() { let Some(record) = record else { - // read stream terminated! - anyhow::bail!("Read stream terminated for partition processor"); + return Err(ProcessorStopReason::LogReadStreamTerminated); }; - - let record = record??; - let trim_gap = record.is_trim_gap(); - - record_buffer.push(record); - - // Ensure that a trim gap is always the last record in a batch - if trim_gap { - break; + match record?? { + Record::TrimGap(gap_to_lsn) => { + return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }); + } + record => record_buffer.push(record), } } else { // no more immediately available records found diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index ad063cabc..4cc90f574 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -433,7 +433,9 @@ impl PartitionProcessorManager { .remove(processor.as_ref().expect("must be some").key_range()); match result { - Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => { + Err(ProcessorStopReason::TrimGapEncountered { + gap_to_lsn: to_lsn, + }) => { if self.snapshot_repository.is_some() { info!( trim_gap_to_lsn = ?to_lsn, @@ -442,13 +444,17 @@ impl PartitionProcessorManager { self.fast_forward_on_startup.insert(partition_id, to_lsn); } else { warn!( - "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}", + trim_gap_to_lsn = ?to_lsn, + "Partition processor stopped due to a log trim gap, and no snapshot repository is configured", ); } } - _ => { + Err(_) => { warn!("Partition processor exited unexpectedly: {result:?}") } + Ok(_) => { + info!("Partition processor stopped.") + } } } ProcessorState::Stopping { @@ -511,7 +517,7 @@ impl PartitionProcessorManager { fn await_runtime_task_result( &mut self, partition_id: PartitionId, - runtime_task_handle: RuntimeTaskHandle>, + runtime_task_handle: RuntimeTaskHandle>, ) { self.asynchronous_operations.spawn( async move { @@ -946,10 +952,10 @@ enum EventKind { Started( anyhow::Result<( StartedProcessor, - RuntimeTaskHandle>, + RuntimeTaskHandle>, )>, ), - Stopped(anyhow::Result), + Stopped(Result<(), ProcessorStopReason>), NewLeaderEpoch { leader_epoch_token: LeaderEpochToken, result: anyhow::Result, diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 0c702daaa..77d4be9c1 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -79,7 +79,7 @@ impl SpawnPartitionProcessorTask { self, ) -> anyhow::Result<( StartedProcessor, - RuntimeTaskHandle>, + RuntimeTaskHandle>, )> { let Self { task_name, @@ -151,11 +151,13 @@ impl SpawnPartitionProcessorTask { TaskKind::SystemService, invoker_name, invoker.run(invoker_config), - )?; + ) + .map_err(|e| ProcessorStopReason::from(anyhow::anyhow!(e)))?; pp_builder .build::(bifrost, partition_store) - .await? + .await + .map_err(ProcessorStopReason::from)? .run() .await } @@ -199,7 +201,7 @@ async fn open_partition_store( } // We don't have a local partition store initialized - or we have a fast-forward LSN target set. - // Attempt to get the latest available snapshot from the snapshots repository: + // Attempt to get the latest available snapshot from the snapshot repository: let snapshot = match snapshot_repository { Some(repository) => { debug!("Looking for partition snapshot from which to bootstrap partition store"); From d75db99129f63594e8f61dbbc153e8e9e191e43e Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Tue, 24 Dec 2024 14:40:44 +0200 Subject: [PATCH 09/11] add RetryPolicy todo --- .../src/partition_processor_manager/spawn_processor_task.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 77d4be9c1..8f0d42422 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -273,6 +273,7 @@ async fn open_partition_store( } // We expect the processor startup attempt will fail, avoid spinning too fast. + // todo(pavel): replace this with RetryPolicy tokio::time::sleep(Duration::from_millis( 10_000 + rand::random::() % 10_000, )) From 84bab4f31c8b0dd83a404f86ac0cfa3ccb93c24f Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Fri, 27 Dec 2024 10:44:48 +0200 Subject: [PATCH 10/11] Rename StopReason to Error --- crates/worker/src/partition/mod.rs | 41 ++++++++++--------- .../worker/src/partition_processor_manager.rs | 10 ++--- .../spawn_processor_task.rs | 8 ++-- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 5c05795b5..4d78c6b25 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -253,7 +253,7 @@ pub struct PartitionProcessor { } #[derive(Debug, derive_more::Display, thiserror::Error)] -pub enum ProcessorStopReason { +pub enum ProcessorError { TrimGapEncountered { gap_to_lsn: Lsn }, Storage(#[from] StorageError), Bifrost(#[from] restate_bifrost::Error), @@ -278,19 +278,20 @@ where level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty) )] - pub async fn run(mut self) -> Result<(), ProcessorStopReason> { + pub async fn run(mut self) -> Result<(), ProcessorError> { info!("Starting the partition processor."); let res = tokio::select! { res = self.run_inner() => { match res.as_ref() { + // run_inner never returns normally Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."), - Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) => + Err(ProcessorError::TrimGapEncountered { gap_to_lsn }) => info!( trim_gap_to_lsn = ?gap_to_lsn, "Shutting partition processor down because it encountered a trim gap in the log." ), - Err(err) => warn!("Shutting partition processor down because it failed: {err}"), + Err(err) => warn!("Shutting partition processor down because of error: {err}"), } res }, @@ -318,12 +319,12 @@ where res } - async fn run_inner(&mut self) -> Result<(), ProcessorStopReason> { + async fn run_inner(&mut self) -> Result<(), ProcessorError> { let mut partition_store = self.partition_store.clone(); let last_applied_lsn = partition_store .get_applied_lsn() .await - .map_err(ProcessorStopReason::from)?; + .map_err(ProcessorError::from)?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); self.status.last_applied_log_lsn = Some(last_applied_lsn); @@ -333,7 +334,7 @@ where .bifrost .find_tail(LogId::from(self.partition_id)) .await - .map_err(ProcessorStopReason::from)?; + .map_err(ProcessorError::from)?; debug!( last_applied_lsn = %last_applied_lsn, @@ -381,7 +382,7 @@ where last_applied_lsn.next(), Lsn::MAX, ) - .map_err(ProcessorStopReason::from)? + .map_err(ProcessorError::from)? .map_ok(|entry| { trace!(?entry, "Read entry"); let lsn = entry.sequence_number(); @@ -467,14 +468,14 @@ where lsn, envelope, &mut transaction, - &mut action_collector).await.map_err(ProcessorStopReason::from)?; + &mut action_collector).await.map_err(ProcessorError::from)?; apply_command_latency.record(command_start.elapsed()); if let Some((header, announce_leader)) = leadership_change { // commit all changes so far, this is important so that the actuators see all changes // when becoming leader. - transaction.commit().await.map_err(ProcessorStopReason::from)?; + transaction.commit().await.map_err(ProcessorError::from)?; // We can ignore all actions collected so far because as a new leader we have to instruct the // actuators afresh. @@ -492,7 +493,7 @@ where self.status.last_observed_leader_node = announce_leader.node_id; } - let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorStopReason::from)?; + let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?; Span::current().record("is_leader", is_leader); @@ -504,23 +505,23 @@ where } } Record::TrimGap(gap_to_lsn) => { - return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) + return Err(ProcessorError::TrimGapEncountered { gap_to_lsn }) } } } // Commit our changes and notify actuators about actions if we are the leader - transaction.commit().await.map_err(ProcessorStopReason::from)?; + transaction.commit().await.map_err(ProcessorError::from)?; let actions_start = Instant::now(); - self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorStopReason::from)?; + self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorError::from)?; record_actions_latency.record(actions_start.elapsed()); }, result = self.leadership_state.run() => { - let action_effects = result.map_err(ProcessorStopReason::from)?; + let action_effects = result.map_err(ProcessorError::from)?; // We process the action_effects not directly in the run future because it // requires the run future to be cancellation safe. In the future this could be // implemented. - self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorStopReason::from)?; + self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorError::from)?; } } // Allow other tasks on this thread to run, but only if we have exhausted the coop @@ -864,7 +865,7 @@ where log_reader: &mut S, max_batching_size: usize, record_buffer: &mut Vec, - ) -> Result<(), ProcessorStopReason> + ) -> Result<(), ProcessorError> where S: Stream, restate_bifrost::Error>> + Unpin, { @@ -872,7 +873,7 @@ where let first_record = log_reader.next().await; let Some(first_record) = first_record else { - return Err(ProcessorStopReason::LogReadStreamTerminated); + return Err(ProcessorError::LogReadStreamTerminated); }; record_buffer.clear(); @@ -882,11 +883,11 @@ where // read more message from the stream but only if they are immediately available if let Some(record) = log_reader.next().now_or_never() { let Some(record) = record else { - return Err(ProcessorStopReason::LogReadStreamTerminated); + return Err(ProcessorError::LogReadStreamTerminated); }; match record?? { Record::TrimGap(gap_to_lsn) => { - return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }); + return Err(ProcessorError::TrimGapEncountered { gap_to_lsn }); } record => record_buffer.push(record), } diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 4cc90f574..bd4177a73 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -73,7 +73,7 @@ use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository}; -use crate::partition::ProcessorStopReason; +use crate::partition::ProcessorError; use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; use crate::partition_processor_manager::processor_state::{ @@ -433,7 +433,7 @@ impl PartitionProcessorManager { .remove(processor.as_ref().expect("must be some").key_range()); match result { - Err(ProcessorStopReason::TrimGapEncountered { + Err(ProcessorError::TrimGapEncountered { gap_to_lsn: to_lsn, }) => { if self.snapshot_repository.is_some() { @@ -517,7 +517,7 @@ impl PartitionProcessorManager { fn await_runtime_task_result( &mut self, partition_id: PartitionId, - runtime_task_handle: RuntimeTaskHandle>, + runtime_task_handle: RuntimeTaskHandle>, ) { self.asynchronous_operations.spawn( async move { @@ -952,10 +952,10 @@ enum EventKind { Started( anyhow::Result<( StartedProcessor, - RuntimeTaskHandle>, + RuntimeTaskHandle>, )>, ), - Stopped(Result<(), ProcessorStopReason>), + Stopped(Result<(), ProcessorError>), NewLeaderEpoch { leader_epoch_token: LeaderEpochToken, result: anyhow::Result, diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 8f0d42422..1ff9cf44c 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -30,7 +30,7 @@ use restate_types::schema::Schema; use crate::invoker_integration::EntryEnricher; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::snapshots::SnapshotRepository; -use crate::partition::ProcessorStopReason; +use crate::partition::ProcessorError; use crate::partition_processor_manager::processor_state::StartedProcessor; use crate::PartitionProcessorBuilder; @@ -79,7 +79,7 @@ impl SpawnPartitionProcessorTask { self, ) -> anyhow::Result<( StartedProcessor, - RuntimeTaskHandle>, + RuntimeTaskHandle>, )> { let Self { task_name, @@ -152,12 +152,12 @@ impl SpawnPartitionProcessorTask { invoker_name, invoker.run(invoker_config), ) - .map_err(|e| ProcessorStopReason::from(anyhow::anyhow!(e)))?; + .map_err(|e| ProcessorError::from(anyhow::anyhow!(e)))?; pp_builder .build::(bifrost, partition_store) .await - .map_err(ProcessorStopReason::from)? + .map_err(ProcessorError::from)? .run() .await } From 2539aa3a912456a8c07b97ff2bdd1704d65fbb4d Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Fri, 27 Dec 2024 11:17:01 +0200 Subject: [PATCH 11/11] Remove Record enum in favor of simple tuple --- crates/worker/src/partition/mod.rs | 146 ++++++++---------- .../worker/src/partition_processor_manager.rs | 4 +- 2 files changed, 67 insertions(+), 83 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 4d78c6b25..5046b1dd9 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -60,6 +60,7 @@ use restate_types::net::partition_processor::{ InvocationOutput, PartitionProcessorRpcError, PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse, }; +use restate_types::storage::StorageDecodeError; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; @@ -256,6 +257,7 @@ pub struct PartitionProcessor { pub enum ProcessorError { TrimGapEncountered { gap_to_lsn: Lsn }, Storage(#[from] StorageError), + Decode(#[from] StorageDecodeError), Bifrost(#[from] restate_bifrost::Error), StateMachine(#[from] state_machine::Error), ActionEffect(#[from] leadership::Error), @@ -264,10 +266,7 @@ pub enum ProcessorError { Other(#[from] anyhow::Error), } -enum Record { - Envelope(Lsn, Arc), - TrimGap(Lsn), -} +type LsnEnvelope = (Lsn, Arc); impl PartitionProcessor where @@ -374,7 +373,7 @@ where // Start reading after the last applied lsn let key_query = KeyFilter::Within(self.partition_key_range.clone()); - let mut log_reader = self + let mut record_stream = self .bifrost .create_reader( LogId::from(self.partition_id), @@ -383,33 +382,32 @@ where Lsn::MAX, ) .map_err(ProcessorError::from)? - .map_ok(|entry| { - trace!(?entry, "Read entry"); - let lsn = entry.sequence_number(); - if entry.is_data_record() { - entry - .try_decode_arc::() - .map(|envelope| anyhow::Ok(Record::Envelope(lsn, envelope?))) - .expect("data record is present") - } else { - // read_commands will translate trim gaps to errors - anyhow::Ok(Record::TrimGap( + .map(|entry| match entry { + Ok(entry) => { + trace!(?entry, "Read entry"); + let lsn = entry.sequence_number(); + if entry.is_data_record() { entry - .trim_gap_to_sequence_number() - .expect("trim gap has to-LSN"), - )) + .try_decode_arc::() + .map(|envelope| Ok((lsn, envelope.map_err(ProcessorError::from)?))) + .expect("data record is present") + } else { + Err(ProcessorError::TrimGapEncountered { + gap_to_lsn: entry + .trim_gap_to_sequence_number() + .expect("trim gap has to-LSN"), + }) + } } + Err(err) => Err(ProcessorError::from(err)), }) - .try_take_while(|entry| { + .try_take_while(|(_, envelope)| { // a catch-all safety net if all lower layers didn't filter this record out. This // could happen for old records that didn't store `Keys` in the log store. // // At some point, we should remove this and trust that stored records have Keys // stored correctly. - std::future::ready(Ok(entry.as_ref().is_ok_and(|r| match r { - Record::Envelope(_, e) => e.matches_key_query(&key_query), - Record::TrimGap(_) => true, - }))) + std::future::ready(Ok(envelope.matches_key_query(&key_query))) }); // avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis. @@ -446,7 +444,7 @@ where old.updated_at = MillisSinceEpoch::now(); }); } - operation = Self::read_commands(&mut log_reader, self.max_command_batch_size, &mut command_buffer) => { + operation = Self::read_commands(&mut record_stream, self.max_command_batch_size, &mut command_buffer) => { // check that reading has succeeded operation?; @@ -457,56 +455,49 @@ where // clear buffers used when applying the next record action_collector.clear(); - for record in command_buffer.drain(..) { - match record { - Record::Envelope(lsn, envelope) => { - let command_start = Instant::now(); - - trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header); - - let leadership_change = self.apply_record( - lsn, - envelope, - &mut transaction, - &mut action_collector).await.map_err(ProcessorError::from)?; - - apply_command_latency.record(command_start.elapsed()); - - if let Some((header, announce_leader)) = leadership_change { - // commit all changes so far, this is important so that the actuators see all changes - // when becoming leader. - transaction.commit().await.map_err(ProcessorError::from)?; - - // We can ignore all actions collected so far because as a new leader we have to instruct the - // actuators afresh. - action_collector.clear(); - - self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); - if header.source.is_processor_generational() { - let Source::Processor { generational_node_id, .. } = header.source else { - unreachable!("processor source must have generational_node_id"); - }; - // all new AnnounceLeader messages should come from a PartitionProcessor - self.status.last_observed_leader_node = generational_node_id; - } else if announce_leader.node_id.is_some() { - // older AnnounceLeader messages have the announce_leader.node_id set - self.status.last_observed_leader_node = announce_leader.node_id; - } - - let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?; + for (lsn, envelope) in command_buffer.drain(..) { + let command_start = Instant::now(); + + trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header); + + let leadership_change = self.apply_record( + lsn, + envelope, + &mut transaction, + &mut action_collector).await.map_err(ProcessorError::from)?; + + apply_command_latency.record(command_start.elapsed()); + + if let Some((header, announce_leader)) = leadership_change { + // commit all changes so far, this is important so that the actuators see all changes + // when becoming leader. + transaction.commit().await.map_err(ProcessorError::from)?; + + // We can ignore all actions collected so far because as a new leader we have to instruct the + // actuators afresh. + action_collector.clear(); + + self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); + if header.source.is_processor_generational() { + let Source::Processor { generational_node_id, .. } = header.source else { + unreachable!("processor source must have generational_node_id"); + }; + // all new AnnounceLeader messages should come from a PartitionProcessor + self.status.last_observed_leader_node = generational_node_id; + } else if announce_leader.node_id.is_some() { + // older AnnounceLeader messages have the announce_leader.node_id set + self.status.last_observed_leader_node = announce_leader.node_id; + } - Span::current().record("is_leader", is_leader); + let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?; - if is_leader { - self.status.effective_mode = RunMode::Leader; - } + Span::current().record("is_leader", is_leader); - transaction = partition_store.transaction(); - } - } - Record::TrimGap(gap_to_lsn) => { - return Err(ProcessorError::TrimGapEncountered { gap_to_lsn }) + if is_leader { + self.status.effective_mode = RunMode::Leader; } + + transaction = partition_store.transaction(); } } @@ -864,10 +855,10 @@ where async fn read_commands( log_reader: &mut S, max_batching_size: usize, - record_buffer: &mut Vec, + record_buffer: &mut Vec, ) -> Result<(), ProcessorError> where - S: Stream, restate_bifrost::Error>> + Unpin, + S: Stream> + Unpin, { // beyond this point we must not await; otherwise we are no longer cancellation safe let first_record = log_reader.next().await; @@ -877,7 +868,7 @@ where }; record_buffer.clear(); - record_buffer.push(first_record??); + record_buffer.push(first_record?); while record_buffer.len() < max_batching_size { // read more message from the stream but only if they are immediately available @@ -885,12 +876,7 @@ where let Some(record) = record else { return Err(ProcessorError::LogReadStreamTerminated); }; - match record?? { - Record::TrimGap(gap_to_lsn) => { - return Err(ProcessorError::TrimGapEncountered { gap_to_lsn }); - } - record => record_buffer.push(record), - } + record_buffer.push(record?); } else { // no more immediately available records found break; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index bd4177a73..f81e055e6 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -433,9 +433,7 @@ impl PartitionProcessorManager { .remove(processor.as_ref().expect("must be some").key_range()); match result { - Err(ProcessorError::TrimGapEncountered { - gap_to_lsn: to_lsn, - }) => { + Err(ProcessorError::TrimGapEncountered { gap_to_lsn: to_lsn }) => { if self.snapshot_repository.is_some() { info!( trim_gap_to_lsn = ?to_lsn,