Skip to content

Commit

Permalink
Rename StopReason to Error
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 27, 2024
1 parent d75db99 commit 84bab4f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 29 deletions.
41 changes: 21 additions & 20 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub struct PartitionProcessor<Codec, InvokerSender> {
}

#[derive(Debug, derive_more::Display, thiserror::Error)]
pub enum ProcessorStopReason {
pub enum ProcessorError {
TrimGapEncountered { gap_to_lsn: Lsn },
Storage(#[from] StorageError),
Bifrost(#[from] restate_bifrost::Error),
Expand All @@ -278,19 +278,20 @@ where
level = "error", skip_all,
fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty)
)]
pub async fn run(mut self) -> Result<(), ProcessorStopReason> {
pub async fn run(mut self) -> Result<(), ProcessorError> {
info!("Starting the partition processor.");

let res = tokio::select! {
res = self.run_inner() => {
match res.as_ref() {
// run_inner never returns normally
Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."),
Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn }) =>
Err(ProcessorError::TrimGapEncountered { gap_to_lsn }) =>
info!(
trim_gap_to_lsn = ?gap_to_lsn,
"Shutting partition processor down because it encountered a trim gap in the log."
),
Err(err) => warn!("Shutting partition processor down because it failed: {err}"),
Err(err) => warn!("Shutting partition processor down because of error: {err}"),
}
res
},
Expand Down Expand Up @@ -318,12 +319,12 @@ where
res
}

async fn run_inner(&mut self) -> Result<(), ProcessorStopReason> {
async fn run_inner(&mut self) -> Result<(), ProcessorError> {
let mut partition_store = self.partition_store.clone();
let last_applied_lsn = partition_store
.get_applied_lsn()
.await
.map_err(ProcessorStopReason::from)?;
.map_err(ProcessorError::from)?;
let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID);

self.status.last_applied_log_lsn = Some(last_applied_lsn);
Expand All @@ -333,7 +334,7 @@ where
.bifrost
.find_tail(LogId::from(self.partition_id))
.await
.map_err(ProcessorStopReason::from)?;
.map_err(ProcessorError::from)?;

debug!(
last_applied_lsn = %last_applied_lsn,
Expand Down Expand Up @@ -381,7 +382,7 @@ where
last_applied_lsn.next(),
Lsn::MAX,
)
.map_err(ProcessorStopReason::from)?
.map_err(ProcessorError::from)?
.map_ok(|entry| {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
Expand Down Expand Up @@ -467,14 +468,14 @@ where
lsn,
envelope,
&mut transaction,
&mut action_collector).await.map_err(ProcessorStopReason::from)?;
&mut action_collector).await.map_err(ProcessorError::from)?;

apply_command_latency.record(command_start.elapsed());

if let Some((header, announce_leader)) = leadership_change {
// commit all changes so far, this is important so that the actuators see all changes
// when becoming leader.
transaction.commit().await.map_err(ProcessorStopReason::from)?;
transaction.commit().await.map_err(ProcessorError::from)?;

// We can ignore all actions collected so far because as a new leader we have to instruct the
// actuators afresh.
Expand All @@ -492,7 +493,7 @@ where
self.status.last_observed_leader_node = announce_leader.node_id;
}

let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorStopReason::from)?;
let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?;

Span::current().record("is_leader", is_leader);

Expand All @@ -504,23 +505,23 @@ where
}
}
Record::TrimGap(gap_to_lsn) => {
return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn })
return Err(ProcessorError::TrimGapEncountered { gap_to_lsn })
}
}
}

// Commit our changes and notify actuators about actions if we are the leader
transaction.commit().await.map_err(ProcessorStopReason::from)?;
transaction.commit().await.map_err(ProcessorError::from)?;
let actions_start = Instant::now();
self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorStopReason::from)?;
self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorError::from)?;
record_actions_latency.record(actions_start.elapsed());
},
result = self.leadership_state.run() => {
let action_effects = result.map_err(ProcessorStopReason::from)?;
let action_effects = result.map_err(ProcessorError::from)?;
// We process the action_effects not directly in the run future because it
// requires the run future to be cancellation safe. In the future this could be
// implemented.
self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorStopReason::from)?;
self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorError::from)?;
}
}
// Allow other tasks on this thread to run, but only if we have exhausted the coop
Expand Down Expand Up @@ -864,15 +865,15 @@ where
log_reader: &mut S,
max_batching_size: usize,
record_buffer: &mut Vec<Record>,
) -> Result<(), ProcessorStopReason>
) -> Result<(), ProcessorError>
where
S: Stream<Item = Result<anyhow::Result<Record>, restate_bifrost::Error>> + Unpin,
{
// beyond this point we must not await; otherwise we are no longer cancellation safe
let first_record = log_reader.next().await;

let Some(first_record) = first_record else {
return Err(ProcessorStopReason::LogReadStreamTerminated);
return Err(ProcessorError::LogReadStreamTerminated);
};

record_buffer.clear();
Expand All @@ -882,11 +883,11 @@ where
// read more message from the stream but only if they are immediately available
if let Some(record) = log_reader.next().now_or_never() {
let Some(record) = record else {
return Err(ProcessorStopReason::LogReadStreamTerminated);
return Err(ProcessorError::LogReadStreamTerminated);
};
match record?? {
Record::TrimGap(gap_to_lsn) => {
return Err(ProcessorStopReason::TrimGapEncountered { gap_to_lsn });
return Err(ProcessorError::TrimGapEncountered { gap_to_lsn });
}
record => record_buffer.push(record),
}
Expand Down
10 changes: 5 additions & 5 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN;
use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD;
use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE;
use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository};
use crate::partition::ProcessorStopReason;
use crate::partition::ProcessorError;
use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler;
use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog;
use crate::partition_processor_manager::processor_state::{
Expand Down Expand Up @@ -433,7 +433,7 @@ impl PartitionProcessorManager {
.remove(processor.as_ref().expect("must be some").key_range());

match result {
Err(ProcessorStopReason::TrimGapEncountered {
Err(ProcessorError::TrimGapEncountered {
gap_to_lsn: to_lsn,
}) => {
if self.snapshot_repository.is_some() {
Expand Down Expand Up @@ -517,7 +517,7 @@ impl PartitionProcessorManager {
fn await_runtime_task_result(
&mut self,
partition_id: PartitionId,
runtime_task_handle: RuntimeTaskHandle<Result<(), ProcessorStopReason>>,
runtime_task_handle: RuntimeTaskHandle<Result<(), ProcessorError>>,
) {
self.asynchronous_operations.spawn(
async move {
Expand Down Expand Up @@ -952,10 +952,10 @@ enum EventKind {
Started(
anyhow::Result<(
StartedProcessor,
RuntimeTaskHandle<Result<(), ProcessorStopReason>>,
RuntimeTaskHandle<Result<(), ProcessorError>>,
)>,
),
Stopped(Result<(), ProcessorStopReason>),
Stopped(Result<(), ProcessorError>),
NewLeaderEpoch {
leader_epoch_token: LeaderEpochToken,
result: anyhow::Result<LeaderEpoch>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use restate_types::schema::Schema;
use crate::invoker_integration::EntryEnricher;
use crate::partition::invoker_storage_reader::InvokerStorageReader;
use crate::partition::snapshots::SnapshotRepository;
use crate::partition::ProcessorStopReason;
use crate::partition::ProcessorError;
use crate::partition_processor_manager::processor_state::StartedProcessor;
use crate::PartitionProcessorBuilder;

Expand Down Expand Up @@ -79,7 +79,7 @@ impl SpawnPartitionProcessorTask {
self,
) -> anyhow::Result<(
StartedProcessor,
RuntimeTaskHandle<Result<(), ProcessorStopReason>>,
RuntimeTaskHandle<Result<(), ProcessorError>>,
)> {
let Self {
task_name,
Expand Down Expand Up @@ -152,12 +152,12 @@ impl SpawnPartitionProcessorTask {
invoker_name,
invoker.run(invoker_config),
)
.map_err(|e| ProcessorStopReason::from(anyhow::anyhow!(e)))?;
.map_err(|e| ProcessorError::from(anyhow::anyhow!(e)))?;

pp_builder
.build::<ProtobufRawEntryCodec>(bifrost, partition_store)
.await
.map_err(ProcessorStopReason::from)?
.map_err(ProcessorError::from)?
.run()
.await
}
Expand Down

0 comments on commit 84bab4f

Please sign in to comment.