diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 226fef1af..0a74a7078 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -37,11 +37,11 @@ use restate_types::logs::metadata::{ Chain, DefaultProvider, LogletConfig, LogletParams, Logs, LogsConfiguration, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, SegmentIndex, }; -use restate_types::logs::{LogId, Lsn, TailState}; +use restate_types::logs::{LogId, LogletId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::nodes_config::NodesConfiguration; use restate_types::partition_table::PartitionTable; -use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams}; +use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_types::retries::{RetryIter, RetryPolicy}; use restate_types::{logs, GenerationalNodeId, NodeId, PlainNodeId, Version, Versioned}; @@ -55,9 +55,6 @@ type Result = std::result::Result; const FALLBACK_MAX_RETRY_DELAY: Duration = Duration::from_secs(5); -/// A single unified id type enables easier migration between loglet types. -type LogletId = ReplicatedLogletId; - #[derive(Debug, thiserror::Error)] pub enum LogsControllerError { #[error("failed writing to the metadata store: {0}")] @@ -335,7 +332,7 @@ fn try_provisioning( #[cfg(feature = "replicated-loglet")] DefaultProvider::Replicated(ref config) => build_new_replicated_loglet_configuration( config, - ReplicatedLogletId::new(log_id, SegmentIndex::OLDEST), + LogletId::new(log_id, SegmentIndex::OLDEST), &Metadata::with_current(|m| m.nodes_config_ref()), observed_cluster_state, None, @@ -350,7 +347,7 @@ fn try_provisioning( #[cfg(feature = "replicated-loglet")] pub fn build_new_replicated_loglet_configuration( replicated_loglet_config: &ReplicatedLogletConfig, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, nodes_config: &NodesConfiguration, observed_cluster_state: &ObservedClusterState, previous_params: Option<&ReplicatedLogletParams>, @@ -1284,12 +1281,11 @@ pub mod tests { use restate_types::logs::metadata::{ DefaultProvider, LogsConfiguration, NodeSetSelectionStrategy, ReplicatedLogletConfig, }; + use restate_types::logs::LogletId; use restate_types::nodes_config::{ LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState, }; - use restate_types::replicated_loglet::{ - NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty, - }; + use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty}; use restate_types::{GenerationalNodeId, NodeId, PlainNodeId}; use crate::cluster_controller::logs_controller::{ @@ -1492,7 +1488,7 @@ pub mod tests { .build(); let seq_n0 = ReplicatedLogletParams { - loglet_id: ReplicatedLogletId::from(1), + loglet_id: LogletId::from(1), sequencer: GenerationalNodeId::new(0, 1), replication: ReplicationProperty::new(NonZeroU8::new(2).unwrap()), nodeset: NodeSet::from([0, 1, 2]), @@ -1583,7 +1579,7 @@ pub mod tests { let initial = build_new_replicated_loglet_configuration( replicated_loglet_config, - ReplicatedLogletId::from(1), + LogletId::from(1), &nodes.nodes_config, &nodes.observed_state, None, diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index a7e393196..e5694601f 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -17,6 +17,12 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use codederror::CodedError; +use tokio::sync::{mpsc, oneshot}; +use tokio::time; +use tokio::time::{Instant, Interval, MissedTickBehavior}; +use tonic::codec::CompressionEncoding; +use tracing::{debug, info}; + use restate_metadata_store::ReadModifyWriteError; use restate_types::cluster_controller::SchedulingPlan; use restate_types::logs::metadata::{ @@ -28,12 +34,7 @@ use restate_types::metadata_store::keys::{ use restate_types::partition_table::{ self, PartitionTable, PartitionTableBuilder, ReplicationStrategy, }; -use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams}; -use tokio::sync::{mpsc, oneshot}; -use tokio::time; -use tokio::time::{Instant, Interval, MissedTickBehavior}; -use tonic::codec::CompressionEncoding; -use tracing::{debug, info}; +use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; @@ -51,19 +52,19 @@ use restate_types::config::{AdminOptions, Configuration}; use restate_types::health::HealthStatus; use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::live::Live; -use restate_types::logs::{LogId, Lsn}; +use restate_types::logs::{LogId, LogletId, Lsn}; use restate_types::net::metadata::MetadataKind; use restate_types::net::partition_processor_manager::CreateSnapshotRequest; use restate_types::protobuf::common::AdminStatus; use restate_types::{GenerationalNodeId, Version, Versioned}; +use self::state::ClusterControllerState; use super::cluster_state_refresher::ClusterStateRefresher; use super::grpc_svc_handler::ClusterCtrlSvcHandler; use super::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer; use crate::cluster_controller::logs_controller::{self, NodeSetSelectorHints}; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; use crate::cluster_controller::scheduler::SchedulingPlanNodeSetSelectorHints; -use state::ClusterControllerState; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -761,13 +762,13 @@ impl SealAndExtendTask { let (loglet_id, previous_params) = match segment.config.kind { #[cfg(any(test, feature = "memory-loglet"))] ProviderKind::InMemory => { - let loglet_id = ReplicatedLogletId::from_str(&segment.config.params) - .context("Invalid loglet id")?; + let loglet_id = + LogletId::from_str(&segment.config.params).context("Invalid loglet id")?; (loglet_id, None) } ProviderKind::Local => { - let loglet_id = ReplicatedLogletId::from_str(&segment.config.params) - .context("Invalid loglet id")?; + let loglet_id = + LogletId::from_str(&segment.config.params).context("Invalid loglet id")?; (loglet_id, None) } #[cfg(feature = "replicated-loglet")] @@ -835,7 +836,7 @@ mod tests { use test_log::test; use restate_bifrost::providers::memory_loglet; - use restate_bifrost::{Bifrost, BifrostService}; + use restate_bifrost::{Bifrost, BifrostService, ErrorRecoveryStrategy}; use restate_core::network::{ FailingConnector, Incoming, MessageHandler, MockPeerConnection, NetworkServerBuilder, }; @@ -875,7 +876,7 @@ mod tests { let _ = builder.build().await; bifrost_svc.start().await?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?; TaskCenter::spawn(TaskKind::SystemService, "cluster-controller", svc.run())?; @@ -972,7 +973,7 @@ mod tests { let (_node_2, _node2_reactor) = node_2.process_with_message_handler(get_node_state_handler)?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?; for i in 1..=20 { let lsn = appender.append("").await?; assert_eq!(Lsn::from(i), lsn); @@ -1049,7 +1050,7 @@ mod tests { let (_node_2, _node2_reactor) = node_2.process_with_message_handler(get_node_state_handler)?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?; for i in 1..=20 { let lsn = appender.append(format!("record{i}")).await?; assert_eq!(Lsn::from(i), lsn); @@ -1112,7 +1113,7 @@ mod tests { }) .await?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::default())?; for i in 1..=5 { let lsn = appender.append(format!("record{i}")).await?; assert_eq!(Lsn::from(i), lsn); diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index e3f8449fa..4b3df3785 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -12,6 +12,8 @@ default = [] replicated-loglet = [] memory-loglet = ["restate-types/memory-loglet"] test-util = ["memory-loglet", "dep:googletest", "dep:restate-test-util"] +# enables bifrost to auto seal and extend. This is a transitional feature that will be removed soon. +auto-extend = [] [dependencies] restate-core = { workspace = true } diff --git a/crates/bifrost/benches/replicated_loglet_serde.rs b/crates/bifrost/benches/replicated_loglet_serde.rs index 52be8b284..b01cc3c5f 100644 --- a/crates/bifrost/benches/replicated_loglet_serde.rs +++ b/crates/bifrost/benches/replicated_loglet_serde.rs @@ -26,11 +26,10 @@ use restate_types::identifiers::{InvocationId, LeaderEpoch, PartitionProcessorRp use restate_types::invocation::{ InvocationTarget, ServiceInvocation, ServiceInvocationSpanContext, }; -use restate_types::logs::{LogId, Record}; +use restate_types::logs::{LogId, LogletId, Record}; use restate_types::net::codec::{serialize_message, MessageBodyExt, WireDecode}; use restate_types::net::replicated_loglet::{Append, CommonRequestHeader}; use restate_types::protobuf::node::Message; -use restate_types::replicated_loglet::ReplicatedLogletId; use restate_types::time::MillisSinceEpoch; use restate_types::GenerationalNodeId; use restate_wal_protocol::{Command, Destination, Envelope}; @@ -124,7 +123,7 @@ fn serialize_append_message(payloads: Arc<[Record]>) -> anyhow::Result header: CommonRequestHeader { log_id: LogId::from(12u16), segment_index: 2.into(), - loglet_id: ReplicatedLogletId::new(12u16.into(), 4.into()), + loglet_id: LogletId::new(12u16.into(), 4.into()), }, payloads, }; diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index 65feaa37b..81d8a26ba 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -21,7 +21,7 @@ use restate_types::logs::{LogId, Lsn, Record}; use restate_types::retries::RetryIter; use restate_types::storage::StorageEncode; -use crate::bifrost::BifrostInner; +use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy}; use crate::loglet::AppendError; use crate::loglet_wrapper::LogletWrapper; use crate::{Error, InputRecord, Result}; @@ -31,17 +31,25 @@ pub struct Appender { log_id: LogId, #[debug(skip)] pub(super) config: Live, + // todo: asoli remove + #[allow(unused)] + error_recovery_strategy: ErrorRecoveryStrategy, loglet_cache: Option, #[debug(skip)] bifrost_inner: Arc, } impl Appender { - pub(crate) fn new(log_id: LogId, bifrost_inner: Arc) -> Self { + pub(crate) fn new( + log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, + bifrost_inner: Arc, + ) -> Self { let config = Configuration::updateable(); Self { log_id, config, + error_recovery_strategy, loglet_cache: Default::default(), bifrost_inner, } diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 6c769aac6..06b86ff76 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -27,6 +27,40 @@ use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; use crate::{Error, InputRecord, LogReadStream, Result}; +/// The strategy to use when bifrost fails to append or when it observes +/// a sealed loglet while it's tailing a log. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ErrorRecoveryStrategy { + /// Eagerly extend the chain by creating a new loglet and appending to it. + ExtendChainPreferred, + /// Extend the chain only running out of patience, others might be better suited to reconfigure + /// the chain, but when desperate, we are allowed to seal and extend. + ExtendChainAllowed, + /// Do not extend the chain, wait indefinitely instead until the error disappears. + Wait, +} + +impl ErrorRecoveryStrategy { + /// Conditional on a temporary feature gate `auto-extend` until transition is complete + pub fn extend_preferred() -> Self { + if cfg!(feature = "auto-extend") { + Self::ExtendChainPreferred + } else { + Self::Wait + } + } +} + +impl Default for ErrorRecoveryStrategy { + fn default() -> Self { + if cfg!(feature = "auto-extend") { + Self::ExtendChainAllowed + } else { + Self::Wait + } + } +} + /// Bifrost is Restate's durable interconnect system /// /// Bifrost is a mutable-friendly handle to access the system. You don't need @@ -97,10 +131,13 @@ impl Bifrost { pub async fn append( &self, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, body: impl Into>, ) -> Result { self.inner.fail_if_shutting_down()?; - self.inner.append(log_id, body).await + self.inner + .append(log_id, error_recovery_strategy, body) + .await } /// Appends a batch of records to a log. The log id must exist, otherwise the @@ -116,10 +153,13 @@ impl Bifrost { pub async fn append_batch( &self, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, batch: Vec>>, ) -> Result { self.inner.fail_if_shutting_down()?; - self.inner.append_batch(log_id, batch).await + self.inner + .append_batch(log_id, error_recovery_strategy, batch) + .await } /// Read the next record from the LSN provided. The `from` indicates the LSN where we will @@ -171,15 +211,24 @@ impl Bifrost { /// The best way to write to Bifrost is to hold on to an [`Appender`] and reuse it across /// calls, this allows internal caching of recently accessed loglets and recycling write /// buffers. - pub fn create_appender(&self, log_id: LogId) -> Result { + pub fn create_appender( + &self, + log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, + ) -> Result { self.inner.fail_if_shutting_down()?; self.inner.check_log_id(log_id)?; - Ok(Appender::new(log_id, self.inner.clone())) + Ok(Appender::new( + log_id, + error_recovery_strategy, + self.inner.clone(), + )) } pub fn create_background_appender( &self, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, queue_capacity: usize, max_batch_size: usize, ) -> Result> @@ -187,7 +236,7 @@ impl Bifrost { T: StorageEncode, { Ok(BackgroundAppender::new( - self.create_appender(log_id)?, + self.create_appender(log_id, error_recovery_strategy)?, queue_capacity, max_batch_size, )) @@ -279,17 +328,21 @@ impl BifrostInner { pub async fn append( self: &Arc, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, record: impl Into>, ) -> Result { - Appender::new(log_id, Arc::clone(self)).append(record).await + Appender::new(log_id, error_recovery_strategy, Arc::clone(self)) + .append(record) + .await } pub async fn append_batch( self: &Arc, log_id: LogId, + error_recovery_strategy: ErrorRecoveryStrategy, batch: Vec>>, ) -> Result { - Appender::new(log_id, Arc::clone(self)) + Appender::new(log_id, error_recovery_strategy, Arc::clone(self)) .append_batch(batch) .await } @@ -523,8 +576,8 @@ mod tests { let clean_bifrost_clone = bifrost.clone(); - let mut appender_0 = bifrost.create_appender(LogId::new(0))?; - let mut appender_3 = bifrost.create_appender(LogId::new(3))?; + let mut appender_0 = bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?; + let mut appender_3 = bifrost.create_appender(LogId::new(3), ErrorRecoveryStrategy::Wait)?; let mut max_lsn = Lsn::INVALID; for i in 1..=5 { // Append a record to memory @@ -536,13 +589,14 @@ mod tests { // Append to a log that doesn't exist. let invalid_log = LogId::from(num_partitions + 1); - let resp = bifrost.create_appender(invalid_log); + let resp = bifrost.create_appender(invalid_log, ErrorRecoveryStrategy::Wait); assert_that!(resp, pat!(Err(pat!(Error::UnknownLogId(eq(invalid_log)))))); // use a cloned bifrost. let cloned_bifrost = bifrost.clone(); - let mut second_appender_0 = cloned_bifrost.create_appender(LogId::new(0))?; + let mut second_appender_0 = + cloned_bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?; for _ in 1..=5 { // Append a record to memory let lsn = second_appender_0.append("").await?; @@ -553,7 +607,7 @@ mod tests { // Ensure original clone writes to the same underlying loglet. let lsn = clean_bifrost_clone - .create_appender(LogId::new(0))? + .create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)? .append("") .await?; assert_eq!(max_lsn + Lsn::from(1), lsn); @@ -591,7 +645,10 @@ mod tests { let bifrost = Bifrost::init_with_factory(factory).await; let start = tokio::time::Instant::now(); - let lsn = bifrost.create_appender(LogId::new(0))?.append("").await?; + let lsn = bifrost + .create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)? + .append("") + .await?; assert_eq!(Lsn::from(1), lsn); // The append was properly delayed assert_eq!(delay, start.elapsed()); @@ -618,7 +675,7 @@ mod tests { assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // append 10 records for _ in 1..=10 { appender.append("").await?; @@ -687,7 +744,7 @@ mod tests { &node_env.metadata_store_client, ); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // Lsns [1..5] for i in 1..=5 { // Append a record to memory @@ -771,7 +828,7 @@ mod tests { ); // appends should go to the new segment - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // Lsns [5..7] for i in 5..=7 { // Append a record to memory @@ -882,7 +939,7 @@ mod tests { let append_counter = append_counter.clone(); let stop_signal = stop_signal.clone(); let bifrost = bifrost.clone(); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; async move { let mut i = 0; while !stop_signal.load(Ordering::Relaxed) { diff --git a/crates/bifrost/src/lib.rs b/crates/bifrost/src/lib.rs index 7cfaf7dbb..305a07c63 100644 --- a/crates/bifrost/src/lib.rs +++ b/crates/bifrost/src/lib.rs @@ -24,7 +24,7 @@ mod watchdog; pub use appender::Appender; pub use background_appender::{AppenderHandle, BackgroundAppender, CommitToken, LogSender}; -pub use bifrost::Bifrost; +pub use bifrost::{Bifrost, ErrorRecoveryStrategy}; pub use bifrost_admin::{BifrostAdmin, SealedSegment}; pub use error::{Error, Result}; pub use read_stream::LogReadStream; diff --git a/crates/bifrost/src/providers/replicated_loglet/error.rs b/crates/bifrost/src/providers/replicated_loglet/error.rs index 9552f1781..775709cbf 100644 --- a/crates/bifrost/src/providers/replicated_loglet/error.rs +++ b/crates/bifrost/src/providers/replicated_loglet/error.rs @@ -13,8 +13,7 @@ use std::sync::Arc; use restate_core::ShutdownError; use restate_types::errors::MaybeRetryableError; use restate_types::logs::metadata::SegmentIndex; -use restate_types::logs::LogId; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::{LogId, LogletId}; use crate::loglet::OperationError; @@ -25,7 +24,7 @@ pub(crate) enum ReplicatedLogletError { #[error("cannot find the tail of the loglet: {0}")] FindTailFailed(String), #[error("could not seal loglet_id={0}, insufficient nodes available for seal")] - SealFailed(ReplicatedLogletId), + SealFailed(LogletId), #[error(transparent)] Shutdown(#[from] ShutdownError), } diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 6b3c04a1e..8e176db18 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -346,8 +346,8 @@ mod tests { use restate_types::config::{set_current_config, Configuration}; use restate_types::health::HealthStatus; use restate_types::live::Live; - use restate_types::logs::Keys; - use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicationProperty}; + use restate_types::logs::{Keys, LogletId}; + use restate_types::replicated_loglet::{NodeSet, ReplicationProperty}; use restate_types::{GenerationalNodeId, PlainNodeId}; use crate::loglet::{AppendError, Loglet}; @@ -421,7 +421,7 @@ mod tests { // ** Single-node replicated-loglet smoke tests ** #[test(restate_core::test(start_paused = true))] async fn test_append_local_sequencer_single_node() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -464,7 +464,7 @@ mod tests { // ** Single-node replicated-loglet seal ** #[test(restate_core::test(start_paused = true))] async fn test_seal_local_sequencer_single_node() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -513,7 +513,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_gapless_loglet_smoke_test() -> Result<()> { let record_cache = RecordCache::new(1_000_000); - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -528,7 +528,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_single_loglet_readstream() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -544,7 +544,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_single_loglet_readstream_with_trims() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -567,7 +567,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_append_after_seal() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -583,7 +583,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_append_after_seal_concurrent() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -600,7 +600,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_seal_empty() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs index c8eca92d5..1306a53ce 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs @@ -26,9 +26,9 @@ use restate_core::{ }; use restate_types::{ config::Configuration, - logs::{LogletOffset, Record, RecordCache, SequenceNumber}, + logs::{LogletId, LogletOffset, Record, RecordCache, SequenceNumber}, net::log_server::Store, - replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty}, + replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty}, GenerationalNodeId, }; @@ -74,7 +74,7 @@ impl SequencerSharedState { &self.my_params } - pub fn loglet_id(&self) -> &ReplicatedLogletId { + pub fn loglet_id(&self) -> &LogletId { &self.my_params.loglet_id } } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs index 134da9015..794febb0f 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs @@ -17,12 +17,12 @@ use tracing::{debug, trace, warn}; use restate_core::network::rpc_router::{RpcError, RpcRouter}; use restate_core::network::{Networking, TransportConnect}; use restate_core::{cancellation_watcher, ShutdownError, TaskCenterFutureExt}; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::{ Digest, LogServerRequestHeader, RecordStatus, Status, Store, StoreFlags, }; use restate_types::nodes_config::NodesConfiguration; -use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicatedLogletParams}; +use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams}; use restate_types::{GenerationalNodeId, PlainNodeId}; use crate::loglet::util::TailOffsetWatch; @@ -40,7 +40,7 @@ struct ReplicationFailed; /// Tracks digest responses and record repairs to achieve a consistent and durable /// state of the loglet tail. pub struct Digests { - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, // inclusive. The first record we need to repair. start_offset: LogletOffset, // exclusive (this should be the durable global_tail after finishing) diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs index ac5e09708..ced39e8a7 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs @@ -18,12 +18,10 @@ use restate_core::network::{Networking, TransportConnect}; use restate_core::TaskCenterFutureExt; use restate_types::config::Configuration; use restate_types::logs::metadata::SegmentIndex; -use restate_types::logs::{LogId, LogletOffset, RecordCache, SequenceNumber}; +use restate_types::logs::{LogId, LogletId, LogletOffset, RecordCache, SequenceNumber}; use restate_types::net::log_server::{GetLogletInfo, LogServerRequestHeader, Status, WaitForTail}; use restate_types::net::replicated_loglet::{CommonRequestHeader, GetSequencerState}; -use restate_types::replicated_loglet::{ - EffectiveNodeSet, ReplicatedLogletId, ReplicatedLogletParams, -}; +use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletParams}; use restate_types::PlainNodeId; use super::{NodeTailStatus, RepairTail, RepairTailResult, SealTask}; @@ -485,7 +483,7 @@ impl FindTailTask { pub(super) struct FindTailOnNode<'a> { pub(super) node_id: PlainNodeId, - pub(super) loglet_id: ReplicatedLogletId, + pub(super) loglet_id: LogletId, pub(super) get_loglet_info_rpc: &'a RpcRouter, pub(super) known_global_tail: &'a TailOffsetWatch, } @@ -603,7 +601,7 @@ impl<'a> FindTailOnNode<'a> { struct WaitForTailOnNode { node_id: PlainNodeId, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, wait_for_tail_rpc: RpcRouter, known_global_tail: TailOffsetWatch, } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs index ae4098f44..aef25acba 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs @@ -15,7 +15,7 @@ use tokio::time::Instant; use tracing::{debug, trace}; use restate_core::network::TransportConnect; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::LogletId; use crate::loglet::{Loglet, OperationError}; use crate::providers::replicated_loglet::loglet::ReplicatedLoglet; @@ -24,7 +24,7 @@ pub struct PeriodicTailChecker {} impl PeriodicTailChecker { pub async fn run( - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, loglet: Weak>, duration: Duration, ) -> anyhow::Result<()> { diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs index eafaca25e..f79a28ee6 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -15,11 +15,9 @@ use restate_core::network::rpc_router::{RpcError, RpcRouter}; use restate_core::network::{Incoming, Networking, TransportConnect}; use restate_core::{TaskCenter, TaskKind}; use restate_types::config::Configuration; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status}; -use restate_types::replicated_loglet::{ - EffectiveNodeSet, NodeSet, ReplicatedLogletId, ReplicatedLogletParams, -}; +use restate_types::replicated_loglet::{EffectiveNodeSet, NodeSet, ReplicatedLogletParams}; use restate_types::retries::RetryPolicy; use restate_types::{GenerationalNodeId, PlainNodeId}; @@ -132,7 +130,7 @@ impl SealTask { struct SealSingleNode { node_id: PlainNodeId, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, sequencer: GenerationalNodeId, seal_router: RpcRouter, networking: Networking, diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 8b95884b3..8ac81c3ee 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -454,7 +454,7 @@ mod tests { use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Versioned; - use crate::{BifrostAdmin, BifrostService}; + use crate::{BifrostAdmin, BifrostService, ErrorRecoveryStrategy}; #[restate_core::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] @@ -476,7 +476,7 @@ mod tests { svc.start().await.expect("loglet must start"); let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, read_from, Lsn::MAX)?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; let tail = bifrost.find_tail(LOG_ID).await?; // no records have been written @@ -558,7 +558,7 @@ mod tests { ); svc.start().await.expect("loglet must start"); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); @@ -651,7 +651,7 @@ mod tests { // create the reader and put it on the side. let mut reader = bifrost.create_reader(LOG_ID, KeyFilter::Any, Lsn::OLDEST, Lsn::MAX)?; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // We should be at tail, any attempt to read will yield `pending`. assert_that!( futures::poll!(std::pin::pin!(reader.next())), @@ -810,7 +810,7 @@ mod tests { ); svc.start().await.expect("loglet must start"); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; let tail = bifrost.find_tail(LOG_ID).await?; // no records have been written @@ -922,7 +922,7 @@ mod tests { .enable_in_memory_loglet(); let bifrost = svc.handle(); svc.start().await.expect("loglet must start"); - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; let metadata = Metadata::current(); // prepare a chain that starts from Lsn 10 (we expect trim from OLDEST -> 9) diff --git a/crates/log-server/src/grpc_svc_handler.rs b/crates/log-server/src/grpc_svc_handler.rs index db347ecac..6563622d7 100644 --- a/crates/log-server/src/grpc_svc_handler.rs +++ b/crates/log-server/src/grpc_svc_handler.rs @@ -11,9 +11,8 @@ use async_trait::async_trait; use tonic::{Request, Response, Status}; -use restate_types::logs::{LogletOffset, RecordCache, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, RecordCache, SequenceNumber}; use restate_types::net::log_server::{GetDigest, LogServerResponseHeader, LogletInfo}; -use restate_types::replicated_loglet::ReplicatedLogletId; use crate::logstore::LogStore; use crate::metadata::LogletStateMap; @@ -51,7 +50,7 @@ where request: Request, ) -> Result, Status> { let request = request.into_inner(); - let loglet_id = ReplicatedLogletId::from(request.loglet_id); + let loglet_id = LogletId::from(request.loglet_id); let state = self .state_map .get_or_load(loglet_id, &self.log_store) @@ -82,7 +81,7 @@ where request: Request, ) -> Result, Status> { let request = request.into_inner(); - let loglet_id = ReplicatedLogletId::from(request.loglet_id); + let loglet_id = LogletId::from(request.loglet_id); let state = self .state_map .get_or_load(loglet_id, &self.log_store) diff --git a/crates/log-server/src/loglet_worker.rs b/crates/log-server/src/loglet_worker.rs index 603403eea..40b324d99 100644 --- a/crates/log-server/src/loglet_worker.rs +++ b/crates/log-server/src/loglet_worker.rs @@ -16,9 +16,8 @@ use tracing::{debug, trace, trace_span, warn, Instrument}; use restate_core::network::Incoming; use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskHandle, TaskKind}; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::*; -use restate_types::replicated_loglet::ReplicatedLogletId; use restate_types::GenerationalNodeId; use crate::logstore::{AsyncToken, LogStore}; @@ -93,14 +92,14 @@ impl LogletWorkerHandle { } pub struct LogletWorker { - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, log_store: S, loglet_state: LogletState, } impl LogletWorker { pub fn start( - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, log_store: S, loglet_state: LogletState, ) -> Result { @@ -613,7 +612,6 @@ mod tests { use restate_types::logs::{KeyFilter, Keys, Record, RecordCache}; use restate_types::net::codec::MessageBodyExt; use restate_types::net::CURRENT_PROTOCOL_VERSION; - use restate_types::replicated_loglet::ReplicatedLogletId; use crate::metadata::LogletStateMap; use crate::rocksdb_logstore::{RocksDbLogStore, RocksDbLogStoreBuilder}; @@ -642,7 +640,7 @@ mod tests { async fn test_simple_store_flow() -> Result<()> { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); @@ -718,7 +716,7 @@ mod tests { async fn test_store_and_seal() -> Result<()> { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); @@ -878,7 +876,7 @@ mod tests { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); const PEER: GenerationalNodeId = GenerationalNodeId::new(2, 2); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); @@ -1041,7 +1039,7 @@ mod tests { async fn test_simple_get_records_flow() -> Result<()> { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); @@ -1258,7 +1256,7 @@ mod tests { async fn test_trim_basics() -> Result<()> { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); diff --git a/crates/log-server/src/logstore.rs b/crates/log-server/src/logstore.rs index 0471aed4c..730519f8a 100644 --- a/crates/log-server/src/logstore.rs +++ b/crates/log-server/src/logstore.rs @@ -15,8 +15,8 @@ use tokio::sync::oneshot; use restate_bifrost::loglet::OperationError; use restate_core::ShutdownError; +use restate_types::logs::LogletId; use restate_types::net::log_server::{Digest, GetDigest, GetRecords, Records, Seal, Store, Trim}; -use restate_types::replicated_loglet::ReplicatedLogletId; use crate::metadata::{LogStoreMarker, LogletState}; @@ -32,7 +32,7 @@ pub trait LogStore: Clone + Send + 'static { /// [`LogletState`] will not observe the values in this one. fn load_loglet_state( &self, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, ) -> impl Future> + Send; fn enqueue_store( diff --git a/crates/log-server/src/metadata.rs b/crates/log-server/src/metadata.rs index 5f5ad5fb8..4bfc9e72e 100644 --- a/crates/log-server/src/metadata.rs +++ b/crates/log-server/src/metadata.rs @@ -19,8 +19,7 @@ use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::loglet::util::TailOffsetWatch; use restate_bifrost::loglet::OperationError; use restate_core::ShutdownError; -use restate_types::logs::{LogletOffset, SequenceNumber, TailState}; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber, TailState}; use restate_types::{GenerationalNodeId, PlainNodeId}; use crate::logstore::LogStore; @@ -65,7 +64,7 @@ impl LogStoreMarker { /// Caches loglet state in memory #[derive(Default, Clone)] pub struct LogletStateMap { - inner: Arc>>, + inner: Arc>>, } impl LogletStateMap { @@ -76,7 +75,7 @@ impl LogletStateMap { pub async fn get_or_load( &self, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, log_store: &S, ) -> Result { let mut guard = self.inner.lock().await; diff --git a/crates/log-server/src/network.rs b/crates/log-server/src/network.rs index d8ebad0d0..034c2a4e4 100644 --- a/crates/log-server/src/network.rs +++ b/crates/log-server/src/network.rs @@ -25,10 +25,10 @@ use restate_core::network::{Incoming, MessageRouterBuilder, MessageStream}; use restate_types::config::Configuration; use restate_types::health::HealthStatus; use restate_types::live::Live; +use restate_types::logs::LogletId; use restate_types::net::log_server::*; use restate_types::nodes_config::StorageState; use restate_types::protobuf::common::LogServerStatus; -use restate_types::replicated_loglet::ReplicatedLogletId; use crate::loglet_worker::{LogletWorker, LogletWorkerHandle}; use crate::logstore::LogStore; @@ -36,7 +36,7 @@ use crate::metadata::LogletStateMap; const DEFAULT_WRITERS_CAPACITY: usize = 128; -type LogletWorkerMap = HashMap; +type LogletWorkerMap = HashMap; pub struct RequestPump { _configuration: Live, @@ -312,7 +312,7 @@ impl RequestPump { } async fn find_or_create_worker<'a, S: LogStore>( - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, log_store: &S, state_map: &LogletStateMap, loglet_workers: &'a mut LogletWorkerMap, diff --git a/crates/log-server/src/rocksdb_logstore/keys.rs b/crates/log-server/src/rocksdb_logstore/keys.rs index c7500e12e..74eaa3103 100644 --- a/crates/log-server/src/rocksdb_logstore/keys.rs +++ b/crates/log-server/src/rocksdb_logstore/keys.rs @@ -12,8 +12,7 @@ use std::mem::size_of; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use restate_types::logs::LogletOffset; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::{LogletId, LogletOffset}; // log-store marker pub(super) const MARKER_KEY: &[u8] = b"storage-marker"; @@ -40,11 +39,11 @@ pub(super) enum KeyPrefixKind { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) struct KeyPrefix { kind: KeyPrefixKind, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, } impl KeyPrefix { - pub fn new(kind: KeyPrefixKind, loglet_id: ReplicatedLogletId) -> Self { + pub fn new(kind: KeyPrefixKind, loglet_id: LogletId) -> Self { Self { kind, loglet_id } } @@ -73,12 +72,12 @@ impl KeyPrefix { fn decode(buf: &mut B) -> KeyPrefix { let kind = KeyPrefixKind::try_from(buf.get_u8()).expect("recognized key kind"); - let loglet_id = ReplicatedLogletId::from(buf.get_u64()); + let loglet_id = LogletId::from(buf.get_u64()); Self { kind, loglet_id } } pub(super) const fn size() -> usize { - size_of::() + size_of::() + size_of::() + size_of::() } } @@ -89,14 +88,14 @@ pub(super) struct DataRecordKey { } impl DataRecordKey { - pub fn new(loglet_id: ReplicatedLogletId, offset: LogletOffset) -> Self { + pub fn new(loglet_id: LogletId, offset: LogletOffset) -> Self { Self { prefix: KeyPrefix::new(KeyPrefixKind::DataRecord, loglet_id), offset, } } - pub fn loglet_id(&self) -> ReplicatedLogletId { + pub fn loglet_id(&self) -> LogletId { self.prefix.loglet_id } @@ -104,7 +103,7 @@ impl DataRecordKey { self.offset } - pub fn exclusive_upper_bound(loglet_id: ReplicatedLogletId) -> BytesMut { + pub fn exclusive_upper_bound(loglet_id: LogletId) -> BytesMut { let mut buf = BytesMut::with_capacity(Self::size()); KeyPrefix::new(KeyPrefixKind::DataRecord, loglet_id).encode_exclusive_upper_bound(&mut buf); buf.put_u64(0); @@ -151,7 +150,7 @@ pub(super) struct MetadataKey { } impl MetadataKey { - pub fn new(kind: KeyPrefixKind, loglet_id: ReplicatedLogletId) -> Self { + pub fn new(kind: KeyPrefixKind, loglet_id: LogletId) -> Self { // Just a sanity check debug_assert_ne!(kind, KeyPrefixKind::DataRecord); Self { @@ -160,7 +159,7 @@ impl MetadataKey { } #[allow(unused)] - pub fn loglet_id(&self) -> ReplicatedLogletId { + pub fn loglet_id(&self) -> LogletId { self.prefix.loglet_id } diff --git a/crates/log-server/src/rocksdb_logstore/store.rs b/crates/log-server/src/rocksdb_logstore/store.rs index 91a72b03d..7c8a0ab2b 100644 --- a/crates/log-server/src/rocksdb_logstore/store.rs +++ b/crates/log-server/src/rocksdb_logstore/store.rs @@ -20,13 +20,12 @@ use restate_rocksdb::{IoMode, Priority, RocksDb}; use restate_types::config::LogServerOptions; use restate_types::health::HealthStatus; use restate_types::live::BoxedLiveLoad; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::{ Digest, DigestEntry, Gap, GetDigest, GetRecords, LogServerResponseHeader, MaybeRecord, RecordStatus, Records, Seal, Store, Trim, }; use restate_types::protobuf::common::LogServerStatus; -use restate_types::replicated_loglet::ReplicatedLogletId; use restate_types::GenerationalNodeId; use super::keys::{KeyPrefixKind, MetadataKey, MARKER_KEY}; @@ -100,10 +99,7 @@ impl LogStore for RocksDbLogStore { Ok(()) } - async fn load_loglet_state( - &self, - loglet_id: ReplicatedLogletId, - ) -> Result { + async fn load_loglet_state(&self, loglet_id: LogletId) -> Result { let start = Instant::now(); let metadata_cf = self.metadata_cf(); let data_cf = self.data_cf(); @@ -501,11 +497,10 @@ mod tests { use restate_rocksdb::RocksDbManager; use restate_types::config::Configuration; use restate_types::live::Live; - use restate_types::logs::{LogletOffset, Record, RecordCache, SequenceNumber}; + use restate_types::logs::{LogletId, LogletOffset, Record, RecordCache, SequenceNumber}; use restate_types::net::log_server::{ DigestEntry, GetDigest, LogServerRequestHeader, RecordStatus, Status, Store, StoreFlags, }; - use restate_types::replicated_loglet::ReplicatedLogletId; use restate_types::{GenerationalNodeId, PlainNodeId}; use super::RocksDbLogStore; @@ -554,8 +549,8 @@ mod tests { async fn test_load_loglet_state() -> Result<()> { let log_store = setup().await?; // fresh/unknown loglet - let loglet_id_1 = ReplicatedLogletId::new_unchecked(88); - let loglet_id_2 = ReplicatedLogletId::new_unchecked(89); + let loglet_id_1 = LogletId::new_unchecked(88); + let loglet_id_2 = LogletId::new_unchecked(89); let sequencer_1 = GenerationalNodeId::new(5, 213); let sequencer_2 = GenerationalNodeId::new(2, 212); @@ -645,8 +640,8 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn test_digest() -> Result<()> { let log_store = setup().await?; - let loglet_id_1 = ReplicatedLogletId::new_unchecked(88); - let loglet_id_2 = ReplicatedLogletId::new_unchecked(89); + let loglet_id_1 = LogletId::new_unchecked(88); + let loglet_id_2 = LogletId::new_unchecked(89); let sequencer_1 = GenerationalNodeId::new(5, 213); let sequencer_2 = GenerationalNodeId::new(2, 212); diff --git a/crates/log-server/src/rocksdb_logstore/writer.rs b/crates/log-server/src/rocksdb_logstore/writer.rs index 08498c49a..edb6b6541 100644 --- a/crates/log-server/src/rocksdb_logstore/writer.rs +++ b/crates/log-server/src/rocksdb_logstore/writer.rs @@ -33,8 +33,7 @@ use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; use restate_rocksdb::{IoMode, Priority, RocksDb}; use restate_types::config::LogServerOptions; use restate_types::live::BoxedLiveLoad; -use restate_types::logs::{LogletOffset, Record, RecordCache, SequenceNumber}; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::{LogletId, LogletOffset, Record, RecordCache, SequenceNumber}; use super::keys::{DataRecordKey, KeyPrefixKind, MetadataKey}; use super::record_format::DataRecordEncoder; @@ -51,7 +50,7 @@ const RECORD_SIZE_GUESS: usize = 4_096; // Estimate 4KiB per record const INITIAL_SERDE_BUFFER_SIZE: usize = 16_384; // Initial capacity 16KiB pub struct LogStoreWriteCommand { - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, data_update: Option, metadata_update: Option, ack: Option, @@ -220,7 +219,7 @@ impl LogStoreWriter { fn update_metadata( metadata_cf: &Arc, write_batch: &mut WriteBatch, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, update: MetadataUpdate, buffer: &mut BytesMut, ) { @@ -248,7 +247,7 @@ impl LogStoreWriter { fn trim_log_records( data_cf: &Arc, write_batch: &mut WriteBatch, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, trim_point: LogletOffset, buffer: &mut BytesMut, ) { diff --git a/crates/types/src/logs/loglet.rs b/crates/types/src/logs/loglet.rs new file mode 100644 index 000000000..ad2eed15c --- /dev/null +++ b/crates/types/src/logs/loglet.rs @@ -0,0 +1,98 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + +use crate::logs::metadata::SegmentIndex; +use crate::logs::LogId; + +/// LogletId is a helper type to generate reliably unique identifiers for individual loglets in a +/// single chain. +/// +/// This is not an essential type and loglet providers may choose to use their own type. This type +/// stitches the log-id and a segment-index in a u64 number which can be displayed as +/// `_` +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Eq, + PartialEq, + Hash, + Ord, + PartialOrd, + Clone, + Copy, + derive_more::From, + derive_more::Deref, + derive_more::Into, +)] +#[serde(transparent)] +#[repr(transparent)] +pub struct LogletId(u64); + +impl LogletId { + /// Creates a new [`LogletId`] from a [`LogId`] and a [`SegmentIndex`]. The upper + /// 32 bits are the log_id and the lower are the segment_index. + pub fn new(log_id: LogId, segment_index: SegmentIndex) -> Self { + let id = u64::from(u32::from(log_id)) << 32 | u64::from(u32::from(segment_index)); + Self(id) + } + + /// It's your responsibility that the value has the right meaning. + pub const fn new_unchecked(v: u64) -> Self { + Self(v) + } + + /// Creates a new [`LogletId`] by incrementing the lower 32 bits (segment index part). + pub fn next(&self) -> Self { + assert!( + self.0 & 0xFFFFFFFF < u64::from(u32::MAX), + "Segment part must not overflow into the LogId part" + ); + Self(self.0 + 1) + } + + fn log_id(&self) -> LogId { + LogId::new(u32::try_from(self.0 >> 32).expect("upper 32 bits should fit into u32")) + } + + fn segment_index(&self) -> SegmentIndex { + SegmentIndex::from( + u32::try_from(self.0 & 0xFFFFFFFF).expect("lower 32 bits should fit into u32"), + ) + } +} + +impl Display for LogletId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}_{}", self.log_id(), self.segment_index()) + } +} + +impl FromStr for LogletId { + type Err = ::Err; + fn from_str(s: &str) -> Result { + if s.contains('_') { + let parts: Vec<&str> = s.split('_').collect(); + let log_id: u32 = parts[0].parse()?; + let segment_index: u32 = parts[1].parse()?; + Ok(LogletId::new( + LogId::from(log_id), + SegmentIndex::from(segment_index), + )) + } else { + // treat the string as raw replicated log-id + let id: u64 = s.parse()?; + Ok(LogletId(id)) + } + } +} diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index c2944e4ff..1668910b5 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -23,12 +23,13 @@ use smallvec::SmallVec; use xxhash_rust::xxh3::Xxh3Builder; use super::builder::LogsBuilder; +use super::LogletId; use crate::config::Configuration; use crate::logs::{LogId, Lsn, SequenceNumber}; use crate::protobuf::cluster::{ NodeSetSelectionStrategy as ProtoNodeSetSelectionStrategy, NodeSetSelectionStrategyKind, }; -use crate::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty}; +use crate::replicated_loglet::{ReplicatedLogletParams, ReplicationProperty}; use crate::{flexbuffers_storage_encode_decode, Version, Versioned}; // Starts with 0 being the oldest loglet in the chain. @@ -73,7 +74,7 @@ pub struct LogletRef

{ #[derive(Debug, Clone, Default)] pub(super) struct LookupIndex { pub(super) replicated_loglets: - HashMap, Xxh3Builder>, + HashMap, Xxh3Builder>, } impl LookupIndex { @@ -97,7 +98,7 @@ impl LookupIndex { &mut self, log_id: LogId, segment_index: SegmentIndex, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, ) { if let hash_map::Entry::Occupied(mut entry) = self.replicated_loglets.entry(loglet_id) { entry @@ -112,7 +113,7 @@ impl LookupIndex { pub fn get_replicated_loglet( &self, - loglet_id: &ReplicatedLogletId, + loglet_id: &LogletId, ) -> Option<&LogletRef> { self.replicated_loglets.get(loglet_id) } @@ -135,7 +136,7 @@ impl LookupIndex { pub enum NodeSetSelectionStrategy { /// Selects an optimal nodeset size based on the replication factor. The nodeset size is at /// least `2f+1`, where `f` is the number of tolerable failures. - /// + /// /// It's calculated by working backwards from a replication factor of `f+1`. If there are /// more nodes available in the cluster, the strategy will use them. /// @@ -540,7 +541,7 @@ impl Logs { pub fn get_replicated_loglet( &self, - loglet_id: &ReplicatedLogletId, + loglet_id: &LogletId, ) -> Option<&LogletRef> { self.lookup_index.get_replicated_loglet(loglet_id) } diff --git a/crates/types/src/logs/mod.rs b/crates/types/src/logs/mod.rs index a8624dc4c..170e84cfa 100644 --- a/crates/types/src/logs/mod.rs +++ b/crates/types/src/logs/mod.rs @@ -17,11 +17,13 @@ use crate::identifiers::PartitionId; use crate::storage::StorageEncode; pub mod builder; +mod loglet; pub mod metadata; mod record; mod record_cache; mod tail; +pub use loglet::*; pub use record::Record; pub use record_cache::RecordCache; pub use tail::*; diff --git a/crates/types/src/logs/record_cache.rs b/crates/types/src/logs/record_cache.rs index e2ea24b8b..70b3cb634 100644 --- a/crates/types/src/logs/record_cache.rs +++ b/crates/types/src/logs/record_cache.rs @@ -14,11 +14,10 @@ use moka::{ }; use xxhash_rust::xxh3::Xxh3Builder; -use super::{LogletOffset, Record, SequenceNumber}; -use crate::replicated_loglet::ReplicatedLogletId; +use super::{LogletId, LogletOffset, Record, SequenceNumber}; /// Unique record key across different loglets. -type RecordKey = (ReplicatedLogletId, LogletOffset); +type RecordKey = (LogletId, LogletOffset); /// A a simple LRU-based record cache. /// @@ -54,7 +53,7 @@ impl RecordCache { } /// Writes a record to cache externally - pub fn add(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset, record: Record) { + pub fn add(&self, loglet_id: LogletId, offset: LogletOffset, record: Record) { let Some(ref inner) = self.inner else { return; }; @@ -65,7 +64,7 @@ impl RecordCache { /// Extend cache with records pub fn extend>( &self, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, mut first_offset: LogletOffset, records: I, ) { @@ -80,7 +79,7 @@ impl RecordCache { } /// Get a for given loglet id and offset. - pub fn get(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset) -> Option { + pub fn get(&self, loglet_id: LogletId, offset: LogletOffset) -> Option { let inner = self.inner.as_ref()?; inner.get(&(loglet_id, offset)) diff --git a/crates/types/src/net/log_server.rs b/crates/types/src/net/log_server.rs index 8da9cf074..325834e3d 100644 --- a/crates/types/src/net/log_server.rs +++ b/crates/types/src/net/log_server.rs @@ -17,8 +17,7 @@ use serde::{Deserialize, Serialize}; use super::codec::{WireDecode, WireEncode}; use super::{RpcRequest, TargetName}; -use crate::logs::{KeyFilter, LogletOffset, Record, SequenceNumber, TailState}; -use crate::replicated_loglet::ReplicatedLogletId; +use crate::logs::{KeyFilter, LogletId, LogletOffset, Record, SequenceNumber, TailState}; use crate::time::MillisSinceEpoch; use crate::GenerationalNodeId; @@ -166,14 +165,14 @@ define_logserver_rpc! { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogServerRequestHeader { - pub loglet_id: ReplicatedLogletId, + pub loglet_id: LogletId, /// If the sender has now knowledge of this value, it can safely be set to /// `LogletOffset::INVALID` pub known_global_tail: LogletOffset, } impl LogServerRequestHeader { - pub fn new(loglet_id: ReplicatedLogletId, known_global_tail: LogletOffset) -> Self { + pub fn new(loglet_id: LogletId, known_global_tail: LogletOffset) -> Self { Self { loglet_id, known_global_tail, diff --git a/crates/types/src/net/replicated_loglet.rs b/crates/types/src/net/replicated_loglet.rs index dfbfbb494..ab2834a67 100644 --- a/crates/types/src/net/replicated_loglet.rs +++ b/crates/types/src/net/replicated_loglet.rs @@ -17,9 +17,8 @@ use serde::{Deserialize, Serialize}; use super::TargetName; use crate::logs::metadata::SegmentIndex; -use crate::logs::{LogId, LogletOffset, Record, SequenceNumber, TailState}; +use crate::logs::{LogId, LogletId, LogletOffset, Record, SequenceNumber, TailState}; use crate::net::define_rpc; -use crate::replicated_loglet::ReplicatedLogletId; // ----- ReplicatedLoglet Sequencer API ----- define_rpc! { @@ -66,7 +65,7 @@ pub struct CommonRequestHeader { pub log_id: LogId, pub segment_index: SegmentIndex, /// The loglet_id id globally unique - pub loglet_id: ReplicatedLogletId, + pub loglet_id: LogletId, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/types/src/replicated_loglet/params.rs b/crates/types/src/replicated_loglet/params.rs index ae6cce5bb..f987dbe5e 100644 --- a/crates/types/src/replicated_loglet/params.rs +++ b/crates/types/src/replicated_loglet/params.rs @@ -9,12 +9,10 @@ // by the Apache License, Version 2.0. use std::collections::HashSet; -use std::fmt::{Display, Formatter}; -use std::str::FromStr; +use std::fmt::Display; use super::ReplicationProperty; -use crate::logs::metadata::SegmentIndex; -use crate::logs::LogId; +use crate::logs::LogletId; use crate::nodes_config::NodesConfiguration; use crate::{GenerationalNodeId, PlainNodeId}; use itertools::Itertools; @@ -26,7 +24,7 @@ use xxhash_rust::xxh3::Xxh3Builder; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] pub struct ReplicatedLogletParams { /// Unique identifier for this loglet - pub loglet_id: ReplicatedLogletId, + pub loglet_id: LogletId, /// The sequencer node #[serde(with = "serde_with::As::")] pub sequencer: GenerationalNodeId, @@ -45,83 +43,6 @@ impl ReplicatedLogletParams { } } -#[derive( - serde::Serialize, - serde::Deserialize, - Debug, - Eq, - PartialEq, - Hash, - Ord, - PartialOrd, - Clone, - Copy, - derive_more::From, - derive_more::Deref, - derive_more::Into, -)] -#[serde(transparent)] -#[repr(transparent)] -pub struct ReplicatedLogletId(u64); - -impl ReplicatedLogletId { - /// Creates a new [`ReplicatedLogletId`] from a [`LogId`] and a [`SegmentIndex`]. The upper - /// 32 bits are the log_id and the lower are the segment_index. - pub fn new(log_id: LogId, segment_index: SegmentIndex) -> Self { - let id = u64::from(u32::from(log_id)) << 32 | u64::from(u32::from(segment_index)); - Self(id) - } - - /// It's your responsibility that the value has the right meaning. - pub const fn new_unchecked(v: u64) -> Self { - Self(v) - } - - /// Creates a new [`ReplicatedLogletId`] by incrementing the lower 32 bits (segment index part). - pub fn next(&self) -> Self { - assert!( - self.0 & 0xFFFFFFFF < u64::from(u32::MAX), - "Segment part must not overflow into the LogId part" - ); - Self(self.0 + 1) - } - - fn log_id(&self) -> LogId { - LogId::new(u32::try_from(self.0 >> 32).expect("upper 32 bits should fit into u32")) - } - - fn segment_index(&self) -> SegmentIndex { - SegmentIndex::from( - u32::try_from(self.0 & 0xFFFFFFFF).expect("lower 32 bits should fit into u32"), - ) - } -} - -impl Display for ReplicatedLogletId { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}_{}", self.log_id(), self.segment_index()) - } -} - -impl FromStr for ReplicatedLogletId { - type Err = ::Err; - fn from_str(s: &str) -> Result { - if s.contains('_') { - let parts: Vec<&str> = s.split('_').collect(); - let log_id: u32 = parts[0].parse()?; - let segment_index: u32 = parts[1].parse()?; - Ok(ReplicatedLogletId::new( - LogId::from(log_id), - SegmentIndex::from(segment_index), - )) - } else { - // treat the string as raw replicated log-id - let id: u64 = s.parse()?; - Ok(ReplicatedLogletId(id)) - } - } -} - #[serde_with::serde_as] #[derive( serde::Serialize, diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index feec06619..d3a5418ce 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use bytes::{Bytes, BytesMut}; -use restate_bifrost::Bifrost; +use restate_bifrost::{Bifrost, ErrorRecoveryStrategy}; use restate_core::{Metadata, ShutdownError}; use restate_storage_api::deduplication_table::DedupInformation; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; @@ -231,6 +231,10 @@ pub enum Error { /// /// Important: This method must only be called in the context of a [`TaskCenter`] task because /// it needs access to [`metadata()`]. +/// +/// todo: This method should be removed in favor of using Appender/BackgroundAppender API in +/// Bifrost. Additionally, the check for partition_table is probably unnecessary in the vast +/// majority of call-sites. pub async fn append_envelope_to_bifrost( bifrost: &Bifrost, envelope: Arc, @@ -246,7 +250,9 @@ pub async fn append_envelope_to_bifrost( let log_id = LogId::from(*partition_id); // todo: Pass the envelope as `Arc` to `append_envelope_to_bifrost` instead. Possibly use // triomphe's UniqueArc for a mutable Arc during construction. - let lsn = bifrost.append(log_id, envelope).await?; + let lsn = bifrost + .append(log_id, ErrorRecoveryStrategy::default(), envelope) + .await?; Ok((log_id, lsn)) } diff --git a/crates/worker/src/partition/leadership/self_proposer.rs b/crates/worker/src/partition/leadership/self_proposer.rs index a3611463c..5f561206d 100644 --- a/crates/worker/src/partition/leadership/self_proposer.rs +++ b/crates/worker/src/partition/leadership/self_proposer.rs @@ -10,7 +10,7 @@ use crate::partition::leadership::Error; use futures::never::Never; -use restate_bifrost::{Bifrost, CommitToken}; +use restate_bifrost::{Bifrost, CommitToken, ErrorRecoveryStrategy}; use restate_core::my_node_id; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; use restate_types::identifiers::{PartitionId, PartitionKey}; @@ -44,6 +44,7 @@ impl SelfProposer { let bifrost_appender = bifrost .create_background_appender( LogId::from(partition_id), + ErrorRecoveryStrategy::extend_preferred(), BIFROST_QUEUE_SIZE, MAX_BIFROST_APPEND_BATCH, )? diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index 94efe9f6f..628707454 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -26,6 +26,7 @@ use restate_local_cluster_runner::{ use restate_rocksdb::RocksDbManager; use restate_types::logs::builder::LogsBuilder; use restate_types::logs::metadata::{Chain, LogletParams, SegmentIndex}; +use restate_types::logs::LogletId; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::{ config::Configuration, @@ -33,7 +34,7 @@ use restate_types::{ logs::{metadata::ProviderKind, LogId}, net::{AdvertisedAddress, BindAddress}, nodes_config::Role, - replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty}, + replicated_loglet::{ReplicatedLogletParams, ReplicationProperty}, GenerationalNodeId, PlainNodeId, }; @@ -137,7 +138,7 @@ where cluster.wait_healthy(Duration::from_secs(30)).await?; let loglet_params = ReplicatedLogletParams { - loglet_id: ReplicatedLogletId::new(LogId::from(1u32), SegmentIndex::OLDEST), + loglet_id: LogletId::new(LogId::from(1u32), SegmentIndex::OLDEST), sequencer, replication, // node 1 is the metadata, 2..=count+1 are logservers diff --git a/server/tests/replicated_loglet.rs b/server/tests/replicated_loglet.rs index 621aa73b6..8fa79c7f2 100644 --- a/server/tests/replicated_loglet.rs +++ b/server/tests/replicated_loglet.rs @@ -21,7 +21,7 @@ mod tests { use futures_util::StreamExt; use googletest::prelude::*; - use restate_bifrost::loglet::AppendError; + use restate_bifrost::{loglet::AppendError, ErrorRecoveryStrategy}; use restate_core::{cancellation_token, Metadata, TaskCenterFutureExt}; use test_log::test; @@ -235,6 +235,7 @@ mod tests { let offset = bifrost .append( log_id, + ErrorRecoveryStrategy::Wait, format!("appender-{appender_id}-record{i}"), ) .await?; diff --git a/tools/bifrost-benchpress/src/append_latency.rs b/tools/bifrost-benchpress/src/append_latency.rs index 37d5a82df..9c696a020 100644 --- a/tools/bifrost-benchpress/src/append_latency.rs +++ b/tools/bifrost-benchpress/src/append_latency.rs @@ -14,7 +14,7 @@ use bytes::BytesMut; use hdrhistogram::Histogram; use tracing::info; -use restate_bifrost::Bifrost; +use restate_bifrost::{Bifrost, ErrorRecoveryStrategy}; use restate_types::logs::{LogId, WithKeys}; use crate::util::{print_latencies, DummyPayload}; @@ -41,7 +41,8 @@ pub async fn run( let blob = BytesMut::zeroed(args.payload_size).freeze(); let mut append_latencies = Histogram::::new(3)?; let mut counter = 0; - let mut appender = bifrost.create_appender(LOG_ID)?; + let mut appender = + bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::extend_preferred())?; let start = Instant::now(); loop { if counter >= args.num_records { diff --git a/tools/bifrost-benchpress/src/write_to_read.rs b/tools/bifrost-benchpress/src/write_to_read.rs index f4f58f63a..eff7ac025 100644 --- a/tools/bifrost-benchpress/src/write_to_read.rs +++ b/tools/bifrost-benchpress/src/write_to_read.rs @@ -16,7 +16,7 @@ use futures::StreamExt; use hdrhistogram::Histogram; use tracing::info; -use restate_bifrost::Bifrost; +use restate_bifrost::{Bifrost, ErrorRecoveryStrategy}; use restate_core::{Metadata, TaskCenter, TaskHandle, TaskKind}; use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber, WithKeys}; @@ -96,6 +96,7 @@ pub async fn run(_common_args: &Arguments, args: &WriteToReadOpts, bifrost: Bifr let appender_handle = bifrost .create_background_appender( LOG_ID, + ErrorRecoveryStrategy::extend_preferred(), args.write_buffer_size, args.max_batch_size, )? diff --git a/tools/restatectl/src/commands/log/gen_metadata.rs b/tools/restatectl/src/commands/log/gen_metadata.rs index 6f71a3524..d0f4c928c 100644 --- a/tools/restatectl/src/commands/log/gen_metadata.rs +++ b/tools/restatectl/src/commands/log/gen_metadata.rs @@ -14,10 +14,8 @@ use cling::prelude::*; use restate_types::logs::builder::LogsBuilder; use restate_types::logs::metadata::{Chain, LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::LogId; -use restate_types::replicated_loglet::{ - NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty, -}; +use restate_types::logs::{LogId, LogletId}; +use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty}; use restate_types::{GenerationalNodeId, PlainNodeId}; #[derive(Run, Parser, Collect, Clone, Debug)] @@ -49,7 +47,7 @@ async fn generate_log_metadata(opts: &GenerateLogMetadataOpts) -> anyhow::Result let log_id = LogId::from(log_id); let segment_index = SegmentIndex::OLDEST; let loglet_params = ReplicatedLogletParams { - loglet_id: ReplicatedLogletId::new(log_id, segment_index), + loglet_id: LogletId::new(log_id, segment_index), sequencer: opts.sequencer, replication: ReplicationProperty::new(opts.replication_factor), nodeset: NodeSet::from_iter(opts.nodeset.clone()), diff --git a/tools/restatectl/src/commands/log/reconfigure.rs b/tools/restatectl/src/commands/log/reconfigure.rs index 45c8c3b71..b67545a13 100644 --- a/tools/restatectl/src/commands/log/reconfigure.rs +++ b/tools/restatectl/src/commands/log/reconfigure.rs @@ -21,11 +21,9 @@ use restate_admin::cluster_controller::protobuf::{ }; use restate_cli_util::{c_eprintln, c_println}; use restate_types::logs::metadata::{Logs, ProviderKind, SegmentIndex}; -use restate_types::logs::LogId; +use restate_types::logs::{LogId, LogletId}; use restate_types::protobuf::common::Version; -use restate_types::replicated_loglet::{ - NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty, -}; +use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty}; use restate_types::storage::StorageCodec; use restate_types::{GenerationalNodeId, PlainNodeId}; @@ -162,7 +160,7 @@ async fn replicated_loglet_params( .map(SegmentIndex::from) .unwrap_or(chain.tail_index()); - let loglet_id = ReplicatedLogletId::new(log_id, tail_index.next()); + let loglet_id = LogletId::new(log_id, tail_index.next()); let tail_segment = chain.tail(); diff --git a/tools/restatectl/src/commands/replicated_loglet/digest.rs b/tools/restatectl/src/commands/replicated_loglet/digest.rs index 272bd3951..805f2151e 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest.rs @@ -25,10 +25,10 @@ use restate_core::MetadataKind; use restate_log_server::protobuf::log_server_svc_client::LogServerSvcClient; use restate_log_server::protobuf::GetDigestRequest; use restate_types::logs::metadata::Logs; -use restate_types::logs::{LogletOffset, SequenceNumber, TailState}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber, TailState}; use restate_types::net::log_server::RecordStatus; use restate_types::nodes_config::{NodesConfiguration, Role}; -use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletId}; +use restate_types::replicated_loglet::EffectiveNodeSet; use restate_types::storage::StorageCodec; use restate_types::Versioned; @@ -40,7 +40,7 @@ use crate::util::grpc_connect; #[cling(run = "get_digest")] pub struct DigestOpts { /// The replicated loglet id - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, /// Sync metadata from metadata store first #[arg(long)] sync_metadata: bool, diff --git a/tools/restatectl/src/commands/replicated_loglet/digest_util.rs b/tools/restatectl/src/commands/replicated_loglet/digest_util.rs index 0e41a2fa0..5e1da75e3 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest_util.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest_util.rs @@ -13,14 +13,14 @@ use std::collections::{BTreeMap, HashMap}; use tracing::warn; use restate_bifrost::loglet::util::TailOffsetWatch; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::{Digest, LogServerResponseHeader, RecordStatus, Status}; -use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams}; +use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_types::PlainNodeId; /// Tracks digest responses and record statuses pub struct DigestsHelper { - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, // all offsets `[start_offset..target_tail)` offsets: BTreeMap>, known_nodes: HashMap, diff --git a/tools/restatectl/src/commands/replicated_loglet/info.rs b/tools/restatectl/src/commands/replicated_loglet/info.rs index 521f79355..f5b9d56e7 100644 --- a/tools/restatectl/src/commands/replicated_loglet/info.rs +++ b/tools/restatectl/src/commands/replicated_loglet/info.rs @@ -10,16 +10,16 @@ use anyhow::Context; use cling::prelude::*; +use tonic::codec::CompressionEncoding; use restate_cli_util::{c_indentln, c_println}; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; use restate_core::network::protobuf::node_ctl_svc::GetMetadataRequest; use restate_core::MetadataKind; use restate_types::logs::metadata::Logs; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::LogletId; use restate_types::storage::StorageCodec; use restate_types::Versioned; -use tonic::codec::CompressionEncoding; use crate::app::ConnectionInfo; use crate::util::grpc_connect; @@ -28,7 +28,7 @@ use crate::util::grpc_connect; #[cling(run = "get_info")] pub struct InfoOpts { /// The replicated loglet id - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, /// Sync metadata from metadata store first #[arg(long)] sync_metadata: bool,