diff --git a/Cargo.lock b/Cargo.lock index 436138462..128283d19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6080,6 +6080,7 @@ dependencies = [ "enumset", "futures", "googletest", + "itertools 0.13.0", "metrics", "parking_lot", "paste", diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index f9c820239..c685a102b 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -16,9 +16,8 @@ use restate_types::protobuf::cluster::ClusterConfiguration; use tonic::{async_trait, Request, Response, Status}; use tracing::info; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError}; +use restate_bifrost::{Bifrost, Error as BiforstError}; use restate_core::{Metadata, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::{Logs, SegmentIndex}; use restate_types::logs::{LogId, Lsn, SequenceNumber}; @@ -44,7 +43,6 @@ use super::service::ChainExtension; use super::ClusterControllerHandle; pub(crate) struct ClusterCtrlSvcHandler { - metadata_store_client: MetadataStoreClient, controller_handle: ClusterControllerHandle, bifrost: Bifrost, metadata_writer: MetadataWriter, @@ -53,20 +51,19 @@ pub(crate) struct ClusterCtrlSvcHandler { impl ClusterCtrlSvcHandler { pub fn new( controller_handle: ClusterControllerHandle, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, metadata_writer: MetadataWriter, ) -> Self { Self { controller_handle, - metadata_store_client, bifrost, metadata_writer, } } async fn get_logs(&self) -> Result { - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await .map_err(|error| Status::unknown(format!("Failed to get log metadata: {error:?}")))? @@ -120,7 +117,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let (trim_point, nodes_config) = tokio::join!( self.bifrost.get_trim_point(log_id), - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()), ); @@ -151,7 +149,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { _request: Request, ) -> Result, Status> { let nodes_config = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()) .await .map_err(|error| { @@ -261,13 +260,9 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let request = request.into_inner(); let log_id: LogId = request.log_id.into(); - let admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - - let writable_loglet = admin + let writable_loglet = self + .bifrost + .admin() .writeable_loglet(log_id) .await .map_err(|err| match err { diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 0a74a7078..c522dadb0 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -10,32 +10,30 @@ mod nodeset_selection; -use futures::never::Never; -use rand::prelude::IteratorRandom; -use rand::thread_rng; use std::collections::HashMap; use std::iter; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; + +use futures::never::Never; +use rand::prelude::IteratorRandom; +use rand::thread_rng; use tokio::sync::Semaphore; use tokio::task::JoinSet; use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; -use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, -}; +use restate_bifrost::{Bifrost, Error as BifrostError}; +use restate_core::metadata_store::{Precondition, ReadWriteError, WriteError}; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; -use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; use restate_types::live::Pinned; use restate_types::logs::builder::LogsBuilder; use restate_types::logs::metadata::{ - Chain, DefaultProvider, LogletConfig, LogletParams, Logs, LogsConfiguration, - NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, SegmentIndex, + Chain, LogletConfig, LogletParams, Logs, LogsConfiguration, NodeSetSelectionStrategy, + ProviderConfiguration, ProviderKind, ReplicatedLogletConfig, SegmentIndex, }; use restate_types::logs::{LogId, LogletId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; @@ -320,17 +318,17 @@ fn try_provisioning( node_set_selector_hints: impl NodeSetSelectorHints, ) -> Option { match logs_configuration.default_provider { - DefaultProvider::Local => { + ProviderConfiguration::Local => { let log_id = LogletId::new(log_id, SegmentIndex::OLDEST); Some(LogletConfiguration::Local(log_id.into())) } #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => { + ProviderConfiguration::InMemory => { let log_id = LogletId::new(log_id, SegmentIndex::OLDEST); Some(LogletConfiguration::Memory(log_id.into())) } #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(ref config) => build_new_replicated_loglet_configuration( + ProviderConfiguration::Replicated(ref config) => build_new_replicated_loglet_configuration( config, LogletId::new(log_id, SegmentIndex::OLDEST), &Metadata::with_current(|m| m.nodes_config_ref()), @@ -436,10 +434,10 @@ impl LogletConfiguration { ) -> bool { match (self, &logs_configuration.default_provider) { #[cfg(any(test, feature = "memory-loglet"))] - (Self::Memory(_), DefaultProvider::InMemory) => false, - (Self::Local(_), DefaultProvider::Local) => false, + (Self::Memory(_), ProviderConfiguration::InMemory) => false, + (Self::Local(_), ProviderConfiguration::Local) => false, #[cfg(feature = "replicated-loglet")] - (Self::Replicated(params), DefaultProvider::Replicated(config)) => { + (Self::Replicated(params), ProviderConfiguration::Replicated(config)) => { let sequencer_change_required = !observed_cluster_state .is_node_alive(params.sequencer) && !observed_cluster_state.alive_nodes.is_empty(); @@ -468,9 +466,10 @@ impl LogletConfiguration { sequencer_change_required || nodeset_improvement_possible } - _ => { + (x, y) => { debug!( - "Changing provider type is not supporter at the moment. Ignoring reconfigure" + "Changing provider type from {} to {} is not supporter at the moment. Ignoring reconfigure", + x.as_provider(), y.kind(), ); false } @@ -501,10 +500,14 @@ impl LogletConfiguration { match logs_configuration.default_provider { #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => Some(LogletConfiguration::Memory(loglet_id.next().into())), - DefaultProvider::Local => Some(LogletConfiguration::Local(loglet_id.next().into())), + ProviderConfiguration::InMemory => { + Some(LogletConfiguration::Memory(loglet_id.next().into())) + } + ProviderConfiguration::Local => { + Some(LogletConfiguration::Local(loglet_id.next().into())) + } #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(ref config) => { + ProviderConfiguration::Replicated(ref config) => { let previous_params = match self { Self::Replicated(previous_params) => Some(previous_params), _ => None, @@ -639,9 +642,9 @@ struct LogsControllerInner { } impl LogsControllerInner { - fn new(configuration: LogsConfiguration, retry_policy: RetryPolicy) -> Self { + fn new(current_logs: Arc, retry_policy: RetryPolicy) -> Self { Self { - current_logs: Arc::new(Logs::with_logs_configuration(configuration)), + current_logs, logs_state: HashMap::with_hasher(Xxh3Builder::default()), logs_write_in_progress: None, retry_policy, @@ -918,33 +921,13 @@ pub struct LogsController { effects: Option>, inner: LogsControllerInner, bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, async_operations: JoinSet, find_logs_tail_semaphore: Arc, } impl LogsController { - pub async fn init( - configuration: &Configuration, - bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, - metadata_writer: MetadataWriter, - ) -> Result { - // obtain the latest logs or init it with an empty logs variant - let logs = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) - }, - ) - .await?; - - let logs_configuration = logs.configuration().clone(); - metadata_writer.update(Arc::new(logs)).await?; - + pub async fn init(bifrost: Bifrost, metadata_writer: MetadataWriter) -> Result { //todo(azmy): make configurable let retry_policy = RetryPolicy::exponential( Duration::from_millis(10), @@ -955,9 +938,11 @@ impl LogsController { let mut this = Self { effects: Some(Vec::new()), - inner: LogsControllerInner::new(logs_configuration, retry_policy), + inner: LogsControllerInner::new( + Metadata::with_current(|m| m.logs_snapshot()), + retry_policy, + ), bifrost, - metadata_store_client, metadata_writer, async_operations: JoinSet::default(), find_logs_tail_semaphore: Arc::new(Semaphore::new(1)), @@ -976,17 +961,12 @@ impl LogsController { let logs = Arc::clone(&self.inner.current_logs); let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); let find_tail = async move { - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - let mut updates = LogsTailUpdates::default(); for (log_id, chain) in logs.iter() { let tail_segment = chain.tail(); - let writable_loglet = match bifrost_admin.writeable_loglet(*log_id).await { + let writable_loglet = match bifrost.admin().writeable_loglet(*log_id).await { Ok(loglet) => loglet, Err(BifrostError::Shutdown(_)) => break, Err(err) => { @@ -1090,7 +1070,6 @@ impl LogsController { logs: Arc, mut debounce: Option>, ) { - let metadata_store_client = self.metadata_store_client.clone(); let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn(async move { @@ -1100,7 +1079,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - if let Err(err) = metadata_store_client + if let Err(err) = metadata_writer.metadata_store_client() .put( BIFROST_CONFIG_KEY.clone(), logs.deref(), @@ -1112,7 +1091,7 @@ impl LogsController { WriteError::FailedPrecondition(_) => { debug!("Detected a concurrent modification of logs. Fetching the latest logs now."); // There was a concurrent modification of the logs. Fetch the latest version. - match metadata_store_client + match metadata_writer.metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await { @@ -1158,8 +1137,6 @@ impl LogsController { mut debounce: Option>, ) { let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn( async move { @@ -1169,10 +1146,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - - match bifrost_admin.seal(log_id, segment_index).await { + match bifrost.admin().seal(log_id, segment_index).await { Ok(sealed_segment) => { if sealed_segment.tail.is_sealed() { Event::SealSucceeded { @@ -1279,7 +1253,7 @@ pub mod tests { use enumset::{enum_set, EnumSet}; use restate_types::logs::metadata::{ - DefaultProvider, LogsConfiguration, NodeSetSelectionStrategy, ReplicatedLogletConfig, + LogsConfiguration, NodeSetSelectionStrategy, ProviderConfiguration, ReplicatedLogletConfig, }; use restate_types::logs::LogletId; use restate_types::nodes_config::{ @@ -1452,7 +1426,7 @@ pub mod tests { fn logs_configuration(replication_factor: u8) -> LogsConfiguration { LogsConfiguration { - default_provider: DefaultProvider::Replicated(ReplicatedLogletConfig { + default_provider: ProviderConfiguration::Replicated(ReplicatedLogletConfig { replication_property: ReplicationProperty::new( NonZeroU8::new(replication_factor).expect("must be non zero"), ), @@ -1537,7 +1511,7 @@ pub mod tests { &nodes.observed_state )); - let DefaultProvider::Replicated(ref replicated_loglet_config) = + let ProviderConfiguration::Replicated(ref replicated_loglet_config) = logs_config.default_provider else { unreachable!() @@ -1571,7 +1545,7 @@ pub mod tests { let logs_config = logs_configuration(2); - let DefaultProvider::Replicated(ref replicated_loglet_config) = + let ProviderConfiguration::Replicated(ref replicated_loglet_config) = logs_config.default_provider else { unreachable!() diff --git a/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs b/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs index 8d5a89e22..ebd05a0d6 100644 --- a/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs +++ b/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs @@ -13,9 +13,9 @@ use std::cmp::{max, Ordering}; use itertools::Itertools; use rand::prelude::IteratorRandom; use rand::Rng; -use restate_types::logs::metadata::NodeSetSelectionStrategy; use tracing::trace; +use restate_types::logs::metadata::NodeSetSelectionStrategy; use restate_types::nodes_config::NodesConfiguration; use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty}; diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index e5694601f..9c68aab81 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -26,7 +26,7 @@ use tracing::{debug, info}; use restate_metadata_store::ReadModifyWriteError; use restate_types::cluster_controller::SchedulingPlan; use restate_types::logs::metadata::{ - DefaultProvider, LogletParams, Logs, LogsConfiguration, ProviderKind, SegmentIndex, + LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex, }; use restate_types::metadata_store::keys::{ BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEDULING_PLAN_KEY, @@ -36,8 +36,8 @@ use restate_types::partition_table::{ }; use restate_types::replicated_loglet::ReplicatedLogletParams; -use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; -use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; +use restate_bifrost::{Bifrost, SealedSegment}; +use restate_core::metadata_store::retry_on_network_error; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ @@ -79,7 +79,6 @@ pub struct Service { cluster_state_refresher: ClusterStateRefresher, configuration: Live, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, processor_manager_client: PartitionProcessorManagerClient>, command_tx: mpsc::Sender, @@ -102,7 +101,6 @@ where router_builder: &mut MessageRouterBuilder, server_builder: &mut NetworkServerBuilder, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, ) -> Self { let (command_tx, command_rx) = mpsc::channel(2); @@ -122,7 +120,6 @@ where ClusterControllerHandle { tx: command_tx.clone(), }, - metadata_store_client.clone(), bifrost.clone(), metadata_writer.clone(), )) @@ -140,7 +137,6 @@ where bifrost, cluster_state_refresher, metadata_writer, - metadata_store_client, processor_manager_client, command_tx, command_rx, @@ -183,7 +179,7 @@ enum ClusterControllerCommand { UpdateClusterConfiguration { num_partitions: NonZeroU16, replication_strategy: ReplicationStrategy, - default_provider: DefaultProvider, + default_provider: ProviderConfiguration, response_tx: oneshot::Sender>, }, SealAndExtendChain { @@ -249,7 +245,7 @@ impl ClusterControllerHandle { &self, num_partitions: NonZeroU16, replication_strategy: ReplicationStrategy, - default_provider: DefaultProvider, + default_provider: ProviderConfiguration, ) -> Result, ShutdownError> { let (response_tx, response_rx) = oneshot::channel(); @@ -309,12 +305,6 @@ impl Service { let mut shutdown = std::pin::pin!(cancellation_watcher()); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let mut state: ClusterControllerState = ClusterControllerState::Follower; self.health_status.update(AdminStatus::Ready); @@ -333,7 +323,7 @@ impl Service { } Some(cmd) = self.command_rx.recv() => { // it is still safe to handle cluster commands as a follower - self.on_cluster_cmd(cmd, bifrost_admin).await; + self.on_cluster_cmd(cmd).await; } _ = config_watcher.changed() => { debug!("Updating the cluster controller settings."); @@ -360,8 +350,9 @@ impl Service { let partition_table = retry_on_network_error( configuration.common.network_error_retry_policy.clone(), || { - self.metadata_store_client - .get_or_insert(PARTITION_TABLE_KEY.clone(), || { + self.metadata_writer.metadata_store_client().get_or_insert( + PARTITION_TABLE_KEY.clone(), + || { let partition_table = PartitionTable::with_equally_sized_partitions( Version::MIN, configuration.common.bootstrap_num_partitions.get(), @@ -370,7 +361,8 @@ impl Service { debug!("Initializing the partition table with '{partition_table:?}'"); partition_table - }) + }, + ) }, ) .await?; @@ -439,10 +431,11 @@ impl Service { &self, num_partitions: u16, replication_strategy: ReplicationStrategy, - default_provider: DefaultProvider, + default_provider: ProviderConfiguration, ) -> anyhow::Result<()> { let logs = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write(BIFROST_CONFIG_KEY.clone(), |current: Option| { let logs = match current { Some(logs) => logs, @@ -457,8 +450,7 @@ impl Service { // we can only change the default provider if logs.version() != Version::INVALID - && logs.configuration().default_provider.as_provider_kind() - != default_provider.as_provider_kind() + && logs.configuration().default_provider.kind() != default_provider.kind() { { return Err( @@ -494,7 +486,8 @@ impl Service { }; let partition_table = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( PARTITION_TABLE_KEY.clone(), |current: Option| { @@ -558,7 +551,6 @@ impl Service { extension, min_version, bifrost: self.bifrost.clone(), - metadata_store_client: self.metadata_store_client.clone(), metadata_writer: self.metadata_writer.clone(), observed_cluster_state: self.observed_cluster_state.clone(), }; @@ -570,11 +562,7 @@ impl Service { }); } - async fn on_cluster_cmd( - &self, - command: ClusterControllerCommand, - bifrost_admin: BifrostAdmin<'_>, - ) { + async fn on_cluster_cmd(&self, command: ClusterControllerCommand) { match command { ClusterControllerCommand::GetClusterState(tx) => { let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); @@ -588,7 +576,7 @@ impl Service { ?log_id, trim_point_inclusive = ?trim_point, "Manual trim log command received"); - let result = bifrost_admin.trim(log_id, trim_point).await; + let result = self.bifrost.admin().trim(log_id, trim_point).await; let _ = response_tx.send(result.map_err(Into::into)); } ClusterControllerCommand::CreateSnapshot { @@ -716,7 +704,6 @@ struct SealAndExtendTask { extension: Option, bifrost: Bifrost, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, observed_cluster_state: ObservedClusterState, } @@ -727,18 +714,14 @@ impl SealAndExtendTask { .as_ref() .and_then(|ext| ext.segment_index_to_seal); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let (provider, params) = match self.extension.take() { Some(extension) => (extension.provider_kind, extension.params), None => self.next_segment().await?, }; - let sealed_segment = bifrost_admin + let sealed_segment = self + .bifrost + .admin() .seal_and_extend_chain( self.log_id, last_segment_index, @@ -786,18 +769,19 @@ impl SealAndExtendTask { let (provider, params) = match &logs.configuration().default_provider { #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => ( + ProviderConfiguration::InMemory => ( ProviderKind::InMemory, u64::from(loglet_id.next()).to_string().into(), ), - DefaultProvider::Local => ( + ProviderConfiguration::Local => ( ProviderKind::Local, u64::from(loglet_id.next()).to_string().into(), ), #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(config) => { + ProviderConfiguration::Replicated(config) => { let schedule_plan = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(SCHEDULING_PLAN_KEY.clone()) .await?; @@ -833,6 +817,7 @@ mod tests { use googletest::assert_that; use googletest::matchers::eq; + use restate_types::logs::metadata::ProviderKind; use test_log::test; use restate_bifrost::providers::memory_loglet; @@ -843,7 +828,7 @@ mod tests { use restate_core::test_env::NoOpMessageHandler; use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder}; use restate_types::cluster::cluster_state::PartitionProcessorStatus; - use restate_types::config::{AdminOptions, Configuration}; + use restate_types::config::{AdminOptions, BifrostOptions, Configuration}; use restate_types::health::HealthStatus; use restate_types::identifiers::PartitionId; use restate_types::live::Live; @@ -858,7 +843,8 @@ mod tests { async fn manual_log_trim() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let svc = Service::new( @@ -869,7 +855,6 @@ mod tests { &mut builder.router_builder, &mut NetworkServerBuilder::default(), builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let svc_handle = svc.handle(); @@ -1086,8 +1071,11 @@ mod tests { admin_options.log_trim_threshold = 0; let interval_duration = Duration::from_secs(10); admin_options.log_trim_interval = Some(interval_duration.into()); + let mut bifrost_options = BifrostOptions::default(); + bifrost_options.default_provider = ProviderKind::InMemory; let config = Configuration { admin: admin_options, + bifrost: bifrost_options, ..Default::default() }; @@ -1136,21 +1124,22 @@ mod tests { where F: FnMut(TestCoreEnvBuilder) -> TestCoreEnvBuilder, { + restate_types::config::set_current_config(config); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let mut server_builder = NetworkServerBuilder::default(); let svc = Service::new( - Live::from_value(config), + Configuration::updateable(), HealthStatus::default(), bifrost.clone(), builder.networking.clone(), &mut builder.router_builder, &mut server_builder, builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..fada9e129 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -17,10 +17,9 @@ use tokio::time; use tokio::time::{Interval, MissedTickBehavior}; use tracing::{debug, info, warn}; -use restate_bifrost::{Bifrost, BifrostAdmin}; -use restate_core::metadata_store::MetadataStoreClient; +use restate_bifrost::Bifrost; use restate_core::network::TransportConnect; -use restate_core::{my_node_id, Metadata, MetadataWriter}; +use restate_core::{my_node_id, Metadata}; use restate_types::cluster::cluster_state::{AliveNode, NodeState}; use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; @@ -135,8 +134,6 @@ pub enum LeaderEvent { pub struct Leader { bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, - metadata_writer: MetadataWriter, logs_watcher: watch::Receiver, partition_table_watcher: watch::Receiver, find_logs_tail_interval: Interval, @@ -156,18 +153,13 @@ where let scheduler = Scheduler::init( &configuration, - service.metadata_store_client.clone(), + service.metadata_writer.metadata_store_client().clone(), service.networking.clone(), ) .await?; - let logs_controller = LogsController::init( - &configuration, - service.bifrost.clone(), - service.metadata_store_client.clone(), - service.metadata_writer.clone(), - ) - .await?; + let logs_controller = + LogsController::init(service.bifrost.clone(), service.metadata_writer.clone()).await?; let (log_trim_interval, log_trim_threshold) = create_log_trim_interval(&configuration.admin); @@ -179,8 +171,6 @@ where let metadata = Metadata::current(); let mut leader = Self { bifrost: service.bifrost.clone(), - metadata_store_client: service.metadata_store_client.clone(), - metadata_writer: service.metadata_writer.clone(), logs_watcher: metadata.watch(MetadataKind::Logs), partition_table_watcher: metadata.watch(MetadataKind::PartitionTable), cluster_state_watcher: service.cluster_state_refresher.cluster_state_watcher(), @@ -296,12 +286,6 @@ where } async fn trim_logs_inner(&self) -> Result<(), restate_bifrost::Error> { - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let cluster_state = self.cluster_state_watcher.current(); let mut persisted_lsns_per_partition: BTreeMap< @@ -342,13 +326,13 @@ where if persisted_lsns.len() >= cluster_state.nodes.len() { let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID); // trim point is before the oldest record - let current_trim_point = bifrost_admin.get_trim_point(log_id).await?; + let current_trim_point = self.bifrost.get_trim_point(log_id).await?; if min_persisted_lsn >= current_trim_point + self.log_trim_threshold { debug!( "Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'" ); - bifrost_admin.trim(log_id, min_persisted_lsn).await? + self.bifrost.admin().trim(log_id, min_persisted_lsn).await? } } else { warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log."); diff --git a/crates/admin/src/schema_registry/mod.rs b/crates/admin/src/schema_registry/mod.rs index cd043be8d..39481e6d2 100644 --- a/crates/admin/src/schema_registry/mod.rs +++ b/crates/admin/src/schema_registry/mod.rs @@ -11,16 +11,15 @@ pub mod error; mod updater; -use http::Uri; - use std::borrow::Borrow; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; + +use http::Uri; use tracing::subscriber::NoSubscriber; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::{Metadata, MetadataWriter}; use restate_service_protocol::discovery::{DiscoverEndpoint, DiscoveredEndpoint, ServiceDiscovery}; use restate_types::identifiers::{DeploymentId, ServiceRevision, SubscriptionId}; @@ -77,7 +76,6 @@ pub enum ModifyServiceChange { /// new deployments. #[derive(Clone)] pub struct SchemaRegistry { - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -87,7 +85,6 @@ pub struct SchemaRegistry { impl SchemaRegistry { pub fn new( - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -95,7 +92,6 @@ impl SchemaRegistry { ) -> Self { Self { metadata_writer, - metadata_store_client, service_discovery, subscription_validator, experimental_feature_kafka_ingress_next, @@ -155,7 +151,8 @@ impl SchemaRegistry { } else { let mut new_deployment_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -195,7 +192,8 @@ impl SchemaRegistry { deployment_id: DeploymentId, ) -> Result<(), SchemaRegistryError> { let schema_registry = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_registry: Option| { @@ -229,7 +227,8 @@ impl SchemaRegistry { changes: Vec, ) -> Result { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -270,7 +269,8 @@ impl SchemaRegistry { subscription_id: SubscriptionId, ) -> Result<(), SchemaRegistryError> { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -370,7 +370,8 @@ where let mut subscription_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index b3c5ecef8..3f3020a62 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -17,7 +17,6 @@ use restate_types::config::AdminOptions; use restate_types::live::LiveLoad; use tower::ServiceBuilder; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::net_util; use restate_core::MetadataWriter; use restate_service_protocol::discovery::ServiceDiscovery; @@ -44,7 +43,6 @@ where { pub fn new( metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, subscription_validator: V, service_discovery: ServiceDiscovery, @@ -54,7 +52,6 @@ where Self { bifrost, schema_registry: SchemaRegistry::new( - metadata_store_client, metadata_writer, service_discovery, subscription_validator, diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 4b3df3785..e0b9d1348 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -31,6 +31,7 @@ derive_more = { workspace = true } enum-map = { workspace = true, features = ["serde"] } futures = { workspace = true } googletest = { workspace = true, features = ["anyhow"], optional = true } +itertools = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index 81d8a26ba..0d4ec6d36 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -116,7 +116,7 @@ impl Appender { info!( attempt = attempt, segment_index = %loglet.segment_index(), - "Append batch will be retried (loglet being sealed), waiting for tail to be determined" + "Append batch will be retried (loglet is being sealed), waiting for tail to be determined" ); let new_loglet = Self::wait_next_unsealed_loglet( self.log_id, @@ -131,7 +131,7 @@ impl Appender { Err(AppendError::Other(err)) if err.retryable() => { if let Some(retry_dur) = retry_iter.next() { info!( - ?err, + %err, attempt = attempt, segment_index = %loglet.segment_index(), "Failed to append this batch. Since underlying error is retryable, will retry in {:?}", @@ -140,7 +140,7 @@ impl Appender { tokio::time::sleep(retry_dur).await; } else { warn!( - ?err, + %err, attempt = attempt, segment_index = %loglet.segment_index(), "Failed to append this batch and exhausted all attempts to retry", diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 06b86ff76..b0da0b240 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -15,7 +15,7 @@ use std::sync::OnceLock; use enum_map::EnumMap; use tracing::instrument; -use restate_core::{Metadata, MetadataKind, TargetVersion}; +use restate_core::{Metadata, MetadataKind, MetadataWriter, TargetVersion}; use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment}; use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber, TailState}; use restate_types::storage::StorageEncode; @@ -25,7 +25,7 @@ use crate::background_appender::BackgroundAppender; use crate::loglet::LogletProvider; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; -use crate::{Error, InputRecord, LogReadStream, Result}; +use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result}; /// The strategy to use when bifrost fails to append or when it observes /// a sealed loglet while it's tailing a log. @@ -77,20 +77,20 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_in_memory() -> Self { + pub async fn init_in_memory(metadata_writer: MetadataWriter) -> Self { use crate::providers::memory_loglet; - Self::init_with_factory(memory_loglet::Factory::default()).await + Self::init_with_factory(metadata_writer, memory_loglet::Factory::default()).await } #[cfg(any(test, feature = "test-util"))] - pub async fn init_local() -> Self { + pub async fn init_local(metadata_writer: MetadataWriter) -> Self { use restate_types::config::Configuration; use crate::BifrostService; let config = Configuration::updateable(); - let bifrost_svc = BifrostService::new().enable_local_loglet(&config); + let bifrost_svc = BifrostService::new(metadata_writer).enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -102,10 +102,13 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_with_factory(factory: impl crate::loglet::LogletProviderFactory) -> Self { + pub async fn init_with_factory( + metadata_writer: MetadataWriter, + factory: impl crate::loglet::LogletProviderFactory, + ) -> Self { use crate::BifrostService; - let bifrost_svc = BifrostService::new().with_factory(factory); + let bifrost_svc = BifrostService::new(metadata_writer).with_factory(factory); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -116,6 +119,11 @@ impl Bifrost { bifrost } + /// Admin operations of bifrost + pub fn admin(&self) -> BifrostAdmin<'_> { + BifrostAdmin::new(self) + } + /// Appends a single record to a log. The log id must exist, otherwise the /// operation fails with [`Error::UnknownLogId`] /// @@ -302,15 +310,17 @@ static_assertions::assert_impl_all!(Bifrost: Send, Sync, Clone); pub struct BifrostInner { #[allow(unused)] watchdog: WatchdogSender, + pub(crate) metadata_writer: MetadataWriter, // Initialized after BifrostService::start completes. pub(crate) providers: OnceLock>>>, shutting_down: AtomicBool, } impl BifrostInner { - pub fn new(watchdog: WatchdogSender) -> Self { + pub fn new(watchdog: WatchdogSender, metadata_writer: MetadataWriter) -> Self { Self { watchdog, + metadata_writer, providers: Default::default(), shutting_down: AtomicBool::new(false), } @@ -558,13 +568,12 @@ mod tests { use restate_types::{Version, Versioned}; use crate::providers::memory_loglet::{self}; - use crate::BifrostAdmin; #[restate_core::test] #[traced_test] async fn test_append_smoke() -> googletest::Result<()> { let num_partitions = 5; - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, num_partitions, @@ -572,7 +581,7 @@ mod tests { .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let clean_bifrost_clone = bifrost.clone(); @@ -637,12 +646,12 @@ mod tests { #[restate_core::test(start_paused = true)] async fn test_lazy_initialization() -> googletest::Result<()> { - let _ = TestCoreEnv::create_with_single_node(1, 1).await; + let env = TestCoreEnv::create_with_single_node(1, 1).await; let delay = Duration::from_secs(5); // This memory provider adds a delay to its loglet initialization, we want // to ensure that appends do not fail while waiting for the loglet; let factory = memory_loglet::Factory::with_init_delay(delay); - let bifrost = Bifrost::init_with_factory(factory).await; + let bifrost = Bifrost::init_with_factory(env.metadata_writer, factory).await; let start = tokio::time::Instant::now(); let lsn = bifrost @@ -664,12 +673,7 @@ mod tests { .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset()); @@ -681,7 +685,7 @@ mod tests { appender.append("").await?; } - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; let tail = bifrost.find_tail(LOG_ID).await?; assert_eq!(tail.offset(), Lsn::from(11)); @@ -703,7 +707,7 @@ mod tests { } // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::MAX).await?; + bifrost.admin().trim(LOG_ID, Lsn::MAX).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); let new_trim_point = bifrost.get_trim_point(LOG_ID).await?; @@ -737,12 +741,7 @@ mod tests { )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer).await; let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // Lsns [1..5] @@ -765,7 +764,8 @@ mod tests { .unwrap(); // seal the segment - bifrost_admin + bifrost + .admin() .seal(LOG_ID, segment_1.segment_index()) .await?; @@ -925,12 +925,7 @@ mod tests { .build() .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; // create an appender let stop_signal = Arc::new(AtomicBool::default()); @@ -976,7 +971,7 @@ mod tests { } // seal and don't extend the chain. - let _ = bifrost_admin.seal(LOG_ID, SegmentIndex::from(0)).await?; + let _ = bifrost.admin().seal(LOG_ID, SegmentIndex::from(0)).await?; // appends should stall! tokio::time::sleep(Duration::from_millis(100)).await; @@ -998,7 +993,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::Local); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index bb3a6f50f..94d91503a 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -8,16 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::ops::Deref; use std::sync::Arc; use tracing::{debug, info, instrument}; -use restate_core::{Metadata, MetadataKind, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; +use restate_core::metadata_store::retry_on_network_error; +use restate_core::{Metadata, MetadataKind}; use restate_types::config::Configuration; -use restate_types::logs::builder::BuilderError; -use restate_types::logs::metadata::{LogletParams, Logs, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{Chain, LogletParams, Logs, ProviderKind, SegmentIndex}; use restate_types::logs::{LogId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Version; @@ -30,21 +28,6 @@ use crate::{Bifrost, Error, Result}; #[derive(Clone, Copy)] pub struct BifrostAdmin<'a> { bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, -} - -impl<'a> AsRef for BifrostAdmin<'a> { - fn as_ref(&self) -> &Bifrost { - self.bifrost - } -} - -impl<'a> Deref for BifrostAdmin<'a> { - type Target = Bifrost; - fn deref(&self) -> &Self::Target { - self.bifrost - } } #[derive(Debug)] @@ -56,16 +39,8 @@ pub struct SealedSegment { } impl<'a> BifrostAdmin<'a> { - pub fn new( - bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, - ) -> Self { - Self { - bifrost, - metadata_writer, - metadata_store_client, - } + pub fn new(bifrost: &'a Bifrost) -> Self { + Self { bifrost } } /// Trim the log prefix up to and including the `trim_point`. /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to @@ -76,6 +51,61 @@ impl<'a> BifrostAdmin<'a> { self.bifrost.inner.trim(log_id, trim_point).await } + /// Seals a loglet under a set of conditions. + /// + /// The loglet will be sealed if and only if the following is true: + /// - if segment_index is set, the tail loglet must match segment_index. + /// If the intention is to create the log, then `segment_index` must be set to `None`. + /// + /// This will continue to retry sealing for seal retryable errors automatically. + #[instrument(level = "debug", skip(self), err)] + pub async fn seal_and_auto_extend_chain( + &self, + log_id: LogId, + segment_index: Option, + ) -> Result<()> { + self.bifrost.inner.fail_if_shutting_down()?; + let logs = Metadata::with_current(|m| m.logs_snapshot()); + let provider_config = &logs.configuration().default_provider; + let provider = self.bifrost.inner.provider_for(provider_config.kind())?; + // if this is a new log, we don't need to seal and we can immediately write to metadata + // store, otherwise, we need to seal first. + if logs.chain(&log_id).is_none() && segment_index.is_none() { + let proposed_params = + provider.propose_new_loglet_params(log_id, None, provider_config)?; + self.add_log(log_id, provider_config.kind(), proposed_params) + .await?; + return Ok(()); + } + + let segment_index = segment_index + .or_else(|| logs.chain(&log_id).map(|c| c.tail_index())) + .ok_or(Error::UnknownLogId(log_id))?; + + let sealed_segment = loop { + let sealed_segment = self.seal(log_id, segment_index).await?; + if sealed_segment.tail.is_sealed() { + break sealed_segment; + } + debug!(%log_id, %segment_index, "Segment is not sealed yet"); + tokio::time::sleep(Configuration::pinned().bifrost.seal_retry_interval.into()).await; + }; + + let proposed_params = + provider.propose_new_loglet_params(log_id, logs.chain(&log_id), provider_config)?; + + self.add_segment_with_params( + log_id, + segment_index, + sealed_segment.tail.offset(), + provider_config.kind(), + proposed_params, + ) + .await?; + + Ok(()) + } + /// Seals a loglet under a set of conditions. /// /// The loglet will be sealed if and only if the following is true: @@ -124,7 +154,7 @@ impl<'a> BifrostAdmin<'a> { } pub async fn writeable_loglet(&self, log_id: LogId) -> Result { - self.inner.writeable_loglet(log_id).await + self.bifrost.inner.writeable_loglet(log_id).await } #[instrument(level = "debug", skip(self), err)] @@ -187,36 +217,116 @@ impl<'a> BifrostAdmin<'a> { params: LogletParams, ) -> Result<()> { self.bifrost.inner.fail_if_shutting_down()?; - let logs = self - .metadata_store_client - .read_modify_write(BIFROST_CONFIG_KEY.clone(), move |logs: Option| { - let logs = logs.ok_or(Error::UnknownLogId(log_id))?; - - let mut builder = logs.into_builder(); - let mut chain_builder = builder.chain(log_id).ok_or(Error::UnknownLogId(log_id))?; - - if chain_builder.tail().index() != last_segment_index { - // tail is not what we expected. - return Err(Error::from(AdminError::SegmentMismatch { - expected: last_segment_index, - found: chain_builder.tail().index(), - })); - } + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + let logs = retry_on_network_error(retry_policy, || { + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write(BIFROST_CONFIG_KEY.clone(), |logs: Option| { + let logs = logs.ok_or(Error::UnknownLogId(log_id))?; + + let mut builder = logs.into_builder(); + let mut chain_builder = + builder.chain(log_id).ok_or(Error::UnknownLogId(log_id))?; + + if chain_builder.tail().index() != last_segment_index { + // tail is not what we expected. + return Err(Error::from(AdminError::SegmentMismatch { + expected: last_segment_index, + found: chain_builder.tail().index(), + })); + } + + let _ = chain_builder + .append_segment(base_lsn, provider, params.clone()) + .map_err(AdminError::from)?; + Ok(builder.build()) + }) + }) + .await + .map_err(|e| e.transpose())?; + + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; + Ok(()) + } + + /// Adds a new log if it doesn't exist. + #[instrument(level = "debug", skip(self), err)] + async fn add_log( + &self, + log_id: LogId, + provider: ProviderKind, + params: LogletParams, + ) -> Result<()> { + self.bifrost.inner.fail_if_shutting_down()?; + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + let logs = retry_on_network_error(retry_policy, || { + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write::<_, _, Error>( + BIFROST_CONFIG_KEY.clone(), + |logs: Option| { + // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, + // provisioning the empty logs metadata is not our responsibility. + let logs = logs.ok_or(Error::UnknownLogId(log_id))?; - match chain_builder.append_segment(base_lsn, provider, params.clone()) { - Err(e) => match e { - BuilderError::SegmentConflict(lsn) => { - Err(Error::from(AdminError::SegmentConflict(lsn))) - } - _ => unreachable!("the log must exist at this point"), + let mut builder = logs.into_builder(); + builder + .add_log(log_id, Chain::new(provider, params.clone())) + .map_err(AdminError::from)?; + Ok(builder.build()) }, - Ok(_) => Ok(builder.build()), - } - }) - .await - .map_err(|e| e.transpose())?; + ) + }) + .await + .map_err(|e| e.transpose())?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; + Ok(()) + } + + /// Creates empty metadata if none exists for bifrost and publishes it to metadata + /// manager. + pub async fn init_metadata(&self) -> Result<(), Error> { + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + + let logs = retry_on_network_error(retry_policy, || { + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { + debug!("Attempting to initialize logs metadata in metadata store"); + Logs::from_configuration(&Configuration::pinned()) + }) + }) + .await?; + + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } } diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 7ee8b6961..f61fb1e05 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use restate_core::{ShutdownError, SyncError}; use restate_types::errors::MaybeRetryableError; +use restate_types::logs::builder::BuilderError; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{LogId, Lsn}; @@ -53,6 +54,8 @@ pub enum EnqueueError { #[derive(Debug, thiserror::Error)] pub enum AdminError { + #[error("log {0} already exists")] + LogAlreadyExists(LogId), #[error("segment conflicts with existing segment with base_lsn={0}")] SegmentConflict(Lsn), #[error("segment index found in metadata does not match expected {expected}!={found}")] @@ -60,6 +63,8 @@ pub enum AdminError { expected: SegmentIndex, found: SegmentIndex, }, + #[error("loglet params could not be deserialized: {0}")] + ParamsSerde(#[from] serde_json::Error), } impl From for Error { @@ -70,3 +75,13 @@ impl From for Error { } } } + +impl From for AdminError { + fn from(value: BuilderError) -> Self { + match value { + BuilderError::LogAlreadyExists(log_id) => AdminError::LogAlreadyExists(log_id), + BuilderError::ParamsSerde(error) => AdminError::ParamsSerde(error), + BuilderError::SegmentConflict(lsn) => AdminError::SegmentConflict(lsn), + } + } +} diff --git a/crates/bifrost/src/loglet/provider.rs b/crates/bifrost/src/loglet/provider.rs index e0b397f27..00d9fc359 100644 --- a/crates/bifrost/src/loglet/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -12,7 +12,9 @@ use std::sync::Arc; use async_trait::async_trait; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; use restate_types::logs::LogId; use super::{Loglet, OperationError}; @@ -37,6 +39,17 @@ pub trait LogletProvider: Send + Sync { params: &LogletParams, ) -> Result>; + /// Create a loglet client for a given segment and configuration. + /// + /// if `chain` is None, the provider should assume that no chain exists already + /// for this log. + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + defaults: &ProviderConfiguration, + ) -> Result; + /// A hook that's called after provider is started. async fn post_start(&self) {} diff --git a/crates/bifrost/src/providers/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs index 2d19cd4f0..685a9858a 100644 --- a/crates/bifrost/src/providers/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -17,8 +17,10 @@ use tracing::debug; use restate_types::config::{LocalLogletOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::LogId; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; +use restate_types::logs::{LogId, LogletId}; use super::log_store::RocksDbLogStore; use super::log_store_writer::RocksDbLogWriterHandle; @@ -105,6 +107,20 @@ impl LogletProvider for LocalLogletProvider { Ok(loglet as Arc) } + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + _defaults: &ProviderConfiguration, + ) -> Result { + let new_segment_index = chain + .map(|c| c.tail_index()) + .unwrap_or(SegmentIndex::OLDEST); + Ok(LogletParams::from( + LogletId::new(log_id, new_segment_index).to_string(), + )) + } + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index a1bb9ae0f..d90adb583 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -22,9 +22,11 @@ use tokio::sync::Mutex as AsyncMutex; use tracing::{debug, info}; use restate_core::ShutdownError; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; use restate_types::logs::{ - KeyFilter, LogId, LogletOffset, MatchKeyQuery, Record, SequenceNumber, TailState, + KeyFilter, LogId, LogletId, LogletOffset, MatchKeyQuery, Record, SequenceNumber, TailState, }; use crate::loglet::util::TailOffsetWatch; @@ -100,6 +102,20 @@ impl LogletProvider for MemoryLogletProvider { Ok(loglet as Arc) } + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + _defaults: &ProviderConfiguration, + ) -> Result { + let new_segment_index = chain + .map(|c| c.tail_index()) + .unwrap_or(SegmentIndex::OLDEST); + Ok(LogletParams::from( + LogletId::new(log_id, new_segment_index).to_string(), + )) + } + async fn shutdown(&self) -> Result<(), OperationError> { info!("Shutting down in-memory loglet provider"); Ok(()) diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index dc7b92f0b..4d3f89bc0 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -13,6 +13,7 @@ mod log_server_manager; mod loglet; pub(crate) mod metric_definitions; mod network; +mod nodeset_selector; mod provider; mod read_path; mod remote_sequencer; diff --git a/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs b/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs new file mode 100644 index 000000000..4b48b541e --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs @@ -0,0 +1,515 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::cmp::{max, Ordering}; + +use itertools::Itertools; +use rand::prelude::IteratorRandom; +use rand::Rng; +use tracing::trace; + +use restate_types::logs::metadata::NodeSetSelectionStrategy; +use restate_types::nodes_config::NodesConfiguration; +use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty}; +use restate_types::NodeId; + +/// TEMPORARY UNTIL NODE-LEVEL CLUSTER STATE IS ACTUALLY IMPLEMENTED +#[derive(Clone, Debug)] +pub struct ObservedClusterState; + +impl ObservedClusterState { + pub fn is_node_alive(&self, _node_id: impl Into) -> bool { + // assume all nodes are alive + true + } +} + +/// Nodeset selector for picking a set of storage nodes for a replicated loglet out of a broader +/// pool of available nodes. +/// +/// This selector can be reused once constructed to make multiple decisions in a single scheduling +/// iteration, if the node configuration and the replication settings are not changing. +#[derive(Clone)] +pub struct NodeSetSelector<'a> { + nodes_config: &'a NodesConfiguration, + cluster_state: &'a ObservedClusterState, +} + +impl<'a> NodeSetSelector<'a> { + pub fn new( + nodes_config: &'a NodesConfiguration, + cluster_state: &'a ObservedClusterState, + ) -> NodeSetSelector<'a> { + Self { + nodes_config, + cluster_state, + } + } + + /// Determines if a nodeset can be improved by adding or replacing members. Does NOT consider + /// sealability of the current configuration when making decisions! + #[allow(unused)] + pub fn can_improve( + &self, + nodeset: &NodeSet, + strategy: NodeSetSelectionStrategy, + replication_property: &ReplicationProperty, + ) -> bool { + let writable_nodeset = WritableNodeSet::from(self.nodes_config); + let alive_nodeset = writable_nodeset.alive(self.cluster_state); + let current_alive = alive_nodeset.intersect(nodeset); + + let nodeset_size = + nodeset_size_range(&strategy, replication_property, writable_nodeset.len()); + + if current_alive.len() == nodeset_size.target_size { + return false; + } + + // todo: we should check the current segment for sealability, otherwise we might propose + // reconfiguration when we are virtually certain to get stuck! + alive_nodeset.len() >= nodeset_size.minimum_size + && alive_nodeset.len() > current_alive.len() + } + + /// Picks a set of storage nodes for a replicated loglet out of the available pool. Only alive, + /// writable storage nodes will be used. Returns a proposed new nodeset that meets the + /// requirements of the supplied selection strategy and replication, or an explicit error. + pub fn select( + &self, + strategy: NodeSetSelectionStrategy, + replication_property: &ReplicationProperty, + rng: &mut R, + preferred_nodes: &NodeSet, + ) -> Result { + if replication_property.at_greatest_scope().0 != &LocationScope::Node { + // todo: add support for other location scopes + unimplemented!("only node-scoped replication is currently supported"); + } + + let writable_nodeset = WritableNodeSet::from(self.nodes_config); + // Only consider alive, writable storage nodes. + let alive_nodeset = writable_nodeset.alive(self.cluster_state); + + let nodeset_size = + nodeset_size_range(&strategy, replication_property, writable_nodeset.len()); + + if writable_nodeset.len() < nodeset_size.fault_tolerant_size { + trace!( + nodes_count = %writable_nodeset.len(), + ?nodeset_size.minimum_size, + ?nodeset_size.fault_tolerant_size, + cluster_state = ?self.cluster_state, + nodes_config = ?self.nodes_config, + "Not enough nodes to meet the fault tolerant replication requirements" + ); + return Err(NodeSelectionError::InsufficientWriteableNodes); + } + + let nodeset = match strategy { + NodeSetSelectionStrategy::StrictFaultTolerantGreedy => { + let mut nodes = preferred_nodes + .iter() + .copied() + .filter(|node_id| alive_nodeset.contains(node_id)) + .choose_multiple(rng, nodeset_size.target_size); + + if nodes.len() < nodeset_size.target_size { + let remaining = nodeset_size.target_size - nodes.len(); + nodes.extend( + alive_nodeset + .iter() + .filter(|node_id| !preferred_nodes.contains(node_id)) + .choose_multiple(rng, remaining), + ); + } + + if nodes.len() < nodeset_size.minimum_size { + trace!( + "Failed to place replicated loglet: insufficient alive nodes to meet minimum size requirement {} < {}", + nodes.len(), + nodeset_size.minimum_size, + ); + + return Err(NodeSelectionError::InsufficientWriteableNodes); + } + + // last possibility is if the selected nodeset is still + // smaller than fault tolerant size we try to extend from the full nodeset + // which includes possibly dead nodes + if nodes.len() < nodeset_size.fault_tolerant_size { + // greedy approach: Every other node that is not + // already in the set. + let remaining = nodeset_size.fault_tolerant_size - nodes.len(); + + let extension = writable_nodeset + .iter() + .filter(|node_id| !alive_nodeset.contains(node_id)) + .cloned() + .sorted_by(|l, r| { + // sorting nodes by "preferred" nodes. Preferred nodes comes first. + match (preferred_nodes.contains(l), preferred_nodes.contains(r)) { + (true, true) | (false, false) => Ordering::Equal, + (true, false) => Ordering::Less, + (false, true) => Ordering::Greater, + } + }) + .take(remaining); + + nodes.extend(extension); + } + + let nodes_len = nodes.len(); + let nodeset = NodeSet::from_iter(nodes); + assert_eq!( + nodeset.len(), + nodes_len, + "We have accidentally chosen duplicate candidates during nodeset selection" + ); + nodeset + } + }; + + // even with all possible dead node we still can't reach the fault tolerant + // nodeset size. This means there are not enough nodes in the cluster + // so we still return an error. + + // todo: implement location scope-aware selection + if nodeset.len() < nodeset_size.fault_tolerant_size { + trace!( + "Failed to place replicated loglet: insufficient writeable nodes to meet fault tolerant size requirement {} < {}", + nodeset.len(), + nodeset_size.fault_tolerant_size, + ); + return Err(NodeSelectionError::InsufficientWriteableNodes); + } + + Ok(nodeset) + } +} + +#[derive(Debug)] +struct NodeSetSizeRange { + /// Minimum number of nodes required to maintain write availability; + /// dropping below this threshold will result in loss of write availability. + minimum_size: usize, + /// The minimum number of nodes to satisfy replication + /// property with fault tolerance + /// + /// calculated as (minimum_size - 1) * 2 + 1 + fault_tolerant_size: usize, + /// The proposed number of nodes to use if possible + target_size: usize, +} + +fn nodeset_size_range( + strategy: &NodeSetSelectionStrategy, + replication_property: &ReplicationProperty, + writable_nodes_size: usize, +) -> NodeSetSizeRange { + let min_copies = replication_property.num_copies(); + + // ReplicationFactor(f+1) implies a minimum of 2f+1 nodes. At this point we are only + // calculating the nodeset floor size, the actual size will be determined by the specific + // strategy in use. + assert!( + min_copies < u8::MAX >> 1, + "The replication factor implies a cluster size that exceeds the maximum supported size" + ); + + let fault_tolerant_size = (usize::from(min_copies) - 1) * 2 + 1; + assert!( + fault_tolerant_size >= usize::from(min_copies), + "The calculated minimum nodeset size can not be less than the replication factor" + ); + + let (fault_tolerant_size, nodeset_target_size) = match strategy { + // writable_nodes_size includes any writable node (log-server) dead or alive. + // in the greedy strategy we take the max(fault_tolerant, writable_nodes_size) as + // our target size + NodeSetSelectionStrategy::StrictFaultTolerantGreedy => ( + fault_tolerant_size, + max(fault_tolerant_size, writable_nodes_size), + ), + }; + + NodeSetSizeRange { + minimum_size: min_copies.into(), + fault_tolerant_size, + target_size: nodeset_target_size, + } +} + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum NodeSelectionError { + #[error("Insufficient writeable nodes in the nodeset")] + InsufficientWriteableNodes, +} + +/// Set of all log-server nodeset, regardless of the state +#[derive(Debug, Clone, Eq, PartialEq, derive_more::Into, derive_more::Deref)] +struct WritableNodeSet(NodeSet); + +impl WritableNodeSet { + fn alive(&self, state: &ObservedClusterState) -> AliveNodeSet { + self.iter() + .cloned() + .filter(|id| state.is_node_alive(*id)) + .collect::() + .into() + } +} + +impl From<&NodesConfiguration> for WritableNodeSet { + fn from(value: &NodesConfiguration) -> Self { + Self( + value + .iter() + .filter_map(|(node_id, config)| { + if config.log_server_config.storage_state.can_write_to() { + Some(node_id) + } else { + None + } + }) + .collect(), + ) + } +} + +/// A subset of WritableNodeset that is known to be alive at the time of creation. +#[derive(Debug, Clone, Eq, PartialEq, derive_more::Into, derive_more::Deref, derive_more::From)] +struct AliveNodeSet(NodeSet); + +#[cfg(test)] +pub mod tests { + // ** NOTE ** + // THESE TESTS ARE TEMPORARY DISABLED AND WILL ENABLED AFTER CLUSTER STATE IS IMPLEMENTED + // THIS IS A TRANSITIONAL STATE + + // use std::collections::HashSet; + // + // use enumset::enum_set; + // use rand::thread_rng; + // + // use restate_types::nodes_config::{NodesConfiguration, Role, StorageState}; + // use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty}; + // use restate_types::PlainNodeId; + // + // use super::*; + // use crate::cluster_controller::logs_controller::tests::{node, MockNodes}; + // use crate::cluster_controller::observed_cluster_state::ObservedClusterState; + // + // #[test] + // #[should_panic( + // expected = "not implemented: only node-scoped replication is currently supported" + // )] + // fn test_select_log_servers_rejects_unsupported_replication_scope() { + // let replication = + // ReplicationProperty::with_scope(LocationScope::Zone, 1.try_into().unwrap()); + // + // let nodes_config = NodesConfiguration::default(); + // let observed_state = ObservedClusterState::default(); + // + // let preferred_nodes = NodeSet::empty(); + // NodeSetSelector::new(&nodes_config, &observed_state) + // .select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ) + // .unwrap(); // panics + // } + // + // #[test] + // fn test_select_log_servers_insufficient_capacity() { + // let nodes: Vec = vec![1.into(), 2.into(), 3.into()]; + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // let mut nodes_config = NodesConfiguration::default(); + // nodes_config.upsert_node(node(0, enum_set!(Role::Admin), StorageState::Disabled)); + // nodes_config.upsert_node(node( + // 1, + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::Provisioning, + // )); + // nodes_config.upsert_node(node( + // 2, + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::ReadWrite, + // )); + // nodes_config.upsert_node(node( + // 3, + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::ReadOnly, + // )); + // nodes_config.upsert_node(node(4, enum_set!(Role::Worker), StorageState::Disabled)); + // + // let observed_state = ObservedClusterState { + // alive_nodes: nodes + // .iter() + // .copied() + // .map(|id| (id, id.with_generation(1))) + // .collect(), + // dead_nodes: HashSet::default(), + // ..Default::default() + // }; + // + // let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy; + // let preferred_nodes = NodeSet::empty(); + // let selection = NodeSetSelector::new(&nodes_config, &observed_state).select( + // strategy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ); + // + // assert_eq!( + // selection, + // Err(NodeSelectionError::InsufficientWriteableNodes) + // ); + // } + // + // /// Replicated loglets should work just fine in single-node clusters, with the FT strategy inferring that f=0, + // /// as long as the replication factor is set to 1. + // #[test] + // fn test_select_log_servers_single_node_cluster() { + // let nodes = MockNodes::builder().with_mixed_server_nodes([1]).build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 1.try_into().unwrap()); + // + // let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy; + // let preferred_nodes = NodeSet::empty(); + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // strategy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ); + // + // assert_eq!( + // selection.unwrap(), + // NodeSet::from([1]), + // "A single-node cluster is possible with replication factor of 0" + // ); + // } + // + // /// In this test we have a cluster with 3 nodes and replication factor is 2. The strict FT + // /// strategy can bootstrap a loglet using all 3 nodes but won't choose a new nodeset when only 2 + // /// are alive, as that puts the loglet at risk. The assumption is that the previous nodeset will + // /// carry on in its original configuration - it is the data plane's problem to work around + // /// partial node availability. When an additional log server becomes available, the selector can + // /// reconfigure the loglet to use it. + // #[test] + // fn test_select_log_servers_respects_replication_factor() { + // let mut nodes = MockNodes::builder() + // .with_mixed_server_nodes([1, 2, 3]) + // .build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // // initial selection - no prior preferences + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &NodeSet::empty(), + // ); + // assert!(selection.is_ok()); + // let initial_nodeset = selection.unwrap(); + // assert_eq!(initial_nodeset, NodeSet::from([1, 2, 3])); + // + // nodes.kill_node(1); + // + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &initial_nodeset, // preferred nodes + // ); + // + // // while one node is dead, the selector can still satisfy a write quorum + // // based on supplied replication property. The dead node will be included + // // in the nodeset. + // assert!(selection.is_ok()); + // let initial_nodeset = selection.unwrap(); + // assert_eq!(initial_nodeset, NodeSet::from([1, 2, 3])); + // + // nodes.add_dedicated_log_server_node(4); + // + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &initial_nodeset, // preferred nodes + // ); + // assert_eq!(selection.unwrap(), NodeSet::from([2, 3, 4])); + // } + // + // #[test] + // fn test_select_log_servers_respects_replication_factor_not_enough_nodes() { + // let nodes = MockNodes::builder().with_mixed_server_nodes([1, 2]).build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // // initial selection - no prior preferences + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &NodeSet::empty(), + // ); + // + // // in this case, the entire cluster does not have enough nodes for an optimal + // // nodeset. + // assert_eq!( + // selection, + // Err(NodeSelectionError::InsufficientWriteableNodes), + // "The strict FT strategy does not compromise on the minimum 2f+1 nodeset size" + // ); + // } + // + // #[test] + // fn test_select_log_servers_insufficient_fault_tolerant_capacity() { + // // while we only have 2 alive node, the algorithm will still + // // prefer to use a dead node instead of failing as long as + // // we have write availability + // + // let nodes = MockNodes::builder() + // .with_nodes( + // [1, 2, 3], + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::ReadWrite, + // ) + // .dead_nodes([3]) + // .build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy; + // let preferred_nodes = NodeSet::empty(); + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // strategy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ); + // + // assert!(selection.is_ok()); + // let selection = selection.unwrap(); + // assert!(selection.contains(&PlainNodeId::from(3))); + // } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index d164ccbfa..ab4fa6988 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -16,16 +16,19 @@ use dashmap::DashMap; use tracing::debug; use restate_core::network::{MessageRouterBuilder, Networking, TransportConnect}; -use restate_core::{TaskCenter, TaskKind}; +use restate_core::{my_node_id, Metadata, TaskCenter, TaskKind}; use restate_metadata_store::MetadataStoreClient; use restate_types::config::Configuration; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::{LogId, RecordCache}; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; +use restate_types::logs::{LogId, LogletId, RecordCache}; use restate_types::replicated_loglet::ReplicatedLogletParams; use super::loglet::ReplicatedLoglet; use super::metric_definitions; use super::network::RequestPump; +use super::nodeset_selector::{NodeSelectionError, NodeSetSelector, ObservedClusterState}; use super::rpc_routers::{LogServersRpc, SequencersRpc}; use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError}; use crate::providers::replicated_loglet::error::ReplicatedLogletError; @@ -201,6 +204,76 @@ impl LogletProvider for ReplicatedLogletProvider { Ok(loglet as Arc) } + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + defaults: &ProviderConfiguration, + ) -> Result { + let ProviderConfiguration::Replicated(defaults) = defaults else { + panic!("ProviderConfiguration::Replicated is expected"); + }; + + let new_segment_index = chain + .map(|c| c.tail_index()) + .unwrap_or(SegmentIndex::OLDEST); + + let loglet_id = LogletId::new(log_id, new_segment_index); + + let mut rng = rand::thread_rng(); + + let replication = defaults.replication_property.clone(); + let strategy = defaults.nodeset_selection_strategy; + + // if the last loglet in the chain is of the same provider kind, we can use this to + // influence the nodeset selector. + let previous_params = chain.and_then(|chain| { + let tail_config = chain.tail().config; + match tail_config.kind { + ProviderKind::Replicated => Some( + ReplicatedLogletParams::deserialize_from(tail_config.params.as_bytes()) + .expect("params serde must be infallible"), + ), + // Another kind, we don't care about its config + _ => None, + } + }); + + let preferred_nodes = previous_params + .map(|p| p.nodeset.clone()) + .unwrap_or_default(); + let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + + let selection = NodeSetSelector::new(&nodes_config, &ObservedClusterState).select( + strategy, + &replication, + &mut rng, + &preferred_nodes, + ); + + match selection { + Ok(nodeset) => Ok(LogletParams::from( + ReplicatedLogletParams { + loglet_id, + // We choose ourselves to be the sequencer for this loglet + sequencer: my_node_id(), + replication, + nodeset, + } + .serialize() + .expect("params serde must be infallible"), + )), + Err(e @ NodeSelectionError::InsufficientWriteableNodes) => { + debug!( + ?loglet_id, + "Insufficient writeable nodes to select new nodeset for replicated loglet" + ); + + Err(OperationError::terminal(e)) + } + } + } + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs index 068ee3b6f..49398117c 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs @@ -94,6 +94,7 @@ impl CheckSealTask { loglet_id = %my_params.loglet_id, status = %nodeset_checker, effective_nodeset = %effective_nodeset, + replication = %my_params.replication, "Insufficient nodes responded to GetLogletInfo requests, we cannot determine seal status, we'll assume it's unsealed for now", ); return Ok(CheckSealOutcome::ProbablyOpen); diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 8ac81c3ee..11c32f212 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -454,14 +454,14 @@ mod tests { use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Versioned; - use crate::{BifrostAdmin, BifrostService, ErrorRecoveryStrategy}; + use crate::{BifrostService, ErrorRecoveryStrategy}; #[restate_core::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] async fn test_readstream_one_loglet() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_provider_kind(ProviderKind::Local) .build() .await; @@ -471,7 +471,7 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = BifrostService::new(env.metadata_writer).enable_local_loglet(&config); let bifrost = svc.handle(); svc.start().await.expect("loglet must start"); @@ -548,14 +548,10 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = + BifrostService::new(node_env.metadata_writer.clone()).enable_local_loglet(&config); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -569,7 +565,7 @@ mod tests { } // [1..5] trimmed. trim_point = 5 - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?); @@ -590,7 +586,7 @@ mod tests { let tail = bifrost.find_tail(LOG_ID).await?.offset(); // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::from(u64::MAX)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(u64::MAX)).await?; let trim_point = bifrost.get_trim_point(LOG_ID).await?; assert_eq!(Lsn::from(10), bifrost.get_trim_point(LOG_ID).await?); // trim point becomes the point before the next slot available for writes (aka. the @@ -643,7 +639,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer.clone()) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); @@ -799,15 +795,10 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -825,7 +816,8 @@ mod tests { // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, @@ -917,7 +909,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index 3222cc5c2..03059c774 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -11,11 +11,12 @@ use std::collections::HashMap; use std::sync::Arc; -use anyhow::Context; use enum_map::EnumMap; use tracing::{debug, error, trace}; -use restate_core::{cancellation_watcher, TaskCenter, TaskCenterFutureExt, TaskKind}; +use restate_core::{ + cancellation_watcher, MetadataWriter, TaskCenter, TaskCenterFutureExt, TaskKind, +}; use restate_types::config::Configuration; use restate_types::live::Live; use restate_types::logs::metadata::ProviderKind; @@ -34,16 +35,10 @@ pub struct BifrostService { factories: HashMap>, } -impl Default for BifrostService { - fn default() -> Self { - Self::new() - } -} - impl BifrostService { - pub fn new() -> Self { + pub fn new(metadata_writer: MetadataWriter) -> Self { let (watchdog_sender, watchdog_receiver) = tokio::sync::mpsc::unbounded_channel(); - let inner = Arc::new(BifrostInner::new(watchdog_sender.clone())); + let inner = Arc::new(BifrostInner::new(watchdog_sender.clone(), metadata_writer)); let bifrost = Bifrost::new(inner.clone()); let watchdog = Watchdog::new(inner.clone(), watchdog_sender, watchdog_receiver); Self { @@ -85,11 +80,10 @@ impl BifrostService { /// /// This requires to run within a task_center context. pub async fn start(self) -> anyhow::Result<()> { - // Perform an initial metadata sync. - self.inner - .sync_metadata() - .await - .context("Initial bifrost metadata sync has failed!")?; + // Make sure we have v1 metadata written to metadata store with the default + // configuration. If metadata is already initialized, this will make sure we have the + // latest version set in metadata manager. + self.bifrost.admin().init_metadata().await?; // initialize all enabled providers. if self.factories.is_empty() { diff --git a/crates/core/src/metadata.rs b/crates/core/src/metadata.rs index 2244c0ef3..08fde62e1 100644 --- a/crates/core/src/metadata.rs +++ b/crates/core/src/metadata.rs @@ -29,7 +29,7 @@ use restate_types::schema::Schema; use restate_types::{GenerationalNodeId, Version, Versioned}; use crate::metadata::manager::Command; -use crate::metadata_store::ReadError; +use crate::metadata_store::{MetadataStoreClient, ReadError}; use crate::network::WeakConnection; use crate::{ShutdownError, TaskCenter, TaskId, TaskKind}; @@ -335,6 +335,7 @@ struct MetadataInner { /// so it's safe to call update_* without checking the current version. #[derive(Clone)] pub struct MetadataWriter { + metadata_store_client: MetadataStoreClient, sender: manager::CommandSender, /// strictly used to set my node id. Do not use this to update metadata /// directly to avoid race conditions. @@ -342,8 +343,20 @@ pub struct MetadataWriter { } impl MetadataWriter { - fn new(sender: manager::CommandSender, inner: Arc) -> Self { - Self { sender, inner } + fn new( + sender: manager::CommandSender, + metadata_store_client: MetadataStoreClient, + inner: Arc, + ) -> Self { + Self { + metadata_store_client, + sender, + inner, + } + } + + pub fn metadata_store_client(&self) -> &MetadataStoreClient { + &self.metadata_store_client } // Returns when the nodes configuration update is performed. diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index 5607da03b..baf248d3d 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::any::type_name; use std::sync::Arc; use arc_swap::ArcSwap; @@ -257,7 +258,11 @@ impl MetadataManager { } pub fn writer(&self) -> MetadataWriter { - MetadataWriter::new(self.metadata.sender.clone(), self.metadata.inner.clone()) + MetadataWriter::new( + self.metadata.sender.clone(), + self.metadata_store_client.clone(), + self.metadata.inner.clone(), + ) } /// Start and wait for shutdown signal. @@ -428,11 +433,18 @@ impl MetadataManager { let mut maybe_new_version = new_value.version(); if new_value.version() > current_value.version() { + trace!( + "Updating {} from {} to {}", + type_name::(), + current_value.version(), + new_value.version(), + ); container.store(new_value); } else { /* Do nothing, current is already newer */ trace!( - "Ignoring update {} because we are at {}", + "Ignoring update of {} to {} because we are already at {}", + type_name::(), new_value.version(), current_value.version(), ); diff --git a/crates/core/src/metadata_store.rs b/crates/core/src/metadata_store.rs index 3542226f5..891e19d6d 100644 --- a/crates/core/src/metadata_store.rs +++ b/crates/core/src/metadata_store.rs @@ -105,7 +105,6 @@ pub trait MetadataStore { /// Metadata store client which allows storing [`Versioned`] values into a [`MetadataStore`]. #[derive(Clone)] pub struct MetadataStoreClient { - // premature optimization? Maybe introduce trait object once we have multiple implementations? inner: Arc, backoff_policy: Option, } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..7dbc273a0 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -184,7 +184,8 @@ impl Node { record_cache.clone(), &mut router_builder, ); - let bifrost_svc = BifrostService::new().enable_local_loglet(&updateable_config); + let bifrost_svc = + BifrostService::new(metadata_manager.writer()).enable_local_loglet(&updateable_config); #[cfg(feature = "replicated-loglet")] let bifrost_svc = bifrost_svc.with_factory(replicated_loglet_factory); @@ -271,7 +272,6 @@ impl Node { metadata_manager.writer(), &mut server_builder, &mut router_builder, - metadata_store_client.clone(), worker_role .as_ref() .map(|worker_role| worker_role.storage_query_context().clone()), diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index b06ed3eec..89cedd8af 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -14,7 +14,6 @@ use codederror::CodedError; use restate_admin::cluster_controller; use restate_admin::service::AdminService; use restate_bifrost::Bifrost; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::MessageRouterBuilder; use restate_core::network::NetworkServerBuilder; use restate_core::network::Networking; @@ -70,7 +69,6 @@ impl AdminRole { metadata_writer: MetadataWriter, server_builder: &mut NetworkServerBuilder, router_builder: &mut MessageRouterBuilder, - metadata_store_client: MetadataStoreClient, local_query_context: Option, ) -> Result { health_status.update(AdminStatus::StartingUp); @@ -104,7 +102,6 @@ impl AdminRole { let admin = AdminService::new( metadata_writer.clone(), - metadata_store_client.clone(), bifrost.clone(), config.ingress.clone(), service_discovery, @@ -121,7 +118,6 @@ impl AdminRole { router_builder, server_builder, metadata_writer, - metadata_store_client, )) } else { None diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 178c05d1c..412353f1f 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -233,7 +233,7 @@ pub struct CommonOptions { /// # Network error retry policy /// - /// The retry policy for node network error + /// The retry policy for network related errors pub network_error_retry_policy: RetryPolicy, } diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 1668910b5..7eaecb526 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -194,7 +194,7 @@ impl TryFrom for NodeSetSelectionStrategy { #[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "kebab-case")] -pub enum DefaultProvider { +pub enum ProviderConfiguration { #[cfg(any(test, feature = "memory-loglet"))] InMemory, #[default] @@ -202,8 +202,8 @@ pub enum DefaultProvider { Replicated(ReplicatedLogletConfig), } -impl DefaultProvider { - pub fn as_provider_kind(&self) -> ProviderKind { +impl ProviderConfiguration { + pub fn kind(&self) -> ProviderKind { match self { #[cfg(any(test, feature = "memory-loglet"))] Self::InMemory => ProviderKind::InMemory, @@ -213,17 +213,17 @@ impl DefaultProvider { } } -impl From for crate::protobuf::cluster::DefaultProvider { - fn from(value: DefaultProvider) -> Self { +impl From for crate::protobuf::cluster::DefaultProvider { + fn from(value: ProviderConfiguration) -> Self { use crate::protobuf::cluster; let mut result = crate::protobuf::cluster::DefaultProvider::default(); match value { - DefaultProvider::Local => result.provider = ProviderKind::Local.to_string(), + ProviderConfiguration::Local => result.provider = ProviderKind::Local.to_string(), #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => result.provider = ProviderKind::InMemory.to_string(), - DefaultProvider::Replicated(config) => { + ProviderConfiguration::InMemory => result.provider = ProviderKind::InMemory.to_string(), + ProviderConfiguration::Replicated(config) => { result.provider = ProviderKind::Replicated.to_string(); result.replicated_config = Some(cluster::ReplicatedProviderConfig { replication_property: config.replication_property.to_string(), @@ -236,7 +236,7 @@ impl From for crate::protobuf::cluster::DefaultProvider { } } -impl TryFrom for DefaultProvider { +impl TryFrom for ProviderConfiguration { type Error = anyhow::Error; fn try_from(value: crate::protobuf::cluster::DefaultProvider) -> Result { let provider_kind: ProviderKind = value.provider.parse()?; @@ -272,7 +272,7 @@ pub struct ReplicatedLogletConfig { #[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] pub struct LogsConfiguration { - pub default_provider: DefaultProvider, + pub default_provider: ProviderConfiguration, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -325,12 +325,14 @@ impl TryFrom for Logs { // this means we are migrating from an older setup that had replication-property // hardcoded to {node:2} config = Some(LogsConfiguration { - default_provider: DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(2).expect("2 is not 0"), - ), - }), + default_provider: ProviderConfiguration::Replicated( + ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(2).expect("2 is not 0"), + ), + }, + ), }) } } @@ -495,14 +497,16 @@ impl Logs { Self::with_logs_configuration(LogsConfiguration { default_provider: match config.bifrost.default_provider { #[cfg(any(test, feature = "memory-loglet"))] - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, - ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(1).expect("1 is not zero"), - ), - }), + ProviderKind::InMemory => ProviderConfiguration::InMemory, + ProviderKind::Local => ProviderConfiguration::Local, + ProviderKind::Replicated => { + ProviderConfiguration::Replicated(ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(1).expect("1 is not zero"), + ), + }) + } }, }) } @@ -510,6 +514,7 @@ impl Logs { pub fn with_logs_configuration(logs_configuration: LogsConfiguration) -> Self { Logs { config: logs_configuration, + version: Version::MIN, ..Default::default() } } diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index 884a22866..ecb513c56 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -213,14 +213,14 @@ mod tests { // Start paused makes sure the timer is immediately fired #[test(restate_core::test(start_paused = true))] pub async fn cleanup_works() { - let _env = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, 1, )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let expired_invocation = InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 105e5a6a4..6d8dc4842 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -637,12 +637,12 @@ mod tests { #[test(restate_core::test)] async fn become_leader_then_step_down() -> googletest::Result<()> { - let _env = TestCoreEnv::create_with_single_node(0, 0).await; + let env = TestCoreEnv::create_with_single_node(0, 0).await; let storage_options = StorageOptions::default(); let rocksdb_options = RocksDbOptions::default(); RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let partition_store_manager = PartitionStoreManager::create( Constant::new(storage_options.clone()).boxed(), diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 571812edd..6834c0e40 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -344,6 +344,7 @@ where // Start reading after the last applied lsn let key_query = KeyFilter::Within(self.partition_key_range.clone()); + let mut log_reader = self .bifrost .create_reader( diff --git a/crates/worker/src/partition/shuffle.rs b/crates/worker/src/partition/shuffle.rs index e4ac810d0..6b378b33e 100644 --- a/crates/worker/src/partition/shuffle.rs +++ b/crates/worker/src/partition/shuffle.rs @@ -628,7 +628,7 @@ mod tests { let (truncation_tx, _truncation_rx) = mpsc::channel(1); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer.clone()).await; let shuffle = Shuffle::new(metadata, outbox_reader, truncation_tx, 1, bifrost.clone()); ShuffleEnv { diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 1a0492d89..9965ceaf8 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -975,7 +975,8 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(env_builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let partition_store_manager = PartitionStoreManager::create( diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index 628707454..c11c9a1a0 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -15,7 +15,7 @@ use enumset::{enum_set, EnumSet}; use googletest::internal::test_outcome::TestAssertionFailure; use googletest::IntoTestResult; -use restate_bifrost::{loglet::Loglet, Bifrost, BifrostAdmin}; +use restate_bifrost::{loglet::Loglet, Bifrost}; use restate_core::metadata_store::Precondition; use restate_core::TaskCenter; use restate_core::{metadata_store::MetadataStoreClient, MetadataWriter}; @@ -92,16 +92,6 @@ pub struct TestEnv { pub cluster: StartedCluster, } -impl TestEnv { - pub fn bifrost_admin(&self) -> BifrostAdmin<'_> { - BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ) - } -} - pub async fn run_in_test_env( mut base_config: Configuration, sequencer: GenerationalNodeId, diff --git a/server/tests/replicated_loglet.rs b/server/tests/replicated_loglet.rs index 8fa79c7f2..f3656d2c9 100644 --- a/server/tests/replicated_loglet.rs +++ b/server/tests/replicated_loglet.rs @@ -248,23 +248,13 @@ mod tests { } let mut sealer_handle: JoinHandle> = tokio::task::spawn({ - let (bifrost, metadata_writer, metadata_store_client) = ( - test_env.bifrost.clone(), - test_env.metadata_writer.clone(), - test_env.metadata_store_client.clone() - ); + let bifrost = test_env.bifrost.clone(); async move { let cancellation_token = cancellation_token(); let mut chain = metadata.updateable_logs_metadata().map(|logs| logs.chain(&log_id).expect("a chain to exist")); - let bifrost_admin = restate_bifrost::BifrostAdmin::new( - &bifrost, - &metadata_writer, - &metadata_store_client, - ); - let mut last_loglet_id = None; while !cancellation_token.is_cancelled() { @@ -280,7 +270,8 @@ mod tests { eprintln!("Sealing loglet {} and creating new loglet {}", params.loglet_id, params.loglet_id.next()); params.loglet_id = params.loglet_id.next(); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( log_id, None, diff --git a/tools/bifrost-benchpress/src/main.rs b/tools/bifrost-benchpress/src/main.rs index 33d5f8d21..f9512f964 100644 --- a/tools/bifrost-benchpress/src/main.rs +++ b/tools/bifrost-benchpress/src/main.rs @@ -176,7 +176,7 @@ fn spawn_environment(config: Live, num_logs: u16) -> (task_center metadata_writer.submit(Arc::new(logs)); spawn_metadata_manager(metadata_manager).expect("metadata manager starts"); - let bifrost_svc = BifrostService::new() + let bifrost_svc = BifrostService::new(metadata_writer) .enable_in_memory_loglet() .enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); diff --git a/tools/restatectl/src/commands/cluster/config.rs b/tools/restatectl/src/commands/cluster/config.rs index 620a2f488..b985b1278 100644 --- a/tools/restatectl/src/commands/cluster/config.rs +++ b/tools/restatectl/src/commands/cluster/config.rs @@ -16,7 +16,7 @@ use std::fmt::{self, Display, Write}; use cling::prelude::*; use restate_types::{ - logs::metadata::DefaultProvider, partition_table::ReplicationStrategy, + logs::metadata::ProviderConfiguration, partition_table::ReplicationStrategy, protobuf::cluster::ClusterConfiguration, }; @@ -43,7 +43,7 @@ fn cluster_config_string(config: ClusterConfiguration) -> anyhow::Result write_leaf(&mut w, 1, false, "Bifrost replication strategy", strategy)?; - let provider: DefaultProvider = config.default_provider.unwrap_or_default().try_into()?; + let provider: ProviderConfiguration = config.default_provider.unwrap_or_default().try_into()?; write_default_provider(&mut w, 1, provider)?; Ok(w) @@ -52,19 +52,19 @@ fn cluster_config_string(config: ClusterConfiguration) -> anyhow::Result fn write_default_provider( w: &mut W, depth: usize, - provider: DefaultProvider, + provider: ProviderConfiguration, ) -> Result<(), fmt::Error> { let title = "Bifrost Provider"; match provider { #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => { + ProviderConfiguration::InMemory => { write_leaf(w, depth, true, title, "in-memory")?; } - DefaultProvider::Local => { + ProviderConfiguration::Local => { write_leaf(w, depth, true, title, "local")?; } #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(config) => { + ProviderConfiguration::Replicated(config) => { write_leaf(w, depth, true, title, "replicated")?; let depth = depth + 1; write_leaf( diff --git a/tools/restatectl/src/commands/cluster/config/set.rs b/tools/restatectl/src/commands/cluster/config/set.rs index 5c23c01f2..abdece88c 100644 --- a/tools/restatectl/src/commands/cluster/config/set.rs +++ b/tools/restatectl/src/commands/cluster/config/set.rs @@ -23,7 +23,7 @@ use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::{confirm_or_exit, StyledTable}; use restate_types::logs::metadata::{ - DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, + NodeSetSelectionStrategy, ProviderConfiguration, ProviderKind, ReplicatedLogletConfig, }; use restate_types::partition_table::ReplicationStrategy; use restate_types::replicated_loglet::ReplicationProperty; @@ -89,8 +89,8 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an if let Some(provider) = set_opts.bifrost_provider { let default_provider = match provider { - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, + ProviderKind::InMemory => ProviderConfiguration::InMemory, + ProviderKind::Local => ProviderConfiguration::Local, ProviderKind::Replicated => { let config = ReplicatedLogletConfig { replication_property: set_opts @@ -101,7 +101,7 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an .nodeset_selection_strategy .unwrap_or_default(), }; - DefaultProvider::Replicated(config) + ProviderConfiguration::Replicated(config) } }; diff --git a/tools/restatectl/src/commands/log/dump_log.rs b/tools/restatectl/src/commands/log/dump_log.rs index cb4219513..213e328fa 100644 --- a/tools/restatectl/src/commands/log/dump_log.rs +++ b/tools/restatectl/src/commands/log/dump_log.rs @@ -86,6 +86,7 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { let metadata_manager = MetadataManager::new(metadata_builder, metadata_store_client.clone()); + let metadata_writer = metadata_manager.writer(); let mut router_builder = MessageRouterBuilder::default(); metadata_manager.register_in_message_router(&mut router_builder); @@ -95,7 +96,8 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { metadata_manager.run(), )?; - let bifrost_svc = BifrostService::new().enable_local_loglet(&Configuration::updateable()); + let bifrost_svc = + BifrostService::new(metadata_writer).enable_local_loglet(&Configuration::updateable()); let bifrost = bifrost_svc.handle(); // Ensures bifrost has initial metadata synced up before starting the worker. diff --git a/tools/restatectl/src/commands/log/list_logs.rs b/tools/restatectl/src/commands/log/list_logs.rs index a92668959..1b48e7b44 100644 --- a/tools/restatectl/src/commands/log/list_logs.rs +++ b/tools/restatectl/src/commands/log/list_logs.rs @@ -55,6 +55,11 @@ pub async fn list_logs(connection: &ConnectionInfo, _opts: &ListLogsOpts) -> any c_println!("Log Configuration ({})", logs.version()); + c_println!( + "Default Provider Config: {:?}", + logs.configuration().default_provider + ); + // sort by log-id for display let logs: BTreeMap = logs.iter().map(|(id, chain)| (*id, chain)).collect(); diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index ee702beec..e556aa82c 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -103,11 +103,10 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> { // We start the Meta service, then download the openapi schema generated let node_env = TestCoreEnv::create_with_single_node(1, 1).await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer.clone()).await; let admin_service = AdminService::new( node_env.metadata_writer.clone(), - node_env.metadata_store_client.clone(), bifrost, Mock, ServiceDiscovery::new(