From 84bab4f31c8b0dd83a404f86ac0cfa3ccb93c24f Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Fri, 27 Dec 2024 10:44:48 +0200 Subject: [PATCH] Rename StopReason to Error --- crates/worker/src/partition/mod.rs | 41 ++++++++++--------- .../worker/src/partition_processor_manager.rs | 10 ++--- .../spawn_processor_task.rs | 8 ++-- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 5c05795b5..4d78c6b25 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -253,7 +253,7 @@ pub struct PartitionProcessor { } #[derive(Debug, derive_more::Display, thiserror::Error)] -pub enum ProcessorStopReason { +pub enum ProcessorError { TrimGapEncountered { gap_to_lsn: Lsn }, Storage(#[from] StorageError), Bifrost(#[from] restate_bifrost::Error), @@ -278,19 +278,20 @@ where level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty) )] - pub async fn run(mut self) -> Result<(), ProcessorStopReason> { + pub async fn run(mut self) -> Result<(), ProcessorError> { info!("Starting the partition processor."); let res = tokio::select! { res = self.run_inner() => { match res.as_ref() { + // run_inner never returns normally Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."), - Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) => + Err(ProcessorError::TrimGapEncountered { gap_to_lsn }) => info!( trim_gap_to_lsn = ?gap_to_lsn, "Shutting partition processor down because it encountered a trim gap in the log." ), - Err(err) => warn!("Shutting partition processor down because it failed: {err}"), + Err(err) => warn!("Shutting partition processor down because of error: {err}"), } res }, @@ -318,12 +319,12 @@ where res } - async fn run_inner(&mut self) -> Result<(), ProcessorStopReason> { + async fn run_inner(&mut self) -> Result<(), ProcessorError> { let mut partition_store = self.partition_store.clone(); let last_applied_lsn = partition_store .get_applied_lsn() .await - .map_err(ProcessorStopReason::from)?; + .map_err(ProcessorError::from)?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); self.status.last_applied_log_lsn = Some(last_applied_lsn); @@ -333,7 +334,7 @@ where .bifrost .find_tail(LogId::from(self.partition_id)) .await - .map_err(ProcessorStopReason::from)?; + .map_err(ProcessorError::from)?; debug!( last_applied_lsn = %last_applied_lsn, @@ -381,7 +382,7 @@ where last_applied_lsn.next(), Lsn::MAX, ) - .map_err(ProcessorStopReason::from)? + .map_err(ProcessorError::from)? .map_ok(|entry| { trace!(?entry, "Read entry"); let lsn = entry.sequence_number(); @@ -467,14 +468,14 @@ where lsn, envelope, &mut transaction, - &mut action_collector).await.map_err(ProcessorStopReason::from)?; + &mut action_collector).await.map_err(ProcessorError::from)?; apply_command_latency.record(command_start.elapsed()); if let Some((header, announce_leader)) = leadership_change { // commit all changes so far, this is important so that the actuators see all changes // when becoming leader. - transaction.commit().await.map_err(ProcessorStopReason::from)?; + transaction.commit().await.map_err(ProcessorError::from)?; // We can ignore all actions collected so far because as a new leader we have to instruct the // actuators afresh. @@ -492,7 +493,7 @@ where self.status.last_observed_leader_node = announce_leader.node_id; } - let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorStopReason::from)?; + let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?; Span::current().record("is_leader", is_leader); @@ -504,23 +505,23 @@ where } } Record::TrimGap(gap_to_lsn) => { - return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) + return Err(ProcessorError::TrimGapEncountered { gap_to_lsn }) } } } // Commit our changes and notify actuators about actions if we are the leader - transaction.commit().await.map_err(ProcessorStopReason::from)?; + transaction.commit().await.map_err(ProcessorError::from)?; let actions_start = Instant::now(); - self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorStopReason::from)?; + self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorError::from)?; record_actions_latency.record(actions_start.elapsed()); }, result = self.leadership_state.run() => { - let action_effects = result.map_err(ProcessorStopReason::from)?; + let action_effects = result.map_err(ProcessorError::from)?; // We process the action_effects not directly in the run future because it // requires the run future to be cancellation safe. In the future this could be // implemented. - self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorStopReason::from)?; + self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorError::from)?; } } // Allow other tasks on this thread to run, but only if we have exhausted the coop @@ -864,7 +865,7 @@ where log_reader: &mut S, max_batching_size: usize, record_buffer: &mut Vec, - ) -> Result<(), ProcessorStopReason> + ) -> Result<(), ProcessorError> where S: Stream, restate_bifrost::Error>> + Unpin, { @@ -872,7 +873,7 @@ where let first_record = log_reader.next().await; let Some(first_record) = first_record else { - return Err(ProcessorStopReason::LogReadStreamTerminated); + return Err(ProcessorError::LogReadStreamTerminated); }; record_buffer.clear(); @@ -882,11 +883,11 @@ where // read more message from the stream but only if they are immediately available if let Some(record) = log_reader.next().now_or_never() { let Some(record) = record else { - return Err(ProcessorStopReason::LogReadStreamTerminated); + return Err(ProcessorError::LogReadStreamTerminated); }; match record?? { Record::TrimGap(gap_to_lsn) => { - return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }); + return Err(ProcessorError::TrimGapEncountered { gap_to_lsn }); } record => record_buffer.push(record), } diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 4cc90f574..bd4177a73 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -73,7 +73,7 @@ use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository}; -use crate::partition::ProcessorStopReason; +use crate::partition::ProcessorError; use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; use crate::partition_processor_manager::processor_state::{ @@ -433,7 +433,7 @@ impl PartitionProcessorManager { .remove(processor.as_ref().expect("must be some").key_range()); match result { - Err(ProcessorStopReason::TrimGapEncountered { + Err(ProcessorError::TrimGapEncountered { gap_to_lsn: to_lsn, }) => { if self.snapshot_repository.is_some() { @@ -517,7 +517,7 @@ impl PartitionProcessorManager { fn await_runtime_task_result( &mut self, partition_id: PartitionId, - runtime_task_handle: RuntimeTaskHandle>, + runtime_task_handle: RuntimeTaskHandle>, ) { self.asynchronous_operations.spawn( async move { @@ -952,10 +952,10 @@ enum EventKind { Started( anyhow::Result<( StartedProcessor, - RuntimeTaskHandle>, + RuntimeTaskHandle>, )>, ), - Stopped(Result<(), ProcessorStopReason>), + Stopped(Result<(), ProcessorError>), NewLeaderEpoch { leader_epoch_token: LeaderEpochToken, result: anyhow::Result, diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 8f0d42422..1ff9cf44c 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -30,7 +30,7 @@ use restate_types::schema::Schema; use crate::invoker_integration::EntryEnricher; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::snapshots::SnapshotRepository; -use crate::partition::ProcessorStopReason; +use crate::partition::ProcessorError; use crate::partition_processor_manager::processor_state::StartedProcessor; use crate::PartitionProcessorBuilder; @@ -79,7 +79,7 @@ impl SpawnPartitionProcessorTask { self, ) -> anyhow::Result<( StartedProcessor, - RuntimeTaskHandle>, + RuntimeTaskHandle>, )> { let Self { task_name, @@ -152,12 +152,12 @@ impl SpawnPartitionProcessorTask { invoker_name, invoker.run(invoker_config), ) - .map_err(|e| ProcessorStopReason::from(anyhow::anyhow!(e)))?; + .map_err(|e| ProcessorError::from(anyhow::anyhow!(e)))?; pp_builder .build::(bifrost, partition_store) .await - .map_err(ProcessorStopReason::from)? + .map_err(ProcessorError::from)? .run() .await }