Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] Introduce ErrorRecoveryStrategy control #2446

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ mod tests {
use test_log::test;

use restate_bifrost::providers::memory_loglet;
use restate_bifrost::{Bifrost, BifrostService};
use restate_bifrost::{Bifrost, BifrostService, ErrorRecoveryStrategy};
use restate_core::network::{
FailingConnector, Incoming, MessageHandler, MockPeerConnection, NetworkServerBuilder,
};
Expand Down Expand Up @@ -875,7 +875,7 @@ mod tests {
let _ = builder.build().await;
bifrost_svc.start().await?;

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?;

TaskCenter::spawn(TaskKind::SystemService, "cluster-controller", svc.run())?;

Expand Down Expand Up @@ -972,7 +972,7 @@ mod tests {
let (_node_2, _node2_reactor) =
node_2.process_with_message_handler(get_node_state_handler)?;

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?;
for i in 1..=20 {
let lsn = appender.append("").await?;
assert_eq!(Lsn::from(i), lsn);
Expand Down Expand Up @@ -1049,7 +1049,7 @@ mod tests {
let (_node_2, _node2_reactor) =
node_2.process_with_message_handler(get_node_state_handler)?;

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?;
for i in 1..=20 {
let lsn = appender.append(format!("record{i}")).await?;
assert_eq!(Lsn::from(i), lsn);
Expand Down Expand Up @@ -1112,7 +1112,7 @@ mod tests {
})
.await?;

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?;
for i in 1..=5 {
let lsn = appender.append(format!("record{i}")).await?;
assert_eq!(Lsn::from(i), lsn);
Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ default = []
replicated-loglet = []
memory-loglet = ["restate-types/memory-loglet"]
test-util = ["memory-loglet", "dep:googletest", "dep:restate-test-util"]
# enables bifrost to auto seal and extend. This is a transitional feature that will be removed soon.
auto-extend = []

[dependencies]
restate-core = { workspace = true }
Expand Down
12 changes: 10 additions & 2 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_types::logs::{LogId, Lsn, Record};
use restate_types::retries::RetryIter;
use restate_types::storage::StorageEncode;

use crate::bifrost::BifrostInner;
use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy};
use crate::loglet::AppendError;
use crate::loglet_wrapper::LogletWrapper;
use crate::{Error, InputRecord, Result};
Expand All @@ -31,17 +31,25 @@ pub struct Appender {
log_id: LogId,
#[debug(skip)]
pub(super) config: Live<Configuration>,
// todo: asoli remove
#[allow(unused)]
error_recovery_strategy: ErrorRecoveryStrategy,
loglet_cache: Option<LogletWrapper>,
#[debug(skip)]
bifrost_inner: Arc<BifrostInner>,
}

impl Appender {
pub(crate) fn new(log_id: LogId, bifrost_inner: Arc<BifrostInner>) -> Self {
pub(crate) fn new(
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
bifrost_inner: Arc<BifrostInner>,
) -> Self {
let config = Configuration::updateable();
Self {
log_id,
config,
error_recovery_strategy,
loglet_cache: Default::default(),
bifrost_inner,
}
Expand Down
91 changes: 74 additions & 17 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,40 @@ use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{Error, InputRecord, LogReadStream, Result};

/// The strategy to use when bifrost fails to append or when it observes
/// a sealed loglet while it's tailing a log.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ErrorRecoveryStrategy {
/// Eagerly extend the chain by creating a new loglet and appending to it.
ExtendChainPreferred,
/// Extend the chain only running out of patience, others might be better suited to reconfigure
/// the chain, but when desperate, we are allowed to seal and extend.
ExtendChainAllowed,
/// Do not extend the chain, wait indefinitely instead until the error disappears.
Wait,
}

impl ErrorRecoveryStrategy {
/// Conditional on a temporary feature gate `auto-extend` until transition is complete
pub fn extend_preferred() -> Self {
if cfg!(feature = "auto-extend") {
Self::ExtendChainPreferred
} else {
Self::Wait
}
}
}

impl Default for ErrorRecoveryStrategy {
fn default() -> Self {
if cfg!(feature = "auto-extend") {
Self::ExtendChainAllowed
} else {
Self::Wait
}
}
}

/// Bifrost is Restate's durable interconnect system
///
/// Bifrost is a mutable-friendly handle to access the system. You don't need
Expand Down Expand Up @@ -97,10 +131,13 @@ impl Bifrost {
pub async fn append<T: StorageEncode>(
&self,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
body: impl Into<InputRecord<T>>,
) -> Result<Lsn> {
self.inner.fail_if_shutting_down()?;
self.inner.append(log_id, body).await
self.inner
.append(log_id, error_recovery_strategy, body)
.await
}

/// Appends a batch of records to a log. The log id must exist, otherwise the
Expand All @@ -116,10 +153,13 @@ impl Bifrost {
pub async fn append_batch<T: StorageEncode>(
&self,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
batch: Vec<impl Into<InputRecord<T>>>,
) -> Result<Lsn> {
self.inner.fail_if_shutting_down()?;
self.inner.append_batch(log_id, batch).await
self.inner
.append_batch(log_id, error_recovery_strategy, batch)
.await
}

/// Read the next record from the LSN provided. The `from` indicates the LSN where we will
Expand Down Expand Up @@ -171,23 +211,32 @@ impl Bifrost {
/// The best way to write to Bifrost is to hold on to an [`Appender`] and reuse it across
/// calls, this allows internal caching of recently accessed loglets and recycling write
/// buffers.
pub fn create_appender(&self, log_id: LogId) -> Result<Appender> {
pub fn create_appender(
&self,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
) -> Result<Appender> {
self.inner.fail_if_shutting_down()?;
self.inner.check_log_id(log_id)?;
Ok(Appender::new(log_id, self.inner.clone()))
Ok(Appender::new(
log_id,
error_recovery_strategy,
self.inner.clone(),
))
}

pub fn create_background_appender<T>(
&self,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
queue_capacity: usize,
max_batch_size: usize,
) -> Result<BackgroundAppender<T>>
where
T: StorageEncode,
{
Ok(BackgroundAppender::new(
self.create_appender(log_id)?,
self.create_appender(log_id, error_recovery_strategy)?,
queue_capacity,
max_batch_size,
))
Expand Down Expand Up @@ -279,17 +328,21 @@ impl BifrostInner {
pub async fn append<T: StorageEncode>(
self: &Arc<Self>,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
record: impl Into<InputRecord<T>>,
) -> Result<Lsn> {
Appender::new(log_id, Arc::clone(self)).append(record).await
Appender::new(log_id, error_recovery_strategy, Arc::clone(self))
.append(record)
.await
}

pub async fn append_batch<T: StorageEncode>(
self: &Arc<Self>,
log_id: LogId,
error_recovery_strategy: ErrorRecoveryStrategy,
batch: Vec<impl Into<InputRecord<T>>>,
) -> Result<Lsn> {
Appender::new(log_id, Arc::clone(self))
Appender::new(log_id, error_recovery_strategy, Arc::clone(self))
.append_batch(batch)
.await
}
Expand Down Expand Up @@ -523,8 +576,8 @@ mod tests {

let clean_bifrost_clone = bifrost.clone();

let mut appender_0 = bifrost.create_appender(LogId::new(0))?;
let mut appender_3 = bifrost.create_appender(LogId::new(3))?;
let mut appender_0 = bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?;
let mut appender_3 = bifrost.create_appender(LogId::new(3), ErrorRecoveryStrategy::Wait)?;
let mut max_lsn = Lsn::INVALID;
for i in 1..=5 {
// Append a record to memory
Expand All @@ -536,13 +589,14 @@ mod tests {

// Append to a log that doesn't exist.
let invalid_log = LogId::from(num_partitions + 1);
let resp = bifrost.create_appender(invalid_log);
let resp = bifrost.create_appender(invalid_log, ErrorRecoveryStrategy::Wait);

assert_that!(resp, pat!(Err(pat!(Error::UnknownLogId(eq(invalid_log))))));

// use a cloned bifrost.
let cloned_bifrost = bifrost.clone();
let mut second_appender_0 = cloned_bifrost.create_appender(LogId::new(0))?;
let mut second_appender_0 =
cloned_bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?;
for _ in 1..=5 {
// Append a record to memory
let lsn = second_appender_0.append("").await?;
Expand All @@ -553,7 +607,7 @@ mod tests {

// Ensure original clone writes to the same underlying loglet.
let lsn = clean_bifrost_clone
.create_appender(LogId::new(0))?
.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?
.append("")
.await?;
assert_eq!(max_lsn + Lsn::from(1), lsn);
Expand Down Expand Up @@ -591,7 +645,10 @@ mod tests {
let bifrost = Bifrost::init_with_factory(factory).await;

let start = tokio::time::Instant::now();
let lsn = bifrost.create_appender(LogId::new(0))?.append("").await?;
let lsn = bifrost
.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?
.append("")
.await?;
assert_eq!(Lsn::from(1), lsn);
// The append was properly delayed
assert_eq!(delay, start.elapsed());
Expand All @@ -618,7 +675,7 @@ mod tests {

assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// append 10 records
for _ in 1..=10 {
appender.append("").await?;
Expand Down Expand Up @@ -687,7 +744,7 @@ mod tests {
&node_env.metadata_store_client,
);

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// Lsns [1..5]
for i in 1..=5 {
// Append a record to memory
Expand Down Expand Up @@ -771,7 +828,7 @@ mod tests {
);

// appends should go to the new segment
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// Lsns [5..7]
for i in 5..=7 {
// Append a record to memory
Expand Down Expand Up @@ -882,7 +939,7 @@ mod tests {
let append_counter = append_counter.clone();
let stop_signal = stop_signal.clone();
let bifrost = bifrost.clone();
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
async move {
let mut i = 0;
while !stop_signal.load(Ordering::Relaxed) {
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod watchdog;

pub use appender::Appender;
pub use background_appender::{AppenderHandle, BackgroundAppender, CommitToken, LogSender};
pub use bifrost::Bifrost;
pub use bifrost::{Bifrost, ErrorRecoveryStrategy};
pub use bifrost_admin::{BifrostAdmin, SealedSegment};
pub use error::{Error, Result};
pub use read_stream::LogReadStream;
Expand Down
12 changes: 6 additions & 6 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ mod tests {
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::Versioned;

use crate::{BifrostAdmin, BifrostService};
use crate::{BifrostAdmin, BifrostService, ErrorRecoveryStrategy};

#[restate_core::test(flavor = "multi_thread", worker_threads = 2)]
#[traced_test]
Expand All @@ -476,7 +476,7 @@ mod tests {
svc.start().await.expect("loglet must start");

let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, read_from, Lsn::MAX)?;
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;

let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
Expand Down Expand Up @@ -558,7 +558,7 @@ mod tests {
);
svc.start().await.expect("loglet must start");

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;

assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

Expand Down Expand Up @@ -651,7 +651,7 @@ mod tests {

// create the reader and put it on the side.
let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, Lsn::OLDEST, Lsn::MAX)?;
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;
// We should be at tail, any attempt to read will yield `pending`.
assert_that!(
futures::poll!(std::pin::pin!(reader.next())),
Expand Down Expand Up @@ -810,7 +810,7 @@ mod tests {
);
svc.start().await.expect("loglet must start");

let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;

let tail = bifrost.find_tail(LOG_ID).await?;
// no records have been written
Expand Down Expand Up @@ -922,7 +922,7 @@ mod tests {
.enable_in_memory_loglet();
let bifrost = svc.handle();
svc.start().await.expect("loglet must start");
let mut appender = bifrost.create_appender(LOG_ID)?;
let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?;

let metadata = Metadata::current();
// prepare a chain that starts from Lsn 10 (we expect trim from OLDEST -> 9)
Expand Down
10 changes: 8 additions & 2 deletions crates/wal-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use restate_bifrost::Bifrost;
use restate_bifrost::{Bifrost, ErrorRecoveryStrategy};
use restate_core::{Metadata, ShutdownError};
use restate_storage_api::deduplication_table::DedupInformation;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey};
Expand Down Expand Up @@ -231,6 +231,10 @@ pub enum Error {
///
/// Important: This method must only be called in the context of a [`TaskCenter`] task because
/// it needs access to [`metadata()`].
///
/// todo: This method should be removed in favor of using Appender/BackgroundAppender API in
/// Bifrost. Additionally, the check for partition_table is probably unnecessary in the vast
/// majority of call-sites.
pub async fn append_envelope_to_bifrost(
bifrost: &Bifrost,
envelope: Arc<Envelope>,
Expand All @@ -246,7 +250,9 @@ pub async fn append_envelope_to_bifrost(
let log_id = LogId::from(*partition_id);
// todo: Pass the envelope as `Arc` to `append_envelope_to_bifrost` instead. Possibly use
// triomphe's UniqueArc for a mutable Arc during construction.
let lsn = bifrost.append(log_id, envelope).await?;
let lsn = bifrost
.append(log_id, ErrorRecoveryStrategy::default(), envelope)
.await?;

Ok((log_id, lsn))
}
Loading
Loading