Skip to content

Commit

Permalink
Add stopped-reason to ProcessorManager root future
Browse files Browse the repository at this point in the history
This allows us to signal the PPM about log trim gaps that the PP may encounter
at runtime, which require special handling.
  • Loading branch information
pcholakov committed Dec 23, 2024
1 parent 58e96ad commit 745ff17
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 66 deletions.
7 changes: 4 additions & 3 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,15 +520,16 @@ impl TaskCenterInner {

/// Starts the `root_future` on a new runtime. The runtime is stopped once the root future
/// completes.
pub fn start_runtime<F>(
pub fn start_runtime<F, R>(
self: &Arc<Self>,
root_task_kind: TaskKind,
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<R>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
F: Future<Output = R> + 'static,
R: Send + 'static,
{
if self.shutdown_requested.load(Ordering::Relaxed) {
return Err(ShutdownError.into());
Expand Down
8 changes: 5 additions & 3 deletions crates/core/src/task_center/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt::Debug;
use std::future::Future;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -77,15 +78,16 @@ impl Handle {
self.inner.block_on(future)
}

pub fn start_runtime<F>(
pub fn start_runtime<F, R>(
&self,
root_task_kind: TaskKind,
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<R>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
F: Future<Output = R> + 'static,
R: Send + Debug + 'static,
{
self.inner
.start_runtime(root_task_kind, runtime_name, partition_id, root_future)
Expand Down
151 changes: 96 additions & 55 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,26 +252,44 @@ pub struct PartitionProcessor<Codec, InvokerSender> {
partition_store: PartitionStore,
}

#[derive(Debug)]
pub enum ProcessorStopReason {
Cancelled,
LogTrimGap { to_lsn: Lsn },
}

enum Record {
Envelope(Lsn, Arc<Envelope>),
TrimGap(Lsn),
}

impl<Codec, InvokerSender> PartitionProcessor<Codec, InvokerSender>
where
Codec: RawEntryCodec + Default + Debug,
InvokerSender: restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>> + Clone,
{
#[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))]
pub async fn run(mut self) -> anyhow::Result<()> {
#[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty
))]
pub async fn run(mut self) -> anyhow::Result<ProcessorStopReason> {
info!("Starting the partition processor.");

let res = tokio::select! {
res = self.run_inner() => {
match res.as_ref() {
Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."),
Ok(stopped) => {
match stopped {
ProcessorStopReason::LogTrimGap { to_lsn } =>
info!(?to_lsn, "Shutting partition processor down because we encountered a trim gap in the log."),
_ => warn!("Shutting partition processor down because it stopped unexpectedly.")
}
},
Err(err) => warn!("Shutting partition processor down because it failed: {err}"),
}
res
},
_ = cancellation_watcher() => {
debug!("Shutting partition processor down because it was cancelled.");
Ok(())
Ok(ProcessorStopReason::Cancelled)
},
};

Expand All @@ -293,7 +311,8 @@ where
res
}

async fn run_inner(&mut self) -> anyhow::Result<()> {
// Runs as long as log records are available, or returns
async fn run_inner(&mut self) -> anyhow::Result<ProcessorStopReason> {
let mut partition_store = self.partition_store.clone();
let last_applied_lsn = partition_store.get_applied_lsn().await?;
let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID);
Expand Down Expand Up @@ -355,21 +374,29 @@ where
.map_ok(|entry| {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
let Some(envelope) = entry.try_decode_arc::<Envelope>() else {
// trim-gap
unimplemented!("Handling trim gap is currently not supported")
};
anyhow::Ok((lsn, envelope?))
if entry.is_data_record() {
entry
.try_decode_arc::<Envelope>()
.map(|envelope| anyhow::Ok(Record::Envelope(lsn, envelope?)))
.expect("data record is present")
} else {
anyhow::Ok(Record::TrimGap(
entry
.trim_gap_to_sequence_number()
.expect("trim gap has to-LSN"),
))
}
})
.try_take_while(|entry| {
// a catch-all safety net if all lower layers didn't filter this record out. This
// could happen for old records that didn't store `Keys` in the log store.
//
// At some point, we should remove this and trust that stored records have Keys
// stored correctly.
std::future::ready(Ok(entry
.as_ref()
.is_ok_and(|(_, envelope)| envelope.matches_key_query(&key_query))))
std::future::ready(Ok(entry.as_ref().is_ok_and(|r| match r {
Record::Envelope(_, e) => e.matches_key_query(&key_query),
Record::TrimGap(_) => true,
})))
});

// avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis.
Expand Down Expand Up @@ -417,49 +444,60 @@ where
// clear buffers used when applying the next record
action_collector.clear();

for (lsn, envelope) in command_buffer.drain(..) {
let command_start = Instant::now();

trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header);

let leadership_change = self.apply_record(
lsn,
envelope,
&mut transaction,
&mut action_collector).await?;

apply_command_latency.record(command_start.elapsed());

if let Some((header, announce_leader)) = leadership_change {
// commit all changes so far, this is important so that the actuators see all changes
// when becoming leader.
transaction.commit().await?;

// We can ignore all actions collected so far because as a new leader we have to instruct the
// actuators afresh.
action_collector.clear();

self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch);
if header.source.is_processor_generational() {
let Source::Processor { generational_node_id, .. } = header.source else {
unreachable!("processor source must have generational_node_id");
};
// all new AnnounceLeader messages should come from a PartitionProcessor
self.status.last_observed_leader_node = generational_node_id;
} else if announce_leader.node_id.is_some() {
// older AnnounceLeader messages have the announce_leader.node_id set
self.status.last_observed_leader_node = announce_leader.node_id;
}
let mut trim_gap = None;

let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?;
for record in command_buffer.drain(..) {
match record {
Record::Envelope(lsn, envelope) => {
assert!(trim_gap.is_none(), "Expecting single trim gap to be the last record in a batch!");
let command_start = Instant::now();

Span::current().record("is_leader", is_leader);
trace!(%lsn, "Processing bifrost record for '{}': {:?}", envelope.command.name(), envelope.header);

if is_leader {
self.status.effective_mode = RunMode::Leader;
}
let leadership_change = self.apply_record(
lsn,
envelope,
&mut transaction,
&mut action_collector).await?;

transaction = partition_store.transaction();
apply_command_latency.record(command_start.elapsed());

if let Some((header, announce_leader)) = leadership_change {
// commit all changes so far, this is important so that the actuators see all changes
// when becoming leader.
transaction.commit().await?;

// We can ignore all actions collected so far because as a new leader we have to instruct the
// actuators afresh.
action_collector.clear();

self.status.last_observed_leader_epoch = Some(announce_leader.leader_epoch);
if header.source.is_processor_generational() {
let Source::Processor { generational_node_id, .. } = header.source else {
unreachable!("processor source must have generational_node_id");
};
// all new AnnounceLeader messages should come from a PartitionProcessor
self.status.last_observed_leader_node = generational_node_id;
} else if announce_leader.node_id.is_some() {
// older AnnounceLeader messages have the announce_leader.node_id set
self.status.last_observed_leader_node = announce_leader.node_id;
}

let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?;

Span::current().record("is_leader", is_leader);

if is_leader {
self.status.effective_mode = RunMode::Leader;
}

transaction = partition_store.transaction();
}
}
Record::TrimGap(to_lsn) => {
assert!(trim_gap.is_none(), "Expecting only a single trim gap!");
trim_gap = Some(to_lsn)
}
}
}

Expand All @@ -468,6 +506,10 @@ where
let actions_start = Instant::now();
self.leadership_state.handle_actions(action_collector.drain(..)).await?;
record_actions_latency.record(actions_start.elapsed());

if let Some(to_lsn) = trim_gap {
return Ok(ProcessorStopReason::LogTrimGap { to_lsn })
}
},
result = self.leadership_state.run() => {
let action_effects = result?;
Expand Down Expand Up @@ -817,11 +859,10 @@ where
async fn read_commands<S>(
log_reader: &mut S,
max_batching_size: usize,
record_buffer: &mut Vec<(Lsn, Arc<Envelope>)>,
record_buffer: &mut Vec<Record>,
) -> anyhow::Result<()>
where
S: Stream<Item = Result<anyhow::Result<(Lsn, Arc<Envelope>)>, restate_bifrost::Error>>
+ Unpin,
S: Stream<Item = Result<anyhow::Result<Record>, restate_bifrost::Error>> + Unpin,
{
// beyond this point we must not await; otherwise we are no longer cancellation safe
let first_record = log_reader.next().await;
Expand Down
29 changes: 25 additions & 4 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN;
use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD;
use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE;
use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository};
use crate::partition::ProcessorStopReason;
use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler;
use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog;
use crate::partition_processor_manager::processor_state::{
Expand Down Expand Up @@ -103,6 +104,7 @@ pub struct PartitionProcessorManager {

pending_snapshots: HashMap<PartitionId, PendingSnapshotTask>,
snapshot_export_tasks: FuturesUnordered<TaskHandle<SnapshotResultInternal>>,
snapshot_import_tasks: FuturesUnordered<TaskHandle<ImportSnapshotResultInternal>>,
snapshot_repository: Option<SnapshotRepository>,
}

Expand All @@ -112,6 +114,7 @@ struct PendingSnapshotTask {
}

type SnapshotResultInternal = Result<PartitionSnapshotMetadata, SnapshotError>;
type ImportSnapshotResultInternal = Result<(), SnapshotError>;

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -425,7 +428,20 @@ impl PartitionProcessorManager {
ProcessorState::Started { processor, .. } => {
self.invokers_status_reader
.remove(processor.as_ref().expect("must be some").key_range());
warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}");

match result {
Ok(ProcessorStopReason::LogTrimGap { to_lsn }) => {
if self.snapshot_repository.is_some() {
info!(%partition_id, "Partition processor stopped due to a log trim gap, will look for snapshot with LSN >= {to_lsn}");
// todo(pavel): spawn snapshot import task for the partition
} else {
warn!(%partition_id, "Partition processor stopped due to a log trim gap, and no snapshot repository is configured: {result:?}");
}
}
_ => {
warn!(%partition_id, "Partition processor exited unexpectedly: {result:?}")
}
}
}
ProcessorState::Stopping {
processor,
Expand Down Expand Up @@ -487,7 +503,7 @@ impl PartitionProcessorManager {
fn await_runtime_task_result(
&mut self,
partition_id: PartitionId,
runtime_task_handle: RuntimeTaskHandle<anyhow::Result<()>>,
runtime_task_handle: RuntimeTaskHandle<anyhow::Result<ProcessorStopReason>>,
) {
self.asynchronous_operations.spawn(
async move {
Expand Down Expand Up @@ -921,8 +937,13 @@ struct AsynchronousEvent {

#[derive(strum::IntoStaticStr)]
enum EventKind {
Started(anyhow::Result<(StartedProcessor, RuntimeTaskHandle<anyhow::Result<()>>)>),
Stopped(anyhow::Result<()>),
Started(
anyhow::Result<(
StartedProcessor,
RuntimeTaskHandle<anyhow::Result<ProcessorStopReason>>,
)>,
),
Stopped(anyhow::Result<ProcessorStopReason>),
NewLeaderEpoch {
leader_epoch_token: LeaderEpochToken,
result: anyhow::Result<LeaderEpoch>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use restate_types::schema::Schema;
use crate::invoker_integration::EntryEnricher;
use crate::partition::invoker_storage_reader::InvokerStorageReader;
use crate::partition::snapshots::SnapshotRepository;
use crate::partition::ProcessorStopReason;
use crate::partition_processor_manager::processor_state::StartedProcessor;
use crate::PartitionProcessorBuilder;

Expand Down Expand Up @@ -68,7 +69,12 @@ impl SpawnPartitionProcessorTask {
partition_id=%self.partition_id,
)
)]
pub fn run(self) -> anyhow::Result<(StartedProcessor, RuntimeTaskHandle<anyhow::Result<()>>)> {
pub fn run(
self,
) -> anyhow::Result<(
StartedProcessor,
RuntimeTaskHandle<anyhow::Result<ProcessorStopReason>>,
)> {
let Self {
task_name,
partition_id,
Expand Down

0 comments on commit 745ff17

Please sign in to comment.