From 73d668197f9c6aa9193ea97c55b3bea11a0c4754 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 12 Dec 2024 14:08:58 +0100 Subject: [PATCH] Add cluster provision functionality After starting the metdata store service and the grpc server, the node will try to initialize itself by joining an existing cluster. Additionally each node exposes an provision cluster grpc call with which it is possible to provision a cluster (writing the initial NodesConfiguration, PartitionTable and Logs). Nodes can only join after the cluster is provisioned. This fixes #2409. --- Cargo.lock | 1 + .../src/cluster_controller/logs_controller.rs | 35 +-- .../admin/src/cluster_controller/service.rs | 32 +- .../src/cluster_controller/service/state.rs | 6 +- crates/bifrost/src/service.rs | 8 +- crates/core/build.rs | 1 + crates/core/protobuf/node_ctl_svc.proto | 25 ++ crates/core/src/metadata_store.rs | 141 ++++++++- .../core/src/metadata_store/providers/etcd.rs | 4 +- .../metadata_store/providers/objstore/glue.rs | 6 +- crates/core/src/metadata_store/test_util.rs | 6 +- crates/core/src/network/protobuf.rs | 28 ++ .../metadata-store/src/local/grpc/client.rs | 4 +- crates/metadata-store/src/local/store.rs | 4 +- crates/node/Cargo.toml | 1 + crates/node/src/init.rs | 283 ++++++++++++++++++ crates/node/src/lib.rs | 252 ++++++---------- .../src/network_server/grpc_svc_handler.rs | 204 ++++++++++++- crates/node/src/network_server/service.rs | 7 +- crates/types/src/config/common.rs | 8 + crates/types/src/logs/metadata.rs | 30 +- crates/types/src/node_id.rs | 3 + server/tests/cluster.rs | 2 +- 23 files changed, 823 insertions(+), 268 deletions(-) create mode 100644 crates/node/src/init.rs diff --git a/Cargo.lock b/Cargo.lock index 436138462..c8775bdb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6564,6 +6564,7 @@ dependencies = [ "async-trait", "axum", "bytes", + "bytestring", "codederror", "datafusion", "derive_builder", diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 0a74a7078..602c2f0a7 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -24,11 +24,8 @@ use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; -use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, -}; +use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError}; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; -use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; use restate_types::live::Pinned; @@ -639,9 +636,9 @@ struct LogsControllerInner { } impl LogsControllerInner { - fn new(configuration: LogsConfiguration, retry_policy: RetryPolicy) -> Self { + fn new(current_logs: Arc, retry_policy: RetryPolicy) -> Self { Self { - current_logs: Arc::new(Logs::with_logs_configuration(configuration)), + current_logs, logs_state: HashMap::with_hasher(Xxh3Builder::default()), logs_write_in_progress: None, retry_policy, @@ -925,26 +922,11 @@ pub struct LogsController { } impl LogsController { - pub async fn init( - configuration: &Configuration, + pub fn new( bifrost: Bifrost, metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, - ) -> Result { - // obtain the latest logs or init it with an empty logs variant - let logs = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) - }, - ) - .await?; - - let logs_configuration = logs.configuration().clone(); - metadata_writer.update(Arc::new(logs)).await?; - + ) -> Self { //todo(azmy): make configurable let retry_policy = RetryPolicy::exponential( Duration::from_millis(10), @@ -955,7 +937,10 @@ impl LogsController { let mut this = Self { effects: Some(Vec::new()), - inner: LogsControllerInner::new(logs_configuration, retry_policy), + inner: LogsControllerInner::new( + Metadata::with_current(|m| m.logs_snapshot()), + retry_policy, + ), bifrost, metadata_store_client, metadata_writer, @@ -964,7 +949,7 @@ impl LogsController { }; this.find_logs_tail(); - Ok(this) + this } pub fn find_logs_tail(&mut self) { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index e5694601f..9c68f9c91 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -37,7 +37,7 @@ use restate_types::partition_table::{ use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; -use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; +use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ @@ -296,8 +296,6 @@ impl Service { } pub async fn run(mut self) -> anyhow::Result<()> { - self.init_partition_table().await?; - let mut config_watcher = Configuration::watcher(); let mut cluster_state_watcher = self.cluster_state_refresher.cluster_state_watcher(); @@ -353,34 +351,6 @@ impl Service { } } - /// creates partition table iff it does not exist - async fn init_partition_table(&mut self) -> anyhow::Result<()> { - let configuration = self.configuration.live_load(); - - let partition_table = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - self.metadata_store_client - .get_or_insert(PARTITION_TABLE_KEY.clone(), || { - let partition_table = PartitionTable::with_equally_sized_partitions( - Version::MIN, - configuration.common.bootstrap_num_partitions.get(), - ); - - debug!("Initializing the partition table with '{partition_table:?}'"); - - partition_table - }) - }, - ) - .await?; - - self.metadata_writer - .update(Arc::new(partition_table)) - .await?; - - Ok(()) - } /// Triggers a snapshot creation for the given partition by issuing an RPC /// to the node hosting the active leader. async fn create_partition_snapshot( diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..4392744ca 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -161,13 +161,11 @@ where ) .await?; - let logs_controller = LogsController::init( - &configuration, + let logs_controller = LogsController::new( service.bifrost.clone(), service.metadata_store_client.clone(), service.metadata_writer.clone(), - ) - .await?; + ); let (log_trim_interval, log_trim_threshold) = create_log_trim_interval(&configuration.admin); diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index 3222cc5c2..54dc75168 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -78,10 +78,10 @@ impl BifrostService { pub fn handle(&self) -> Bifrost { self.bifrost.clone() } - /// Runs initialization phase, then returns a handle to join on shutdown. - /// In this phase the system should wait until this is completed before - /// continuing. For instance, a worker mark itself as `STARTING_UP` and not - /// accept any requests until this is completed. + + /// Runs initialization phase. In this phase the system should wait until this is completed + /// before continuing. For instance, a worker mark itself as `STARTING_UP` and not accept any + /// requests until this is completed. /// /// This requires to run within a task_center context. pub async fn start(self) -> anyhow::Result<()> { diff --git a/crates/core/build.rs b/crates/core/build.rs index 135e37a1d..9904963e6 100644 --- a/crates/core/build.rs +++ b/crates/core/build.rs @@ -21,6 +21,7 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .extern_path(".restate.node", "::restate_types::protobuf::node") .extern_path(".restate.common", "::restate_types::protobuf::common") + .extern_path(".restate.cluster", "::restate_types::protobuf::cluster") .compile_protos( &["./protobuf/node_ctl_svc.proto"], &["protobuf", "../types/protobuf"], diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index 74350bc4a..673d9e4dc 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -10,6 +10,7 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; +import "restate/cluster.proto"; import "restate/common.proto"; import "restate/node.proto"; @@ -20,6 +21,30 @@ service NodeCtlSvc { rpc GetIdent(google.protobuf.Empty) returns (IdentResponse); rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse); + + // Provision the Restate cluster on this node. + rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse); +} + +message ProvisionClusterRequest { + bool dry_run = 1; + optional uint32 num_partitions = 2; + optional restate.cluster.ReplicationStrategy placement_strategy = 3; + optional restate.cluster.DefaultProvider log_provider = 4; +} + +enum ProvisionClusterResponseKind { + ProvisionClusterResponseType_UNKNOWN = 0; + ERROR = 1; + SUCCESS = 2; + DRY_RUN = 3; +} + +message ProvisionClusterResponse { + ProvisionClusterResponseKind kind = 1; + // If there is an error, this field will be set. All other fields will be empty. + optional string error = 2; + optional restate.cluster.ClusterConfiguration cluster_configuration = 3; } message IdentResponse { diff --git a/crates/core/src/metadata_store.rs b/crates/core/src/metadata_store.rs index 3542226f5..c391cd080 100644 --- a/crates/core/src/metadata_store.rs +++ b/crates/core/src/metadata_store.rs @@ -17,8 +17,10 @@ use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use bytestring::ByteString; use restate_types::errors::GenericError; +use restate_types::metadata_store::keys::NODES_CONFIG_KEY; +use restate_types::nodes_config::NodesConfiguration; use restate_types::retries::RetryPolicy; -use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; +use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode, StorageEncodeError}; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; use std::future::Future; use std::sync::Arc; @@ -51,6 +53,29 @@ pub enum WriteError { Store(GenericError), } +#[derive(Debug, thiserror::Error)] +pub enum ProvisionError { + #[error("network error: {0}")] + Network(GenericError), + #[error("internal error: {0}")] + Internal(String), + #[error("codec error: {0}")] + Codec(GenericError), + #[error("store error: {0}")] + Store(GenericError), +} + +impl MetadataStoreClientError for ProvisionError { + fn is_network_error(&self) -> bool { + match self { + ProvisionError::Network(_) => true, + ProvisionError::Internal(_) | ProvisionError::Codec(_) | ProvisionError::Store(_) => { + false + } + } + } +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct VersionedValue { pub version: Version, @@ -100,6 +125,89 @@ pub trait MetadataStore { /// Deletes the key-value pair for the given key following the provided precondition. If the /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError>; + + /// Tries to provision the metadata store with the provided [`NodesConfiguration`]. Returns + /// `true` if the metadata store was newly provisioned. Returns `false` if the metadata store + /// is already provisioned. + async fn provision( + &self, + nodes_configuration: &NodesConfiguration, + ) -> Result; +} + +/// A provisioned metadata store does not need to be explicitly provisioned. Therefore, a provision +/// call is translated into a put command. +#[async_trait] +pub trait ProvisionedMetadataStore { + /// Gets the value and its current version for the given key. If key-value pair is not present, + /// then return [`None`]. + async fn get(&self, key: ByteString) -> Result, ReadError>; + + /// Gets the current version for the given key. If key-value pair is not present, then return + /// [`None`]. + async fn get_version(&self, key: ByteString) -> Result, ReadError>; + + /// Puts the versioned value under the given key following the provided precondition. If the + /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. + async fn put( + &self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), WriteError>; + + /// Deletes the key-value pair for the given key following the provided precondition. If the + /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. + async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError>; +} + +#[async_trait] +impl MetadataStore for T { + async fn get(&self, key: ByteString) -> Result, ReadError> { + self.get(key).await + } + + async fn get_version(&self, key: ByteString) -> Result, ReadError> { + self.get_version(key).await + } + + async fn put( + &self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), WriteError> { + self.put(key, value, precondition).await + } + + async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> { + self.delete(key, precondition).await + } + + async fn provision( + &self, + nodes_configuration: &NodesConfiguration, + ) -> Result { + let versioned_value = serialize_value(nodes_configuration) + .map_err(|err| ProvisionError::Codec(err.into()))?; + match self + .put( + NODES_CONFIG_KEY.clone(), + versioned_value, + Precondition::DoesNotExist, + ) + .await + { + Ok(()) => Ok(true), + Err(err) => match err { + WriteError::FailedPrecondition(_) => Ok(false), + WriteError::Network(err) => Err(ProvisionError::Network(err)), + WriteError::Internal(err) => Err(ProvisionError::Internal(err)), + WriteError::Codec(err) => Err(ProvisionError::Codec(err)), + WriteError::Store(err) => Err(ProvisionError::Store(err)), + }, + } + } } /// Metadata store client which allows storing [`Versioned`] values into a [`MetadataStore`]. @@ -170,18 +278,10 @@ impl MetadataStoreClient { where T: Versioned + StorageEncode, { - let version = value.version(); + let versioned_value = + serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?; - let mut buf = BytesMut::default(); - StorageCodec::encode(value, &mut buf).map_err(|err| WriteError::Codec(err.into()))?; - - self.inner - .put( - key, - VersionedValue::new(version, buf.freeze()), - precondition, - ) - .await + self.inner.put(key, versioned_value, precondition).await } /// Deletes the key-value pair for the given key following the provided precondition. If the @@ -285,6 +385,23 @@ impl MetadataStoreClient { } } } + + pub async fn provision( + &self, + nodes_configuration: &NodesConfiguration, + ) -> Result { + self.inner.provision(nodes_configuration).await + } +} + +pub fn serialize_value( + value: &T, +) -> Result { + let version = value.version(); + let mut buf = BytesMut::default(); + StorageCodec::encode(value, &mut buf)?; + let versioned_value = VersionedValue::new(version, buf.freeze()); + Ok(versioned_value) } #[derive(Debug, thiserror::Error)] diff --git a/crates/core/src/metadata_store/providers/etcd.rs b/crates/core/src/metadata_store/providers/etcd.rs index 164a772ee..49b85f11b 100644 --- a/crates/core/src/metadata_store/providers/etcd.rs +++ b/crates/core/src/metadata_store/providers/etcd.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use crate::metadata_store::{ - MetadataStore, Precondition, ReadError, Version, VersionedValue, WriteError, + Precondition, ProvisionedMetadataStore, ReadError, Version, VersionedValue, WriteError, }; use crate::network::net_util::CommonClientConnectionOptions; use anyhow::Context; @@ -145,7 +145,7 @@ impl EtcdMetadataStore { } #[async_trait::async_trait] -impl MetadataStore for EtcdMetadataStore { +impl ProvisionedMetadataStore for EtcdMetadataStore { async fn get(&self, key: ByteString) -> Result, ReadError> { let mut client = self.client.kv_client(); let mut response = client.get(key.into_bytes(), None).await?; diff --git a/crates/core/src/metadata_store/providers/objstore/glue.rs b/crates/core/src/metadata_store/providers/objstore/glue.rs index 940840b17..dc76e9404 100644 --- a/crates/core/src/metadata_store/providers/objstore/glue.rs +++ b/crates/core/src/metadata_store/providers/objstore/glue.rs @@ -17,7 +17,9 @@ use tracing::warn; use crate::cancellation_watcher; use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder; -use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError}; +use crate::metadata_store::{ + Precondition, ProvisionedMetadataStore, ReadError, VersionedValue, WriteError, +}; #[derive(Debug)] pub(crate) enum Commands { @@ -111,7 +113,7 @@ impl Client { } #[async_trait::async_trait] -impl MetadataStore for Client { +impl ProvisionedMetadataStore for Client { async fn get(&self, key: ByteString) -> Result, ReadError> { let (tx, rx) = tokio::sync::oneshot::channel(); diff --git a/crates/core/src/metadata_store/test_util.rs b/crates/core/src/metadata_store/test_util.rs index 94f2b5d69..ef01c2872 100644 --- a/crates/core/src/metadata_store/test_util.rs +++ b/crates/core/src/metadata_store/test_util.rs @@ -8,7 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError}; +use crate::metadata_store::{ + Precondition, ProvisionedMetadataStore, ReadError, VersionedValue, WriteError, +}; use bytestring::ByteString; use restate_types::Version; use std::collections::HashMap; @@ -47,7 +49,7 @@ impl InMemoryMetadataStore { } #[async_trait::async_trait] -impl MetadataStore for InMemoryMetadataStore { +impl ProvisionedMetadataStore for InMemoryMetadataStore { async fn get(&self, key: ByteString) -> Result, ReadError> { Ok(self.kv_pairs.lock().unwrap().get(&key).cloned()) } diff --git a/crates/core/src/network/protobuf.rs b/crates/core/src/network/protobuf.rs index bf64a3e30..011c7cbd4 100644 --- a/crates/core/src/network/protobuf.rs +++ b/crates/core/src/network/protobuf.rs @@ -9,10 +9,38 @@ // by the Apache License, Version 2.0. pub mod node_ctl_svc { + use restate_types::protobuf::cluster::ClusterConfiguration; + tonic::include_proto!("restate.node_ctl_svc"); pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("node_ctl_svc_descriptor"); + + impl ProvisionClusterResponse { + pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::DryRun.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + + pub fn err(message: impl ToString) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Error.into(), + error: Some(message.to_string()), + ..Default::default() + } + } + + pub fn success(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Success.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + } } pub mod core_node_svc { diff --git a/crates/metadata-store/src/local/grpc/client.rs b/crates/metadata-store/src/local/grpc/client.rs index 4d5362281..7e52e9994 100644 --- a/crates/metadata-store/src/local/grpc/client.rs +++ b/crates/metadata-store/src/local/grpc/client.rs @@ -14,7 +14,7 @@ use tonic::transport::Channel; use tonic::{Code, Status}; use restate_core::metadata_store::{ - MetadataStore, Precondition, ReadError, VersionedValue, WriteError, + Precondition, ProvisionedMetadataStore, ReadError, VersionedValue, WriteError, }; use restate_core::network::net_util::create_tonic_channel_from_advertised_address; use restate_core::network::net_util::CommonClientConnectionOptions; @@ -45,7 +45,7 @@ impl LocalMetadataStoreClient { } #[async_trait] -impl MetadataStore for LocalMetadataStoreClient { +impl ProvisionedMetadataStore for LocalMetadataStoreClient { async fn get(&self, key: ByteString) -> Result, ReadError> { let response = self .svc_client diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 4e40403ef..7b8877a75 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -30,7 +30,7 @@ use tracing::{debug, trace}; pub type RequestSender = mpsc::Sender; pub type RequestReceiver = mpsc::Receiver; -type Result = std::result::Result; +type Result = std::result::Result; const DB_NAME: &str = "local-metadata-store"; const KV_PAIRS: &str = "kv_pairs"; @@ -163,7 +163,7 @@ impl LocalMetadataStore { request = self.request_rx.recv() => { let request = request.expect("receiver should not be closed since we own one clone."); self.handle_request(request).await; - } + }, _ = cancellation_watcher() => { break; }, diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 5583799ab..9881f52e6 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -38,6 +38,7 @@ arc-swap = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } bytes = { workspace = true } +bytestring = { workspace = true } codederror = { workspace = true } datafusion = { workspace = true } derive_builder = { workspace = true } diff --git a/crates/node/src/init.rs b/crates/node/src/init.rs new file mode 100644 index 000000000..789d3c171 --- /dev/null +++ b/crates/node/src/init.rs @@ -0,0 +1,283 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use anyhow::bail; +use restate_core::metadata_store::{MetadataStoreClient, MetadataStoreClientError, ReadWriteError}; +use restate_core::{ + cancellation_watcher, Metadata, MetadataWriter, ShutdownError, SyncError, TargetVersion, +}; +use restate_types::config::{CommonOptions, Configuration}; +use restate_types::metadata_store::keys::NODES_CONFIG_KEY; +use restate_types::net::metadata::MetadataKind; +use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration}; +use restate_types::retries::RetryPolicy; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tracing::{debug, info, trace, warn}; + +#[derive(Debug, thiserror::Error)] +enum JoinError { + #[error("missing nodes configuration")] + MissingNodesConfiguration, + #[error("detected a concurrent registration for node '{0}'")] + ConcurrentNodeRegistration(String), + #[error("failed writing to metadata store: {0}")] + MetadataStore(#[from] ReadWriteError), + #[error("trying to join wrong cluster; expected '{expected_cluster_name}', actual '{actual_cluster_name}'")] + ClusterMismatch { + expected_cluster_name: String, + actual_cluster_name: String, + }, +} + +pub struct NodeInit<'a> { + metadata_store_client: &'a MetadataStoreClient, + metadata_writer: &'a MetadataWriter, +} + +impl<'a> NodeInit<'a> { + pub fn new( + metadata_store_client: &'a MetadataStoreClient, + metadata_writer: &'a MetadataWriter, + ) -> Self { + Self { + metadata_store_client, + metadata_writer, + } + } + + pub async fn init(self) -> anyhow::Result<()> { + let config = Configuration::pinned().into_arc(); + + let join_cluster = Self::join_cluster(self.metadata_store_client, &config.common); + + let nodes_configuration = tokio::select! { + _ = cancellation_watcher() => { + return Err(ShutdownError.into()); + }, + result = join_cluster => { + let nodes_configuration = result?; + info!("Successfully joined cluster"); + nodes_configuration + }, + }; + + // Find my node in nodes configuration. + let my_node_config = nodes_configuration + .find_node_by_name(config.common.node_name()) + .expect("node config should have been upserted"); + + let my_node_id = my_node_config.current_generation; + + // Safety checks, same node (if set)? + if config + .common + .force_node_id + .is_some_and(|n| n != my_node_id.as_plain()) + { + bail!( + format!( + "Node ID mismatch: configured node ID is {}, but the nodes configuration contains {}", + config.common.force_node_id.unwrap(), + my_node_id.as_plain() + )); + } + + // Same cluster? + if config.common.cluster_name() != nodes_configuration.cluster_name() { + bail!( + format!( + "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", + config.common.cluster_name(), + nodes_configuration.cluster_name() + )); + } + + info!( + roles = %my_node_config.roles, + address = %my_node_config.address, + "My Node ID is {}", my_node_config.current_generation); + + self.metadata_writer + .update(Arc::new(nodes_configuration)) + .await?; + + // My Node ID is set + self.metadata_writer.set_my_node_id(my_node_id); + restate_tracing_instrumentation::set_global_node_id(my_node_id); + + self.sync_metadata().await; + + Ok(()) + } + + async fn sync_metadata(&self) { + // fetch the latest metadata + let metadata = Metadata::current(); + + let config = Configuration::pinned(); + + let retry_policy = config.common.network_error_retry_policy.clone(); + + if let Err(err) = retry_policy + .retry_if( + || async { + metadata + .sync(MetadataKind::Schema, TargetVersion::Latest) + .await?; + metadata + .sync(MetadataKind::PartitionTable, TargetVersion::Latest) + .await?; + metadata + .sync(MetadataKind::Logs, TargetVersion::Latest) + .await + }, + |err| match err { + SyncError::MetadataStore(err) => err.is_network_error(), + SyncError::Shutdown(_) => false, + }, + ) + .await + { + warn!("Failed to fetch the latest metadata when initializing the node: {err}"); + } + } + + async fn join_cluster( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + ) -> anyhow::Result { + info!("Trying to join cluster '{}'", common_opts.cluster_name()); + + // todo make configurable + // Never give up trying to join the cluster. Users of this struct will set a timeout if + // needed. + let join_retry = RetryPolicy::exponential( + Duration::from_millis(100), + 2.0, + None, + Some(Duration::from_secs(5)), + ); + + let join_start = Instant::now(); + let mut printed_provision_message = false; + + join_retry + .retry_if( + || Self::join_cluster_inner(metadata_store_client, common_opts), + |err| { + if join_start.elapsed() < Duration::from_secs(5) { + trace!("Failed joining the cluster: {err}; retrying"); + } else if !printed_provision_message { + info!("Couldn't join the cluster yet. Don't forget to provision it!"); + printed_provision_message = true; + } else { + debug!("Failed joining cluster: {err}; retrying"); + } + match err { + JoinError::MissingNodesConfiguration => true, + JoinError::ConcurrentNodeRegistration(_) => false, + JoinError::MetadataStore(err) => err.is_network_error(), + JoinError::ClusterMismatch { .. } => false, + } + }, + ) + .await + .map_err(Into::into) + } + + async fn join_cluster_inner( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + ) -> Result { + let mut previous_node_generation = None; + + metadata_store_client + .read_modify_write::( + NODES_CONFIG_KEY.clone(), + move |nodes_config| { + let mut nodes_config = + nodes_config.ok_or(JoinError::MissingNodesConfiguration)?; + + // check that we are joining the right cluster + if nodes_config.cluster_name() != common_opts.cluster_name() { + return Err(JoinError::ClusterMismatch { + expected_cluster_name: common_opts.cluster_name().to_owned(), + actual_cluster_name: nodes_config.cluster_name().to_owned(), + }); + } + + // check whether we have registered before + let node_config = nodes_config + .find_node_by_name(common_opts.node_name()) + .cloned(); + + let my_node_config = if let Some(mut node_config) = node_config { + assert_eq!( + common_opts.node_name(), + node_config.name, + "node name must match" + ); + + if let Some(previous_node_generation) = previous_node_generation { + if node_config + .current_generation + .is_newer_than(previous_node_generation) + { + // detected a concurrent registration of the same node + return Err(JoinError::ConcurrentNodeRegistration( + common_opts.node_name().to_owned(), + )); + } + } else { + // remember the previous node generation to detect concurrent modifications + previous_node_generation = Some(node_config.current_generation); + } + + // update node_config + node_config.roles = common_opts.roles; + node_config.address = common_opts.advertised_address.clone(); + node_config.current_generation.bump_generation(); + + node_config + } else { + let plain_node_id = common_opts.force_node_id.unwrap_or_else(|| { + nodes_config + .max_plain_node_id() + .map(|n| n.next()) + .unwrap_or_default() + }); + + assert!( + nodes_config.find_node_by_id(plain_node_id).is_err(), + "duplicate plain node id '{plain_node_id}'" + ); + + let my_node_id = plain_node_id.with_generation(1); + + NodeConfig::new( + common_opts.node_name().to_owned(), + my_node_id, + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ) + }; + + nodes_config.upsert_node(my_node_config); + nodes_config.increment_version(); + + Ok(nodes_config) + }, + ) + .await + .map_err(|err| err.transpose()) + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..f21016af6 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -9,46 +9,47 @@ // by the Apache License, Version 2.0. mod cluster_marker; +mod init; mod network_server; mod roles; -use std::sync::Arc; - -use tracing::{debug, error, info, trace}; +use anyhow::Context; +use tonic::{Code, IntoRequest}; +use tracing::{debug, error, info, trace, warn}; +use crate::cluster_marker::ClusterValidationError; +use crate::init::NodeInit; +use crate::network_server::NetworkServer; +use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole}; use codederror::CodedError; use restate_bifrost::BifrostService; -use restate_core::metadata_store::{retry_on_network_error, ReadWriteError}; +use restate_core::metadata_store::ReadWriteError; +use restate_core::network::net_util::create_tonic_channel_from_advertised_address; +use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::network::protobuf::node_ctl_svc::{ + ProvisionClusterRequest, ProvisionClusterResponseKind, +}; use restate_core::network::{ GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking, }; use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher}; -use restate_core::{cancellation_watcher, TaskKind}; -use restate_core::{ - spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager, TargetVersion, - TaskCenter, -}; +use restate_core::{cancellation_watcher, Metadata, TaskKind}; +use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter}; #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; -use restate_types::config::{CommonOptions, Configuration}; +use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::health::Health; use restate_types::live::Live; #[cfg(feature = "replicated-loglet")] use restate_types::logs::RecordCache; -use restate_types::metadata_store::keys::NODES_CONFIG_KEY; -use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; +use restate_types::nodes_config::Role; use restate_types::protobuf::common::{ AdminStatus, IngressStatus, LogServerStatus, MetadataServerStatus, NodeRpcStatus, NodeStatus, WorkerStatus, }; -use restate_types::Version; - -use crate::cluster_marker::ClusterValidationError; -use crate::network_server::NetworkServer; -use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole}; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -331,86 +332,101 @@ impl Node { let health = self.health.clone(); let common_options = config.common.clone(); let connection_manager = self.networking.connection_manager().clone(); + let metadata_store_client = self.metadata_store_client.clone(); async move { NetworkServer::run( health, connection_manager, self.server_builder, common_options, + metadata_store_client, ) .await?; Ok(()) } })?; + // wait until the node rpc server is up and running before continuing + self.health + .node_rpc_status() + .wait_for_value(NodeRpcStatus::Ready) + .await; + if let Some(metadata_store) = self.metadata_store_role { - TaskCenter::spawn( - TaskKind::MetadataStore, - "local-metadata-store", - async move { - metadata_store.run().await?; - Ok(()) - }, - )?; + TaskCenter::spawn(TaskKind::MetadataStore, "metadata-store", async move { + metadata_store.run().await?; + Ok(()) + })?; } - // Start partition routing information refresher - spawn_partition_routing_refresher(self.partition_routing_refresher)?; - - let nodes_config = - Self::upsert_node_config(&self.metadata_store_client, &config.common).await?; - metadata_writer.update(Arc::new(nodes_config)).await?; - if config.common.allow_bootstrap { - // todo write bootstrap state + TaskCenter::spawn(TaskKind::SystemBoot, "auto-provision-cluster", { + let channel = create_tonic_channel_from_advertised_address( + config.common.advertised_address.clone(), + &config.networking, + ); + let client = NodeCtlSvcClient::new(channel); + let retry_policy = config.common.network_error_retry_policy.clone(); + async move { + let response = retry_policy + .retry_if( + || { + let mut client = client.clone(); + async move { + client + .provision_cluster( + ProvisionClusterRequest { + dry_run: false, + ..Default::default() + } + .into_request(), + ) + .await + } + }, + |status| status.code() == Code::Unavailable, + ) + .await; + + match response { + Ok(response) => { + if response.into_inner().kind() == ProvisionClusterResponseKind::DryRun + { + debug!("The cluster is already provisioned."); + } + } + Err(err) => { + warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); + } + } + + Ok(()) + } + })?; } - // fetch the latest schema information - metadata - .sync(MetadataKind::Schema, TargetVersion::Latest) - .await?; + let initialization_timeout = config.common.initialization_timeout.into(); - let nodes_config = metadata.nodes_config_ref(); + tokio::time::timeout( + initialization_timeout, + NodeInit::new( + &self.metadata_store_client, + &metadata_writer, + ) + .init(), + ) + .await + .context("Giving up trying to initialize the node. Make sure that it can reach the metadata store and don't forget to provision the cluster on a fresh start.")? + .context("Failed initializing the node.")?; - // Find my node in nodes configuration. + let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + let my_node_id = Metadata::with_current(|m| m.my_node_id()); let my_node_config = nodes_config - .find_node_by_name(config.common.node_name()) - .expect("node config should have been upserted"); - - let my_node_id = my_node_config.current_generation; - - // Safety checks, same node (if set)? - if config - .common - .force_node_id - .is_some_and(|n| n != my_node_id.as_plain()) - { - return Err(Error::SafetyCheck( - format!( - "Node ID mismatch: configured node ID is {}, but the nodes configuration contains {}", - config.common.force_node_id.unwrap(), - my_node_id.as_plain() - )))?; - } - - // Same cluster? - if config.common.cluster_name() != nodes_config.cluster_name() { - return Err(Error::SafetyCheck( - format!( - "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", - config.common.cluster_name(), - nodes_config.cluster_name() - )))?; - } - - // My Node ID is set - metadata_writer.set_my_node_id(my_node_id); - restate_tracing_instrumentation::set_global_node_id(my_node_id); + .find_node_by_id(my_node_id) + .expect("should be present"); - info!( - roles = %my_node_config.roles, - address = %my_node_config.address, - "My Node ID is {}", my_node_config.current_generation); + // Start partition routing information refresher + spawn_partition_routing_refresher(self.partition_routing_refresher)?; // todo this is a temporary solution to announce the updated NodesConfiguration to the // configured admin nodes. It should be removed once we have a gossip-based node status @@ -522,92 +538,6 @@ impl Node { Ok(()) } - async fn upsert_node_config( - metadata_store_client: &MetadataStoreClient, - common_opts: &CommonOptions, - ) -> Result { - retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - let mut previous_node_generation = None; - metadata_store_client.read_modify_write(NODES_CONFIG_KEY.clone(), move |nodes_config| { - let mut nodes_config = if common_opts.allow_bootstrap { - debug!("allow-bootstrap is set to `true`, allowed to create initial NodesConfiguration!"); - nodes_config.unwrap_or_else(|| { - NodesConfiguration::new( - Version::INVALID, - common_opts.cluster_name().to_owned(), - ) - }) - } else { - nodes_config.ok_or(Error::MissingNodesConfiguration)? - }; - - // check whether we have registered before - let node_config = nodes_config - .find_node_by_name(common_opts.node_name()) - .cloned(); - - let my_node_config = if let Some(mut node_config) = node_config { - assert_eq!( - common_opts.node_name(), - node_config.name, - "node name must match" - ); - - if let Some(previous_node_generation) = previous_node_generation { - if node_config - .current_generation - .is_newer_than(previous_node_generation) - { - // detected a concurrent registration of the same node - return Err(Error::ConcurrentNodeRegistration( - common_opts.node_name().to_owned(), - )); - } - } else { - // remember the previous node generation to detect concurrent modifications - previous_node_generation = Some(node_config.current_generation); - } - - // update node_config - node_config.roles = common_opts.roles; - node_config.address = common_opts.advertised_address.clone(); - node_config.current_generation.bump_generation(); - - node_config - } else { - let plain_node_id = common_opts.force_node_id.unwrap_or_else(|| { - nodes_config - .max_plain_node_id() - .map(|n| n.next()) - .unwrap_or_default() - }); - - assert!( - nodes_config.find_node_by_id(plain_node_id).is_err(), - "duplicate plain node id '{plain_node_id}'" - ); - - let my_node_id = plain_node_id.with_generation(1); - - NodeConfig::new( - common_opts.node_name().to_owned(), - my_node_id, - common_opts.advertised_address.clone(), - common_opts.roles, - LogServerConfig::default(), - ) - }; - - nodes_config.upsert_node(my_node_config); - nodes_config.increment_version(); - - Ok(nodes_config) - }) - }) - .await - .map_err(|err| err.transpose()) - } - pub fn bifrost(&self) -> restate_bifrost::Bifrost { self.bifrost.handle() } diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 0ec5b7bc4..dcc0a8040 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -8,31 +8,45 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use anyhow::Context; use bytes::BytesMut; +use bytestring::ByteString; use enumset::EnumSet; use futures::stream::BoxStream; -use tokio_stream::StreamExt; -use tonic::{Request, Response, Status, Streaming}; - +use restate_core::metadata_store::{ + retry_on_network_error, MetadataStoreClient, Precondition, WriteError, +}; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; use restate_core::network::protobuf::node_ctl_svc::{ - GetMetadataRequest, GetMetadataResponse, IdentResponse, + GetMetadataRequest, GetMetadataResponse, IdentResponse, ProvisionClusterRequest, + ProvisionClusterResponse, ProvisionClusterResponseKind, }; use restate_core::network::ConnectionManager; use restate_core::network::{ProtocolError, TransportConnect}; use restate_core::task_center::TaskCenterMonitoring; use restate_core::{task_center, Metadata, MetadataKind, TargetVersion}; +use restate_types::config::{CommonOptions, Configuration}; use restate_types::health::Health; -use restate_types::nodes_config::Role; +use restate_types::logs::metadata::{DefaultProvider, Logs, LogsConfiguration}; +use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; +use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; +use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy}; +use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfiguration; use restate_types::protobuf::node::Message; -use restate_types::storage::StorageCodec; +use restate_types::storage::{StorageCodec, StorageEncode}; +use restate_types::{GenerationalNodeId, Version, Versioned}; +use std::num::NonZeroU16; +use tokio_stream::StreamExt; +use tonic::{Request, Response, Status, Streaming}; +use tracing::warn; pub struct NodeCtlSvcHandler { task_center: task_center::Handle, cluster_name: String, roles: EnumSet, health: Health, + metadata_store_client: MetadataStoreClient, } impl NodeCtlSvcHandler { @@ -41,14 +55,153 @@ impl NodeCtlSvcHandler { cluster_name: String, roles: EnumSet, health: Health, + metadata_store_client: MetadataStoreClient, ) -> Self { Self { task_center, cluster_name, roles, health, + metadata_store_client, + } + } + + async fn provision_metadata( + &self, + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, + ) -> anyhow::Result { + let (initial_nodes_configuration, initial_partition_table, initial_logs) = + Self::generate_initial_metadata(common_opts, cluster_configuration); + + let result = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + self.metadata_store_client + .provision(&initial_nodes_configuration) + }) + .await?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + self.write_initial_value_dont_fail_if_it_exists( + PARTITION_TABLE_KEY.clone(), + &initial_partition_table, + ) + }) + .await + .context("failed provisioning the initial partition table")?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + self.write_initial_value_dont_fail_if_it_exists( + BIFROST_CONFIG_KEY.clone(), + &initial_logs, + ) + }) + .await + .context("failed provisioning the initial logs configuration")?; + + Ok(result) + } + + pub fn create_initial_nodes_configuration(common_opts: &CommonOptions) -> NodesConfiguration { + let mut initial_nodes_configuration = + NodesConfiguration::new(Version::MIN, common_opts.cluster_name().to_owned()); + let node_config = NodeConfig::new( + common_opts.node_name().to_owned(), + common_opts + .force_node_id + .map(|force_node_id| force_node_id.with_generation(1)) + .unwrap_or(GenerationalNodeId::INITIAL_NODE_ID), + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ); + initial_nodes_configuration.upsert_node(node_config); + initial_nodes_configuration + } + + fn generate_initial_metadata( + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, + ) -> (NodesConfiguration, PartitionTable, Logs) { + let mut initial_partition_table_builder = PartitionTableBuilder::default(); + initial_partition_table_builder + .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) + .expect("Empty partition table should not have conflicts"); + initial_partition_table_builder + .set_replication_strategy(cluster_configuration.placement_strategy); + let initial_partition_table = initial_partition_table_builder.build(); + + let mut logs_builder = Logs::default().into_builder(); + logs_builder.set_configuration(LogsConfiguration::from( + cluster_configuration.log_provider.clone(), + )); + let initial_logs = logs_builder.build(); + + let initial_nodes_configuration = Self::create_initial_nodes_configuration(common_opts); + + ( + initial_nodes_configuration, + initial_partition_table, + initial_logs, + ) + } + + async fn write_initial_value_dont_fail_if_it_exists( + &self, + key: ByteString, + initial_value: &T, + ) -> Result<(), WriteError> { + match self + .metadata_store_client + .put(key, initial_value, Precondition::DoesNotExist) + .await + { + Ok(_) => Ok(()), + Err(WriteError::FailedPrecondition(_)) => { + // we might have failed on a previous attempt after writing this value; so let's continue + Ok(()) + } + Err(err) => Err(err), } } + + fn resolve_cluster_configuration( + config: &Configuration, + request: ProvisionClusterRequest, + ) -> anyhow::Result { + let num_partitions = request + .num_partitions + .map(|num_partitions| { + u16::try_from(num_partitions) + .context("Restate only supports running up to 65535 partitions.") + .and_then(|num_partitions| { + NonZeroU16::try_from(num_partitions) + .context("The number of partitions needs to be > 0") + }) + }) + .transpose()? + .unwrap_or(config.common.bootstrap_num_partitions); + let placement_strategy = request + .placement_strategy + .map(ReplicationStrategy::try_from) + .transpose()? + .unwrap_or_default(); + let log_provider = request + .log_provider + .map(DefaultProvider::try_from) + .unwrap_or_else(|| Ok(DefaultProvider::from_configuration(config)))?; + + Ok(ClusterConfiguration { + num_partitions, + placement_strategy, + log_provider, + }) + } + + fn convert_cluster_configuration( + _cluster_configuration: ClusterConfiguration, + ) -> ProtoClusterConfiguration { + todo!() + } } #[async_trait::async_trait] @@ -114,6 +267,45 @@ impl NodeCtlSvc for NodeCtlSvcHandler { encoded: encoded.freeze(), })) } + + async fn provision_cluster( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let config = Configuration::pinned(); + + let dry_run = request.dry_run; + let cluster_configuration = Self::resolve_cluster_configuration(&config, request) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + + if dry_run { + return Ok(Response::new(ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::DryRun.into(), + cluster_configuration: Some(Self::convert_cluster_configuration( + cluster_configuration, + )), + ..Default::default() + })); + } + + self.provision_metadata(&config.common, &cluster_configuration) + .await + .map_err(|err| Status::internal(err.to_string()))?; + + Ok(Response::new(ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Success.into(), + cluster_configuration: Some(Self::convert_cluster_configuration(cluster_configuration)), + ..Default::default() + })) + } +} + +#[derive(Clone, Debug)] +struct ClusterConfiguration { + num_partitions: NonZeroU16, + placement_strategy: ReplicationStrategy, + log_provider: DefaultProvider, } pub struct CoreNodeSvcHandler { diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 64ae8b330..5e208a572 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -15,8 +15,7 @@ use tokio::time::MissedTickBehavior; use tonic::codec::CompressionEncoding; use tracing::{debug, trace}; -use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics}; -use crate::network_server::state::NodeCtrlHandlerStateBuilder; +use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; @@ -28,6 +27,8 @@ use restate_types::protobuf::common::NodeStatus; use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler}; use super::pprof; +use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics}; +use crate::network_server::state::NodeCtrlHandlerStateBuilder; pub struct NetworkServer {} @@ -37,6 +38,7 @@ impl NetworkServer { connection_manager: ConnectionManager, mut server_builder: NetworkServerBuilder, options: CommonOptions, + metadata_store_client: MetadataStoreClient, ) -> Result<(), anyhow::Error> { // Configure Metric Exporter let mut state_builder = NodeCtrlHandlerStateBuilder::default(); @@ -103,6 +105,7 @@ impl NetworkServer { options.cluster_name().to_owned(), options.roles, health, + metadata_store_client, )) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 178c05d1c..5b65b8913 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -235,6 +235,13 @@ pub struct CommonOptions { /// /// The retry policy for node network error pub network_error_retry_policy: RetryPolicy, + + /// # Initialization timeout + /// + /// The timeout until the node gives up joining a cluster and initializing itself. + #[serde(with = "serde_with::As::")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub initialization_timeout: humantime::Duration, } impl CommonOptions { @@ -374,6 +381,7 @@ impl Default for CommonOptions { Some(15), Some(Duration::from_secs(5)), ), + initialization_timeout: Duration::from_secs(5 * 60).into(), } } } diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 1668910b5..6a0289b51 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -211,6 +211,20 @@ impl DefaultProvider { Self::Replicated(_) => ProviderKind::Replicated, } } + + pub fn from_configuration(configuration: &Configuration) -> Self { + match configuration.bifrost.default_provider { + #[cfg(any(test, feature = "memory-loglet"))] + ProviderKind::InMemory => DefaultProvider::InMemory, + ProviderKind::Local => DefaultProvider::Local, + ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(1).expect("1 is not zero"), + ), + }), + } + } } impl From for crate::protobuf::cluster::DefaultProvider { @@ -270,7 +284,9 @@ pub struct ReplicatedLogletConfig { pub replication_property: ReplicationProperty, } -#[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, Clone, Eq, PartialEq, Default, derive_more::From, serde::Serialize, serde::Deserialize, +)] pub struct LogsConfiguration { pub default_provider: DefaultProvider, } @@ -493,17 +509,7 @@ impl LogletConfig { impl Logs { pub fn from_configuration(config: &Configuration) -> Self { Self::with_logs_configuration(LogsConfiguration { - default_provider: match config.bifrost.default_provider { - #[cfg(any(test, feature = "memory-loglet"))] - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, - ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(1).expect("1 is not zero"), - ), - }), - }, + default_provider: DefaultProvider::from_configuration(config), }) } diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index 1c9a394be..88de877e3 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -85,6 +85,9 @@ impl FromStr for GenerationalNodeId { } impl GenerationalNodeId { + // Start with 1 as id to leave 0 as a special value in the future + pub const INITIAL_NODE_ID: GenerationalNodeId = GenerationalNodeId::new(1, 1); + pub fn decode(mut data: B) -> Self { // generational node id is stored as two u32s next to each other, each in big-endian. let plain_id = data.get_u32(); diff --git a/server/tests/cluster.rs b/server/tests/cluster.rs index 0503b8d5a..503b529cc 100644 --- a/server/tests/cluster.rs +++ b/server/tests/cluster.rs @@ -113,7 +113,7 @@ async fn cluster_name_mismatch() -> googletest::Result<()> { .await?; assert!(mismatch_node - .lines("Cluster name mismatch".parse()?) + .lines("trying to join wrong cluster".parse()?) .next() .await .is_some());