From 656e68388f9e389da8b1d8d0452a16a18dcd256c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 12 Dec 2024 14:08:58 +0100 Subject: [PATCH] Add cluster provision functionality After starting the metdata store service and the grpc server, the node will try to initialize itself by joining an existing cluster or provisioning the a new cluster. Either of the two actions will return the NodeId of the node. Moreover, the initialization procedure makes sure that the latest metadata is fetched from the metadata store. The cluster provisioning will ensure that all relevant metadata has been written to the metadata store. This includes the initial NodesConfiguration, a PartitionTable that includes the number of partitions and an empty Logs that contains the default log provider configuration. This fixes #2409. --- Cargo.lock | 1 + .../src/cluster_controller/logs_controller.rs | 35 +- .../admin/src/cluster_controller/service.rs | 32 +- .../src/cluster_controller/service/state.rs | 6 +- crates/bifrost/src/service.rs | 8 +- crates/core/build.rs | 1 + crates/core/protobuf/node_ctl_svc.proto | 25 + crates/core/src/network/protobuf.rs | 28 + crates/metadata-store/src/local/mod.rs | 1 + crates/metadata-store/src/local/service.rs | 6 +- crates/metadata-store/src/local/store.rs | 208 +++++- crates/node/Cargo.toml | 1 + crates/node/src/init.rs | 651 ++++++++++++++++++ crates/node/src/lib.rs | 286 ++++---- .../src/network_server/grpc_svc_handler.rs | 33 + crates/node/src/network_server/service.rs | 7 +- crates/types/src/config/common.rs | 14 + crates/types/src/logs/metadata.rs | 30 +- crates/types/src/node_id.rs | 3 + server/tests/cluster.rs | 2 +- 20 files changed, 1133 insertions(+), 245 deletions(-) create mode 100644 crates/node/src/init.rs diff --git a/Cargo.lock b/Cargo.lock index 720d0df46..203dbf849 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6564,6 +6564,7 @@ dependencies = [ "async-trait", "axum", "bytes", + "bytestring", "codederror", "datafusion", "derive_builder", diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 226fef1af..9999a4ff1 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -24,11 +24,8 @@ use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; -use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, -}; +use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError}; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; -use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; use restate_types::live::Pinned; @@ -642,9 +639,9 @@ struct LogsControllerInner { } impl LogsControllerInner { - fn new(configuration: LogsConfiguration, retry_policy: RetryPolicy) -> Self { + fn new(current_logs: Arc, retry_policy: RetryPolicy) -> Self { Self { - current_logs: Arc::new(Logs::with_logs_configuration(configuration)), + current_logs, logs_state: HashMap::with_hasher(Xxh3Builder::default()), logs_write_in_progress: None, retry_policy, @@ -928,26 +925,11 @@ pub struct LogsController { } impl LogsController { - pub async fn init( - configuration: &Configuration, + pub fn new( bifrost: Bifrost, metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, - ) -> Result { - // obtain the latest logs or init it with an empty logs variant - let logs = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) - }, - ) - .await?; - - let logs_configuration = logs.configuration().clone(); - metadata_writer.update(Arc::new(logs)).await?; - + ) -> Self { //todo(azmy): make configurable let retry_policy = RetryPolicy::exponential( Duration::from_millis(10), @@ -958,7 +940,10 @@ impl LogsController { let mut this = Self { effects: Some(Vec::new()), - inner: LogsControllerInner::new(logs_configuration, retry_policy), + inner: LogsControllerInner::new( + Metadata::with_current(|m| m.logs_snapshot()), + retry_policy, + ), bifrost, metadata_store_client, metadata_writer, @@ -967,7 +952,7 @@ impl LogsController { }; this.find_logs_tail(); - Ok(this) + this } pub fn find_logs_tail(&mut self) { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index a7e393196..19efc8d20 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -36,7 +36,7 @@ use tonic::codec::CompressionEncoding; use tracing::{debug, info}; use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; -use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; +use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ @@ -295,8 +295,6 @@ impl Service { } pub async fn run(mut self) -> anyhow::Result<()> { - self.init_partition_table().await?; - let mut config_watcher = Configuration::watcher(); let mut cluster_state_watcher = self.cluster_state_refresher.cluster_state_watcher(); @@ -352,34 +350,6 @@ impl Service { } } - /// creates partition table iff it does not exist - async fn init_partition_table(&mut self) -> anyhow::Result<()> { - let configuration = self.configuration.live_load(); - - let partition_table = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - self.metadata_store_client - .get_or_insert(PARTITION_TABLE_KEY.clone(), || { - let partition_table = PartitionTable::with_equally_sized_partitions( - Version::MIN, - configuration.common.bootstrap_num_partitions.get(), - ); - - debug!("Initializing the partition table with '{partition_table:?}'"); - - partition_table - }) - }, - ) - .await?; - - self.metadata_writer - .update(Arc::new(partition_table)) - .await?; - - Ok(()) - } /// Triggers a snapshot creation for the given partition by issuing an RPC /// to the node hosting the active leader. async fn create_partition_snapshot( diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..4392744ca 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -161,13 +161,11 @@ where ) .await?; - let logs_controller = LogsController::init( - &configuration, + let logs_controller = LogsController::new( service.bifrost.clone(), service.metadata_store_client.clone(), service.metadata_writer.clone(), - ) - .await?; + ); let (log_trim_interval, log_trim_threshold) = create_log_trim_interval(&configuration.admin); diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index 3222cc5c2..54dc75168 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -78,10 +78,10 @@ impl BifrostService { pub fn handle(&self) -> Bifrost { self.bifrost.clone() } - /// Runs initialization phase, then returns a handle to join on shutdown. - /// In this phase the system should wait until this is completed before - /// continuing. For instance, a worker mark itself as `STARTING_UP` and not - /// accept any requests until this is completed. + + /// Runs initialization phase. In this phase the system should wait until this is completed + /// before continuing. For instance, a worker mark itself as `STARTING_UP` and not accept any + /// requests until this is completed. /// /// This requires to run within a task_center context. pub async fn start(self) -> anyhow::Result<()> { diff --git a/crates/core/build.rs b/crates/core/build.rs index 135e37a1d..9904963e6 100644 --- a/crates/core/build.rs +++ b/crates/core/build.rs @@ -21,6 +21,7 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .extern_path(".restate.node", "::restate_types::protobuf::node") .extern_path(".restate.common", "::restate_types::protobuf::common") + .extern_path(".restate.cluster", "::restate_types::protobuf::cluster") .compile_protos( &["./protobuf/node_ctl_svc.proto"], &["protobuf", "../types/protobuf"], diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index 74350bc4a..673d9e4dc 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -10,6 +10,7 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; +import "restate/cluster.proto"; import "restate/common.proto"; import "restate/node.proto"; @@ -20,6 +21,30 @@ service NodeCtlSvc { rpc GetIdent(google.protobuf.Empty) returns (IdentResponse); rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse); + + // Provision the Restate cluster on this node. + rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse); +} + +message ProvisionClusterRequest { + bool dry_run = 1; + optional uint32 num_partitions = 2; + optional restate.cluster.ReplicationStrategy placement_strategy = 3; + optional restate.cluster.DefaultProvider log_provider = 4; +} + +enum ProvisionClusterResponseKind { + ProvisionClusterResponseType_UNKNOWN = 0; + ERROR = 1; + SUCCESS = 2; + DRY_RUN = 3; +} + +message ProvisionClusterResponse { + ProvisionClusterResponseKind kind = 1; + // If there is an error, this field will be set. All other fields will be empty. + optional string error = 2; + optional restate.cluster.ClusterConfiguration cluster_configuration = 3; } message IdentResponse { diff --git a/crates/core/src/network/protobuf.rs b/crates/core/src/network/protobuf.rs index bf64a3e30..011c7cbd4 100644 --- a/crates/core/src/network/protobuf.rs +++ b/crates/core/src/network/protobuf.rs @@ -9,10 +9,38 @@ // by the Apache License, Version 2.0. pub mod node_ctl_svc { + use restate_types::protobuf::cluster::ClusterConfiguration; + tonic::include_proto!("restate.node_ctl_svc"); pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("node_ctl_svc_descriptor"); + + impl ProvisionClusterResponse { + pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::DryRun.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + + pub fn err(message: impl ToString) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Error.into(), + error: Some(message.to_string()), + ..Default::default() + } + } + + pub fn success(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Success.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + } } pub mod core_node_svc { diff --git a/crates/metadata-store/src/local/mod.rs b/crates/metadata-store/src/local/mod.rs index 7fd52fd42..14221b120 100644 --- a/crates/metadata-store/src/local/mod.rs +++ b/crates/metadata-store/src/local/mod.rs @@ -22,6 +22,7 @@ use restate_types::{ use crate::local::grpc::client::LocalMetadataStoreClient; pub use service::{BuildError, Error, LocalMetadataStoreService}; +pub use store::{ProvisionError, ProvisionHandle}; /// Creates a [`MetadataStoreClient`]. pub async fn create_client( diff --git a/crates/metadata-store/src/local/service.rs b/crates/metadata-store/src/local/service.rs index 39b464133..d28899371 100644 --- a/crates/metadata-store/src/local/service.rs +++ b/crates/metadata-store/src/local/service.rs @@ -20,7 +20,7 @@ use restate_types::protobuf::common::MetadataServerStatus; use crate::grpc_svc; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; use crate::local::grpc::handler::LocalMetadataStoreHandler; -use crate::local::store::LocalMetadataStore; +use crate::local::store::{LocalMetadataStore, ProvisionHandle}; pub struct LocalMetadataStoreService { health_status: HealthStatus, @@ -63,6 +63,10 @@ impl LocalMetadataStoreService { }) } + pub fn provision_handle(&self) -> ProvisionHandle { + self.store.provision_handle() + } + pub async fn run(self) -> Result<(), Error> { let LocalMetadataStoreService { health_status, diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 4e40403ef..f6418a3ce 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -10,18 +10,25 @@ use bytes::BytesMut; use bytestring::ByteString; -use restate_core::cancellation_watcher; use restate_core::metadata_store::{Precondition, VersionedValue}; +use restate_core::{cancellation_watcher, ShutdownError}; use restate_rocksdb::{ CfName, CfPrefixPattern, DbName, DbSpecBuilder, IoMode, Priority, RocksDb, RocksDbManager, RocksError, }; use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; +use restate_types::errors::GenericError; use restate_types::live::BoxedLiveLoad; +use restate_types::logs::metadata::Logs; +use restate_types::metadata_store::keys::{ + BIFROST_CONFIG_KEY, NODES_CONFIG_KEY, PARTITION_TABLE_KEY, +}; +use restate_types::nodes_config::NodesConfiguration; +use restate_types::partition_table::PartitionTable; use restate_types::storage::{ StorageCodec, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, }; -use restate_types::Version; +use restate_types::{Version, Versioned}; use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB}; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; @@ -30,7 +37,10 @@ use tracing::{debug, trace}; pub type RequestSender = mpsc::Sender; pub type RequestReceiver = mpsc::Receiver; -type Result = std::result::Result; +pub type ProvisionSender = mpsc::Sender; +pub type ProvisionReceiver = mpsc::Receiver; + +type Result = std::result::Result; const DB_NAME: &str = "local-metadata-store"; const KV_PAIRS: &str = "kv_pairs"; @@ -95,10 +105,12 @@ pub struct LocalMetadataStore { rocksdb: Arc, rocksdb_options: BoxedLiveLoad, request_rx: RequestReceiver, + provision_rx: ProvisionReceiver, buffer: BytesMut, // for creating other senders request_tx: RequestSender, + provision_tx: ProvisionSender, } impl LocalMetadataStore { @@ -107,6 +119,7 @@ impl LocalMetadataStore { updateable_rocksdb_options: BoxedLiveLoad, ) -> std::result::Result { let (request_tx, request_rx) = mpsc::channel(options.request_queue_length()); + let (provision_tx, provision_rx) = mpsc::channel(1); let db_name = DbName::new(DB_NAME); let db_manager = RocksDbManager::get(); @@ -134,6 +147,8 @@ impl LocalMetadataStore { buffer: BytesMut::default(), request_rx, request_tx, + provision_tx, + provision_rx, }) } @@ -155,6 +170,12 @@ impl LocalMetadataStore { self.request_tx.clone() } + pub fn provision_handle(&self) -> ProvisionHandle { + ProvisionHandle { + tx: self.provision_tx.clone(), + } + } + pub async fn run(mut self) { debug!("Running LocalMetadataStore"); @@ -163,7 +184,11 @@ impl LocalMetadataStore { request = self.request_rx.recv() => { let request = request.expect("receiver should not be closed since we own one clone."); self.handle_request(request).await; - } + }, + provision_request = self.provision_rx.recv() => { + let provision_request = provision_request.expect("receiver should not be closed since we own one clone."); + self.handle_provision_request(provision_request).await; + }, _ = cancellation_watcher() => { break; }, @@ -179,6 +204,111 @@ impl LocalMetadataStore { .expect("KV_PAIRS column family exists") } + async fn handle_provision_request(&mut self, provision_request: ProvisionMetadataStoreRequest) { + trace!("Handle provision request"); + let (initial_nodes_configuration, initial_partition_table, initial_logs, response_tx) = + provision_request.into_inner(); + + match self + .handle_provision_request_inner( + initial_nodes_configuration, + initial_partition_table, + initial_logs, + ) + .await + { + Ok(()) => { + // if the receiver is gone, then the caller is no longer interested + let _ = response_tx.send(Ok(())); + } + Err(err) => { + debug!("Failed processing provision request: {err}"); + // if the receiver is gone, then the caller is no longer interested + let _ = response_tx.send(Err(err)); + } + } + } + + async fn handle_provision_request_inner( + &mut self, + initial_nodes_configuration: NodesConfiguration, + initial_partition_table: PartitionTable, + initial_logs: Logs, + ) -> Result<(), ProvisionError> { + // 1. check whether we are already provisioned by checking for the nodes configuration + if self + .get(&NODES_CONFIG_KEY) + .map_err(ProvisionError::internal)? + .is_some() + { + return Err(ProvisionError::AlreadyProvisioned); + } + + let serialized_partition_table = self + .serialize_versioned_value(&initial_partition_table) + .map_err(ProvisionError::internal)?; + let serialized_logs = self + .serialize_versioned_value(&initial_logs) + .map_err(ProvisionError::internal)?; + let serialized_nodes_configuration = self + .serialize_versioned_value(&initial_nodes_configuration) + .map_err(ProvisionError::internal)?; + + // 2. write the initial metadata + self.put_initial_metadata( + &serialized_partition_table, + &serialized_logs, + &serialized_nodes_configuration, + ) + .await + .map_err(ProvisionError::internal)?; + + Ok(()) + } + + async fn put_initial_metadata( + &mut self, + serialized_partition_table: &VersionedValue, + serialized_logs: &VersionedValue, + serialized_nodes_configuration: &VersionedValue, + ) -> Result<()> { + // todo replace with multi put once supported + + // It's very important that we put the different metadata values under their correct keys. + // Otherwise, client's won't find these values. + + // It can happen that a provision attempt failed before. In this case, we might have + // written the partition table or logs before. Therefore, we allow to overwrite them + // here because we know that the provisioning was not successful (no nodes configuration). + self.put( + &PARTITION_TABLE_KEY, + serialized_partition_table, + Precondition::None, + ) + .await?; + self.put(&BIFROST_CONFIG_KEY, serialized_logs, Precondition::None) + .await?; + + // Extra safety to not allow the nodes configuration to be overwritten even though we + // checked before that it does not exist. + self.put( + &NODES_CONFIG_KEY, + serialized_nodes_configuration, + Precondition::DoesNotExist, + ) + .await?; + + Ok(()) + } + + fn serialize_versioned_value( + &mut self, + value: &T, + ) -> Result { + let serialized_value = StorageCodec::encode_and_split(value, &mut self.buffer)?.freeze(); + Ok(VersionedValue::new(value.version(), serialized_value)) + } + async fn handle_request(&mut self, request: MetadataStoreRequest) { trace!("Handle request '{:?}'", request); @@ -340,6 +470,76 @@ impl LocalMetadataStore { } } +#[derive(Debug, thiserror::Error)] +pub enum ProvisionError { + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error(transparent)] + Internal(GenericError), + #[error("already provisioned")] + AlreadyProvisioned, +} + +impl ProvisionError { + fn internal(err: impl Into) -> Self { + ProvisionError::Internal(err.into()) + } +} + +pub struct ProvisionMetadataStoreRequest { + initial_nodes_configuration: NodesConfiguration, + initial_partition_table: PartitionTable, + initial_logs: Logs, + response_tx: oneshot::Sender>, +} + +impl ProvisionMetadataStoreRequest { + fn into_inner( + self, + ) -> ( + NodesConfiguration, + PartitionTable, + Logs, + oneshot::Sender>, + ) { + ( + self.initial_nodes_configuration, + self.initial_partition_table, + self.initial_logs, + self.response_tx, + ) + } +} + +pub struct ProvisionHandle { + tx: mpsc::Sender, +} + +impl ProvisionHandle { + pub async fn provision( + &self, + initial_nodes_configuration: NodesConfiguration, + initial_partition_table: PartitionTable, + initial_logs: Logs, + ) -> Result<(), ProvisionError> { + let (response_tx, response_rx) = oneshot::channel(); + + self.tx + .send(ProvisionMetadataStoreRequest { + initial_partition_table, + initial_logs, + initial_nodes_configuration, + response_tx, + }) + .await + .map_err(|_| ProvisionError::Shutdown(ShutdownError))?; + + response_rx + .await + .map_err(|_| ProvisionError::Shutdown(ShutdownError))? + } +} + fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options { rocksdb::Options::default() } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 5583799ab..9881f52e6 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -38,6 +38,7 @@ arc-swap = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } bytes = { workspace = true } +bytestring = { workspace = true } codederror = { workspace = true } datafusion = { workspace = true } derive_builder = { workspace = true } diff --git a/crates/node/src/init.rs b/crates/node/src/init.rs new file mode 100644 index 000000000..48a31dfb5 --- /dev/null +++ b/crates/node/src/init.rs @@ -0,0 +1,651 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::ProvisionCluster; +use anyhow::bail; +use bytestring::ByteString; +use futures::future::OptionFuture; +use futures::TryFutureExt; +use restate_core::metadata_store::{ + retry_on_network_error, MetadataStoreClient, MetadataStoreClientError, Precondition, + ReadWriteError, WriteError, +}; +use restate_core::{ + cancellation_watcher, Metadata, MetadataWriter, ShutdownError, SyncError, TargetVersion, + TaskCenter, TaskKind, +}; +use restate_metadata_store::local::ProvisionHandle; +use restate_types::config::{CommonOptions, Configuration}; +use restate_types::errors::GenericError; +use restate_types::logs::metadata::{DefaultProvider, Logs, LogsConfiguration}; +use restate_types::metadata_store::keys::{ + BIFROST_CONFIG_KEY, NODES_CONFIG_KEY, PARTITION_TABLE_KEY, +}; +use restate_types::net::metadata::MetadataKind; +use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration}; +use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy}; +use restate_types::retries::RetryPolicy; +use restate_types::storage::StorageEncode; +use restate_types::{GenerationalNodeId, Version, Versioned}; +use std::num::NonZeroU16; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use tracing::{debug, info, trace, warn}; + +#[derive(Debug, thiserror::Error)] +enum JoinError { + #[error("missing nodes configuration")] + MissingNodesConfiguration, + #[error("detected a concurrent registration for node '{0}'")] + ConcurrentNodeRegistration(String), + #[error("failed writing to metadata store: {0}")] + MetadataStore(#[from] ReadWriteError), + #[error("trying to join wrong cluster; expected '{expected_cluster_name}', actual '{actual_cluster_name}'")] + ClusterMismatch { + expected_cluster_name: String, + actual_cluster_name: String, + }, +} + +#[derive(Debug, thiserror::Error)] +enum ProvisionMetadataError { + #[error("already provisioned")] + AlreadyProvisioned, + #[error(transparent)] + Other(GenericError), + #[error(transparent)] + Shutdown(ShutdownError), +} + +impl ProvisionMetadataError { + fn other(err: impl Into) -> Self { + ProvisionMetadataError::Other(err.into()) + } +} + +impl From for ProvisionMetadataError { + fn from(value: restate_metadata_store::local::ProvisionError) -> Self { + match value { + restate_metadata_store::local::ProvisionError::AlreadyProvisioned => { + ProvisionMetadataError::AlreadyProvisioned + } + err => ProvisionMetadataError::Other(err.into()), + } + } +} + +#[derive(Clone, Debug)] +pub struct ClusterConfiguration { + num_partitions: NonZeroU16, + placement_strategy: ReplicationStrategy, + log_provider: DefaultProvider, +} + +#[derive(Clone, Debug, Default)] +pub struct ProvisionClusterRequest { + dry_run: bool, + num_partitions: Option, + placement_strategy: Option, + log_provider: Option, +} + +#[derive(Clone, Debug)] +pub enum ProvisionClusterResponse { + DryRun(ClusterConfiguration), + NewlyProvisioned(ClusterConfiguration), + AlreadyProvisioned, +} + +enum HandleProvisionClusterResponse { + DryRun { + cluster_configuration: ClusterConfiguration, + }, + Provisioned { + cluster_configuration: ClusterConfiguration, + nodes_configuration: NodesConfiguration, + }, + AlreadyProvisioned, +} + +pub struct NodeInit<'a> { + provision_cluster_rx: Option>, + // We are using an in-memory channel for provisioning a co-located metadata store to ensure + // that it uses the local NodesConfiguration as initial value. + metadata_store_provision_handle: Option, + metadata_store_client: &'a MetadataStoreClient, + metadata_writer: &'a MetadataWriter, +} + +impl<'a> NodeInit<'a> { + pub fn new( + provision_cluster_rx: mpsc::Receiver, + metadata_store_provision_handle: Option, + metadata_store_client: &'a MetadataStoreClient, + metadata_writer: &'a MetadataWriter, + ) -> Self { + Self { + provision_cluster_rx: Some(provision_cluster_rx), + metadata_store_provision_handle, + metadata_store_client, + metadata_writer, + } + } + + pub async fn init(mut self) -> anyhow::Result<()> { + let config = Configuration::pinned().into_arc(); + + let mut join_future = std::pin::pin!(Self::join_cluster( + self.metadata_store_client, + &config.common + )); + let mut cancellation = std::pin::pin!(cancellation_watcher()); + + let nodes_configuration = loop { + tokio::select! { + _ = &mut cancellation => { + Err(ShutdownError)?; + }, + result = &mut join_future => { + let nodes_configuration = result?; + info!("Successfully joined cluster"); + break nodes_configuration; + }, + Some(Some(provision_cluster)) = OptionFuture::from(self.provision_cluster_rx.as_mut().map(|rx| async { rx.recv().await })) => { + let (request, response_tx) = provision_cluster.into_inner(); + + match self.handle_provision_request(request, &config).await { + Ok(response) => { + match response { + HandleProvisionClusterResponse::DryRun{ cluster_configuration } => { + // if receiver is no longer present, then caller is not interested in result + let _ = response_tx.send(Ok(ProvisionClusterResponse::DryRun(cluster_configuration))); + } + HandleProvisionClusterResponse::Provisioned{ cluster_configuration, nodes_configuration } => { + info!("Successfully provisioned cluster with {cluster_configuration:?}"); + // if receiver is no longer present, then caller is not interested in result + let _ = response_tx.send(Ok(ProvisionClusterResponse::NewlyProvisioned(cluster_configuration))); + break nodes_configuration; + } + HandleProvisionClusterResponse::AlreadyProvisioned => { + // if receiver is no longer present, then caller is not interested in result + let _ = response_tx.send(Ok(ProvisionClusterResponse::AlreadyProvisioned)); + self.mark_as_provisioned() + } + } + }, + Err(err) => { + debug!("Failed processing cluster provision command: {err}"); + let _ = response_tx.send(Err(err)); + } + } + }, + } + }; + + // Find my node in nodes configuration. + let my_node_config = nodes_configuration + .find_node_by_name(config.common.node_name()) + .expect("node config should have been upserted"); + + let my_node_id = my_node_config.current_generation; + + // Safety checks, same node (if set)? + if config + .common + .force_node_id + .is_some_and(|n| n != my_node_id.as_plain()) + { + bail!( + format!( + "Node ID mismatch: configured node ID is {}, but the nodes configuration contains {}", + config.common.force_node_id.unwrap(), + my_node_id.as_plain() + )); + } + + // Same cluster? + if config.common.cluster_name() != nodes_configuration.cluster_name() { + bail!( + format!( + "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", + config.common.cluster_name(), + nodes_configuration.cluster_name() + )); + } + + info!( + roles = %my_node_config.roles, + address = %my_node_config.address, + "My Node ID is {}", my_node_config.current_generation); + + self.metadata_writer + .update(Arc::new(nodes_configuration)) + .await?; + + // My Node ID is set + self.metadata_writer.set_my_node_id(my_node_id); + restate_tracing_instrumentation::set_global_node_id(my_node_id); + + self.mark_as_provisioned(); + + self.sync_metadata().await; + + Ok(()) + } + + async fn sync_metadata(&self) { + // fetch the latest metadata + let metadata = Metadata::current(); + + let config = Configuration::pinned(); + + let retry_policy = config.common.network_error_retry_policy.clone(); + + if let Err(err) = retry_policy + .retry_if( + || async { + metadata + .sync(MetadataKind::Schema, TargetVersion::Latest) + .await?; + metadata + .sync(MetadataKind::PartitionTable, TargetVersion::Latest) + .await?; + metadata + .sync(MetadataKind::Logs, TargetVersion::Latest) + .await + }, + |err| match err { + SyncError::MetadataStore(err) => err.is_network_error(), + SyncError::Shutdown(_) => false, + }, + ) + .await + { + warn!("Failed to fetch the latest metadata when initializing the node: {err}"); + } + } + + fn mark_as_provisioned(&mut self) { + if let Some(mut provision_cluster_rx) = self.provision_cluster_rx.take() { + // ignore if we are shutting down + let _ = TaskCenter::spawn_child( + TaskKind::Background, + "provision-cluster-responder", + async move { + let mut cancellation = std::pin::pin!(cancellation_watcher()); + + loop { + tokio::select! { + _ = &mut cancellation => { + break; + }, + Some(provision_cluster_request) = provision_cluster_rx.recv() => { + let (_, response_tx) = provision_cluster_request.into_inner(); + // if the receiver is gone then the caller is not interested in the result + let _ = response_tx.send(Ok(ProvisionClusterResponse::AlreadyProvisioned)); + } + } + } + + Ok(()) + }, + ); + } + } + + async fn handle_provision_request( + &self, + request: ProvisionClusterRequest, + config: &Configuration, + ) -> anyhow::Result { + trace!("Handle provision request: {request:?}"); + + if config + .common + .metadata_store_client + .metadata_store_client + .is_embedded() + && self.metadata_store_provision_handle.is_none() + { + // todo forward internally to a node running the metadata store? + bail!("Cannot provision cluster with embedded metadata store that is not running on this node. Please provision the cluster on a node that runs the metadata store."); + } + + let dry_run = request.dry_run; + + let cluster_configuration = Self::resolve_cluster_configuration(config, request); + + if dry_run { + return Ok(HandleProvisionClusterResponse::DryRun { + cluster_configuration, + }); + } + + let nodes_configuration = match self + .provision_metadata(&config.common, &cluster_configuration) + .await + { + Ok(nodes_configuration) => nodes_configuration, + Err(ProvisionMetadataError::AlreadyProvisioned) => { + return Ok(HandleProvisionClusterResponse::AlreadyProvisioned); + } + err => err?, + }; + + Ok(HandleProvisionClusterResponse::Provisioned { + nodes_configuration, + cluster_configuration, + }) + } + + async fn provision_metadata( + &self, + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, + ) -> Result { + let (initial_nodes_configuration, initial_partition_table, initial_logs) = + Self::generate_initial_metadata(common_opts, cluster_configuration); + + if let Some(metadata_store_handle) = self.metadata_store_provision_handle.as_ref() { + metadata_store_handle + .provision( + initial_nodes_configuration.clone(), + initial_partition_table, + initial_logs, + ) + .await?; + Ok(initial_nodes_configuration) + } else { + // we must be running with an external metadata store + self.provision_external_metadata( + &initial_nodes_configuration, + &initial_partition_table, + &initial_logs, + ) + .await?; + Ok(initial_nodes_configuration) + } + } + + fn resolve_cluster_configuration( + config: &Configuration, + request: ProvisionClusterRequest, + ) -> ClusterConfiguration { + let num_partitions = request + .num_partitions + .unwrap_or(config.common.bootstrap_num_partitions); + let placement_strategy = request.placement_strategy.unwrap_or_default(); + let log_provider = request + .log_provider + .unwrap_or_else(|| DefaultProvider::from_configuration(config)); + + ClusterConfiguration { + num_partitions, + placement_strategy, + log_provider, + } + } + + fn generate_initial_metadata( + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, + ) -> (NodesConfiguration, PartitionTable, Logs) { + let mut initial_partition_table_builder = PartitionTableBuilder::default(); + initial_partition_table_builder + .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) + .expect("Empty partition table should not have conflicts"); + initial_partition_table_builder + .set_replication_strategy(cluster_configuration.placement_strategy); + let initial_partition_table = initial_partition_table_builder.build(); + + let mut logs_builder = Logs::default().into_builder(); + logs_builder.set_configuration(LogsConfiguration::from( + cluster_configuration.log_provider.clone(), + )); + let initial_logs = logs_builder.build(); + + let mut initial_nodes_configuration = + NodesConfiguration::new(Version::MIN, common_opts.cluster_name().to_owned()); + let node_config = NodeConfig::new( + common_opts.node_name().to_owned(), + common_opts + .force_node_id + .map(|force_node_id| force_node_id.with_generation(1)) + .unwrap_or(GenerationalNodeId::INITIAL_NODE_ID), + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ); + initial_nodes_configuration.upsert_node(node_config); + + ( + initial_nodes_configuration, + initial_partition_table, + initial_logs, + ) + } + + async fn join_cluster( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + ) -> anyhow::Result { + info!( + "Trying to join an existing cluster '{}'", + common_opts.cluster_name() + ); + + // todo make configurable + // Never give up trying to join the cluster. Users of this struct will set a timeout if + // needed. + let join_retry = RetryPolicy::exponential( + Duration::from_millis(100), + 2.0, + None, + Some(Duration::from_secs(5)), + ); + + let join_start = Instant::now(); + + join_retry + .retry_if( + || Self::join_cluster_inner(metadata_store_client, common_opts), + |err| { + if join_start.elapsed() < Duration::from_secs(5) { + trace!("Failed joining the cluster: {err}; retrying"); + } else { + debug!("Failed joining cluster: {err}; retrying"); + } + match err { + JoinError::MissingNodesConfiguration => true, + JoinError::ConcurrentNodeRegistration(_) => false, + JoinError::MetadataStore(err) => err.is_network_error(), + JoinError::ClusterMismatch { .. } => false, + } + }, + ) + .await + .map_err(Into::into) + } + + async fn join_cluster_inner( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + ) -> Result { + let mut previous_node_generation = None; + + metadata_store_client + .read_modify_write::( + NODES_CONFIG_KEY.clone(), + move |nodes_config| { + let mut nodes_config = + nodes_config.ok_or(JoinError::MissingNodesConfiguration)?; + + // check that we are joining the right cluster + if nodes_config.cluster_name() != common_opts.cluster_name() { + return Err(JoinError::ClusterMismatch { + expected_cluster_name: common_opts.cluster_name().to_owned(), + actual_cluster_name: nodes_config.cluster_name().to_owned(), + }); + } + + // check whether we have registered before + let node_config = nodes_config + .find_node_by_name(common_opts.node_name()) + .cloned(); + + let my_node_config = if let Some(mut node_config) = node_config { + assert_eq!( + common_opts.node_name(), + node_config.name, + "node name must match" + ); + + if let Some(previous_node_generation) = previous_node_generation { + if node_config + .current_generation + .is_newer_than(previous_node_generation) + { + // detected a concurrent registration of the same node + return Err(JoinError::ConcurrentNodeRegistration( + common_opts.node_name().to_owned(), + )); + } + } else { + // remember the previous node generation to detect concurrent modifications + previous_node_generation = Some(node_config.current_generation); + } + + // update node_config + node_config.roles = common_opts.roles; + node_config.address = common_opts.advertised_address.clone(); + node_config.current_generation.bump_generation(); + + node_config + } else { + let plain_node_id = common_opts.force_node_id.unwrap_or_else(|| { + nodes_config + .max_plain_node_id() + .map(|n| n.next()) + .unwrap_or_default() + }); + + assert!( + nodes_config.find_node_by_id(plain_node_id).is_err(), + "duplicate plain node id '{plain_node_id}'" + ); + + let my_node_id = plain_node_id.with_generation(1); + + NodeConfig::new( + common_opts.node_name().to_owned(), + my_node_id, + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ) + }; + + nodes_config.upsert_node(my_node_config); + nodes_config.increment_version(); + + Ok(nodes_config) + }, + ) + .await + .map_err(|err| err.transpose()) + } + + async fn provision_external_metadata( + &self, + initial_nodes_configuration: &NodesConfiguration, + initial_partition_table: &PartitionTable, + initial_logs: &Logs, + ) -> Result<(), ProvisionMetadataError> { + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + + let write_metadata = std::pin::pin!(retry_on_network_error(retry_policy, || async { + // todo replace with multi put once supported + self.write_initial_partition_table(initial_partition_table) + .await?; + self.write_initial_logs(initial_logs).await?; + self.write_initial_nodes_configuration(initial_nodes_configuration) + .await + }) + .map_err(|err| match err { + WriteError::FailedPrecondition(_) => ProvisionMetadataError::AlreadyProvisioned, + err => ProvisionMetadataError::other(err), + })); + + tokio::select! { + result = write_metadata => result, + _ = cancellation_watcher() => { + Err(ProvisionMetadataError::Shutdown(ShutdownError)) + } + } + } + + async fn write_initial_partition_table( + &self, + initial_partition_table: &PartitionTable, + ) -> Result<(), WriteError> { + // Don't overwrite the value if it exists because we might already be provisioned + Self::write_initial_value_dont_fail_if_it_exists( + self.metadata_store_client, + PARTITION_TABLE_KEY.clone(), + initial_partition_table, + ) + .await + } + + async fn write_initial_logs(&self, initial_logs: &Logs) -> Result<(), WriteError> { + // Don't overwrite the value if it exists because we might already be provisioned + Self::write_initial_value_dont_fail_if_it_exists( + self.metadata_store_client, + BIFROST_CONFIG_KEY.clone(), + initial_logs, + ) + .await + } + + async fn write_initial_value_dont_fail_if_it_exists( + metadata_store_client: &MetadataStoreClient, + key: ByteString, + initial_value: &T, + ) -> Result<(), WriteError> { + match metadata_store_client + .put(key, initial_value, Precondition::DoesNotExist) + .await + { + Ok(_) => Ok(()), + Err(WriteError::FailedPrecondition(_)) => { + // we might have failed on a previous attempt after writing this value; so let's continue + Ok(()) + } + Err(err) => Err(err), + } + } + + async fn write_initial_nodes_configuration( + &self, + initial_nodes_configuration: &NodesConfiguration, + ) -> Result<(), WriteError> { + self.metadata_store_client + .put( + NODES_CONFIG_KEY.clone(), + initial_nodes_configuration, + Precondition::DoesNotExist, + ) + .await + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..b31fbb2c6 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -9,44 +9,41 @@ // by the Apache License, Version 2.0. mod cluster_marker; +mod init; mod network_server; mod roles; -use std::sync::Arc; - -use tracing::{debug, error, info, trace}; +use anyhow::Context; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, error, info, trace, warn}; use codederror::CodedError; use restate_bifrost::BifrostService; -use restate_core::metadata_store::{retry_on_network_error, ReadWriteError}; +use restate_core::metadata_store::ReadWriteError; use restate_core::network::{ GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking, }; use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher}; -use restate_core::{cancellation_watcher, TaskKind}; -use restate_core::{ - spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager, TargetVersion, - TaskCenter, -}; +use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskKind}; +use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter}; #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; -use restate_types::config::{CommonOptions, Configuration}; +use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::health::Health; use restate_types::live::Live; #[cfg(feature = "replicated-loglet")] use restate_types::logs::RecordCache; -use restate_types::metadata_store::keys::NODES_CONFIG_KEY; -use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; +use restate_types::nodes_config::Role; use restate_types::protobuf::common::{ AdminStatus, IngressStatus, LogServerStatus, MetadataServerStatus, NodeRpcStatus, NodeStatus, WorkerStatus, }; -use restate_types::Version; use crate::cluster_marker::ClusterValidationError; +use crate::init::{NodeInit, ProvisionClusterRequest, ProvisionClusterResponse}; use crate::network_server::NetworkServer; use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole}; @@ -326,6 +323,38 @@ impl Node { // Start metadata manager spawn_metadata_manager(self.metadata_manager)?; + let (provision_cluster_handle, cmd_rx) = ProvisionClusterHandle::new(); + + if config.common.allow_bootstrap { + TaskCenter::spawn(TaskKind::SystemBoot, "auto-provision-cluster", { + let provision_cluster_handle = provision_cluster_handle.clone(); + async move { + let result = provision_cluster_handle + .provision_cluster(ProvisionClusterRequest::default()) + .await?; + + match result { + Ok(response) => match response { + ProvisionClusterResponse::DryRun(_) => { + panic!("auto provision should try to provision the cluster w/o a dry run."); + } + ProvisionClusterResponse::NewlyProvisioned(_) => { + // Nothing to do. Newly provisioned clusters are already logged by Init + } + ProvisionClusterResponse::AlreadyProvisioned => { + debug!("The cluster was already provisioned."); + } + }, + Err(err) => { + warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); + } + } + + Ok(()) + } + })?; + } + // spawn the node rpc server first to enable connecting to the metadata store TaskCenter::spawn(TaskKind::RpcServer, "node-rpc-server", { let health = self.health.clone(); @@ -337,80 +366,55 @@ impl Node { connection_manager, self.server_builder, common_options, + provision_cluster_handle, ) .await?; Ok(()) } })?; - if let Some(metadata_store) = self.metadata_store_role { - TaskCenter::spawn( - TaskKind::MetadataStore, - "local-metadata-store", - async move { - metadata_store.run().await?; - Ok(()) - }, - )?; - } + // wait until the node rpc server is up and running before continuing + self.health + .node_rpc_status() + .wait_for_value(NodeRpcStatus::Ready) + .await; - // Start partition routing information refresher - spawn_partition_routing_refresher(self.partition_routing_refresher)?; - - let nodes_config = - Self::upsert_node_config(&self.metadata_store_client, &config.common).await?; - metadata_writer.update(Arc::new(nodes_config)).await?; - - if config.common.allow_bootstrap { - // todo write bootstrap state - } + let metadata_store_provision_handle = if let Some(metadata_store) = self.metadata_store_role + { + let handle = metadata_store.provision_handle(); + TaskCenter::spawn(TaskKind::MetadataStore, "metadata-store", async move { + metadata_store.run().await?; + Ok(()) + })?; + Some(handle) + } else { + None + }; - // fetch the latest schema information - metadata - .sync(MetadataKind::Schema, TargetVersion::Latest) - .await?; + let initialization_timeout = config.common.initialization_timeout.into(); - let nodes_config = metadata.nodes_config_ref(); + tokio::time::timeout( + initialization_timeout, + NodeInit::new( + cmd_rx, + metadata_store_provision_handle, + &self.metadata_store_client, + &metadata_writer, + ) + .init(), + ) + .await + .context("Giving up trying to initialize the node. Make sure that it can reach the metadata store and don't forget to provision the cluster on a fresh start.")? + .context("Failed initializing the node.")?; - // Find my node in nodes configuration. + let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + let my_node_id = Metadata::with_current(|m| m.my_node_id()); let my_node_config = nodes_config - .find_node_by_name(config.common.node_name()) - .expect("node config should have been upserted"); - - let my_node_id = my_node_config.current_generation; - - // Safety checks, same node (if set)? - if config - .common - .force_node_id - .is_some_and(|n| n != my_node_id.as_plain()) - { - return Err(Error::SafetyCheck( - format!( - "Node ID mismatch: configured node ID is {}, but the nodes configuration contains {}", - config.common.force_node_id.unwrap(), - my_node_id.as_plain() - )))?; - } - - // Same cluster? - if config.common.cluster_name() != nodes_config.cluster_name() { - return Err(Error::SafetyCheck( - format!( - "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", - config.common.cluster_name(), - nodes_config.cluster_name() - )))?; - } + .find_node_by_id(my_node_id) + .expect("should be present"); - // My Node ID is set - metadata_writer.set_my_node_id(my_node_id); - restate_tracing_instrumentation::set_global_node_id(my_node_id); - - info!( - roles = %my_node_config.roles, - address = %my_node_config.address, - "My Node ID is {}", my_node_config.current_generation); + // Start partition routing information refresher + spawn_partition_routing_refresher(self.partition_routing_refresher)?; // todo this is a temporary solution to announce the updated NodesConfiguration to the // configured admin nodes. It should be removed once we have a gossip-based node status @@ -522,92 +526,6 @@ impl Node { Ok(()) } - async fn upsert_node_config( - metadata_store_client: &MetadataStoreClient, - common_opts: &CommonOptions, - ) -> Result { - retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - let mut previous_node_generation = None; - metadata_store_client.read_modify_write(NODES_CONFIG_KEY.clone(), move |nodes_config| { - let mut nodes_config = if common_opts.allow_bootstrap { - debug!("allow-bootstrap is set to `true`, allowed to create initial NodesConfiguration!"); - nodes_config.unwrap_or_else(|| { - NodesConfiguration::new( - Version::INVALID, - common_opts.cluster_name().to_owned(), - ) - }) - } else { - nodes_config.ok_or(Error::MissingNodesConfiguration)? - }; - - // check whether we have registered before - let node_config = nodes_config - .find_node_by_name(common_opts.node_name()) - .cloned(); - - let my_node_config = if let Some(mut node_config) = node_config { - assert_eq!( - common_opts.node_name(), - node_config.name, - "node name must match" - ); - - if let Some(previous_node_generation) = previous_node_generation { - if node_config - .current_generation - .is_newer_than(previous_node_generation) - { - // detected a concurrent registration of the same node - return Err(Error::ConcurrentNodeRegistration( - common_opts.node_name().to_owned(), - )); - } - } else { - // remember the previous node generation to detect concurrent modifications - previous_node_generation = Some(node_config.current_generation); - } - - // update node_config - node_config.roles = common_opts.roles; - node_config.address = common_opts.advertised_address.clone(); - node_config.current_generation.bump_generation(); - - node_config - } else { - let plain_node_id = common_opts.force_node_id.unwrap_or_else(|| { - nodes_config - .max_plain_node_id() - .map(|n| n.next()) - .unwrap_or_default() - }); - - assert!( - nodes_config.find_node_by_id(plain_node_id).is_err(), - "duplicate plain node id '{plain_node_id}'" - ); - - let my_node_id = plain_node_id.with_generation(1); - - NodeConfig::new( - common_opts.node_name().to_owned(), - my_node_id, - common_opts.advertised_address.clone(), - common_opts.roles, - LogServerConfig::default(), - ) - }; - - nodes_config.upsert_node(my_node_config); - nodes_config.increment_version(); - - Ok(nodes_config) - }) - }) - .await - .map_err(|err| err.transpose()) - } - pub fn bifrost(&self) -> restate_bifrost::Bifrost { self.bifrost.handle() } @@ -629,3 +547,49 @@ fn warn_if_log_store_left_artifacts(config: &Configuration) { This may indicate that the log-server role was previously enabled and the data directory was not cleaned up. If this was created by v1.1.1 of restate-server, please remove this directory to avoid potential future conflicts.", config.log_server.data_dir().display()); } } + +#[derive(Debug)] +pub struct ProvisionCluster { + provision_cluster_request: ProvisionClusterRequest, + response_tx: oneshot::Sender>, +} + +impl ProvisionCluster { + pub fn into_inner( + self, + ) -> ( + ProvisionClusterRequest, + oneshot::Sender>, + ) { + (self.provision_cluster_request, self.response_tx) + } +} + +#[derive(Clone, Debug)] +pub struct ProvisionClusterHandle { + cmd_tx: mpsc::Sender, +} + +impl ProvisionClusterHandle { + pub fn new() -> (Self, mpsc::Receiver) { + let (cmd_tx, cmd_rx) = mpsc::channel(2); + (Self { cmd_tx }, cmd_rx) + } + + pub async fn provision_cluster( + &self, + provision_cluster_request: ProvisionClusterRequest, + ) -> Result, ShutdownError> { + let (response_tx, response_rx) = oneshot::channel(); + + self.cmd_tx + .send(ProvisionCluster { + provision_cluster_request, + response_tx, + }) + .await + .map_err(|_| ShutdownError)?; + + response_rx.await.map_err(|_| ShutdownError) + } +} diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 0ec5b7bc4..3ffc39235 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -14,10 +14,14 @@ use futures::stream::BoxStream; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; +use crate::init::{ProvisionClusterRequest, ProvisionClusterResponse}; +use crate::ProvisionClusterHandle; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; use restate_core::network::protobuf::node_ctl_svc::{ GetMetadataRequest, GetMetadataResponse, IdentResponse, + ProvisionClusterRequest as ProtoProvisionClusterRequest, + ProvisionClusterResponse as ProtoProvisionClusterResponse, }; use restate_core::network::ConnectionManager; use restate_core::network::{ProtocolError, TransportConnect}; @@ -33,6 +37,7 @@ pub struct NodeCtlSvcHandler { cluster_name: String, roles: EnumSet, health: Health, + node_handle: ProvisionClusterHandle, } impl NodeCtlSvcHandler { @@ -41,14 +46,24 @@ impl NodeCtlSvcHandler { cluster_name: String, roles: EnumSet, health: Health, + node_handle: ProvisionClusterHandle, ) -> Self { Self { task_center, cluster_name, roles, health, + node_handle, } } + + fn from_proto(_request: ProtoProvisionClusterRequest) -> ProvisionClusterRequest { + unimplemented!("replace with dto_prost") + } + + fn into_proto(_response: ProvisionClusterResponse) -> ProtoProvisionClusterResponse { + unimplemented!("replace with dto_prost") + } } #[async_trait::async_trait] @@ -114,6 +129,24 @@ impl NodeCtlSvc for NodeCtlSvcHandler { encoded: encoded.freeze(), })) } + + async fn provision_cluster( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + + let response = self + .node_handle + // todo replace with prost_dto? + .provision_cluster(Self::from_proto(request)) + .await + .map_err(|_| Status::unavailable("System is shutting down"))? + .map_err(|err| Status::internal(err.to_string()))?; + + // todo replace with prost_dto? + Ok(Response::new(Self::into_proto(response))) + } } pub struct CoreNodeSvcHandler { diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 64ae8b330..24a8c6395 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -15,8 +15,6 @@ use tokio::time::MissedTickBehavior; use tonic::codec::CompressionEncoding; use tracing::{debug, trace}; -use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics}; -use crate::network_server::state::NodeCtrlHandlerStateBuilder; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; @@ -28,6 +26,9 @@ use restate_types::protobuf::common::NodeStatus; use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler}; use super::pprof; +use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics}; +use crate::network_server::state::NodeCtrlHandlerStateBuilder; +use crate::ProvisionClusterHandle; pub struct NetworkServer {} @@ -37,6 +38,7 @@ impl NetworkServer { connection_manager: ConnectionManager, mut server_builder: NetworkServerBuilder, options: CommonOptions, + node_handle: ProvisionClusterHandle, ) -> Result<(), anyhow::Error> { // Configure Metric Exporter let mut state_builder = NodeCtrlHandlerStateBuilder::default(); @@ -103,6 +105,7 @@ impl NetworkServer { options.cluster_name().to_owned(), options.roles, health, + node_handle, )) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 178c05d1c..55acc7ec7 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -235,6 +235,13 @@ pub struct CommonOptions { /// /// The retry policy for node network error pub network_error_retry_policy: RetryPolicy, + + /// # Initialization timeout + /// + /// The timeout until the node gives up joining a cluster and initializing itself. + #[serde(with = "serde_with::As::")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub initialization_timeout: humantime::Duration, } impl CommonOptions { @@ -374,6 +381,7 @@ impl Default for CommonOptions { Some(15), Some(Duration::from_secs(5)), ), + initialization_timeout: Duration::from_secs(5 * 60).into(), } } } @@ -525,6 +533,12 @@ pub enum MetadataStoreClient { }, } +impl MetadataStoreClient { + pub fn is_embedded(&self) -> bool { + matches!(self, Self::Embedded { .. }) + } +} + impl Default for MetadataStoreClientOptions { fn default() -> Self { Self { diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index c2944e4ff..10d3358da 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -210,6 +210,20 @@ impl DefaultProvider { Self::Replicated(_) => ProviderKind::Replicated, } } + + pub fn from_configuration(configuration: &Configuration) -> Self { + match configuration.bifrost.default_provider { + #[cfg(any(test, feature = "memory-loglet"))] + ProviderKind::InMemory => DefaultProvider::InMemory, + ProviderKind::Local => DefaultProvider::Local, + ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(1).expect("1 is not zero"), + ), + }), + } + } } impl From for crate::protobuf::cluster::DefaultProvider { @@ -269,7 +283,9 @@ pub struct ReplicatedLogletConfig { pub replication_property: ReplicationProperty, } -#[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, Clone, Eq, PartialEq, Default, derive_more::From, serde::Serialize, serde::Deserialize, +)] pub struct LogsConfiguration { pub default_provider: DefaultProvider, } @@ -492,17 +508,7 @@ impl LogletConfig { impl Logs { pub fn from_configuration(config: &Configuration) -> Self { Self::with_logs_configuration(LogsConfiguration { - default_provider: match config.bifrost.default_provider { - #[cfg(any(test, feature = "memory-loglet"))] - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, - ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(1).expect("1 is not zero"), - ), - }), - }, + default_provider: DefaultProvider::from_configuration(config), }) } diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index 1c9a394be..88de877e3 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -85,6 +85,9 @@ impl FromStr for GenerationalNodeId { } impl GenerationalNodeId { + // Start with 1 as id to leave 0 as a special value in the future + pub const INITIAL_NODE_ID: GenerationalNodeId = GenerationalNodeId::new(1, 1); + pub fn decode(mut data: B) -> Self { // generational node id is stored as two u32s next to each other, each in big-endian. let plain_id = data.get_u32(); diff --git a/server/tests/cluster.rs b/server/tests/cluster.rs index 0503b8d5a..503b529cc 100644 --- a/server/tests/cluster.rs +++ b/server/tests/cluster.rs @@ -113,7 +113,7 @@ async fn cluster_name_mismatch() -> googletest::Result<()> { .await?; assert!(mismatch_node - .lines("Cluster name mismatch".parse()?) + .lines("trying to join wrong cluster".parse()?) .next() .await .is_some());