Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bifrost trim gap handling support by fast-forwarding to the latest partition snapshot #2456

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
7 changes: 4 additions & 3 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,15 +520,16 @@ impl TaskCenterInner {

/// Starts the `root_future` on a new runtime. The runtime is stopped once the root future
/// completes.
pub fn start_runtime<F>(
pub fn start_runtime<F, R>(
self: &Arc<Self>,
root_task_kind: TaskKind,
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<R>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
F: Future<Output = R> + 'static,
R: Send + 'static,
Comment on lines +523 to +532
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it seems more than you want to have control over the error type rather than make the runtime behave like an async task with a return value.

In that case, your PartitionProcessorStopReason becomes the error type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe! We use anyhow::Error quite a bit in the PP now, so it would be difficult to disentangle the errors I care about, from other failure conditions. That aside, I still like modeling this as an outcome of either a known stop reason, or some other failure condition. I am treating PartitionProcessorStopReason as a normal return since both canceling the PP, or encountering a trim gap, are expected over a long enough timeline.

{
if self.shutdown_requested.load(Ordering::Relaxed) {
return Err(ShutdownError.into());
Expand Down
8 changes: 5 additions & 3 deletions crates/core/src/task_center/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt::Debug;
use std::future::Future;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -77,15 +78,16 @@ impl Handle {
self.inner.block_on(future)
}

pub fn start_runtime<F>(
pub fn start_runtime<F, R>(
&self,
root_task_kind: TaskKind,
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<R>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
F: Future<Output = R> + 'static,
R: Send + Debug + 'static,
{
self.inner
.start_runtime(root_task_kind, runtime_name, partition_id, root_future)
Expand Down
116 changes: 75 additions & 41 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tracing::{debug, error, info, instrument, trace, warn, Span};

use restate_bifrost::Bifrost;
use restate_core::network::{HasConnection, Incoming, Outgoing};
use restate_core::{cancellation_watcher, TaskCenter, TaskKind};
use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind};
use restate_partition_store::{PartitionStore, PartitionStoreTransaction};
use restate_storage_api::deduplication_table::{
DedupInformation, DedupSequenceNumber, DeduplicationTable, ProducerId,
Expand Down Expand Up @@ -60,6 +60,7 @@ use restate_types::net::partition_processor::{
InvocationOutput, PartitionProcessorRpcError, PartitionProcessorRpcRequest,
PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse,
};
use restate_types::storage::StorageDecodeError;
use restate_types::time::MillisSinceEpoch;
use restate_wal_protocol::control::AnnounceLeader;
use restate_wal_protocol::{Command, Destination, Envelope, Header, Source};
Expand Down Expand Up @@ -252,20 +253,44 @@ pub struct PartitionProcessor<Codec, InvokerSender> {
partition_store: PartitionStore,
}

#[derive(Debug, derive_more::Display, thiserror::Error)]
pub enum ProcessorError {
TrimGapEncountered { gap_to_lsn: Lsn },
Storage(#[from] StorageError),
Decode(#[from] StorageDecodeError),
Bifrost(#[from] restate_bifrost::Error),
StateMachine(#[from] state_machine::Error),
ActionEffect(#[from] leadership::Error),
ShutdownError(#[from] ShutdownError),
LogReadStreamTerminated,
Other(#[from] anyhow::Error),
}

type LsnEnvelope = (Lsn, Arc<Envelope>);

impl<Codec, InvokerSender> PartitionProcessor<Codec, InvokerSender>
where
Codec: RawEntryCodec + Default + Debug,
InvokerSender: restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>> + Clone,
{
#[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))]
pub async fn run(mut self) -> anyhow::Result<()> {
#[instrument(
level = "error", skip_all,
fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty)
)]
pub async fn run(mut self) -> Result<(), ProcessorError> {
info!("Starting the partition processor.");

let res = tokio::select! {
res = self.run_inner() => {
match res.as_ref() {
// run_inner never returns normally
Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."),
Err(err) => warn!("Shutting partition processor down because it failed: {err}"),
Err(ProcessorError::TrimGapEncountered { gap_to_lsn }) =>
info!(
trim_gap_to_lsn = ?gap_to_lsn,
"Shutting partition processor down because it encountered a trim gap in the log."
),
Err(err) => warn!("Shutting partition processor down because of error: {err}"),
}
res
},
Expand Down Expand Up @@ -293,9 +318,12 @@ where
res
}

async fn run_inner(&mut self) -> anyhow::Result<()> {
async fn run_inner(&mut self) -> Result<(), ProcessorError> {
let mut partition_store = self.partition_store.clone();
let last_applied_lsn = partition_store.get_applied_lsn().await?;
let last_applied_lsn = partition_store
.get_applied_lsn()
.await
.map_err(ProcessorError::from)?;
let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID);

self.status.last_applied_log_lsn = Some(last_applied_lsn);
Expand All @@ -304,7 +332,8 @@ where
let current_tail = self
.bifrost
.find_tail(LogId::from(self.partition_id))
.await?;
.await
.map_err(ProcessorError::from)?;

debug!(
last_applied_lsn = %last_applied_lsn,
Expand Down Expand Up @@ -344,32 +373,41 @@ where

// Start reading after the last applied lsn
let key_query = KeyFilter::Within(self.partition_key_range.clone());
let mut log_reader = self
let mut record_stream = self
.bifrost
.create_reader(
LogId::from(self.partition_id),
key_query.clone(),
last_applied_lsn.next(),
Lsn::MAX,
)?
.map_ok(|entry| {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
let Some(envelope) = entry.try_decode_arc::<Envelope>() else {
// trim-gap
unimplemented!("Handling trim gap is currently not supported")
};
anyhow::Ok((lsn, envelope?))
)
.map_err(ProcessorError::from)?
.map(|entry| match entry {
Ok(entry) => {
trace!(?entry, "Read entry");
let lsn = entry.sequence_number();
if entry.is_data_record() {
entry
.try_decode_arc::<Envelope>()
.map(|envelope| Ok((lsn, envelope.map_err(ProcessorError::from)?)))
.expect("data record is present")
} else {
Err(ProcessorError::TrimGapEncountered {
gap_to_lsn: entry
.trim_gap_to_sequence_number()
.expect("trim gap has to-LSN"),
})
}
}
Err(err) => Err(ProcessorError::from(err)),
})
.try_take_while(|entry| {
.try_take_while(|(_, envelope)| {
// a catch-all safety net if all lower layers didn't filter this record out. This
// could happen for old records that didn't store `Keys` in the log store.
//
// At some point, we should remove this and trust that stored records have Keys
// stored correctly.
std::future::ready(Ok(entry
.as_ref()
.is_ok_and(|(_, envelope)| envelope.matches_key_query(&key_query))))
std::future::ready(Ok(envelope.matches_key_query(&key_query)))
});

// avoid synchronized timers. We pick a randomised timer between 500 and 1023 millis.
Expand Down Expand Up @@ -406,7 +444,7 @@ where
old.updated_at = MillisSinceEpoch::now();
});
}
operation = Self::read_commands(&mut log_reader, self.max_command_batch_size, &mut command_buffer) => {
operation = Self::read_commands(&mut record_stream, self.max_command_batch_size, &mut command_buffer) => {
// check that reading has succeeded
operation?;

Expand All @@ -426,14 +464,14 @@ where
lsn,
envelope,
&mut transaction,
&mut action_collector).await?;
&mut action_collector).await.map_err(ProcessorError::from)?;

apply_command_latency.record(command_start.elapsed());

if let Some((header, announce_leader)) = leadership_change {
// commit all changes so far, this is important so that the actuators see all changes
// when becoming leader.
transaction.commit().await?;
transaction.commit().await.map_err(ProcessorError::from)?;

// We can ignore all actions collected so far because as a new leader we have to instruct the
// actuators afresh.
Expand All @@ -451,7 +489,7 @@ where
self.status.last_observed_leader_node = announce_leader.node_id;
}

let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await?;
let is_leader = self.leadership_state.on_announce_leader(announce_leader, &mut partition_store).await.map_err(ProcessorError::from)?;

Span::current().record("is_leader", is_leader);

Expand All @@ -464,17 +502,17 @@ where
}

// Commit our changes and notify actuators about actions if we are the leader
transaction.commit().await?;
transaction.commit().await.map_err(ProcessorError::from)?;
let actions_start = Instant::now();
self.leadership_state.handle_actions(action_collector.drain(..)).await?;
self.leadership_state.handle_actions(action_collector.drain(..)).await.map_err(ProcessorError::from)?;
record_actions_latency.record(actions_start.elapsed());
},
result = self.leadership_state.run() => {
let action_effects = result?;
let action_effects = result.map_err(ProcessorError::from)?;
// We process the action_effects not directly in the run future because it
// requires the run future to be cancellation safe. In the future this could be
// implemented.
self.leadership_state.handle_action_effects(action_effects).await?;
self.leadership_state.handle_action_effects(action_effects).await.map_err(ProcessorError::from)?;
}
}
// Allow other tasks on this thread to run, but only if we have exhausted the coop
Expand Down Expand Up @@ -813,36 +851,32 @@ where
}

/// Tries to read as many records from the `log_reader` as are immediately available and stops
/// reading at `max_batching_size`.
/// reading at `max_batching_size`. Trim gaps will result in an immediate error.
async fn read_commands<S>(
log_reader: &mut S,
max_batching_size: usize,
record_buffer: &mut Vec<(Lsn, Arc<Envelope>)>,
) -> anyhow::Result<()>
record_buffer: &mut Vec<LsnEnvelope>,
) -> Result<(), ProcessorError>
where
S: Stream<Item = Result<anyhow::Result<(Lsn, Arc<Envelope>)>, restate_bifrost::Error>>
+ Unpin,
S: Stream<Item = Result<LsnEnvelope, ProcessorError>> + Unpin,
{
// beyond this point we must not await; otherwise we are no longer cancellation safe
let first_record = log_reader.next().await;

let Some(first_record) = first_record else {
// read stream terminated!
anyhow::bail!("Read stream terminated for partition processor");
return Err(ProcessorError::LogReadStreamTerminated);
};

record_buffer.clear();
record_buffer.push(first_record??);
record_buffer.push(first_record?);

while record_buffer.len() < max_batching_size {
// read more message from the stream but only if they are immediately available
if let Some(record) = log_reader.next().now_or_never() {
let Some(record) = record else {
// read stream terminated!
anyhow::bail!("Read stream terminated for partition processor");
return Err(ProcessorError::LogReadStreamTerminated);
};

record_buffer.push(record??);
record_buffer.push(record?);
} else {
// no more immediately available records found
break;
Expand Down
Loading
Loading