Skip to content

Commit

Permalink
[Bifrost] Introduce ErrorRecoveryStrategy control
Browse files Browse the repository at this point in the history
This PR introduces the new parameter to appender's API to let API users control the behaviour when appends to the current loglet face persistent failures. There is a new feature (`auto-extend` in restate-bifrost which controls the default behaviour).

No logic changes happen in this PR, in future PRs, bifrost will respect this input and will configure its behaviour accordingly.
  • Loading branch information
AhmedSoliman committed Dec 20, 2024
1 parent ab34a42 commit 809a53f
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 38 deletions.
10 changes: 5 additions & 5 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ mod tests {
use test_log::test;

use restate_bifrost::providers::memory_loglet;
use restate_bifrost::{Bifrost, BifrostService};
use restate_bifrost::{Bifrost, BifrostService, ErrorRecoveryStrategy};
use restate_core::network::{
FailingConnector, Incoming, MessageHandler, MockPeerConnection, NetworkServerBuilder,
};
Expand Down Expand Up @@ -875,7 +875,7 @@ mod tests {
let _ = builder.build().await;
bifrost_svc.start().await?;

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?;

TaskCenter::spawn(TaskKind::SystemService, "cluster-controller", svc.run())?;

Expand Down Expand Up @@ -972,7 +972,7 @@ mod tests {
let (_node_2, _node2_reactor) =
node_2.process_with_message_handler(get_node_state_handler)?;

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?;
for i in 1..=20 {
let lsn = appender.append("").await?;
assert_eq!(Lsn::from(i), lsn);
Expand Down Expand Up @@ -1049,7 +1049,7 @@ mod tests {
let (_node_2, _node2_reactor) =
node_2.process_with_message_handler(get_node_state_handler)?;

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?;
for i in 1..=20 {
let lsn = appender.append(format!("record{i}")).await?;
assert_eq!(Lsn::from(i), lsn);
Expand Down Expand Up @@ -1112,7 +1112,7 @@ mod tests {
})
.await?;

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?;
for i in 1..=5 {
let lsn = appender.append(format!("record{i}")).await?;
assert_eq!(Lsn::from(i), lsn);
Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ default = []
replicated-loglet = []
memory-loglet = ["restate-types/memory-loglet"]
test-util = ["memory-loglet", "dep:googletest", "dep:restate-test-util"]
# enables bifrost to auto seal and extend. This is a transitional feature that will be removed soon.
auto-extend = []

[dependencies]
restate-core = { workspace = true }
Expand Down
12 changes: 10 additions & 2 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_types::logs::{LogId, Lsn, Record};
use restate_types::retries::RetryIter;
use restate_types::storage::StorageEncode;

use crate::bifrost::BifrostInner;
use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy};
use crate::loglet::AppendError;
use crate::loglet_wrapper::LogletWrapper;
use crate::{Error, InputRecord, Result};
Expand All @@ -31,17 +31,25 @@ pub struct Appender {
log_id: LogId,
#[debug(skip)]
pub(super) config: Live<Configuration>,
// todo: asoli remove
#[allow(unused)]
error_recovery_strategy: ErrorRecoveryStrategy,
loglet_cache: Option<LogletWrapper>,
#[debug(skip)]
bifrost_inner: Arc<BifrostInner>,
}

impl Appender {
pub(crate) fn new(log_id: LogId, bifrost_inner: Arc<BifrostInner>) -> Self {
pub(crate) fn new(
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
bifrost_inner: Arc<BifrostInner>,
) -> Self {
let config = Configuration::updateable();
Self {
log_id,
config,
error_recovery_strategy,
loglet_cache: Default::default(),
bifrost_inner,
}
Expand Down
91 changes: 74 additions & 17 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,40 @@ use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{Error, InputRecord, LogReadStream, Result};

/// The strategy to use when bifrost fails to append or when it observes
/// a sealed loglet while it's tailing a log.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ErrorRecoveryStrategy {
/// Eagerly extend the chain by creating a new loglet and appending to it.
ExtendChainPreferred,
/// Extend the chain only running out of patience, others might be better suited to reconfigure
/// the chain, but when desperate, we are allowed to seal and extend.
ExtendChainAllowed,
/// Do not extend the chain, wait indefinitely instead until the error disappears.
Wait,
}

impl ErrorRecoveryStrategy {
/// Conditional on a temporary feature gate `auto-extend` until transition is complete
pub fn extend_preferred() -> Self {
if cfg!(feature = "auto-extend") {
Self::ExtendChainPreferred
} else {
Self::Wait
}
}
}

impl Default for ErrorRecoveryStrategy {
fn default() -> Self {
if cfg!(feature = "auto-extend") {
Self::ExtendChainAllowed
} else {
Self::Wait
}
}
}

/// Bifrost is Restate's durable interconnect system
///
/// Bifrost is a mutable-friendly handle to access the system. You don't need
Expand Down Expand Up @@ -97,10 +131,13 @@ impl Bifrost {
pub async fn append<T: StorageEncode>(
&self,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
body: impl Into<InputRecord<T>>,
) -> Result<Lsn> {
self.inner.fail_if_shutting_down()?;
self.inner.append(log_id, body).await
self.inner
.append(log_id, error_recovery_strategy, body)
.await
}

/// Appends a batch of records to a log. The log id must exist, otherwise the
Expand All @@ -116,10 +153,13 @@ impl Bifrost {
pub async fn append_batch<T: StorageEncode>(
&self,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
batch: Vec<impl Into<InputRecord<T>>>,
) -> Result<Lsn> {
self.inner.fail_if_shutting_down()?;
self.inner.append_batch(log_id, batch).await
self.inner
.append_batch(log_id, error_recovery_strategy, batch)
.await
}

/// Read the next record from the LSN provided. The `from` indicates the LSN where we will
Expand Down Expand Up @@ -171,23 +211,32 @@ impl Bifrost {
/// The best way to write to Bifrost is to hold on to an [`Appender`] and reuse it across
/// calls, this allows internal caching of recently accessed loglets and recycling write
/// buffers.
pub fn create_appender(&self, log_id: LogId) -> Result<Appender> {
pub fn create_appender(
&self,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
) -> Result<Appender> {
self.inner.fail_if_shutting_down()?;
self.inner.check_log_id(log_id)?;
Ok(Appender::new(log_id, self.inner.clone()))
Ok(Appender::new(
log_id,
error_recovery_strategy,
self.inner.clone(),
))
}

pub fn create_background_appender<T>(
&self,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
queue_capacity: usize,
max_batch_size: usize,
) -> Result<BackgroundAppender<T>>
where
T: StorageEncode,
{
Ok(BackgroundAppender::new(
self.create_appender(log_id)?,
self.create_appender(log_id, error_recovery_strategy)?,
queue_capacity,
max_batch_size,
))
Expand Down Expand Up @@ -279,17 +328,21 @@ impl BifrostInner {
pub async fn append<T: StorageEncode>(
self: &Arc<Self>,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
record: impl Into<InputRecord<T>>,
) -> Result<Lsn> {
Appender::new(log_id, Arc::clone(self)).append(record).await
Appender::new(log_id, error_recovery_strategy, Arc::clone(self))
.append(record)
.await
}

pub async fn append_batch<T: StorageEncode>(
self: &Arc<Self>,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
batch: Vec<impl Into<InputRecord<T>>>,
) -> Result<Lsn> {
Appender::new(log_id, Arc::clone(self))
Appender::new(log_id, error_recovery_strategy, Arc::clone(self))
.append_batch(batch)
.await
}
Expand Down Expand Up @@ -523,8 +576,8 @@ mod tests {

let clean_bifrost_clone = bifrost.clone();

let mut appender_0 = bifrost.create_appender(LogId::new(0))?;
let mut appender_3 = bifrost.create_appender(LogId::new(3))?;
let mut appender_0 = bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?;
let mut appender_3 = bifrost.create_appender(LogId::new(3), ErrorRecoveryStrategy::Wait)?;
let mut max_lsn = Lsn::INVALID;
for i in 1..=5 {
// Append a record to memory
Expand All @@ -536,13 +589,14 @@ mod tests {

// Append to a log that doesn't exist.
let invalid_log = LogId::from(num_partitions + 1);
let resp = bifrost.create_appender(invalid_log);
let resp = bifrost.create_appender(invalid_log, ErrorRecoveryStrategy::Wait);

assert_that!(resp, pat!(Err(pat!(Error::UnknownLogId(eq(invalid_log))))));

// use a cloned bifrost.
let cloned_bifrost = bifrost.clone();
let mut second_appender_0 = cloned_bifrost.create_appender(LogId::new(0))?;
let mut second_appender_0 =
cloned_bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?;
for _ in 1..=5 {
// Append a record to memory
let lsn = second_appender_0.append("").await?;
Expand All @@ -553,7 +607,7 @@ mod tests {

// Ensure original clone writes to the same underlying loglet.
let lsn = clean_bifrost_clone
.create_appender(LogId::new(0))?
.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?
.append("")
.await?;
assert_eq!(max_lsn + Lsn::from(1), lsn);
Expand Down Expand Up @@ -591,7 +645,10 @@ mod tests {
let bifrost = Bifrost::init_with_factory(factory).await;

let start = tokio::time::Instant::now();
let lsn = bifrost.create_appender(LogId::new(0))?.append("").await?;
let lsn = bifrost
.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?
.append("")
.await?;
assert_eq!(Lsn::from(1), lsn);
// The append was properly delayed
assert_eq!(delay, start.elapsed());
Expand All @@ -618,7 +675,7 @@ mod tests {

assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// append 10 records
for _ in 1..=10 {
appender.append("").await?;
Expand Down Expand Up @@ -687,7 +744,7 @@ mod tests {
&node_env.metadata_store_client,
);

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// Lsns [1..5]
for i in 1..=5 {
// Append a record to memory
Expand Down Expand Up @@ -771,7 +828,7 @@ mod tests {
);

// appends should go to the new segment
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// Lsns [5..7]
for i in 5..=7 {
// Append a record to memory
Expand Down Expand Up @@ -882,7 +939,7 @@ mod tests {
let append_counter = append_counter.clone();
let stop_signal = stop_signal.clone();
let bifrost = bifrost.clone();
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
async move {
let mut i = 0;
while !stop_signal.load(Ordering::Relaxed) {
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod watchdog;

pub use appender::Appender;
pub use background_appender::{AppenderHandle, BackgroundAppender, CommitToken, LogSender};
pub use bifrost::Bifrost;
pub use bifrost::{Bifrost, ErrorRecoveryStrategy};
pub use bifrost_admin::{BifrostAdmin, SealedSegment};
pub use error::{Error, Result};
pub use read_stream::LogReadStream;
Expand Down
12 changes: 6 additions & 6 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ mod tests {
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::Versioned;

use crate::{BifrostAdmin, BifrostService};
use crate::{BifrostAdmin, BifrostService, ErrorRecoveryStrategy};

#[restate_core::test(flavor = "multi_thread", worker_threads = 2)]
#[traced_test]
Expand All @@ -476,7 +476,7 @@ mod tests {
svc.start().await.expect("loglet must start");

let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, read_from, Lsn::MAX)?;
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;

let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
Expand Down Expand Up @@ -558,7 +558,7 @@ mod tests {
);
svc.start().await.expect("loglet must start");

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;

assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

Expand Down Expand Up @@ -651,7 +651,7 @@ mod tests {

// create the reader and put it on the side.
let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, Lsn::OLDEST, Lsn::MAX)?;
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// We should be at tail, any attempt to read will yield `pending`.
assert_that!(
futures::poll!(std::pin::pin!(reader.next())),
Expand Down Expand Up @@ -810,7 +810,7 @@ mod tests {
);
svc.start().await.expect("loglet must start");

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;

let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
Expand Down Expand Up @@ -922,7 +922,7 @@ mod tests {
.enable_in_memory_loglet();
let bifrost = svc.handle();
svc.start().await.expect("loglet must start");
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;

let metadata = Metadata::current();
// prepare a chain that starts from Lsn 10 (we expect trim from OLDEST -> 9)
Expand Down
10 changes: 8 additions & 2 deletions crates/wal-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use restate_bifrost::Bifrost;
use restate_bifrost::{Bifrost, ErrorRecoveryStrategy};
use restate_core::{Metadata, ShutdownError};
use restate_storage_api::deduplication_table::DedupInformation;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey};
Expand Down Expand Up @@ -231,6 +231,10 @@ pub enum Error {
///
/// Important: This method must only be called in the context of a [`TaskCenter`] task because
/// it needs access to [`metadata()`].
///
/// todo: This method should be removed in favor of using Appender/BackgroundAppender API in
/// Bifrost. Additionally, the check for partition_table is probably unnecessary in the vast
/// majority of call-sites.
pub async fn append_envelope_to_bifrost(
bifrost: &Bifrost,
envelope: Arc<Envelope>,
Expand All @@ -246,7 +250,9 @@ pub async fn append_envelope_to_bifrost(
let log_id = LogId::from(*partition_id);
// todo: Pass the envelope as `Arc` to `append_envelope_to_bifrost` instead. Possibly use
// triomphe's UniqueArc for a mutable Arc during construction.
let lsn = bifrost.append(log_id, envelope).await?;
let lsn = bifrost
.append(log_id, ErrorRecoveryStrategy::default(), envelope)
.await?;

Ok((log_id, lsn))
}
Loading

0 comments on commit 809a53f

Please sign in to comment.