diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index 5f2969cb0..146c8f9d0 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -520,15 +520,16 @@ impl TaskCenterInner { /// Starts the `root_future` on a new runtime. The runtime is stopped once the root future /// completes. - pub fn start_runtime( + pub fn start_runtime( self: &Arc, root_task_kind: TaskKind, runtime_name: &'static str, partition_id: Option, root_future: impl FnOnce() -> F + Send + 'static, - ) -> Result>, RuntimeError> + ) -> Result, RuntimeError> where - F: Future> + 'static, + F: Future + 'static, + R: Send + 'static, { if self.shutdown_requested.load(Ordering::Relaxed) { return Err(ShutdownError.into()); diff --git a/crates/core/src/task_center/handle.rs b/crates/core/src/task_center/handle.rs index e87c7da27..a2da66187 100644 --- a/crates/core/src/task_center/handle.rs +++ b/crates/core/src/task_center/handle.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::fmt::Debug; use std::future::Future; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -77,15 +78,16 @@ impl Handle { self.inner.block_on(future) } - pub fn start_runtime( + pub fn start_runtime( &self, root_task_kind: TaskKind, runtime_name: &'static str, partition_id: Option, root_future: impl FnOnce() -> F + Send + 'static, - ) -> Result>, RuntimeError> + ) -> Result, RuntimeError> where - F: Future> + 'static, + F: Future + 'static, + R: Send + Debug + 'static, { self.inner .start_runtime(root_task_kind, runtime_name, partition_id, root_future) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 571812edd..44be014a2 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -252,26 +252,44 @@ pub struct PartitionProcessor { partition_store: PartitionStore, } +#[derive(Debug)] +pub enum ProcessorStopReason { + Cancelled, + LogTrimGap { to_lsn: Lsn }, +} + +enum Record { + Envelope(Lsn, Arc), + TrimGap(Lsn), +} + impl PartitionProcessor where Codec: RawEntryCodec + Default + Debug, InvokerSender: restate_invoker_api::InvokerHandle> + Clone, { - #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))] - pub async fn run(mut self) -> anyhow::Result<()> { + #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty + ))] + pub async fn run(mut self) -> anyhow::Result { info!("Starting the partition processor."); let res = tokio::select! { res = self.run_inner() => { match res.as_ref() { - Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."), + Ok(stopped) => { + match stopped { + ProcessorStopReason::LogTrimGap { to_lsn } => + info!(?to_lsn, "Shutting partition processor down because we encountered a trim gap in the log."), + _ => warn!("Shutting partition processor down because it stopped unexpectedly.") + } + }, Err(err) => warn!("Shutting partition processor down because it failed: {err}"), } res }, _ = cancellation_watcher() => { debug!("Shutting partition processor down because it was cancelled."); - Ok(()) + Ok(ProcessorStopReason::Cancelled) }, }; @@ -293,7 +311,8 @@ where res } - async fn run_inner(&mut self) -> anyhow::Result<()> { + // Runs as long as log records are available, or returns + async fn run_inner(&mut self) -> anyhow::Result { let mut partition_store = self.partition_store.clone(); let last_applied_lsn = partition_store.get_applied_lsn().await?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); @@ -355,11 +374,18 @@ where .map_ok(|entry| { trace!(?entry, "Read entry"); let lsn = entry.sequence_number(); - let Some(envelope) = entry.try_decode_arc::() else { - // trim-gap - unimplemented!("Handling trim gap is currently not supported") - }; - anyhow::Ok((lsn, envelope?)) + if entry.is_data_record() { + entry + .try_decode_arc::() + .map(|envelope| anyhow::Ok(Record::Envelope(lsn, envelope?))) + .expect("data record is present") + } else { + anyhow::Ok(Record::TrimGap( + entry + .trim_gap_to_sequence_number() + .expect("trim gap has to-LSN"), + )) + } }) .try_take_while(|entry| { // a catch-all safety net if all lower layers didn't filter this record out. This @@ -367,9 +393,10 @@ where // // At some point, we should remove this and trust that stored records have Keys // stored correctly. - std::future::ready(Ok(entry - .as_ref() - .is_ok_and(|(_, envelope)| envelope.matches_key_query(&key_query)))) + std::future::ready(Ok(entry.as_ref().is_ok_and(|r| match r { + Record::Envelope(_, e) => e.matches_key_query(&key_query), + Record::TrimGap(_) => true, + }))) }); // avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis. @@ -417,49 +444,60 @@ where // clear buffers used when applying the next record action_collector.clear(); - for (lsn, envelope) in command_buffer.drain(..) { - let command_start = Instant::now(); - - trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header); - - let leadership_change = self.apply_record( - lsn, - envelope, - &mut transaction, - &mut action_collector).await?; - - apply_command_latency.record(command_start.elapsed()); - - if let Some((header, announce_leader)) = leadership_change { - // commit all changes so far, this is important so that the actuators see all changes - // when becoming leader. - transaction.commit().await?; - - // We can ignore all actions collected so far because as a new leader we have to instruct the - // actuators afresh. - action_collector.clear(); - - self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); - if header.source.is_processor_generational() { - let Source::Processor { generational_node_id, .. } = header.source else { - unreachable!("processor source must have generational_node_id"); - }; - // all new AnnounceLeader messages should come from a PartitionProcessor - self.status.last_observed_leader_node = generational_node_id; - } else if announce_leader.node_id.is_some() { - // older AnnounceLeader messages have the announce_leader.node_id set - self.status.last_observed_leader_node = announce_leader.node_id; - } + let mut trim_gap = None; - let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?; + for record in command_buffer.drain(..) { + match record { + Record::Envelope(lsn, envelope) => { + assert!(trim_gap.is_none(), "Expecting single trim gap to be the last record in a batch!"); + let command_start = Instant::now(); - Span::current().record("is_leader", is_leader); + trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header); - if is_leader { - self.status.effective_mode = RunMode::Leader; - } + let leadership_change = self.apply_record( + lsn, + envelope, + &mut transaction, + &mut action_collector).await?; - transaction = partition_store.transaction(); + apply_command_latency.record(command_start.elapsed()); + + if let Some((header, announce_leader)) = leadership_change { + // commit all changes so far, this is important so that the actuators see all changes + // when becoming leader. + transaction.commit().await?; + + // We can ignore all actions collected so far because as a new leader we have to instruct the + // actuators afresh. + action_collector.clear(); + + self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch); + if header.source.is_processor_generational() { + let Source::Processor { generational_node_id, .. } = header.source else { + unreachable!("processor source must have generational_node_id"); + }; + // all new AnnounceLeader messages should come from a PartitionProcessor + self.status.last_observed_leader_node = generational_node_id; + } else if announce_leader.node_id.is_some() { + // older AnnounceLeader messages have the announce_leader.node_id set + self.status.last_observed_leader_node = announce_leader.node_id; + } + + let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?; + + Span::current().record("is_leader", is_leader); + + if is_leader { + self.status.effective_mode = RunMode::Leader; + } + + transaction = partition_store.transaction(); + } + } + Record::TrimGap(to_lsn) => { + assert!(trim_gap.is_none(), "Expecting only a single trim gap!"); + trim_gap = Some(to_lsn) + } } } @@ -468,6 +506,10 @@ where let actions_start = Instant::now(); self.leadership_state.handle_actions(action_collector.drain(..)).await?; record_actions_latency.record(actions_start.elapsed()); + + if let Some(to_lsn) = trim_gap { + return Ok(ProcessorStopReason::LogTrimGap { to_lsn }) + } }, result = self.leadership_state.run() => { let action_effects = result?; @@ -817,11 +859,10 @@ where async fn read_commands( log_reader: &mut S, max_batching_size: usize, - record_buffer: &mut Vec<(Lsn, Arc)>, + record_buffer: &mut Vec, ) -> anyhow::Result<()> where - S: Stream)>, restate_bifrost::Error>> - + Unpin, + S: Stream, restate_bifrost::Error>> + Unpin, { // beyond this point we must not await; otherwise we are no longer cancellation safe let first_record = log_reader.next().await; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 1a0492d89..dd9154576 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -73,6 +73,7 @@ use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository}; +use crate::partition::ProcessorStopReason; use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; use crate::partition_processor_manager::processor_state::{ @@ -103,6 +104,7 @@ pub struct PartitionProcessorManager { pending_snapshots: HashMap, snapshot_export_tasks: FuturesUnordered>, + snapshot_import_tasks: FuturesUnordered>, snapshot_repository: Option, } @@ -112,6 +114,7 @@ struct PendingSnapshotTask { } type SnapshotResultInternal = Result; +type ImportSnapshotResultInternal = Result<(), SnapshotError>; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -425,7 +428,20 @@ impl PartitionProcessorManager { ProcessorState::Started { processor, .. } => { self.invokers_status_reader .remove(processor.as_ref().expect("must be some").key_range()); - warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}"); + + match result { + Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => { + if self.snapshot_repository.is_some() { + info!(%partition_id, "Partition processor stopped due to a log trim gap, will look for snapshot with LSN >= {to_lsn}"); + // todo(pavel): spawn snapshot import task for the partition + } else { + warn!(%partition_id, "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}"); + } + } + _ => { + warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}") + } + } } ProcessorState::Stopping { processor, @@ -487,7 +503,7 @@ impl PartitionProcessorManager { fn await_runtime_task_result( &mut self, partition_id: PartitionId, - runtime_task_handle: RuntimeTaskHandle>, + runtime_task_handle: RuntimeTaskHandle>, ) { self.asynchronous_operations.spawn( async move { @@ -921,8 +937,13 @@ struct AsynchronousEvent { #[derive(strum::IntoStaticStr)] enum EventKind { - Started(anyhow::Result<(StartedProcessor, RuntimeTaskHandle>)>), - Stopped(anyhow::Result<()>), + Started( + anyhow::Result<( + StartedProcessor, + RuntimeTaskHandle>, + )>, + ), + Stopped(anyhow::Result), NewLeaderEpoch { leader_epoch_token: LeaderEpochToken, result: anyhow::Result, diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index f71d9907d..7bd876bb7 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -27,6 +27,7 @@ use restate_types::schema::Schema; use crate::invoker_integration::EntryEnricher; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::snapshots::SnapshotRepository; +use crate::partition::ProcessorStopReason; use crate::partition_processor_manager::processor_state::StartedProcessor; use crate::PartitionProcessorBuilder; @@ -68,7 +69,12 @@ impl SpawnPartitionProcessorTask { partition_id=%self.partition_id, ) )] - pub fn run(self) -> anyhow::Result<(StartedProcessor, RuntimeTaskHandle>)> { + pub fn run( + self, + ) -> anyhow::Result<( + StartedProcessor, + RuntimeTaskHandle>, + )> { let Self { task_name, partition_id,