diff --git a/Cargo.lock b/Cargo.lock index 436138462..c8775bdb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6564,6 +6564,7 @@ dependencies = [ "async-trait", "axum", "bytes", + "bytestring", "codederror", "datafusion", "derive_builder", diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 0a74a7078..602c2f0a7 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -24,11 +24,8 @@ use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; -use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, -}; +use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError}; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; -use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; use restate_types::live::Pinned; @@ -639,9 +636,9 @@ struct LogsControllerInner { } impl LogsControllerInner { - fn new(configuration: LogsConfiguration, retry_policy: RetryPolicy) -> Self { + fn new(current_logs: Arc, retry_policy: RetryPolicy) -> Self { Self { - current_logs: Arc::new(Logs::with_logs_configuration(configuration)), + current_logs, logs_state: HashMap::with_hasher(Xxh3Builder::default()), logs_write_in_progress: None, retry_policy, @@ -925,26 +922,11 @@ pub struct LogsController { } impl LogsController { - pub async fn init( - configuration: &Configuration, + pub fn new( bifrost: Bifrost, metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, - ) -> Result { - // obtain the latest logs or init it with an empty logs variant - let logs = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) - }, - ) - .await?; - - let logs_configuration = logs.configuration().clone(); - metadata_writer.update(Arc::new(logs)).await?; - + ) -> Self { //todo(azmy): make configurable let retry_policy = RetryPolicy::exponential( Duration::from_millis(10), @@ -955,7 +937,10 @@ impl LogsController { let mut this = Self { effects: Some(Vec::new()), - inner: LogsControllerInner::new(logs_configuration, retry_policy), + inner: LogsControllerInner::new( + Metadata::with_current(|m| m.logs_snapshot()), + retry_policy, + ), bifrost, metadata_store_client, metadata_writer, @@ -964,7 +949,7 @@ impl LogsController { }; this.find_logs_tail(); - Ok(this) + this } pub fn find_logs_tail(&mut self) { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index e5694601f..9c68f9c91 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -37,7 +37,7 @@ use restate_types::partition_table::{ use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; -use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; +use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ @@ -296,8 +296,6 @@ impl Service { } pub async fn run(mut self) -> anyhow::Result<()> { - self.init_partition_table().await?; - let mut config_watcher = Configuration::watcher(); let mut cluster_state_watcher = self.cluster_state_refresher.cluster_state_watcher(); @@ -353,34 +351,6 @@ impl Service { } } - /// creates partition table iff it does not exist - async fn init_partition_table(&mut self) -> anyhow::Result<()> { - let configuration = self.configuration.live_load(); - - let partition_table = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - self.metadata_store_client - .get_or_insert(PARTITION_TABLE_KEY.clone(), || { - let partition_table = PartitionTable::with_equally_sized_partitions( - Version::MIN, - configuration.common.bootstrap_num_partitions.get(), - ); - - debug!("Initializing the partition table with '{partition_table:?}'"); - - partition_table - }) - }, - ) - .await?; - - self.metadata_writer - .update(Arc::new(partition_table)) - .await?; - - Ok(()) - } /// Triggers a snapshot creation for the given partition by issuing an RPC /// to the node hosting the active leader. async fn create_partition_snapshot( diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..4392744ca 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -161,13 +161,11 @@ where ) .await?; - let logs_controller = LogsController::init( - &configuration, + let logs_controller = LogsController::new( service.bifrost.clone(), service.metadata_store_client.clone(), service.metadata_writer.clone(), - ) - .await?; + ); let (log_trim_interval, log_trim_threshold) = create_log_trim_interval(&configuration.admin); diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index 3222cc5c2..54dc75168 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -78,10 +78,10 @@ impl BifrostService { pub fn handle(&self) -> Bifrost { self.bifrost.clone() } - /// Runs initialization phase, then returns a handle to join on shutdown. - /// In this phase the system should wait until this is completed before - /// continuing. For instance, a worker mark itself as `STARTING_UP` and not - /// accept any requests until this is completed. + + /// Runs initialization phase. In this phase the system should wait until this is completed + /// before continuing. For instance, a worker mark itself as `STARTING_UP` and not accept any + /// requests until this is completed. /// /// This requires to run within a task_center context. pub async fn start(self) -> anyhow::Result<()> { diff --git a/crates/core/build.rs b/crates/core/build.rs index 135e37a1d..9904963e6 100644 --- a/crates/core/build.rs +++ b/crates/core/build.rs @@ -21,6 +21,7 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .extern_path(".restate.node", "::restate_types::protobuf::node") .extern_path(".restate.common", "::restate_types::protobuf::common") + .extern_path(".restate.cluster", "::restate_types::protobuf::cluster") .compile_protos( &["./protobuf/node_ctl_svc.proto"], &["protobuf", "../types/protobuf"], diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index 74350bc4a..673d9e4dc 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -10,6 +10,7 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; +import "restate/cluster.proto"; import "restate/common.proto"; import "restate/node.proto"; @@ -20,6 +21,30 @@ service NodeCtlSvc { rpc GetIdent(google.protobuf.Empty) returns (IdentResponse); rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse); + + // Provision the Restate cluster on this node. + rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse); +} + +message ProvisionClusterRequest { + bool dry_run = 1; + optional uint32 num_partitions = 2; + optional restate.cluster.ReplicationStrategy placement_strategy = 3; + optional restate.cluster.DefaultProvider log_provider = 4; +} + +enum ProvisionClusterResponseKind { + ProvisionClusterResponseType_UNKNOWN = 0; + ERROR = 1; + SUCCESS = 2; + DRY_RUN = 3; +} + +message ProvisionClusterResponse { + ProvisionClusterResponseKind kind = 1; + // If there is an error, this field will be set. All other fields will be empty. + optional string error = 2; + optional restate.cluster.ClusterConfiguration cluster_configuration = 3; } message IdentResponse { diff --git a/crates/core/src/metadata_store.rs b/crates/core/src/metadata_store.rs index 3542226f5..c391cd080 100644 --- a/crates/core/src/metadata_store.rs +++ b/crates/core/src/metadata_store.rs @@ -17,8 +17,10 @@ use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use bytestring::ByteString; use restate_types::errors::GenericError; +use restate_types::metadata_store::keys::NODES_CONFIG_KEY; +use restate_types::nodes_config::NodesConfiguration; use restate_types::retries::RetryPolicy; -use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; +use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode, StorageEncodeError}; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; use std::future::Future; use std::sync::Arc; @@ -51,6 +53,29 @@ pub enum WriteError { Store(GenericError), } +#[derive(Debug, thiserror::Error)] +pub enum ProvisionError { + #[error("network error: {0}")] + Network(GenericError), + #[error("internal error: {0}")] + Internal(String), + #[error("codec error: {0}")] + Codec(GenericError), + #[error("store error: {0}")] + Store(GenericError), +} + +impl MetadataStoreClientError for ProvisionError { + fn is_network_error(&self) -> bool { + match self { + ProvisionError::Network(_) => true, + ProvisionError::Internal(_) | ProvisionError::Codec(_) | ProvisionError::Store(_) => { + false + } + } + } +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct VersionedValue { pub version: Version, @@ -100,6 +125,89 @@ pub trait MetadataStore { /// Deletes the key-value pair for the given key following the provided precondition. If the /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError>; + + /// Tries to provision the metadata store with the provided [`NodesConfiguration`]. Returns + /// `true` if the metadata store was newly provisioned. Returns `false` if the metadata store + /// is already provisioned. + async fn provision( + &self, + nodes_configuration: &NodesConfiguration, + ) -> Result; +} + +/// A provisioned metadata store does not need to be explicitly provisioned. Therefore, a provision +/// call is translated into a put command. +#[async_trait] +pub trait ProvisionedMetadataStore { + /// Gets the value and its current version for the given key. If key-value pair is not present, + /// then return [`None`]. + async fn get(&self, key: ByteString) -> Result, ReadError>; + + /// Gets the current version for the given key. If key-value pair is not present, then return + /// [`None`]. + async fn get_version(&self, key: ByteString) -> Result, ReadError>; + + /// Puts the versioned value under the given key following the provided precondition. If the + /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. + async fn put( + &self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), WriteError>; + + /// Deletes the key-value pair for the given key following the provided precondition. If the + /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. + async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError>; +} + +#[async_trait] +impl MetadataStore for T { + async fn get(&self, key: ByteString) -> Result, ReadError> { + self.get(key).await + } + + async fn get_version(&self, key: ByteString) -> Result, ReadError> { + self.get_version(key).await + } + + async fn put( + &self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), WriteError> { + self.put(key, value, precondition).await + } + + async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> { + self.delete(key, precondition).await + } + + async fn provision( + &self, + nodes_configuration: &NodesConfiguration, + ) -> Result { + let versioned_value = serialize_value(nodes_configuration) + .map_err(|err| ProvisionError::Codec(err.into()))?; + match self + .put( + NODES_CONFIG_KEY.clone(), + versioned_value, + Precondition::DoesNotExist, + ) + .await + { + Ok(()) => Ok(true), + Err(err) => match err { + WriteError::FailedPrecondition(_) => Ok(false), + WriteError::Network(err) => Err(ProvisionError::Network(err)), + WriteError::Internal(err) => Err(ProvisionError::Internal(err)), + WriteError::Codec(err) => Err(ProvisionError::Codec(err)), + WriteError::Store(err) => Err(ProvisionError::Store(err)), + }, + } + } } /// Metadata store client which allows storing [`Versioned`] values into a [`MetadataStore`]. @@ -170,18 +278,10 @@ impl MetadataStoreClient { where T: Versioned + StorageEncode, { - let version = value.version(); + let versioned_value = + serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?; - let mut buf = BytesMut::default(); - StorageCodec::encode(value, &mut buf).map_err(|err| WriteError::Codec(err.into()))?; - - self.inner - .put( - key, - VersionedValue::new(version, buf.freeze()), - precondition, - ) - .await + self.inner.put(key, versioned_value, precondition).await } /// Deletes the key-value pair for the given key following the provided precondition. If the @@ -285,6 +385,23 @@ impl MetadataStoreClient { } } } + + pub async fn provision( + &self, + nodes_configuration: &NodesConfiguration, + ) -> Result { + self.inner.provision(nodes_configuration).await + } +} + +pub fn serialize_value( + value: &T, +) -> Result { + let version = value.version(); + let mut buf = BytesMut::default(); + StorageCodec::encode(value, &mut buf)?; + let versioned_value = VersionedValue::new(version, buf.freeze()); + Ok(versioned_value) } #[derive(Debug, thiserror::Error)] diff --git a/crates/core/src/metadata_store/providers/etcd.rs b/crates/core/src/metadata_store/providers/etcd.rs index 164a772ee..49b85f11b 100644 --- a/crates/core/src/metadata_store/providers/etcd.rs +++ b/crates/core/src/metadata_store/providers/etcd.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use crate::metadata_store::{ - MetadataStore, Precondition, ReadError, Version, VersionedValue, WriteError, + Precondition, ProvisionedMetadataStore, ReadError, Version, VersionedValue, WriteError, }; use crate::network::net_util::CommonClientConnectionOptions; use anyhow::Context; @@ -145,7 +145,7 @@ impl EtcdMetadataStore { } #[async_trait::async_trait] -impl MetadataStore for EtcdMetadataStore { +impl ProvisionedMetadataStore for EtcdMetadataStore { async fn get(&self, key: ByteString) -> Result, ReadError> { let mut client = self.client.kv_client(); let mut response = client.get(key.into_bytes(), None).await?; diff --git a/crates/core/src/metadata_store/providers/objstore/glue.rs b/crates/core/src/metadata_store/providers/objstore/glue.rs index 940840b17..dc76e9404 100644 --- a/crates/core/src/metadata_store/providers/objstore/glue.rs +++ b/crates/core/src/metadata_store/providers/objstore/glue.rs @@ -17,7 +17,9 @@ use tracing::warn; use crate::cancellation_watcher; use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder; -use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError}; +use crate::metadata_store::{ + Precondition, ProvisionedMetadataStore, ReadError, VersionedValue, WriteError, +}; #[derive(Debug)] pub(crate) enum Commands { @@ -111,7 +113,7 @@ impl Client { } #[async_trait::async_trait] -impl MetadataStore for Client { +impl ProvisionedMetadataStore for Client { async fn get(&self, key: ByteString) -> Result, ReadError> { let (tx, rx) = tokio::sync::oneshot::channel(); diff --git a/crates/core/src/metadata_store/test_util.rs b/crates/core/src/metadata_store/test_util.rs index 94f2b5d69..ef01c2872 100644 --- a/crates/core/src/metadata_store/test_util.rs +++ b/crates/core/src/metadata_store/test_util.rs @@ -8,7 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError}; +use crate::metadata_store::{ + Precondition, ProvisionedMetadataStore, ReadError, VersionedValue, WriteError, +}; use bytestring::ByteString; use restate_types::Version; use std::collections::HashMap; @@ -47,7 +49,7 @@ impl InMemoryMetadataStore { } #[async_trait::async_trait] -impl MetadataStore for InMemoryMetadataStore { +impl ProvisionedMetadataStore for InMemoryMetadataStore { async fn get(&self, key: ByteString) -> Result, ReadError> { Ok(self.kv_pairs.lock().unwrap().get(&key).cloned()) } diff --git a/crates/core/src/network/protobuf.rs b/crates/core/src/network/protobuf.rs index bf64a3e30..011c7cbd4 100644 --- a/crates/core/src/network/protobuf.rs +++ b/crates/core/src/network/protobuf.rs @@ -9,10 +9,38 @@ // by the Apache License, Version 2.0. pub mod node_ctl_svc { + use restate_types::protobuf::cluster::ClusterConfiguration; + tonic::include_proto!("restate.node_ctl_svc"); pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("node_ctl_svc_descriptor"); + + impl ProvisionClusterResponse { + pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::DryRun.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + + pub fn err(message: impl ToString) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Error.into(), + error: Some(message.to_string()), + ..Default::default() + } + } + + pub fn success(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Success.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + } } pub mod core_node_svc { diff --git a/crates/metadata-store/src/local/grpc/client.rs b/crates/metadata-store/src/local/grpc/client.rs index 4d5362281..7e52e9994 100644 --- a/crates/metadata-store/src/local/grpc/client.rs +++ b/crates/metadata-store/src/local/grpc/client.rs @@ -14,7 +14,7 @@ use tonic::transport::Channel; use tonic::{Code, Status}; use restate_core::metadata_store::{ - MetadataStore, Precondition, ReadError, VersionedValue, WriteError, + Precondition, ProvisionedMetadataStore, ReadError, VersionedValue, WriteError, }; use restate_core::network::net_util::create_tonic_channel_from_advertised_address; use restate_core::network::net_util::CommonClientConnectionOptions; @@ -45,7 +45,7 @@ impl LocalMetadataStoreClient { } #[async_trait] -impl MetadataStore for LocalMetadataStoreClient { +impl ProvisionedMetadataStore for LocalMetadataStoreClient { async fn get(&self, key: ByteString) -> Result, ReadError> { let response = self .svc_client diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 4e40403ef..7b8877a75 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -30,7 +30,7 @@ use tracing::{debug, trace}; pub type RequestSender = mpsc::Sender; pub type RequestReceiver = mpsc::Receiver; -type Result = std::result::Result; +type Result = std::result::Result; const DB_NAME: &str = "local-metadata-store"; const KV_PAIRS: &str = "kv_pairs"; @@ -163,7 +163,7 @@ impl LocalMetadataStore { request = self.request_rx.recv() => { let request = request.expect("receiver should not be closed since we own one clone."); self.handle_request(request).await; - } + }, _ = cancellation_watcher() => { break; }, diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 5583799ab..9881f52e6 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -38,6 +38,7 @@ arc-swap = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } bytes = { workspace = true } +bytestring = { workspace = true } codederror = { workspace = true } datafusion = { workspace = true } derive_builder = { workspace = true } diff --git a/crates/node/src/init.rs b/crates/node/src/init.rs new file mode 100644 index 000000000..789d3c171 --- /dev/null +++ b/crates/node/src/init.rs @@ -0,0 +1,283 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use anyhow::bail; +use restate_core::metadata_store::{MetadataStoreClient, MetadataStoreClientError, ReadWriteError}; +use restate_core::{ + cancellation_watcher, Metadata, MetadataWriter, ShutdownError, SyncError, TargetVersion, +}; +use restate_types::config::{CommonOptions, Configuration}; +use restate_types::metadata_store::keys::NODES_CONFIG_KEY; +use restate_types::net::metadata::MetadataKind; +use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration}; +use restate_types::retries::RetryPolicy; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tracing::{debug, info, trace, warn}; + +#[derive(Debug, thiserror::Error)] +enum JoinError { + #[error("missing nodes configuration")] + MissingNodesConfiguration, + #[error("detected a concurrent registration for node '{0}'")] + ConcurrentNodeRegistration(String), + #[error("failed writing to metadata store: {0}")] + MetadataStore(#[from] ReadWriteError), + #[error("trying to join wrong cluster; expected '{expected_cluster_name}', actual '{actual_cluster_name}'")] + ClusterMismatch { + expected_cluster_name: String, + actual_cluster_name: String, + }, +} + +pub struct NodeInit<'a> { + metadata_store_client: &'a MetadataStoreClient, + metadata_writer: &'a MetadataWriter, +} + +impl<'a> NodeInit<'a> { + pub fn new( + metadata_store_client: &'a MetadataStoreClient, + metadata_writer: &'a MetadataWriter, + ) -> Self { + Self { + metadata_store_client, + metadata_writer, + } + } + + pub async fn init(self) -> anyhow::Result<()> { + let config = Configuration::pinned().into_arc(); + + let join_cluster = Self::join_cluster(self.metadata_store_client, &config.common); + + let nodes_configuration = tokio::select! { + _ = cancellation_watcher() => { + return Err(ShutdownError.into()); + }, + result = join_cluster => { + let nodes_configuration = result?; + info!("Successfully joined cluster"); + nodes_configuration + }, + }; + + // Find my node in nodes configuration. + let my_node_config = nodes_configuration + .find_node_by_name(config.common.node_name()) + .expect("node config should have been upserted"); + + let my_node_id = my_node_config.current_generation; + + // Safety checks, same node (if set)? + if config + .common + .force_node_id + .is_some_and(|n| n != my_node_id.as_plain()) + { + bail!( + format!( + "Node ID mismatch: configured node ID is {}, but the nodes configuration contains {}", + config.common.force_node_id.unwrap(), + my_node_id.as_plain() + )); + } + + // Same cluster? + if config.common.cluster_name() != nodes_configuration.cluster_name() { + bail!( + format!( + "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", + config.common.cluster_name(), + nodes_configuration.cluster_name() + )); + } + + info!( + roles = %my_node_config.roles, + address = %my_node_config.address, + "My Node ID is {}", my_node_config.current_generation); + + self.metadata_writer + .update(Arc::new(nodes_configuration)) + .await?; + + // My Node ID is set + self.metadata_writer.set_my_node_id(my_node_id); + restate_tracing_instrumentation::set_global_node_id(my_node_id); + + self.sync_metadata().await; + + Ok(()) + } + + async fn sync_metadata(&self) { + // fetch the latest metadata + let metadata = Metadata::current(); + + let config = Configuration::pinned(); + + let retry_policy = config.common.network_error_retry_policy.clone(); + + if let Err(err) = retry_policy + .retry_if( + || async { + metadata + .sync(MetadataKind::Schema, TargetVersion::Latest) + .await?; + metadata + .sync(MetadataKind::PartitionTable, TargetVersion::Latest) + .await?; + metadata + .sync(MetadataKind::Logs, TargetVersion::Latest) + .await + }, + |err| match err { + SyncError::MetadataStore(err) => err.is_network_error(), + SyncError::Shutdown(_) => false, + }, + ) + .await + { + warn!("Failed to fetch the latest metadata when initializing the node: {err}"); + } + } + + async fn join_cluster( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + ) -> anyhow::Result { + info!("Trying to join cluster '{}'", common_opts.cluster_name()); + + // todo make configurable + // Never give up trying to join the cluster. Users of this struct will set a timeout if + // needed. + let join_retry = RetryPolicy::exponential( + Duration::from_millis(100), + 2.0, + None, + Some(Duration::from_secs(5)), + ); + + let join_start = Instant::now(); + let mut printed_provision_message = false; + + join_retry + .retry_if( + || Self::join_cluster_inner(metadata_store_client, common_opts), + |err| { + if join_start.elapsed() < Duration::from_secs(5) { + trace!("Failed joining the cluster: {err}; retrying"); + } else if !printed_provision_message { + info!("Couldn't join the cluster yet. Don't forget to provision it!"); + printed_provision_message = true; + } else { + debug!("Failed joining cluster: {err}; retrying"); + } + match err { + JoinError::MissingNodesConfiguration => true, + JoinError::ConcurrentNodeRegistration(_) => false, + JoinError::MetadataStore(err) => err.is_network_error(), + JoinError::ClusterMismatch { .. } => false, + } + }, + ) + .await + .map_err(Into::into) + } + + async fn join_cluster_inner( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + ) -> Result { + let mut previous_node_generation = None; + + metadata_store_client + .read_modify_write::( + NODES_CONFIG_KEY.clone(), + move |nodes_config| { + let mut nodes_config = + nodes_config.ok_or(JoinError::MissingNodesConfiguration)?; + + // check that we are joining the right cluster + if nodes_config.cluster_name() != common_opts.cluster_name() { + return Err(JoinError::ClusterMismatch { + expected_cluster_name: common_opts.cluster_name().to_owned(), + actual_cluster_name: nodes_config.cluster_name().to_owned(), + }); + } + + // check whether we have registered before + let node_config = nodes_config + .find_node_by_name(common_opts.node_name()) + .cloned(); + + let my_node_config = if let Some(mut node_config) = node_config { + assert_eq!( + common_opts.node_name(), + node_config.name, + "node name must match" + ); + + if let Some(previous_node_generation) = previous_node_generation { + if node_config + .current_generation + .is_newer_than(previous_node_generation) + { + // detected a concurrent registration of the same node + return Err(JoinError::ConcurrentNodeRegistration( + common_opts.node_name().to_owned(), + )); + } + } else { + // remember the previous node generation to detect concurrent modifications + previous_node_generation = Some(node_config.current_generation); + } + + // update node_config + node_config.roles = common_opts.roles; + node_config.address = common_opts.advertised_address.clone(); + node_config.current_generation.bump_generation(); + + node_config + } else { + let plain_node_id = common_opts.force_node_id.unwrap_or_else(|| { + nodes_config + .max_plain_node_id() + .map(|n| n.next()) + .unwrap_or_default() + }); + + assert!( + nodes_config.find_node_by_id(plain_node_id).is_err(), + "duplicate plain node id '{plain_node_id}'" + ); + + let my_node_id = plain_node_id.with_generation(1); + + NodeConfig::new( + common_opts.node_name().to_owned(), + my_node_id, + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ) + }; + + nodes_config.upsert_node(my_node_config); + nodes_config.increment_version(); + + Ok(nodes_config) + }, + ) + .await + .map_err(|err| err.transpose()) + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..f21016af6 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -9,46 +9,47 @@ // by the Apache License, Version 2.0. mod cluster_marker; +mod init; mod network_server; mod roles; -use std::sync::Arc; - -use tracing::{debug, error, info, trace}; +use anyhow::Context; +use tonic::{Code, IntoRequest}; +use tracing::{debug, error, info, trace, warn}; +use crate::cluster_marker::ClusterValidationError; +use crate::init::NodeInit; +use crate::network_server::NetworkServer; +use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole}; use codederror::CodedError; use restate_bifrost::BifrostService; -use restate_core::metadata_store::{retry_on_network_error, ReadWriteError}; +use restate_core::metadata_store::ReadWriteError; +use restate_core::network::net_util::create_tonic_channel_from_advertised_address; +use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::network::protobuf::node_ctl_svc::{ + ProvisionClusterRequest, ProvisionClusterResponseKind, +}; use restate_core::network::{ GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking, }; use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher}; -use restate_core::{cancellation_watcher, TaskKind}; -use restate_core::{ - spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager, TargetVersion, - TaskCenter, -}; +use restate_core::{cancellation_watcher, Metadata, TaskKind}; +use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter}; #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; -use restate_types::config::{CommonOptions, Configuration}; +use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::health::Health; use restate_types::live::Live; #[cfg(feature = "replicated-loglet")] use restate_types::logs::RecordCache; -use restate_types::metadata_store::keys::NODES_CONFIG_KEY; -use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; +use restate_types::nodes_config::Role; use restate_types::protobuf::common::{ AdminStatus, IngressStatus, LogServerStatus, MetadataServerStatus, NodeRpcStatus, NodeStatus, WorkerStatus, }; -use restate_types::Version; - -use crate::cluster_marker::ClusterValidationError; -use crate::network_server::NetworkServer; -use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole}; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -331,86 +332,101 @@ impl Node { let health = self.health.clone(); let common_options = config.common.clone(); let connection_manager = self.networking.connection_manager().clone(); + let metadata_store_client = self.metadata_store_client.clone(); async move { NetworkServer::run( health, connection_manager, self.server_builder, common_options, + metadata_store_client, ) .await?; Ok(()) } })?; + // wait until the node rpc server is up and running before continuing + self.health + .node_rpc_status() + .wait_for_value(NodeRpcStatus::Ready) + .await; + if let Some(metadata_store) = self.metadata_store_role { - TaskCenter::spawn( - TaskKind::MetadataStore, - "local-metadata-store", - async move { - metadata_store.run().await?; - Ok(()) - }, - )?; + TaskCenter::spawn(TaskKind::MetadataStore, "metadata-store", async move { + metadata_store.run().await?; + Ok(()) + })?; } - // Start partition routing information refresher - spawn_partition_routing_refresher(self.partition_routing_refresher)?; - - let nodes_config = - Self::upsert_node_config(&self.metadata_store_client, &config.common).await?; - metadata_writer.update(Arc::new(nodes_config)).await?; - if config.common.allow_bootstrap { - // todo write bootstrap state + TaskCenter::spawn(TaskKind::SystemBoot, "auto-provision-cluster", { + let channel = create_tonic_channel_from_advertised_address( + config.common.advertised_address.clone(), + &config.networking, + ); + let client = NodeCtlSvcClient::new(channel); + let retry_policy = config.common.network_error_retry_policy.clone(); + async move { + let response = retry_policy + .retry_if( + || { + let mut client = client.clone(); + async move { + client + .provision_cluster( + ProvisionClusterRequest { + dry_run: false, + ..Default::default() + } + .into_request(), + ) + .await + } + }, + |status| status.code() == Code::Unavailable, + ) + .await; + + match response { + Ok(response) => { + if response.into_inner().kind() == ProvisionClusterResponseKind::DryRun + { + debug!("The cluster is already provisioned."); + } + } + Err(err) => { + warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); + } + } + + Ok(()) + } + })?; } - // fetch the latest schema information - metadata - .sync(MetadataKind::Schema, TargetVersion::Latest) - .await?; + let initialization_timeout = config.common.initialization_timeout.into(); - let nodes_config = metadata.nodes_config_ref(); + tokio::time::timeout( + initialization_timeout, + NodeInit::new( + &self.metadata_store_client, + &metadata_writer, + ) + .init(), + ) + .await + .context("Giving up trying to initialize the node. Make sure that it can reach the metadata store and don't forget to provision the cluster on a fresh start.")? + .context("Failed initializing the node.")?; - // Find my node in nodes configuration. + let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + let my_node_id = Metadata::with_current(|m| m.my_node_id()); let my_node_config = nodes_config - .find_node_by_name(config.common.node_name()) - .expect("node config should have been upserted"); - - let my_node_id = my_node_config.current_generation; - - // Safety checks, same node (if set)? - if config - .common - .force_node_id - .is_some_and(|n| n != my_node_id.as_plain()) - { - return Err(Error::SafetyCheck( - format!( - "Node ID mismatch: configured node ID is {}, but the nodes configuration contains {}", - config.common.force_node_id.unwrap(), - my_node_id.as_plain() - )))?; - } - - // Same cluster? - if config.common.cluster_name() != nodes_config.cluster_name() { - return Err(Error::SafetyCheck( - format!( - "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", - config.common.cluster_name(), - nodes_config.cluster_name() - )))?; - } - - // My Node ID is set - metadata_writer.set_my_node_id(my_node_id); - restate_tracing_instrumentation::set_global_node_id(my_node_id); + .find_node_by_id(my_node_id) + .expect("should be present"); - info!( - roles = %my_node_config.roles, - address = %my_node_config.address, - "My Node ID is {}", my_node_config.current_generation); + // Start partition routing information refresher + spawn_partition_routing_refresher(self.partition_routing_refresher)?; // todo this is a temporary solution to announce the updated NodesConfiguration to the // configured admin nodes. It should be removed once we have a gossip-based node status @@ -522,92 +538,6 @@ impl Node { Ok(()) } - async fn upsert_node_config( - metadata_store_client: &MetadataStoreClient, - common_opts: &CommonOptions, - ) -> Result { - retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - let mut previous_node_generation = None; - metadata_store_client.read_modify_write(NODES_CONFIG_KEY.clone(), move |nodes_config| { - let mut nodes_config = if common_opts.allow_bootstrap { - debug!("allow-bootstrap is set to `true`, allowed to create initial NodesConfiguration!"); - nodes_config.unwrap_or_else(|| { - NodesConfiguration::new( - Version::INVALID, - common_opts.cluster_name().to_owned(), - ) - }) - } else { - nodes_config.ok_or(Error::MissingNodesConfiguration)? - }; - - // check whether we have registered before - let node_config = nodes_config - .find_node_by_name(common_opts.node_name()) - .cloned(); - - let my_node_config = if let Some(mut node_config) = node_config { - assert_eq!( - common_opts.node_name(), - node_config.name, - "node name must match" - ); - - if let Some(previous_node_generation) = previous_node_generation { - if node_config - .current_generation - .is_newer_than(previous_node_generation) - { - // detected a concurrent registration of the same node - return Err(Error::ConcurrentNodeRegistration( - common_opts.node_name().to_owned(), - )); - } - } else { - // remember the previous node generation to detect concurrent modifications - previous_node_generation = Some(node_config.current_generation); - } - - // update node_config - node_config.roles = common_opts.roles; - node_config.address = common_opts.advertised_address.clone(); - node_config.current_generation.bump_generation(); - - node_config - } else { - let plain_node_id = common_opts.force_node_id.unwrap_or_else(|| { - nodes_config - .max_plain_node_id() - .map(|n| n.next()) - .unwrap_or_default() - }); - - assert!( - nodes_config.find_node_by_id(plain_node_id).is_err(), - "duplicate plain node id '{plain_node_id}'" - ); - - let my_node_id = plain_node_id.with_generation(1); - - NodeConfig::new( - common_opts.node_name().to_owned(), - my_node_id, - common_opts.advertised_address.clone(), - common_opts.roles, - LogServerConfig::default(), - ) - }; - - nodes_config.upsert_node(my_node_config); - nodes_config.increment_version(); - - Ok(nodes_config) - }) - }) - .await - .map_err(|err| err.transpose()) - } - pub fn bifrost(&self) -> restate_bifrost::Bifrost { self.bifrost.handle() } diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 0ec5b7bc4..dcc0a8040 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -8,31 +8,45 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use anyhow::Context; use bytes::BytesMut; +use bytestring::ByteString; use enumset::EnumSet; use futures::stream::BoxStream; -use tokio_stream::StreamExt; -use tonic::{Request, Response, Status, Streaming}; - +use restate_core::metadata_store::{ + retry_on_network_error, MetadataStoreClient, Precondition, WriteError, +}; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; use restate_core::network::protobuf::node_ctl_svc::{ - GetMetadataRequest, GetMetadataResponse, IdentResponse, + GetMetadataRequest, GetMetadataResponse, IdentResponse, ProvisionClusterRequest, + ProvisionClusterResponse, ProvisionClusterResponseKind, }; use restate_core::network::ConnectionManager; use restate_core::network::{ProtocolError, TransportConnect}; use restate_core::task_center::TaskCenterMonitoring; use restate_core::{task_center, Metadata, MetadataKind, TargetVersion}; +use restate_types::config::{CommonOptions, Configuration}; use restate_types::health::Health; -use restate_types::nodes_config::Role; +use restate_types::logs::metadata::{DefaultProvider, Logs, LogsConfiguration}; +use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; +use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; +use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy}; +use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfiguration; use restate_types::protobuf::node::Message; -use restate_types::storage::StorageCodec; +use restate_types::storage::{StorageCodec, StorageEncode}; +use restate_types::{GenerationalNodeId, Version, Versioned}; +use std::num::NonZeroU16; +use tokio_stream::StreamExt; +use tonic::{Request, Response, Status, Streaming}; +use tracing::warn; pub struct NodeCtlSvcHandler { task_center: task_center::Handle, cluster_name: String, roles: EnumSet, health: Health, + metadata_store_client: MetadataStoreClient, } impl NodeCtlSvcHandler { @@ -41,14 +55,153 @@ impl NodeCtlSvcHandler { cluster_name: String, roles: EnumSet, health: Health, + metadata_store_client: MetadataStoreClient, ) -> Self { Self { task_center, cluster_name, roles, health, + metadata_store_client, + } + } + + async fn provision_metadata( + &self, + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, + ) -> anyhow::Result { + let (initial_nodes_configuration, initial_partition_table, initial_logs) = + Self::generate_initial_metadata(common_opts, cluster_configuration); + + let result = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + self.metadata_store_client + .provision(&initial_nodes_configuration) + }) + .await?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + self.write_initial_value_dont_fail_if_it_exists( + PARTITION_TABLE_KEY.clone(), + &initial_partition_table, + ) + }) + .await + .context("failed provisioning the initial partition table")?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + self.write_initial_value_dont_fail_if_it_exists( + BIFROST_CONFIG_KEY.clone(), + &initial_logs, + ) + }) + .await + .context("failed provisioning the initial logs configuration")?; + + Ok(result) + } + + pub fn create_initial_nodes_configuration(common_opts: &CommonOptions) -> NodesConfiguration { + let mut initial_nodes_configuration = + NodesConfiguration::new(Version::MIN, common_opts.cluster_name().to_owned()); + let node_config = NodeConfig::new( + common_opts.node_name().to_owned(), + common_opts + .force_node_id + .map(|force_node_id| force_node_id.with_generation(1)) + .unwrap_or(GenerationalNodeId::INITIAL_NODE_ID), + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ); + initial_nodes_configuration.upsert_node(node_config); + initial_nodes_configuration + } + + fn generate_initial_metadata( + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, + ) -> (NodesConfiguration, PartitionTable, Logs) { + let mut initial_partition_table_builder = PartitionTableBuilder::default(); + initial_partition_table_builder + .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) + .expect("Empty partition table should not have conflicts"); + initial_partition_table_builder + .set_replication_strategy(cluster_configuration.placement_strategy); + let initial_partition_table = initial_partition_table_builder.build(); + + let mut logs_builder = Logs::default().into_builder(); + logs_builder.set_configuration(LogsConfiguration::from( + cluster_configuration.log_provider.clone(), + )); + let initial_logs = logs_builder.build(); + + let initial_nodes_configuration = Self::create_initial_nodes_configuration(common_opts); + + ( + initial_nodes_configuration, + initial_partition_table, + initial_logs, + ) + } + + async fn write_initial_value_dont_fail_if_it_exists( + &self, + key: ByteString, + initial_value: &T, + ) -> Result<(), WriteError> { + match self + .metadata_store_client + .put(key, initial_value, Precondition::DoesNotExist) + .await + { + Ok(_) => Ok(()), + Err(WriteError::FailedPrecondition(_)) => { + // we might have failed on a previous attempt after writing this value; so let's continue + Ok(()) + } + Err(err) => Err(err), } } + + fn resolve_cluster_configuration( + config: &Configuration, + request: ProvisionClusterRequest, + ) -> anyhow::Result { + let num_partitions = request + .num_partitions + .map(|num_partitions| { + u16::try_from(num_partitions) + .context("Restate only supports running up to 65535 partitions.") + .and_then(|num_partitions| { + NonZeroU16::try_from(num_partitions) + .context("The number of partitions needs to be > 0") + }) + }) + .transpose()? + .unwrap_or(config.common.bootstrap_num_partitions); + let placement_strategy = request + .placement_strategy + .map(ReplicationStrategy::try_from) + .transpose()? + .unwrap_or_default(); + let log_provider = request + .log_provider + .map(DefaultProvider::try_from) + .unwrap_or_else(|| Ok(DefaultProvider::from_configuration(config)))?; + + Ok(ClusterConfiguration { + num_partitions, + placement_strategy, + log_provider, + }) + } + + fn convert_cluster_configuration( + _cluster_configuration: ClusterConfiguration, + ) -> ProtoClusterConfiguration { + todo!() + } } #[async_trait::async_trait] @@ -114,6 +267,45 @@ impl NodeCtlSvc for NodeCtlSvcHandler { encoded: encoded.freeze(), })) } + + async fn provision_cluster( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let config = Configuration::pinned(); + + let dry_run = request.dry_run; + let cluster_configuration = Self::resolve_cluster_configuration(&config, request) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + + if dry_run { + return Ok(Response::new(ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::DryRun.into(), + cluster_configuration: Some(Self::convert_cluster_configuration( + cluster_configuration, + )), + ..Default::default() + })); + } + + self.provision_metadata(&config.common, &cluster_configuration) + .await + .map_err(|err| Status::internal(err.to_string()))?; + + Ok(Response::new(ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Success.into(), + cluster_configuration: Some(Self::convert_cluster_configuration(cluster_configuration)), + ..Default::default() + })) + } +} + +#[derive(Clone, Debug)] +struct ClusterConfiguration { + num_partitions: NonZeroU16, + placement_strategy: ReplicationStrategy, + log_provider: DefaultProvider, } pub struct CoreNodeSvcHandler { diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 64ae8b330..5e208a572 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -15,8 +15,7 @@ use tokio::time::MissedTickBehavior; use tonic::codec::CompressionEncoding; use tracing::{debug, trace}; -use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics}; -use crate::network_server::state::NodeCtrlHandlerStateBuilder; +use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; @@ -28,6 +27,8 @@ use restate_types::protobuf::common::NodeStatus; use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler}; use super::pprof; +use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics}; +use crate::network_server::state::NodeCtrlHandlerStateBuilder; pub struct NetworkServer {} @@ -37,6 +38,7 @@ impl NetworkServer { connection_manager: ConnectionManager, mut server_builder: NetworkServerBuilder, options: CommonOptions, + metadata_store_client: MetadataStoreClient, ) -> Result<(), anyhow::Error> { // Configure Metric Exporter let mut state_builder = NodeCtrlHandlerStateBuilder::default(); @@ -103,6 +105,7 @@ impl NetworkServer { options.cluster_name().to_owned(), options.roles, health, + metadata_store_client, )) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 178c05d1c..5b65b8913 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -235,6 +235,13 @@ pub struct CommonOptions { /// /// The retry policy for node network error pub network_error_retry_policy: RetryPolicy, + + /// # Initialization timeout + /// + /// The timeout until the node gives up joining a cluster and initializing itself. + #[serde(with = "serde_with::As::")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub initialization_timeout: humantime::Duration, } impl CommonOptions { @@ -374,6 +381,7 @@ impl Default for CommonOptions { Some(15), Some(Duration::from_secs(5)), ), + initialization_timeout: Duration::from_secs(5 * 60).into(), } } } diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 1668910b5..6a0289b51 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -211,6 +211,20 @@ impl DefaultProvider { Self::Replicated(_) => ProviderKind::Replicated, } } + + pub fn from_configuration(configuration: &Configuration) -> Self { + match configuration.bifrost.default_provider { + #[cfg(any(test, feature = "memory-loglet"))] + ProviderKind::InMemory => DefaultProvider::InMemory, + ProviderKind::Local => DefaultProvider::Local, + ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(1).expect("1 is not zero"), + ), + }), + } + } } impl From for crate::protobuf::cluster::DefaultProvider { @@ -270,7 +284,9 @@ pub struct ReplicatedLogletConfig { pub replication_property: ReplicationProperty, } -#[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, Clone, Eq, PartialEq, Default, derive_more::From, serde::Serialize, serde::Deserialize, +)] pub struct LogsConfiguration { pub default_provider: DefaultProvider, } @@ -493,17 +509,7 @@ impl LogletConfig { impl Logs { pub fn from_configuration(config: &Configuration) -> Self { Self::with_logs_configuration(LogsConfiguration { - default_provider: match config.bifrost.default_provider { - #[cfg(any(test, feature = "memory-loglet"))] - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, - ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(1).expect("1 is not zero"), - ), - }), - }, + default_provider: DefaultProvider::from_configuration(config), }) } diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index 1c9a394be..88de877e3 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -85,6 +85,9 @@ impl FromStr for GenerationalNodeId { } impl GenerationalNodeId { + // Start with 1 as id to leave 0 as a special value in the future + pub const INITIAL_NODE_ID: GenerationalNodeId = GenerationalNodeId::new(1, 1); + pub fn decode(mut data: B) -> Self { // generational node id is stored as two u32s next to each other, each in big-endian. let plain_id = data.get_u32(); diff --git a/server/tests/cluster.rs b/server/tests/cluster.rs index 0503b8d5a..503b529cc 100644 --- a/server/tests/cluster.rs +++ b/server/tests/cluster.rs @@ -113,7 +113,7 @@ async fn cluster_name_mismatch() -> googletest::Result<()> { .await?; assert!(mismatch_node - .lines("Cluster name mismatch".parse()?) + .lines("trying to join wrong cluster".parse()?) .next() .await .is_some());