From 73d668197f9c6aa9193ea97c55b3bea11a0c4754 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 12 Dec 2024 14:08:58 +0100 Subject: [PATCH 1/6] Add cluster provision functionality After starting the metdata store service and the grpc server, the node will try to initialize itself by joining an existing cluster. Additionally each node exposes an provision cluster grpc call with which it is possible to provision a cluster (writing the initial NodesConfiguration, PartitionTable and Logs). Nodes can only join after the cluster is provisioned. This fixes #2409. --- Cargo.lock | 1 + .../src/cluster_controller/logs_controller.rs | 35 +-- .../admin/src/cluster_controller/service.rs | 32 +- .../src/cluster_controller/service/state.rs | 6 +- crates/bifrost/src/service.rs | 8 +- crates/core/build.rs | 1 + crates/core/protobuf/node_ctl_svc.proto | 25 ++ crates/core/src/metadata_store.rs | 141 ++++++++- .../core/src/metadata_store/providers/etcd.rs | 4 +- .../metadata_store/providers/objstore/glue.rs | 6 +- crates/core/src/metadata_store/test_util.rs | 6 +- crates/core/src/network/protobuf.rs | 28 ++ .../metadata-store/src/local/grpc/client.rs | 4 +- crates/metadata-store/src/local/store.rs | 4 +- crates/node/Cargo.toml | 1 + crates/node/src/init.rs | 283 ++++++++++++++++++ crates/node/src/lib.rs | 252 ++++++---------- .../src/network_server/grpc_svc_handler.rs | 204 ++++++++++++- crates/node/src/network_server/service.rs | 7 +- crates/types/src/config/common.rs | 8 + crates/types/src/logs/metadata.rs | 30 +- crates/types/src/node_id.rs | 3 + server/tests/cluster.rs | 2 +- 23 files changed, 823 insertions(+), 268 deletions(-) create mode 100644 crates/node/src/init.rs diff --git a/Cargo.lock b/Cargo.lock index 436138462..c8775bdb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6564,6 +6564,7 @@ dependencies = [ "async-trait", "axum", "bytes", + "bytestring", "codederror", "datafusion", "derive_builder", diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 0a74a7078..602c2f0a7 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -24,11 +24,8 @@ use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; -use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, -}; +use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError}; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; -use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; use restate_types::live::Pinned; @@ -639,9 +636,9 @@ struct LogsControllerInner { } impl LogsControllerInner { - fn new(configuration: LogsConfiguration, retry_policy: RetryPolicy) -> Self { + fn new(current_logs: Arc, retry_policy: RetryPolicy) -> Self { Self { - current_logs: Arc::new(Logs::with_logs_configuration(configuration)), + current_logs, logs_state: HashMap::with_hasher(Xxh3Builder::default()), logs_write_in_progress: None, retry_policy, @@ -925,26 +922,11 @@ pub struct LogsController { } impl LogsController { - pub async fn init( - configuration: &Configuration, + pub fn new( bifrost: Bifrost, metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, - ) -> Result { - // obtain the latest logs or init it with an empty logs variant - let logs = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) - }, - ) - .await?; - - let logs_configuration = logs.configuration().clone(); - metadata_writer.update(Arc::new(logs)).await?; - + ) -> Self { //todo(azmy): make configurable let retry_policy = RetryPolicy::exponential( Duration::from_millis(10), @@ -955,7 +937,10 @@ impl LogsController { let mut this = Self { effects: Some(Vec::new()), - inner: LogsControllerInner::new(logs_configuration, retry_policy), + inner: LogsControllerInner::new( + Metadata::with_current(|m| m.logs_snapshot()), + retry_policy, + ), bifrost, metadata_store_client, metadata_writer, @@ -964,7 +949,7 @@ impl LogsController { }; this.find_logs_tail(); - Ok(this) + this } pub fn find_logs_tail(&mut self) { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index e5694601f..9c68f9c91 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -37,7 +37,7 @@ use restate_types::partition_table::{ use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; -use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; +use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ @@ -296,8 +296,6 @@ impl Service { } pub async fn run(mut self) -> anyhow::Result<()> { - self.init_partition_table().await?; - let mut config_watcher = Configuration::watcher(); let mut cluster_state_watcher = self.cluster_state_refresher.cluster_state_watcher(); @@ -353,34 +351,6 @@ impl Service { } } - /// creates partition table iff it does not exist - async fn init_partition_table(&mut self) -> anyhow::Result<()> { - let configuration = self.configuration.live_load(); - - let partition_table = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - self.metadata_store_client - .get_or_insert(PARTITION_TABLE_KEY.clone(), || { - let partition_table = PartitionTable::with_equally_sized_partitions( - Version::MIN, - configuration.common.bootstrap_num_partitions.get(), - ); - - debug!("Initializing the partition table with '{partition_table:?}'"); - - partition_table - }) - }, - ) - .await?; - - self.metadata_writer - .update(Arc::new(partition_table)) - .await?; - - Ok(()) - } /// Triggers a snapshot creation for the given partition by issuing an RPC /// to the node hosting the active leader. async fn create_partition_snapshot( diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..4392744ca 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -161,13 +161,11 @@ where ) .await?; - let logs_controller = LogsController::init( - &configuration, + let logs_controller = LogsController::new( service.bifrost.clone(), service.metadata_store_client.clone(), service.metadata_writer.clone(), - ) - .await?; + ); let (log_trim_interval, log_trim_threshold) = create_log_trim_interval(&configuration.admin); diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index 3222cc5c2..54dc75168 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -78,10 +78,10 @@ impl BifrostService { pub fn handle(&self) -> Bifrost { self.bifrost.clone() } - /// Runs initialization phase, then returns a handle to join on shutdown. - /// In this phase the system should wait until this is completed before - /// continuing. For instance, a worker mark itself as `STARTING_UP` and not - /// accept any requests until this is completed. + + /// Runs initialization phase. In this phase the system should wait until this is completed + /// before continuing. For instance, a worker mark itself as `STARTING_UP` and not accept any + /// requests until this is completed. /// /// This requires to run within a task_center context. pub async fn start(self) -> anyhow::Result<()> { diff --git a/crates/core/build.rs b/crates/core/build.rs index 135e37a1d..9904963e6 100644 --- a/crates/core/build.rs +++ b/crates/core/build.rs @@ -21,6 +21,7 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .extern_path(".restate.node", "::restate_types::protobuf::node") .extern_path(".restate.common", "::restate_types::protobuf::common") + .extern_path(".restate.cluster", "::restate_types::protobuf::cluster") .compile_protos( &["./protobuf/node_ctl_svc.proto"], &["protobuf", "../types/protobuf"], diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index 74350bc4a..673d9e4dc 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -10,6 +10,7 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; +import "restate/cluster.proto"; import "restate/common.proto"; import "restate/node.proto"; @@ -20,6 +21,30 @@ service NodeCtlSvc { rpc GetIdent(google.protobuf.Empty) returns (IdentResponse); rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse); + + // Provision the Restate cluster on this node. + rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse); +} + +message ProvisionClusterRequest { + bool dry_run = 1; + optional uint32 num_partitions = 2; + optional restate.cluster.ReplicationStrategy placement_strategy = 3; + optional restate.cluster.DefaultProvider log_provider = 4; +} + +enum ProvisionClusterResponseKind { + ProvisionClusterResponseType_UNKNOWN = 0; + ERROR = 1; + SUCCESS = 2; + DRY_RUN = 3; +} + +message ProvisionClusterResponse { + ProvisionClusterResponseKind kind = 1; + // If there is an error, this field will be set. All other fields will be empty. + optional string error = 2; + optional restate.cluster.ClusterConfiguration cluster_configuration = 3; } message IdentResponse { diff --git a/crates/core/src/metadata_store.rs b/crates/core/src/metadata_store.rs index 3542226f5..c391cd080 100644 --- a/crates/core/src/metadata_store.rs +++ b/crates/core/src/metadata_store.rs @@ -17,8 +17,10 @@ use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use bytestring::ByteString; use restate_types::errors::GenericError; +use restate_types::metadata_store::keys::NODES_CONFIG_KEY; +use restate_types::nodes_config::NodesConfiguration; use restate_types::retries::RetryPolicy; -use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; +use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode, StorageEncodeError}; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; use std::future::Future; use std::sync::Arc; @@ -51,6 +53,29 @@ pub enum WriteError { Store(GenericError), } +#[derive(Debug, thiserror::Error)] +pub enum ProvisionError { + #[error("network error: {0}")] + Network(GenericError), + #[error("internal error: {0}")] + Internal(String), + #[error("codec error: {0}")] + Codec(GenericError), + #[error("store error: {0}")] + Store(GenericError), +} + +impl MetadataStoreClientError for ProvisionError { + fn is_network_error(&self) -> bool { + match self { + ProvisionError::Network(_) => true, + ProvisionError::Internal(_) | ProvisionError::Codec(_) | ProvisionError::Store(_) => { + false + } + } + } +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct VersionedValue { pub version: Version, @@ -100,6 +125,89 @@ pub trait MetadataStore { /// Deletes the key-value pair for the given key following the provided precondition. If the /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError>; + + /// Tries to provision the metadata store with the provided [`NodesConfiguration`]. Returns + /// `true` if the metadata store was newly provisioned. Returns `false` if the metadata store + /// is already provisioned. + async fn provision( + &self, + nodes_configuration: &NodesConfiguration, + ) -> Result; +} + +/// A provisioned metadata store does not need to be explicitly provisioned. Therefore, a provision +/// call is translated into a put command. +#[async_trait] +pub trait ProvisionedMetadataStore { + /// Gets the value and its current version for the given key. If key-value pair is not present, + /// then return [`None`]. + async fn get(&self, key: ByteString) -> Result, ReadError>; + + /// Gets the current version for the given key. If key-value pair is not present, then return + /// [`None`]. + async fn get_version(&self, key: ByteString) -> Result, ReadError>; + + /// Puts the versioned value under the given key following the provided precondition. If the + /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. + async fn put( + &self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), WriteError>; + + /// Deletes the key-value pair for the given key following the provided precondition. If the + /// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. + async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError>; +} + +#[async_trait] +impl MetadataStore for T { + async fn get(&self, key: ByteString) -> Result, ReadError> { + self.get(key).await + } + + async fn get_version(&self, key: ByteString) -> Result, ReadError> { + self.get_version(key).await + } + + async fn put( + &self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), WriteError> { + self.put(key, value, precondition).await + } + + async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> { + self.delete(key, precondition).await + } + + async fn provision( + &self, + nodes_configuration: &NodesConfiguration, + ) -> Result { + let versioned_value = serialize_value(nodes_configuration) + .map_err(|err| ProvisionError::Codec(err.into()))?; + match self + .put( + NODES_CONFIG_KEY.clone(), + versioned_value, + Precondition::DoesNotExist, + ) + .await + { + Ok(()) => Ok(true), + Err(err) => match err { + WriteError::FailedPrecondition(_) => Ok(false), + WriteError::Network(err) => Err(ProvisionError::Network(err)), + WriteError::Internal(err) => Err(ProvisionError::Internal(err)), + WriteError::Codec(err) => Err(ProvisionError::Codec(err)), + WriteError::Store(err) => Err(ProvisionError::Store(err)), + }, + } + } } /// Metadata store client which allows storing [`Versioned`] values into a [`MetadataStore`]. @@ -170,18 +278,10 @@ impl MetadataStoreClient { where T: Versioned + StorageEncode, { - let version = value.version(); + let versioned_value = + serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?; - let mut buf = BytesMut::default(); - StorageCodec::encode(value, &mut buf).map_err(|err| WriteError::Codec(err.into()))?; - - self.inner - .put( - key, - VersionedValue::new(version, buf.freeze()), - precondition, - ) - .await + self.inner.put(key, versioned_value, precondition).await } /// Deletes the key-value pair for the given key following the provided precondition. If the @@ -285,6 +385,23 @@ impl MetadataStoreClient { } } } + + pub async fn provision( + &self, + nodes_configuration: &NodesConfiguration, + ) -> Result { + self.inner.provision(nodes_configuration).await + } +} + +pub fn serialize_value( + value: &T, +) -> Result { + let version = value.version(); + let mut buf = BytesMut::default(); + StorageCodec::encode(value, &mut buf)?; + let versioned_value = VersionedValue::new(version, buf.freeze()); + Ok(versioned_value) } #[derive(Debug, thiserror::Error)] diff --git a/crates/core/src/metadata_store/providers/etcd.rs b/crates/core/src/metadata_store/providers/etcd.rs index 164a772ee..49b85f11b 100644 --- a/crates/core/src/metadata_store/providers/etcd.rs +++ b/crates/core/src/metadata_store/providers/etcd.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use crate::metadata_store::{ - MetadataStore, Precondition, ReadError, Version, VersionedValue, WriteError, + Precondition, ProvisionedMetadataStore, ReadError, Version, VersionedValue, WriteError, }; use crate::network::net_util::CommonClientConnectionOptions; use anyhow::Context; @@ -145,7 +145,7 @@ impl EtcdMetadataStore { } #[async_trait::async_trait] -impl MetadataStore for EtcdMetadataStore { +impl ProvisionedMetadataStore for EtcdMetadataStore { async fn get(&self, key: ByteString) -> Result, ReadError> { let mut client = self.client.kv_client(); let mut response = client.get(key.into_bytes(), None).await?; diff --git a/crates/core/src/metadata_store/providers/objstore/glue.rs b/crates/core/src/metadata_store/providers/objstore/glue.rs index 940840b17..dc76e9404 100644 --- a/crates/core/src/metadata_store/providers/objstore/glue.rs +++ b/crates/core/src/metadata_store/providers/objstore/glue.rs @@ -17,7 +17,9 @@ use tracing::warn; use crate::cancellation_watcher; use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder; -use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError}; +use crate::metadata_store::{ + Precondition, ProvisionedMetadataStore, ReadError, VersionedValue, WriteError, +}; #[derive(Debug)] pub(crate) enum Commands { @@ -111,7 +113,7 @@ impl Client { } #[async_trait::async_trait] -impl MetadataStore for Client { +impl ProvisionedMetadataStore for Client { async fn get(&self, key: ByteString) -> Result, ReadError> { let (tx, rx) = tokio::sync::oneshot::channel(); diff --git a/crates/core/src/metadata_store/test_util.rs b/crates/core/src/metadata_store/test_util.rs index 94f2b5d69..ef01c2872 100644 --- a/crates/core/src/metadata_store/test_util.rs +++ b/crates/core/src/metadata_store/test_util.rs @@ -8,7 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError}; +use crate::metadata_store::{ + Precondition, ProvisionedMetadataStore, ReadError, VersionedValue, WriteError, +}; use bytestring::ByteString; use restate_types::Version; use std::collections::HashMap; @@ -47,7 +49,7 @@ impl InMemoryMetadataStore { } #[async_trait::async_trait] -impl MetadataStore for InMemoryMetadataStore { +impl ProvisionedMetadataStore for InMemoryMetadataStore { async fn get(&self, key: ByteString) -> Result, ReadError> { Ok(self.kv_pairs.lock().unwrap().get(&key).cloned()) } diff --git a/crates/core/src/network/protobuf.rs b/crates/core/src/network/protobuf.rs index bf64a3e30..011c7cbd4 100644 --- a/crates/core/src/network/protobuf.rs +++ b/crates/core/src/network/protobuf.rs @@ -9,10 +9,38 @@ // by the Apache License, Version 2.0. pub mod node_ctl_svc { + use restate_types::protobuf::cluster::ClusterConfiguration; + tonic::include_proto!("restate.node_ctl_svc"); pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("node_ctl_svc_descriptor"); + + impl ProvisionClusterResponse { + pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::DryRun.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + + pub fn err(message: impl ToString) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Error.into(), + error: Some(message.to_string()), + ..Default::default() + } + } + + pub fn success(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Success.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + } } pub mod core_node_svc { diff --git a/crates/metadata-store/src/local/grpc/client.rs b/crates/metadata-store/src/local/grpc/client.rs index 4d5362281..7e52e9994 100644 --- a/crates/metadata-store/src/local/grpc/client.rs +++ b/crates/metadata-store/src/local/grpc/client.rs @@ -14,7 +14,7 @@ use tonic::transport::Channel; use tonic::{Code, Status}; use restate_core::metadata_store::{ - MetadataStore, Precondition, ReadError, VersionedValue, WriteError, + Precondition, ProvisionedMetadataStore, ReadError, VersionedValue, WriteError, }; use restate_core::network::net_util::create_tonic_channel_from_advertised_address; use restate_core::network::net_util::CommonClientConnectionOptions; @@ -45,7 +45,7 @@ impl LocalMetadataStoreClient { } #[async_trait] -impl MetadataStore for LocalMetadataStoreClient { +impl ProvisionedMetadataStore for LocalMetadataStoreClient { async fn get(&self, key: ByteString) -> Result, ReadError> { let response = self .svc_client diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 4e40403ef..7b8877a75 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -30,7 +30,7 @@ use tracing::{debug, trace}; pub type RequestSender = mpsc::Sender; pub type RequestReceiver = mpsc::Receiver; -type Result = std::result::Result; +type Result = std::result::Result; const DB_NAME: &str = "local-metadata-store"; const KV_PAIRS: &str = "kv_pairs"; @@ -163,7 +163,7 @@ impl LocalMetadataStore { request = self.request_rx.recv() => { let request = request.expect("receiver should not be closed since we own one clone."); self.handle_request(request).await; - } + }, _ = cancellation_watcher() => { break; }, diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 5583799ab..9881f52e6 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -38,6 +38,7 @@ arc-swap = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } bytes = { workspace = true } +bytestring = { workspace = true } codederror = { workspace = true } datafusion = { workspace = true } derive_builder = { workspace = true } diff --git a/crates/node/src/init.rs b/crates/node/src/init.rs new file mode 100644 index 000000000..789d3c171 --- /dev/null +++ b/crates/node/src/init.rs @@ -0,0 +1,283 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use anyhow::bail; +use restate_core::metadata_store::{MetadataStoreClient, MetadataStoreClientError, ReadWriteError}; +use restate_core::{ + cancellation_watcher, Metadata, MetadataWriter, ShutdownError, SyncError, TargetVersion, +}; +use restate_types::config::{CommonOptions, Configuration}; +use restate_types::metadata_store::keys::NODES_CONFIG_KEY; +use restate_types::net::metadata::MetadataKind; +use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration}; +use restate_types::retries::RetryPolicy; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tracing::{debug, info, trace, warn}; + +#[derive(Debug, thiserror::Error)] +enum JoinError { + #[error("missing nodes configuration")] + MissingNodesConfiguration, + #[error("detected a concurrent registration for node '{0}'")] + ConcurrentNodeRegistration(String), + #[error("failed writing to metadata store: {0}")] + MetadataStore(#[from] ReadWriteError), + #[error("trying to join wrong cluster; expected '{expected_cluster_name}', actual '{actual_cluster_name}'")] + ClusterMismatch { + expected_cluster_name: String, + actual_cluster_name: String, + }, +} + +pub struct NodeInit<'a> { + metadata_store_client: &'a MetadataStoreClient, + metadata_writer: &'a MetadataWriter, +} + +impl<'a> NodeInit<'a> { + pub fn new( + metadata_store_client: &'a MetadataStoreClient, + metadata_writer: &'a MetadataWriter, + ) -> Self { + Self { + metadata_store_client, + metadata_writer, + } + } + + pub async fn init(self) -> anyhow::Result<()> { + let config = Configuration::pinned().into_arc(); + + let join_cluster = Self::join_cluster(self.metadata_store_client, &config.common); + + let nodes_configuration = tokio::select! { + _ = cancellation_watcher() => { + return Err(ShutdownError.into()); + }, + result = join_cluster => { + let nodes_configuration = result?; + info!("Successfully joined cluster"); + nodes_configuration + }, + }; + + // Find my node in nodes configuration. + let my_node_config = nodes_configuration + .find_node_by_name(config.common.node_name()) + .expect("node config should have been upserted"); + + let my_node_id = my_node_config.current_generation; + + // Safety checks, same node (if set)? + if config + .common + .force_node_id + .is_some_and(|n| n != my_node_id.as_plain()) + { + bail!( + format!( + "Node ID mismatch: configured node ID is {}, but the nodes configuration contains {}", + config.common.force_node_id.unwrap(), + my_node_id.as_plain() + )); + } + + // Same cluster? + if config.common.cluster_name() != nodes_configuration.cluster_name() { + bail!( + format!( + "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", + config.common.cluster_name(), + nodes_configuration.cluster_name() + )); + } + + info!( + roles = %my_node_config.roles, + address = %my_node_config.address, + "My Node ID is {}", my_node_config.current_generation); + + self.metadata_writer + .update(Arc::new(nodes_configuration)) + .await?; + + // My Node ID is set + self.metadata_writer.set_my_node_id(my_node_id); + restate_tracing_instrumentation::set_global_node_id(my_node_id); + + self.sync_metadata().await; + + Ok(()) + } + + async fn sync_metadata(&self) { + // fetch the latest metadata + let metadata = Metadata::current(); + + let config = Configuration::pinned(); + + let retry_policy = config.common.network_error_retry_policy.clone(); + + if let Err(err) = retry_policy + .retry_if( + || async { + metadata + .sync(MetadataKind::Schema, TargetVersion::Latest) + .await?; + metadata + .sync(MetadataKind::PartitionTable, TargetVersion::Latest) + .await?; + metadata + .sync(MetadataKind::Logs, TargetVersion::Latest) + .await + }, + |err| match err { + SyncError::MetadataStore(err) => err.is_network_error(), + SyncError::Shutdown(_) => false, + }, + ) + .await + { + warn!("Failed to fetch the latest metadata when initializing the node: {err}"); + } + } + + async fn join_cluster( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + ) -> anyhow::Result { + info!("Trying to join cluster '{}'", common_opts.cluster_name()); + + // todo make configurable + // Never give up trying to join the cluster. Users of this struct will set a timeout if + // needed. + let join_retry = RetryPolicy::exponential( + Duration::from_millis(100), + 2.0, + None, + Some(Duration::from_secs(5)), + ); + + let join_start = Instant::now(); + let mut printed_provision_message = false; + + join_retry + .retry_if( + || Self::join_cluster_inner(metadata_store_client, common_opts), + |err| { + if join_start.elapsed() < Duration::from_secs(5) { + trace!("Failed joining the cluster: {err}; retrying"); + } else if !printed_provision_message { + info!("Couldn't join the cluster yet. Don't forget to provision it!"); + printed_provision_message = true; + } else { + debug!("Failed joining cluster: {err}; retrying"); + } + match err { + JoinError::MissingNodesConfiguration => true, + JoinError::ConcurrentNodeRegistration(_) => false, + JoinError::MetadataStore(err) => err.is_network_error(), + JoinError::ClusterMismatch { .. } => false, + } + }, + ) + .await + .map_err(Into::into) + } + + async fn join_cluster_inner( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + ) -> Result { + let mut previous_node_generation = None; + + metadata_store_client + .read_modify_write::( + NODES_CONFIG_KEY.clone(), + move |nodes_config| { + let mut nodes_config = + nodes_config.ok_or(JoinError::MissingNodesConfiguration)?; + + // check that we are joining the right cluster + if nodes_config.cluster_name() != common_opts.cluster_name() { + return Err(JoinError::ClusterMismatch { + expected_cluster_name: common_opts.cluster_name().to_owned(), + actual_cluster_name: nodes_config.cluster_name().to_owned(), + }); + } + + // check whether we have registered before + let node_config = nodes_config + .find_node_by_name(common_opts.node_name()) + .cloned(); + + let my_node_config = if let Some(mut node_config) = node_config { + assert_eq!( + common_opts.node_name(), + node_config.name, + "node name must match" + ); + + if let Some(previous_node_generation) = previous_node_generation { + if node_config + .current_generation + .is_newer_than(previous_node_generation) + { + // detected a concurrent registration of the same node + return Err(JoinError::ConcurrentNodeRegistration( + common_opts.node_name().to_owned(), + )); + } + } else { + // remember the previous node generation to detect concurrent modifications + previous_node_generation = Some(node_config.current_generation); + } + + // update node_config + node_config.roles = common_opts.roles; + node_config.address = common_opts.advertised_address.clone(); + node_config.current_generation.bump_generation(); + + node_config + } else { + let plain_node_id = common_opts.force_node_id.unwrap_or_else(|| { + nodes_config + .max_plain_node_id() + .map(|n| n.next()) + .unwrap_or_default() + }); + + assert!( + nodes_config.find_node_by_id(plain_node_id).is_err(), + "duplicate plain node id '{plain_node_id}'" + ); + + let my_node_id = plain_node_id.with_generation(1); + + NodeConfig::new( + common_opts.node_name().to_owned(), + my_node_id, + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ) + }; + + nodes_config.upsert_node(my_node_config); + nodes_config.increment_version(); + + Ok(nodes_config) + }, + ) + .await + .map_err(|err| err.transpose()) + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..f21016af6 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -9,46 +9,47 @@ // by the Apache License, Version 2.0. mod cluster_marker; +mod init; mod network_server; mod roles; -use std::sync::Arc; - -use tracing::{debug, error, info, trace}; +use anyhow::Context; +use tonic::{Code, IntoRequest}; +use tracing::{debug, error, info, trace, warn}; +use crate::cluster_marker::ClusterValidationError; +use crate::init::NodeInit; +use crate::network_server::NetworkServer; +use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole}; use codederror::CodedError; use restate_bifrost::BifrostService; -use restate_core::metadata_store::{retry_on_network_error, ReadWriteError}; +use restate_core::metadata_store::ReadWriteError; +use restate_core::network::net_util::create_tonic_channel_from_advertised_address; +use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::network::protobuf::node_ctl_svc::{ + ProvisionClusterRequest, ProvisionClusterResponseKind, +}; use restate_core::network::{ GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking, }; use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher}; -use restate_core::{cancellation_watcher, TaskKind}; -use restate_core::{ - spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager, TargetVersion, - TaskCenter, -}; +use restate_core::{cancellation_watcher, Metadata, TaskKind}; +use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter}; #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; -use restate_types::config::{CommonOptions, Configuration}; +use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::health::Health; use restate_types::live::Live; #[cfg(feature = "replicated-loglet")] use restate_types::logs::RecordCache; -use restate_types::metadata_store::keys::NODES_CONFIG_KEY; -use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; +use restate_types::nodes_config::Role; use restate_types::protobuf::common::{ AdminStatus, IngressStatus, LogServerStatus, MetadataServerStatus, NodeRpcStatus, NodeStatus, WorkerStatus, }; -use restate_types::Version; - -use crate::cluster_marker::ClusterValidationError; -use crate::network_server::NetworkServer; -use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole}; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -331,86 +332,101 @@ impl Node { let health = self.health.clone(); let common_options = config.common.clone(); let connection_manager = self.networking.connection_manager().clone(); + let metadata_store_client = self.metadata_store_client.clone(); async move { NetworkServer::run( health, connection_manager, self.server_builder, common_options, + metadata_store_client, ) .await?; Ok(()) } })?; + // wait until the node rpc server is up and running before continuing + self.health + .node_rpc_status() + .wait_for_value(NodeRpcStatus::Ready) + .await; + if let Some(metadata_store) = self.metadata_store_role { - TaskCenter::spawn( - TaskKind::MetadataStore, - "local-metadata-store", - async move { - metadata_store.run().await?; - Ok(()) - }, - )?; + TaskCenter::spawn(TaskKind::MetadataStore, "metadata-store", async move { + metadata_store.run().await?; + Ok(()) + })?; } - // Start partition routing information refresher - spawn_partition_routing_refresher(self.partition_routing_refresher)?; - - let nodes_config = - Self::upsert_node_config(&self.metadata_store_client, &config.common).await?; - metadata_writer.update(Arc::new(nodes_config)).await?; - if config.common.allow_bootstrap { - // todo write bootstrap state + TaskCenter::spawn(TaskKind::SystemBoot, "auto-provision-cluster", { + let channel = create_tonic_channel_from_advertised_address( + config.common.advertised_address.clone(), + &config.networking, + ); + let client = NodeCtlSvcClient::new(channel); + let retry_policy = config.common.network_error_retry_policy.clone(); + async move { + let response = retry_policy + .retry_if( + || { + let mut client = client.clone(); + async move { + client + .provision_cluster( + ProvisionClusterRequest { + dry_run: false, + ..Default::default() + } + .into_request(), + ) + .await + } + }, + |status| status.code() == Code::Unavailable, + ) + .await; + + match response { + Ok(response) => { + if response.into_inner().kind() == ProvisionClusterResponseKind::DryRun + { + debug!("The cluster is already provisioned."); + } + } + Err(err) => { + warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); + } + } + + Ok(()) + } + })?; } - // fetch the latest schema information - metadata - .sync(MetadataKind::Schema, TargetVersion::Latest) - .await?; + let initialization_timeout = config.common.initialization_timeout.into(); - let nodes_config = metadata.nodes_config_ref(); + tokio::time::timeout( + initialization_timeout, + NodeInit::new( + &self.metadata_store_client, + &metadata_writer, + ) + .init(), + ) + .await + .context("Giving up trying to initialize the node. Make sure that it can reach the metadata store and don't forget to provision the cluster on a fresh start.")? + .context("Failed initializing the node.")?; - // Find my node in nodes configuration. + let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + let my_node_id = Metadata::with_current(|m| m.my_node_id()); let my_node_config = nodes_config - .find_node_by_name(config.common.node_name()) - .expect("node config should have been upserted"); - - let my_node_id = my_node_config.current_generation; - - // Safety checks, same node (if set)? - if config - .common - .force_node_id - .is_some_and(|n| n != my_node_id.as_plain()) - { - return Err(Error::SafetyCheck( - format!( - "Node ID mismatch: configured node ID is {}, but the nodes configuration contains {}", - config.common.force_node_id.unwrap(), - my_node_id.as_plain() - )))?; - } - - // Same cluster? - if config.common.cluster_name() != nodes_config.cluster_name() { - return Err(Error::SafetyCheck( - format!( - "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", - config.common.cluster_name(), - nodes_config.cluster_name() - )))?; - } - - // My Node ID is set - metadata_writer.set_my_node_id(my_node_id); - restate_tracing_instrumentation::set_global_node_id(my_node_id); + .find_node_by_id(my_node_id) + .expect("should be present"); - info!( - roles = %my_node_config.roles, - address = %my_node_config.address, - "My Node ID is {}", my_node_config.current_generation); + // Start partition routing information refresher + spawn_partition_routing_refresher(self.partition_routing_refresher)?; // todo this is a temporary solution to announce the updated NodesConfiguration to the // configured admin nodes. It should be removed once we have a gossip-based node status @@ -522,92 +538,6 @@ impl Node { Ok(()) } - async fn upsert_node_config( - metadata_store_client: &MetadataStoreClient, - common_opts: &CommonOptions, - ) -> Result { - retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - let mut previous_node_generation = None; - metadata_store_client.read_modify_write(NODES_CONFIG_KEY.clone(), move |nodes_config| { - let mut nodes_config = if common_opts.allow_bootstrap { - debug!("allow-bootstrap is set to `true`, allowed to create initial NodesConfiguration!"); - nodes_config.unwrap_or_else(|| { - NodesConfiguration::new( - Version::INVALID, - common_opts.cluster_name().to_owned(), - ) - }) - } else { - nodes_config.ok_or(Error::MissingNodesConfiguration)? - }; - - // check whether we have registered before - let node_config = nodes_config - .find_node_by_name(common_opts.node_name()) - .cloned(); - - let my_node_config = if let Some(mut node_config) = node_config { - assert_eq!( - common_opts.node_name(), - node_config.name, - "node name must match" - ); - - if let Some(previous_node_generation) = previous_node_generation { - if node_config - .current_generation - .is_newer_than(previous_node_generation) - { - // detected a concurrent registration of the same node - return Err(Error::ConcurrentNodeRegistration( - common_opts.node_name().to_owned(), - )); - } - } else { - // remember the previous node generation to detect concurrent modifications - previous_node_generation = Some(node_config.current_generation); - } - - // update node_config - node_config.roles = common_opts.roles; - node_config.address = common_opts.advertised_address.clone(); - node_config.current_generation.bump_generation(); - - node_config - } else { - let plain_node_id = common_opts.force_node_id.unwrap_or_else(|| { - nodes_config - .max_plain_node_id() - .map(|n| n.next()) - .unwrap_or_default() - }); - - assert!( - nodes_config.find_node_by_id(plain_node_id).is_err(), - "duplicate plain node id '{plain_node_id}'" - ); - - let my_node_id = plain_node_id.with_generation(1); - - NodeConfig::new( - common_opts.node_name().to_owned(), - my_node_id, - common_opts.advertised_address.clone(), - common_opts.roles, - LogServerConfig::default(), - ) - }; - - nodes_config.upsert_node(my_node_config); - nodes_config.increment_version(); - - Ok(nodes_config) - }) - }) - .await - .map_err(|err| err.transpose()) - } - pub fn bifrost(&self) -> restate_bifrost::Bifrost { self.bifrost.handle() } diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 0ec5b7bc4..dcc0a8040 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -8,31 +8,45 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use anyhow::Context; use bytes::BytesMut; +use bytestring::ByteString; use enumset::EnumSet; use futures::stream::BoxStream; -use tokio_stream::StreamExt; -use tonic::{Request, Response, Status, Streaming}; - +use restate_core::metadata_store::{ + retry_on_network_error, MetadataStoreClient, Precondition, WriteError, +}; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; use restate_core::network::protobuf::node_ctl_svc::{ - GetMetadataRequest, GetMetadataResponse, IdentResponse, + GetMetadataRequest, GetMetadataResponse, IdentResponse, ProvisionClusterRequest, + ProvisionClusterResponse, ProvisionClusterResponseKind, }; use restate_core::network::ConnectionManager; use restate_core::network::{ProtocolError, TransportConnect}; use restate_core::task_center::TaskCenterMonitoring; use restate_core::{task_center, Metadata, MetadataKind, TargetVersion}; +use restate_types::config::{CommonOptions, Configuration}; use restate_types::health::Health; -use restate_types::nodes_config::Role; +use restate_types::logs::metadata::{DefaultProvider, Logs, LogsConfiguration}; +use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; +use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; +use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy}; +use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfiguration; use restate_types::protobuf::node::Message; -use restate_types::storage::StorageCodec; +use restate_types::storage::{StorageCodec, StorageEncode}; +use restate_types::{GenerationalNodeId, Version, Versioned}; +use std::num::NonZeroU16; +use tokio_stream::StreamExt; +use tonic::{Request, Response, Status, Streaming}; +use tracing::warn; pub struct NodeCtlSvcHandler { task_center: task_center::Handle, cluster_name: String, roles: EnumSet, health: Health, + metadata_store_client: MetadataStoreClient, } impl NodeCtlSvcHandler { @@ -41,14 +55,153 @@ impl NodeCtlSvcHandler { cluster_name: String, roles: EnumSet, health: Health, + metadata_store_client: MetadataStoreClient, ) -> Self { Self { task_center, cluster_name, roles, health, + metadata_store_client, + } + } + + async fn provision_metadata( + &self, + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, + ) -> anyhow::Result { + let (initial_nodes_configuration, initial_partition_table, initial_logs) = + Self::generate_initial_metadata(common_opts, cluster_configuration); + + let result = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + self.metadata_store_client + .provision(&initial_nodes_configuration) + }) + .await?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + self.write_initial_value_dont_fail_if_it_exists( + PARTITION_TABLE_KEY.clone(), + &initial_partition_table, + ) + }) + .await + .context("failed provisioning the initial partition table")?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + self.write_initial_value_dont_fail_if_it_exists( + BIFROST_CONFIG_KEY.clone(), + &initial_logs, + ) + }) + .await + .context("failed provisioning the initial logs configuration")?; + + Ok(result) + } + + pub fn create_initial_nodes_configuration(common_opts: &CommonOptions) -> NodesConfiguration { + let mut initial_nodes_configuration = + NodesConfiguration::new(Version::MIN, common_opts.cluster_name().to_owned()); + let node_config = NodeConfig::new( + common_opts.node_name().to_owned(), + common_opts + .force_node_id + .map(|force_node_id| force_node_id.with_generation(1)) + .unwrap_or(GenerationalNodeId::INITIAL_NODE_ID), + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ); + initial_nodes_configuration.upsert_node(node_config); + initial_nodes_configuration + } + + fn generate_initial_metadata( + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, + ) -> (NodesConfiguration, PartitionTable, Logs) { + let mut initial_partition_table_builder = PartitionTableBuilder::default(); + initial_partition_table_builder + .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) + .expect("Empty partition table should not have conflicts"); + initial_partition_table_builder + .set_replication_strategy(cluster_configuration.placement_strategy); + let initial_partition_table = initial_partition_table_builder.build(); + + let mut logs_builder = Logs::default().into_builder(); + logs_builder.set_configuration(LogsConfiguration::from( + cluster_configuration.log_provider.clone(), + )); + let initial_logs = logs_builder.build(); + + let initial_nodes_configuration = Self::create_initial_nodes_configuration(common_opts); + + ( + initial_nodes_configuration, + initial_partition_table, + initial_logs, + ) + } + + async fn write_initial_value_dont_fail_if_it_exists( + &self, + key: ByteString, + initial_value: &T, + ) -> Result<(), WriteError> { + match self + .metadata_store_client + .put(key, initial_value, Precondition::DoesNotExist) + .await + { + Ok(_) => Ok(()), + Err(WriteError::FailedPrecondition(_)) => { + // we might have failed on a previous attempt after writing this value; so let's continue + Ok(()) + } + Err(err) => Err(err), } } + + fn resolve_cluster_configuration( + config: &Configuration, + request: ProvisionClusterRequest, + ) -> anyhow::Result { + let num_partitions = request + .num_partitions + .map(|num_partitions| { + u16::try_from(num_partitions) + .context("Restate only supports running up to 65535 partitions.") + .and_then(|num_partitions| { + NonZeroU16::try_from(num_partitions) + .context("The number of partitions needs to be > 0") + }) + }) + .transpose()? + .unwrap_or(config.common.bootstrap_num_partitions); + let placement_strategy = request + .placement_strategy + .map(ReplicationStrategy::try_from) + .transpose()? + .unwrap_or_default(); + let log_provider = request + .log_provider + .map(DefaultProvider::try_from) + .unwrap_or_else(|| Ok(DefaultProvider::from_configuration(config)))?; + + Ok(ClusterConfiguration { + num_partitions, + placement_strategy, + log_provider, + }) + } + + fn convert_cluster_configuration( + _cluster_configuration: ClusterConfiguration, + ) -> ProtoClusterConfiguration { + todo!() + } } #[async_trait::async_trait] @@ -114,6 +267,45 @@ impl NodeCtlSvc for NodeCtlSvcHandler { encoded: encoded.freeze(), })) } + + async fn provision_cluster( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let config = Configuration::pinned(); + + let dry_run = request.dry_run; + let cluster_configuration = Self::resolve_cluster_configuration(&config, request) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + + if dry_run { + return Ok(Response::new(ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::DryRun.into(), + cluster_configuration: Some(Self::convert_cluster_configuration( + cluster_configuration, + )), + ..Default::default() + })); + } + + self.provision_metadata(&config.common, &cluster_configuration) + .await + .map_err(|err| Status::internal(err.to_string()))?; + + Ok(Response::new(ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::Success.into(), + cluster_configuration: Some(Self::convert_cluster_configuration(cluster_configuration)), + ..Default::default() + })) + } +} + +#[derive(Clone, Debug)] +struct ClusterConfiguration { + num_partitions: NonZeroU16, + placement_strategy: ReplicationStrategy, + log_provider: DefaultProvider, } pub struct CoreNodeSvcHandler { diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 64ae8b330..5e208a572 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -15,8 +15,7 @@ use tokio::time::MissedTickBehavior; use tonic::codec::CompressionEncoding; use tracing::{debug, trace}; -use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics}; -use crate::network_server::state::NodeCtrlHandlerStateBuilder; +use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; @@ -28,6 +27,8 @@ use restate_types::protobuf::common::NodeStatus; use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler}; use super::pprof; +use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics}; +use crate::network_server::state::NodeCtrlHandlerStateBuilder; pub struct NetworkServer {} @@ -37,6 +38,7 @@ impl NetworkServer { connection_manager: ConnectionManager, mut server_builder: NetworkServerBuilder, options: CommonOptions, + metadata_store_client: MetadataStoreClient, ) -> Result<(), anyhow::Error> { // Configure Metric Exporter let mut state_builder = NodeCtrlHandlerStateBuilder::default(); @@ -103,6 +105,7 @@ impl NetworkServer { options.cluster_name().to_owned(), options.roles, health, + metadata_store_client, )) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 178c05d1c..5b65b8913 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -235,6 +235,13 @@ pub struct CommonOptions { /// /// The retry policy for node network error pub network_error_retry_policy: RetryPolicy, + + /// # Initialization timeout + /// + /// The timeout until the node gives up joining a cluster and initializing itself. + #[serde(with = "serde_with::As::")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub initialization_timeout: humantime::Duration, } impl CommonOptions { @@ -374,6 +381,7 @@ impl Default for CommonOptions { Some(15), Some(Duration::from_secs(5)), ), + initialization_timeout: Duration::from_secs(5 * 60).into(), } } } diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 1668910b5..6a0289b51 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -211,6 +211,20 @@ impl DefaultProvider { Self::Replicated(_) => ProviderKind::Replicated, } } + + pub fn from_configuration(configuration: &Configuration) -> Self { + match configuration.bifrost.default_provider { + #[cfg(any(test, feature = "memory-loglet"))] + ProviderKind::InMemory => DefaultProvider::InMemory, + ProviderKind::Local => DefaultProvider::Local, + ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(1).expect("1 is not zero"), + ), + }), + } + } } impl From for crate::protobuf::cluster::DefaultProvider { @@ -270,7 +284,9 @@ pub struct ReplicatedLogletConfig { pub replication_property: ReplicationProperty, } -#[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, Clone, Eq, PartialEq, Default, derive_more::From, serde::Serialize, serde::Deserialize, +)] pub struct LogsConfiguration { pub default_provider: DefaultProvider, } @@ -493,17 +509,7 @@ impl LogletConfig { impl Logs { pub fn from_configuration(config: &Configuration) -> Self { Self::with_logs_configuration(LogsConfiguration { - default_provider: match config.bifrost.default_provider { - #[cfg(any(test, feature = "memory-loglet"))] - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, - ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(1).expect("1 is not zero"), - ), - }), - }, + default_provider: DefaultProvider::from_configuration(config), }) } diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index 1c9a394be..88de877e3 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -85,6 +85,9 @@ impl FromStr for GenerationalNodeId { } impl GenerationalNodeId { + // Start with 1 as id to leave 0 as a special value in the future + pub const INITIAL_NODE_ID: GenerationalNodeId = GenerationalNodeId::new(1, 1); + pub fn decode(mut data: B) -> Self { // generational node id is stored as two u32s next to each other, each in big-endian. let plain_id = data.get_u32(); diff --git a/server/tests/cluster.rs b/server/tests/cluster.rs index 0503b8d5a..503b529cc 100644 --- a/server/tests/cluster.rs +++ b/server/tests/cluster.rs @@ -113,7 +113,7 @@ async fn cluster_name_mismatch() -> googletest::Result<()> { .await?; assert!(mismatch_node - .lines("Cluster name mismatch".parse()?) + .lines("trying to join wrong cluster".parse()?) .next() .await .is_some()); From 6460beba7f20babbeef382e3861b9591f9c00580 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 20 Dec 2024 16:38:55 +0100 Subject: [PATCH 2/6] Add restatectl cluster provision command --- Cargo.lock | 1 + crates/core/protobuf/node_ctl_svc.proto | 9 +- crates/core/src/lib.rs | 1 + crates/core/src/network/protobuf.rs | 35 ---- crates/core/src/protobuf.rs | 41 ++++ crates/node/Cargo.toml | 1 + crates/node/src/init.rs | 2 + crates/node/src/lib.rs | 24 ++- .../src/network_server/grpc_svc_handler.rs | 67 ++++--- crates/node/src/network_server/service.rs | 4 +- .../restatectl/src/commands/cluster/config.rs | 12 +- .../src/commands/cluster/config/get.rs | 2 +- .../src/commands/cluster/config/set.rs | 39 ++-- tools/restatectl/src/commands/cluster/mod.rs | 4 + .../src/commands/cluster/provision.rs | 184 ++++++++++++++++++ .../src/commands/node/list_nodes.rs | 4 +- .../src/commands/replicated_loglet/digest.rs | 4 +- .../src/commands/replicated_loglet/info.rs | 4 +- 18 files changed, 325 insertions(+), 113 deletions(-) create mode 100644 crates/core/src/protobuf.rs create mode 100644 tools/restatectl/src/commands/cluster/provision.rs diff --git a/Cargo.lock b/Cargo.lock index c8775bdb9..660dce222 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6582,6 +6582,7 @@ dependencies = [ "metrics-exporter-prometheus", "metrics-tracing-context", "metrics-util", + "prost-dto", "prost-types", "restate-admin", "restate-bifrost", diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index 673d9e4dc..fa45f4047 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -35,15 +35,14 @@ message ProvisionClusterRequest { enum ProvisionClusterResponseKind { ProvisionClusterResponseType_UNKNOWN = 0; - ERROR = 1; - SUCCESS = 2; - DRY_RUN = 3; + DRY_RUN = 1; + NEWLY_PROVISIONED = 2; + ALREADY_PROVISIONED = 3; } message ProvisionClusterResponse { ProvisionClusterResponseKind kind = 1; - // If there is an error, this field will be set. All other fields will be empty. - optional string error = 2; + // This field will be empty if the cluster is already provisioned optional restate.cluster.ClusterConfiguration cluster_configuration = 3; } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 70ee1e239..12a08c114 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -14,6 +14,7 @@ pub mod metadata_store; mod metric_definitions; pub mod network; pub mod partitions; +pub mod protobuf; pub mod task_center; pub mod worker_api; pub use error::*; diff --git a/crates/core/src/network/protobuf.rs b/crates/core/src/network/protobuf.rs index 011c7cbd4..79214939e 100644 --- a/crates/core/src/network/protobuf.rs +++ b/crates/core/src/network/protobuf.rs @@ -8,41 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -pub mod node_ctl_svc { - use restate_types::protobuf::cluster::ClusterConfiguration; - - tonic::include_proto!("restate.node_ctl_svc"); - - pub const FILE_DESCRIPTOR_SET: &[u8] = - tonic::include_file_descriptor_set!("node_ctl_svc_descriptor"); - - impl ProvisionClusterResponse { - pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { - ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::DryRun.into(), - cluster_configuration: Some(cluster_configuration), - ..Default::default() - } - } - - pub fn err(message: impl ToString) -> Self { - ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::Error.into(), - error: Some(message.to_string()), - ..Default::default() - } - } - - pub fn success(cluster_configuration: ClusterConfiguration) -> Self { - ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::Success.into(), - cluster_configuration: Some(cluster_configuration), - ..Default::default() - } - } - } -} - pub mod core_node_svc { tonic::include_proto!("restate.core_node_svc"); diff --git a/crates/core/src/protobuf.rs b/crates/core/src/protobuf.rs new file mode 100644 index 000000000..31a473775 --- /dev/null +++ b/crates/core/src/protobuf.rs @@ -0,0 +1,41 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod node_ctl_svc { + use restate_types::protobuf::cluster::ClusterConfiguration; + + tonic::include_proto!("restate.node_ctl_svc"); + + pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("node_ctl_svc_descriptor"); + + impl ProvisionClusterResponse { + pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::DryRun.into(), + cluster_configuration: Some(cluster_configuration), + } + } + + pub fn newly_provisioned(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::NewlyProvisioned.into(), + cluster_configuration: Some(cluster_configuration), + } + } + + pub fn already_provisioned() -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::AlreadyProvisioned.into(), + ..Default::default() + } + } + } +} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 9881f52e6..e5b19f647 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -54,6 +54,7 @@ metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } metrics-tracing-context = { workspace = true } metrics-util = { workspace = true } +prost-dto = { workspace = true } prost-types = { workspace = true } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } diff --git a/crates/node/src/init.rs b/crates/node/src/init.rs index 789d3c171..61c38ab7a 100644 --- a/crates/node/src/init.rs +++ b/crates/node/src/init.rs @@ -115,6 +115,8 @@ impl<'a> NodeInit<'a> { self.sync_metadata().await; + info!("Node initialization complete"); + Ok(()) } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index f21016af6..e13ca3bb9 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -25,14 +25,12 @@ use codederror::CodedError; use restate_bifrost::BifrostService; use restate_core::metadata_store::ReadWriteError; use restate_core::network::net_util::create_tonic_channel_from_advertised_address; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::network::protobuf::node_ctl_svc::{ - ProvisionClusterRequest, ProvisionClusterResponseKind, -}; use restate_core::network::{ GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking, }; use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher}; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::{ProvisionClusterRequest, ProvisionClusterResponseKind}; use restate_core::{cancellation_watcher, Metadata, TaskKind}; use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter}; #[cfg(feature = "replicated-loglet")] @@ -389,12 +387,20 @@ impl Node { .await; match response { - Ok(response) => { - if response.into_inner().kind() == ProvisionClusterResponseKind::DryRun - { - debug!("The cluster is already provisioned."); + Ok(response) => match response.into_inner().kind() { + ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { + panic!("unknown cluster response type") } - } + ProvisionClusterResponseKind::DryRun => { + unreachable!("call w/o dry run") + } + ProvisionClusterResponseKind::NewlyProvisioned => { + debug!("Successfully auto provisioned the cluster") + } + ProvisionClusterResponseKind::AlreadyProvisioned => { + debug!("The cluster is already provisioned.") + } + }, Err(err) => { warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); } diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index dcc0a8040..1fd84d2db 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -13,17 +13,17 @@ use bytes::BytesMut; use bytestring::ByteString; use enumset::EnumSet; use futures::stream::BoxStream; +use prost_dto::IntoProto; use restate_core::metadata_store::{ retry_on_network_error, MetadataStoreClient, Precondition, WriteError, }; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; -use restate_core::network::protobuf::node_ctl_svc::{ +use restate_core::network::{ConnectionManager, ProtocolError, TransportConnect}; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; +use restate_core::protobuf::node_ctl_svc::{ GetMetadataRequest, GetMetadataResponse, IdentResponse, ProvisionClusterRequest, ProvisionClusterResponse, ProvisionClusterResponseKind, }; -use restate_core::network::ConnectionManager; -use restate_core::network::{ProtocolError, TransportConnect}; use restate_core::task_center::TaskCenterMonitoring; use restate_core::{task_center, Metadata, MetadataKind, TargetVersion}; use restate_types::config::{CommonOptions, Configuration}; @@ -66,7 +66,9 @@ impl NodeCtlSvcHandler { } } - async fn provision_metadata( + /// Provision the cluster metadata. Returns `true` if the cluster was newly provisioned. Returns + /// `false` if the cluster is already provisioned. + async fn provision_cluster_metadata( &self, common_opts: &CommonOptions, cluster_configuration: &ClusterConfiguration, @@ -127,12 +129,12 @@ impl NodeCtlSvcHandler { .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) .expect("Empty partition table should not have conflicts"); initial_partition_table_builder - .set_replication_strategy(cluster_configuration.placement_strategy); + .set_replication_strategy(cluster_configuration.replication_strategy); let initial_partition_table = initial_partition_table_builder.build(); let mut logs_builder = Logs::default().into_builder(); logs_builder.set_configuration(LogsConfiguration::from( - cluster_configuration.log_provider.clone(), + cluster_configuration.default_provider.clone(), )); let initial_logs = logs_builder.build(); @@ -180,28 +182,22 @@ impl NodeCtlSvcHandler { }) .transpose()? .unwrap_or(config.common.bootstrap_num_partitions); - let placement_strategy = request + let replication_strategy = request .placement_strategy .map(ReplicationStrategy::try_from) .transpose()? .unwrap_or_default(); - let log_provider = request + let default_provider = request .log_provider .map(DefaultProvider::try_from) .unwrap_or_else(|| Ok(DefaultProvider::from_configuration(config)))?; Ok(ClusterConfiguration { num_partitions, - placement_strategy, - log_provider, + replication_strategy, + default_provider, }) } - - fn convert_cluster_configuration( - _cluster_configuration: ClusterConfiguration, - ) -> ProtoClusterConfiguration { - todo!() - } } #[async_trait::async_trait] @@ -282,30 +278,41 @@ impl NodeCtlSvc for NodeCtlSvcHandler { if dry_run { return Ok(Response::new(ProvisionClusterResponse { kind: ProvisionClusterResponseKind::DryRun.into(), - cluster_configuration: Some(Self::convert_cluster_configuration( - cluster_configuration, - )), - ..Default::default() + cluster_configuration: Some(ProtoClusterConfiguration::from(cluster_configuration)), })); } - self.provision_metadata(&config.common, &cluster_configuration) + let newly_provisioned = self + .provision_cluster_metadata(&config.common, &cluster_configuration) .await .map_err(|err| Status::internal(err.to_string()))?; + let kind = if newly_provisioned { + ProvisionClusterResponseKind::NewlyProvisioned + } else { + ProvisionClusterResponseKind::AlreadyProvisioned + }; + Ok(Response::new(ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::Success.into(), - cluster_configuration: Some(Self::convert_cluster_configuration(cluster_configuration)), - ..Default::default() + kind: kind.into(), + cluster_configuration: Some(ProtoClusterConfiguration::from(cluster_configuration)), })) } } -#[derive(Clone, Debug)] -struct ClusterConfiguration { - num_partitions: NonZeroU16, - placement_strategy: ReplicationStrategy, - log_provider: DefaultProvider, +#[derive(Clone, Debug, IntoProto)] +#[proto(target = "restate_types::protobuf::cluster::ClusterConfiguration")] +pub struct ClusterConfiguration { + #[into_proto(map = "num_partitions_to_u32")] + pub num_partitions: NonZeroU16, + #[proto(required)] + pub replication_strategy: ReplicationStrategy, + #[proto(required)] + pub default_provider: DefaultProvider, +} + +fn num_partitions_to_u32(num_partitions: NonZeroU16) -> u32 { + u32::from(num_partitions.get()) } pub struct CoreNodeSvcHandler { diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 5e208a572..86cfeb6cd 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -17,9 +17,9 @@ use tracing::{debug, trace}; use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect}; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; use restate_core::{cancellation_watcher, TaskCenter, TaskKind}; use restate_types::config::CommonOptions; use restate_types::health::Health; @@ -109,7 +109,7 @@ impl NetworkServer { )) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), - restate_core::network::protobuf::node_ctl_svc::FILE_DESCRIPTOR_SET, + restate_core::protobuf::node_ctl_svc::FILE_DESCRIPTOR_SET, ); server_builder.register_grpc_service( diff --git a/tools/restatectl/src/commands/cluster/config.rs b/tools/restatectl/src/commands/cluster/config.rs index 620a2f488..576a2e78f 100644 --- a/tools/restatectl/src/commands/cluster/config.rs +++ b/tools/restatectl/src/commands/cluster/config.rs @@ -27,7 +27,7 @@ pub enum Config { Set(set::ConfigSetOpts), } -fn cluster_config_string(config: ClusterConfiguration) -> anyhow::Result { +pub fn cluster_config_string(config: &ClusterConfiguration) -> anyhow::Result { let mut w = String::default(); writeln!(w, "⚙️ Cluster Configuration")?; @@ -43,8 +43,12 @@ fn cluster_config_string(config: ClusterConfiguration) -> anyhow::Result write_leaf(&mut w, 1, false, "Bifrost replication strategy", strategy)?; - let provider: DefaultProvider = config.default_provider.unwrap_or_default().try_into()?; - write_default_provider(&mut w, 1, provider)?; + let provider: DefaultProvider = config + .default_provider + .clone() + .unwrap_or_default() + .try_into()?; + write_default_provider(&mut w, 1, &provider)?; Ok(w) } @@ -52,7 +56,7 @@ fn cluster_config_string(config: ClusterConfiguration) -> anyhow::Result fn write_default_provider( w: &mut W, depth: usize, - provider: DefaultProvider, + provider: &DefaultProvider, ) -> Result<(), fmt::Error> { let title = "Bifrost Provider"; match provider { diff --git a/tools/restatectl/src/commands/cluster/config/get.rs b/tools/restatectl/src/commands/cluster/config/get.rs index 807f3968a..538997521 100644 --- a/tools/restatectl/src/commands/cluster/config/get.rs +++ b/tools/restatectl/src/commands/cluster/config/get.rs @@ -56,7 +56,7 @@ async fn config_get(connection: &ConnectionInfo, _get_opts: &ConfigGetOpts) -> a let configuration = response.into_inner(); let cluster_configuration = configuration.cluster_configuration.expect("is set"); - let output = cluster_config_string(cluster_configuration)?; + let output = cluster_config_string(&cluster_configuration)?; c_println!("{}", output); diff --git a/tools/restatectl/src/commands/cluster/config/set.rs b/tools/restatectl/src/commands/cluster/config/set.rs index 5c23c01f2..5d9ddcab9 100644 --- a/tools/restatectl/src/commands/cluster/config/set.rs +++ b/tools/restatectl/src/commands/cluster/config/set.rs @@ -20,15 +20,14 @@ use restate_admin::cluster_controller::protobuf::{ cluster_ctrl_svc_client::ClusterCtrlSvcClient, GetClusterConfigurationRequest, }; use restate_cli_util::_comfy_table::{Cell, Color, Table}; -use restate_cli_util::c_println; use restate_cli_util::ui::console::{confirm_or_exit, StyledTable}; -use restate_types::logs::metadata::{ - DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, -}; +use restate_cli_util::{c_println, c_warn}; +use restate_types::logs::metadata::{DefaultProvider, NodeSetSelectionStrategy, ProviderKind}; use restate_types::partition_table::ReplicationStrategy; use restate_types::replicated_loglet::ReplicationProperty; use crate::commands::cluster::config::cluster_config_string; +use crate::commands::cluster::provision::extract_default_provider; use crate::{app::ConnectionInfo, util::grpc_connect}; #[derive(Run, Parser, Collect, Clone, Debug)] @@ -77,7 +76,7 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an let mut current = response.cluster_configuration.expect("must be set"); - let current_config_string = cluster_config_string(current.clone())?; + let current_config_string = cluster_config_string(¤t)?; if let Some(num_partitions) = set_opts.num_partitions { current.num_partitions = num_partitions.get(); @@ -88,27 +87,25 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an } if let Some(provider) = set_opts.bifrost_provider { - let default_provider = match provider { - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, - ProviderKind::Replicated => { - let config = ReplicatedLogletConfig { - replication_property: set_opts - .replication_property - .clone() - .expect("is required"), - nodeset_selection_strategy: set_opts - .nodeset_selection_strategy - .unwrap_or_default(), - }; - DefaultProvider::Replicated(config) + let default_provider = extract_default_provider( + provider, + set_opts.replication_property.clone(), + set_opts.nodeset_selection_strategy, + ); + + match default_provider { + DefaultProvider::InMemory | DefaultProvider::Local => { + c_warn!("You are about to reconfigure your cluster with a Bifrost provider that only supports a single node cluster."); } - }; + DefaultProvider::Replicated(_) => { + // nothing to do + } + } current.default_provider = Some(default_provider.into()); } - let updated_config_string = cluster_config_string(current.clone())?; + let updated_config_string = cluster_config_string(¤t)?; let mut diff_table = Table::new_styled(); diff --git a/tools/restatectl/src/commands/cluster/mod.rs b/tools/restatectl/src/commands/cluster/mod.rs index 4fce65e76..431300831 100644 --- a/tools/restatectl/src/commands/cluster/mod.rs +++ b/tools/restatectl/src/commands/cluster/mod.rs @@ -10,11 +10,13 @@ mod config; pub(crate) mod overview; +mod provision; use cling::prelude::*; use config::Config; use crate::commands::cluster::overview::ClusterStatusOpts; +use crate::commands::cluster::provision::ProvisionOpts; #[derive(Run, Subcommand, Clone)] pub enum Cluster { @@ -23,4 +25,6 @@ pub enum Cluster { /// Manage cluster configuration #[clap(subcommand)] Config(Config), + /// Provision a new cluster + Provision(ProvisionOpts), } diff --git a/tools/restatectl/src/commands/cluster/provision.rs b/tools/restatectl/src/commands/cluster/provision.rs new file mode 100644 index 000000000..60e6d3b16 --- /dev/null +++ b/tools/restatectl/src/commands/cluster/provision.rs @@ -0,0 +1,184 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::app::ConnectionInfo; +use crate::commands::cluster::config::cluster_config_string; +use crate::util::grpc_connect; +use anyhow::Context; +use clap::Parser; +use cling::{Collect, Run}; +use restate_cli_util::ui::console::confirm_or_exit; +use restate_cli_util::{c_error, c_println, c_warn}; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::{ProvisionClusterRequest, ProvisionClusterResponseKind}; +use restate_types::logs::metadata::{ + DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, +}; +use restate_types::net::AdvertisedAddress; +use restate_types::partition_table::ReplicationStrategy; +use restate_types::replicated_loglet::ReplicationProperty; +use std::num::NonZeroU16; +use tonic::codec::CompressionEncoding; + +#[derive(Run, Parser, Collect, Clone, Debug)] +#[cling(run = "cluster_provision")] +pub struct ProvisionOpts { + /// Address of the node that should be provisioned + #[clap(long)] + address: Option, + + /// Number of partitions + #[clap(long)] + num_partitions: Option, + + /// Replication strategy. Possible values + /// are `on-all-nodes` or `factor(n)` + #[clap(long)] + replication_strategy: Option, + + /// Default log provider kind + #[clap(long)] + bifrost_provider: Option, + + /// Replication property + #[clap(long, required_if_eq("bifrost_provider", "replicated"))] + replication_property: Option, + + /// Node set selection strategy + #[clap(long)] + nodeset_selection_strategy: Option, +} + +async fn cluster_provision( + connection_info: &ConnectionInfo, + provision_opts: &ProvisionOpts, +) -> anyhow::Result<()> { + let node_address = provision_opts + .address + .clone() + .unwrap_or_else(|| connection_info.cluster_controller.clone()); + let channel = grpc_connect(node_address.clone()) + .await + .with_context(|| format!("cannot connect to node at {}", node_address))?; + + let mut client = NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + + let log_provider = provision_opts.bifrost_provider.map(|bifrost_provider| { + extract_default_provider( + bifrost_provider, + provision_opts.replication_property.clone(), + provision_opts.nodeset_selection_strategy, + ) + }); + + let request = ProvisionClusterRequest { + dry_run: true, + num_partitions: provision_opts.num_partitions.map(|n| u32::from(n.get())), + placement_strategy: provision_opts.replication_strategy.map(Into::into), + log_provider: log_provider.map(Into::into), + }; + + let response = match client.provision_cluster(request).await { + Ok(response) => response.into_inner(), + Err(err) => { + c_error!( + "Failed to provision cluster during dry run: {}", + err.message() + ); + return Ok(()); + } + }; + + let cluster_configuration_to_provision = match response.kind() { + ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { + panic!("unknown cluster response type") + } + ProvisionClusterResponseKind::DryRun => response + .cluster_configuration + .expect("dry run response needs to carry a cluster configuration"), + ProvisionClusterResponseKind::NewlyProvisioned => { + unreachable!("provisioning a cluster with dry run should not have an effect") + } + ProvisionClusterResponseKind::AlreadyProvisioned => { + c_println!("🏃The cluster has already been provisioned."); + return Ok(()); + } + }; + + c_println!( + "{}", + cluster_config_string(&cluster_configuration_to_provision)? + ); + + if let Some(default_provider) = &cluster_configuration_to_provision.default_provider { + let default_provider = DefaultProvider::try_from(default_provider.clone())?; + + match default_provider { + DefaultProvider::InMemory | DefaultProvider::Local => { + c_warn!("You are about to provision a cluster with a Bifrost provider that only supports a single node cluster."); + } + DefaultProvider::Replicated(_) => { + // nothing to do + } + } + } + + confirm_or_exit("Provision cluster with this configuration?")?; + + let request = ProvisionClusterRequest { + dry_run: false, + num_partitions: Some(cluster_configuration_to_provision.num_partitions), + placement_strategy: cluster_configuration_to_provision.replication_strategy, + log_provider: cluster_configuration_to_provision.default_provider, + }; + + let response = match client.provision_cluster(request).await { + Ok(response) => response.into_inner(), + Err(err) => { + c_error!("Failed to provision cluster: {}", err.message()); + return Ok(()); + } + }; + + match response.kind() { + ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { + panic!("unknown provision cluster response kind") + } + ProvisionClusterResponseKind::DryRun => { + unreachable!("provisioning a cluster w/o dry run should have an effect") + } + ProvisionClusterResponseKind::NewlyProvisioned => { + c_println!("✅ Cluster has been successfully provisioned."); + } + ProvisionClusterResponseKind::AlreadyProvisioned => { + c_println!("🤷 Cluster has been provisioned by somebody else."); + } + } + + Ok(()) +} + +pub fn extract_default_provider( + bifrost_provider: ProviderKind, + replication_property: Option, + nodeset_selection_strategy: Option, +) -> DefaultProvider { + match bifrost_provider { + ProviderKind::InMemory => DefaultProvider::InMemory, + ProviderKind::Local => DefaultProvider::Local, + ProviderKind::Replicated => { + let config = ReplicatedLogletConfig { + replication_property: replication_property.clone().expect("is required"), + nodeset_selection_strategy: nodeset_selection_strategy.unwrap_or_default(), + }; + DefaultProvider::Replicated(config) + } + } +} diff --git a/tools/restatectl/src/commands/node/list_nodes.rs b/tools/restatectl/src/commands/node/list_nodes.rs index fbcc1b5a4..fdfa159dc 100644 --- a/tools/restatectl/src/commands/node/list_nodes.rs +++ b/tools/restatectl/src/commands/node/list_nodes.rs @@ -24,8 +24,8 @@ use restate_cli_util::_comfy_table::{Cell, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; use restate_cli_util::ui::{duration_to_human_rough, Tense}; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::network::protobuf::node_ctl_svc::IdentResponse; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::IdentResponse; use restate_types::nodes_config::NodesConfiguration; use restate_types::storage::StorageCodec; use restate_types::PlainNodeId; diff --git a/tools/restatectl/src/commands/replicated_loglet/digest.rs b/tools/restatectl/src/commands/replicated_loglet/digest.rs index 805f2151e..fb31e7c77 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest.rs @@ -19,8 +19,8 @@ use restate_bifrost::providers::replicated_loglet::replication::NodeSetChecker; use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::network::protobuf::node_ctl_svc::GetMetadataRequest; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::GetMetadataRequest; use restate_core::MetadataKind; use restate_log_server::protobuf::log_server_svc_client::LogServerSvcClient; use restate_log_server::protobuf::GetDigestRequest; diff --git a/tools/restatectl/src/commands/replicated_loglet/info.rs b/tools/restatectl/src/commands/replicated_loglet/info.rs index f5b9d56e7..604a8084b 100644 --- a/tools/restatectl/src/commands/replicated_loglet/info.rs +++ b/tools/restatectl/src/commands/replicated_loglet/info.rs @@ -13,8 +13,8 @@ use cling::prelude::*; use tonic::codec::CompressionEncoding; use restate_cli_util::{c_indentln, c_println}; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::network::protobuf::node_ctl_svc::GetMetadataRequest; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::GetMetadataRequest; use restate_core::MetadataKind; use restate_types::logs::metadata::Logs; use restate_types::logs::LogletId; From ae976863c5d0e567a07206eaebe53daa8731ef83 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 20 Dec 2024 18:02:45 +0100 Subject: [PATCH 3/6] Remove support to change number of partitions via restatectl cluster config set --- .../cluster_controller/grpc_svc_handler.rs | 6 --- .../admin/src/cluster_controller/service.rs | 48 ++++--------------- .../src/commands/cluster/config/set.rs | 24 ++-------- .../src/commands/cluster/provision.rs | 8 +--- 4 files changed, 12 insertions(+), 74 deletions(-) diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index f9c820239..2cea5b963 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::num::NonZeroU16; use std::time::Duration; use bytes::{Bytes, BytesMut}; @@ -324,11 +323,6 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { self.controller_handle .update_cluster_configuration( - NonZeroU16::new( - u16::try_from(request.num_partitions) - .map_err(|_| Status::invalid_argument("num_partitions is too big"))?, - ) - .ok_or(Status::invalid_argument("num_partitions cannot be zero"))?, request .replication_strategy .ok_or_else(|| { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 9c68f9c91..2a213459c 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -10,7 +10,6 @@ mod state; -use std::num::NonZeroU16; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -181,7 +180,6 @@ enum ClusterControllerCommand { response_tx: oneshot::Sender>, }, UpdateClusterConfiguration { - num_partitions: NonZeroU16, replication_strategy: ReplicationStrategy, default_provider: DefaultProvider, response_tx: oneshot::Sender>, @@ -247,7 +245,6 @@ impl ClusterControllerHandle { pub async fn update_cluster_configuration( &self, - num_partitions: NonZeroU16, replication_strategy: ReplicationStrategy, default_provider: DefaultProvider, ) -> Result, ShutdownError> { @@ -256,7 +253,6 @@ impl ClusterControllerHandle { let _ = self .tx .send(ClusterControllerCommand::UpdateClusterConfiguration { - num_partitions, replication_strategy, default_provider, response_tx, @@ -407,23 +403,13 @@ impl Service { async fn update_cluster_configuration( &self, - num_partitions: u16, replication_strategy: ReplicationStrategy, default_provider: DefaultProvider, ) -> anyhow::Result<()> { let logs = self .metadata_store_client .read_modify_write(BIFROST_CONFIG_KEY.clone(), |current: Option| { - let logs = match current { - Some(logs) => logs, - None => { - let mut builder = Logs::empty().into_builder(); - builder.set_configuration(LogsConfiguration { - default_provider: default_provider.clone(), - }); - return Ok(builder.build()); - } - }; + let logs = current.ok_or(ClusterConfigurationUpdateError::MissingLogs)?; // we can only change the default provider if logs.version() != Version::INVALID @@ -468,25 +454,10 @@ impl Service { .read_modify_write( PARTITION_TABLE_KEY.clone(), |current: Option| { - let partition_table = match current { - Some(partition_table) => partition_table, - None => { - // while not possible because we always initialize a partition table - // we still can just create and return a new one - let mut builder = PartitionTableBuilder::default(); - builder.with_equally_sized_partitions(num_partitions)?; - builder.set_replication_strategy(replication_strategy); - - return Ok(builder.build()); - } - }; + let partition_table = + current.ok_or(ClusterConfigurationUpdateError::MissingPartitionTable)?; let mut builder: PartitionTableBuilder = partition_table.into(); - if builder.num_partitions() != 0 && builder.num_partitions() != num_partitions { - return Err(ClusterConfigurationUpdateError::RepartitionNotSupported); - } else if builder.num_partitions() != num_partitions { - builder.with_equally_sized_partitions(num_partitions)?; - } if builder.replication_strategy() != replication_strategy { builder.set_replication_strategy(replication_strategy); @@ -570,17 +541,12 @@ impl Service { .await; } ClusterControllerCommand::UpdateClusterConfiguration { - num_partitions, replication_strategy, default_provider, response_tx, } => { let result = self - .update_cluster_configuration( - num_partitions.get(), - replication_strategy, - default_provider, - ) + .update_cluster_configuration(replication_strategy, default_provider) .await; let _ = response_tx.send(result); } @@ -628,12 +594,14 @@ async fn sync_cluster_controller_metadata() -> anyhow::Result<()> { enum ClusterConfigurationUpdateError { #[error("Unchanged")] Unchanged, - #[error("Repartitioning is not supported")] - RepartitionNotSupported, #[error("Changing default provider kind is not supported")] ChangingDefaultProviderNotSupported, #[error(transparent)] BuildError(#[from] partition_table::BuilderError), + #[error("missing logs; cluster seems to be not provisioned")] + MissingLogs, + #[error("missing partition table; cluster seems to be not provisioned")] + MissingPartitionTable, } #[derive(Clone)] diff --git a/tools/restatectl/src/commands/cluster/config/set.rs b/tools/restatectl/src/commands/cluster/config/set.rs index 5d9ddcab9..bd9b1fad6 100644 --- a/tools/restatectl/src/commands/cluster/config/set.rs +++ b/tools/restatectl/src/commands/cluster/config/set.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::num::NonZeroU32; - use anyhow::Context; use clap::Parser; use cling::{Collect, Run}; @@ -22,7 +20,7 @@ use restate_admin::cluster_controller::protobuf::{ use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::ui::console::{confirm_or_exit, StyledTable}; use restate_cli_util::{c_println, c_warn}; -use restate_types::logs::metadata::{DefaultProvider, NodeSetSelectionStrategy, ProviderKind}; +use restate_types::logs::metadata::{DefaultProvider, ProviderKind}; use restate_types::partition_table::ReplicationStrategy; use restate_types::replicated_loglet::ReplicationProperty; @@ -33,11 +31,6 @@ use crate::{app::ConnectionInfo, util::grpc_connect}; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "config_set")] pub struct ConfigSetOpts { - /// Number of partitions - // todo(azmy): This is temporary until we have the cluster provision command - #[clap(long)] - num_partitions: Option, - /// Replication strategy. Possible values /// are `on-all-nodes` or `factor(n)` #[clap(long)] @@ -50,10 +43,6 @@ pub struct ConfigSetOpts { /// Replication property #[clap(long, required_if_eq("bifrost_provider", "replicated"))] replication_property: Option, - - /// Node set selection strategy - #[clap(long)] - nodeset_selection_strategy: Option, } async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> anyhow::Result<()> { @@ -78,20 +67,13 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an let current_config_string = cluster_config_string(¤t)?; - if let Some(num_partitions) = set_opts.num_partitions { - current.num_partitions = num_partitions.get(); - } - if let Some(replication_strategy) = set_opts.replication_strategy { current.replication_strategy = Some(replication_strategy.into()); } if let Some(provider) = set_opts.bifrost_provider { - let default_provider = extract_default_provider( - provider, - set_opts.replication_property.clone(), - set_opts.nodeset_selection_strategy, - ); + let default_provider = + extract_default_provider(provider, set_opts.replication_property.clone()); match default_provider { DefaultProvider::InMemory | DefaultProvider::Local => { diff --git a/tools/restatectl/src/commands/cluster/provision.rs b/tools/restatectl/src/commands/cluster/provision.rs index 60e6d3b16..1df2cac1f 100644 --- a/tools/restatectl/src/commands/cluster/provision.rs +++ b/tools/restatectl/src/commands/cluster/provision.rs @@ -50,10 +50,6 @@ pub struct ProvisionOpts { /// Replication property #[clap(long, required_if_eq("bifrost_provider", "replicated"))] replication_property: Option, - - /// Node set selection strategy - #[clap(long)] - nodeset_selection_strategy: Option, } async fn cluster_provision( @@ -74,7 +70,6 @@ async fn cluster_provision( extract_default_provider( bifrost_provider, provision_opts.replication_property.clone(), - provision_opts.nodeset_selection_strategy, ) }); @@ -168,7 +163,6 @@ async fn cluster_provision( pub fn extract_default_provider( bifrost_provider: ProviderKind, replication_property: Option, - nodeset_selection_strategy: Option, ) -> DefaultProvider { match bifrost_provider { ProviderKind::InMemory => DefaultProvider::InMemory, @@ -176,7 +170,7 @@ pub fn extract_default_provider( ProviderKind::Replicated => { let config = ReplicatedLogletConfig { replication_property: replication_property.clone().expect("is required"), - nodeset_selection_strategy: nodeset_selection_strategy.unwrap_or_default(), + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), }; DefaultProvider::Replicated(config) } From faad643432071319317db870cee8b22e0c41b235 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 20 Dec 2024 21:28:15 +0100 Subject: [PATCH 4/6] Re-enable replicated loglet test to run with replication factor of 2 --- Cargo.lock | 2 + crates/local-cluster-runner/Cargo.toml | 2 + .../local-cluster-runner/src/cluster/mod.rs | 6 -- crates/local-cluster-runner/src/node/mod.rs | 92 +++++++++++++++---- server/tests/cluster.rs | 25 ++++- 5 files changed, 99 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 660dce222..df241d1aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6450,6 +6450,7 @@ dependencies = [ name = "restate-local-cluster-runner" version = "1.1.6" dependencies = [ + "anyhow", "arc-swap", "clap", "clap-verbosity-flag", @@ -6461,6 +6462,7 @@ dependencies = [ "rand", "regex", "reqwest", + "restate-core", "restate-metadata-store", "restate-types", "rev_lines", diff --git a/crates/local-cluster-runner/Cargo.toml b/crates/local-cluster-runner/Cargo.toml index 3f3e26fd9..708524c78 100644 --- a/crates/local-cluster-runner/Cargo.toml +++ b/crates/local-cluster-runner/Cargo.toml @@ -11,10 +11,12 @@ publish = false default = [] [dependencies] +restate-core = { workspace = true } restate-metadata-store = { workspace = true } # nb features here will also affect the compiled restate-server binary in integration tests restate-types = { workspace = true, features = ["unsafe-mutable-config"] } +anyhow = { workspace = true } arc-swap = { workspace = true } clap = { workspace = true } clap-verbosity-flag = { workspace = true } diff --git a/crates/local-cluster-runner/src/cluster/mod.rs b/crates/local-cluster-runner/src/cluster/mod.rs index ba39cb02f..4f0ee4f12 100644 --- a/crates/local-cluster-runner/src/cluster/mod.rs +++ b/crates/local-cluster-runner/src/cluster/mod.rs @@ -102,12 +102,6 @@ impl Cluster { .start_clustered(base_dir.as_path(), &cluster_name) .await .map_err(|err| ClusterStartError::NodeStartError(i, err))?; - if node.admin_address().is_some() { - // admin nodes are needed for later nodes to bootstrap. we should wait until they are serving - HealthCheck::Admin - .wait_healthy(&node, Duration::from_secs(30)) - .await?; - } started_nodes.push(node) } diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index a147666af..605e411e9 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -8,6 +8,31 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::random_socket_address; +use arc_swap::ArcSwapOption; +use enumset::{enum_set, EnumSet}; +use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; +use itertools::Itertools; +use regex::{Regex, RegexSet}; +use restate_core::network::net_util::create_tonic_channel_from_advertised_address; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::{ + ProvisionClusterRequest as ProtoProvisionClusterRequest, ProvisionClusterResponseKind, +}; +use restate_types::logs::metadata::DefaultProvider; +use restate_types::partition_table::ReplicationStrategy; +use restate_types::retries::RetryPolicy; +use restate_types::{ + config::{Configuration, MetadataStoreClient}, + errors::GenericError, + metadata_store::keys::NODES_CONFIG_KEY, + net::{AdvertisedAddress, BindAddress}, + nodes_config::{NodesConfiguration, Role}, + PlainNodeId, +}; +use rev_lines::RevLines; +use serde::{Deserialize, Serialize}; +use std::num::NonZeroU16; use std::{ ffi::OsString, fmt::Display, @@ -22,14 +47,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; - -use arc_swap::ArcSwapOption; -use enumset::{enum_set, EnumSet}; -use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; -use itertools::Itertools; -use regex::{Regex, RegexSet}; -use rev_lines::RevLines; -use serde::{Deserialize, Serialize}; use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, @@ -40,17 +57,6 @@ use tokio::{process::Command, sync::mpsc::Sender}; use tracing::{error, info, warn}; use typed_builder::TypedBuilder; -use restate_types::{ - config::{Configuration, MetadataStoreClient}, - errors::GenericError, - metadata_store::keys::NODES_CONFIG_KEY, - net::{AdvertisedAddress, BindAddress}, - nodes_config::{NodesConfiguration, Role}, - PlainNodeId, -}; - -use crate::random_socket_address; - #[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)] pub struct Node { #[builder(mutators( @@ -747,6 +753,54 @@ impl StartedNode { !nodes_config.get_log_server_storage_state(&node_id).empty() } + + /// Provisions the cluster on this node with the given configuration. Returns true if the + /// cluster was newly provisioned. + pub async fn provision_cluster( + &self, + num_partitions: Option, + placement_strategy: Option, + log_provider: Option, + ) -> anyhow::Result { + let channel = create_tonic_channel_from_advertised_address( + self.node_address().clone(), + &Configuration::default().networking, + ); + + let request = ProtoProvisionClusterRequest { + dry_run: false, + num_partitions: num_partitions.map(|num| u32::from(num.get())), + placement_strategy: placement_strategy + .map(|replication_strategy| replication_strategy.into()), + log_provider: log_provider.map(|log_provider| log_provider.into()), + }; + + let retry_policy = RetryPolicy::exponential( + Duration::from_millis(10), + 2.0, + Some(10), + Some(Duration::from_secs(1)), + ); + let client = NodeCtlSvcClient::new(channel); + + let response = retry_policy + .retry(|| { + let mut client = client.clone(); + let request = request.clone(); + async move { client.provision_cluster(request).await } + }) + .await? + .into_inner(); + + Ok(match response.kind() { + ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { + panic!("unknown cluster response type") + } + ProvisionClusterResponseKind::DryRun => unreachable!("request non dry run"), + ProvisionClusterResponseKind::NewlyProvisioned => true, + ProvisionClusterResponseKind::AlreadyProvisioned => false, + }) + } } #[derive(Debug, Clone, Copy)] diff --git a/server/tests/cluster.rs b/server/tests/cluster.rs index 503b529cc..5ecad7686 100644 --- a/server/tests/cluster.rs +++ b/server/tests/cluster.rs @@ -8,18 +8,22 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::num::NonZeroU16; +use std::num::{NonZeroU16, NonZeroU8}; use std::time::Duration; use enumset::enum_set; use futures_util::StreamExt; +use googletest::IntoTestResult; use regex::Regex; use restate_local_cluster_runner::{ cluster::Cluster, node::{BinarySource, Node}, }; use restate_types::config::MetadataStoreClient; -use restate_types::logs::metadata::ProviderKind; +use restate_types::logs::metadata::{ + DefaultProvider, NodeSetSelectionStrategy, ReplicatedLogletConfig, +}; +use restate_types::replicated_loglet::ReplicationProperty; use restate_types::{config::Configuration, nodes_config::Role, PlainNodeId}; use test_log::test; @@ -126,7 +130,8 @@ async fn cluster_name_mismatch() -> googletest::Result<()> { #[test(restate_core::test)] async fn replicated_loglet() -> googletest::Result<()> { let mut base_config = Configuration::default(); - base_config.bifrost.default_provider = ProviderKind::Replicated; + // require an explicit provision step to configure the replication property to 2 + base_config.common.allow_bootstrap = false; base_config.common.bootstrap_num_partitions = NonZeroU16::new(1).expect("1 to be non-zero"); let nodes = Node::new_test_nodes_with_metadata( @@ -148,6 +153,20 @@ async fn replicated_loglet() -> googletest::Result<()> { .start() .await?; + let replicated_loglet_config = ReplicatedLogletConfig { + replication_property: ReplicationProperty::new(NonZeroU8::new(2).expect("to be non-zero")), + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + }; + + cluster.nodes[0] + .provision_cluster( + None, + None, + Some(DefaultProvider::Replicated(replicated_loglet_config)), + ) + .await + .into_test_result()?; + cluster.wait_healthy(Duration::from_secs(30)).await?; for partition_processor in &mut partition_processors_starting_up { From cea2a10b9cc6bc621419073be88bcda99eee725a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 24 Dec 2024 14:19:08 +0100 Subject: [PATCH 5/6] Replace nested kind with Status variant and boolean --- Cargo.lock | 1 + crates/core/protobuf/node_ctl_svc.proto | 15 ++---- crates/core/src/protobuf.rs | 13 ++--- crates/local-cluster-runner/Cargo.toml | 1 + crates/local-cluster-runner/src/node/mod.rs | 29 ++++++----- crates/node/src/lib.rs | 24 ++++----- .../src/network_server/grpc_svc_handler.rs | 27 +++++----- .../src/commands/cluster/provision.rs | 51 +++++++------------ 8 files changed, 65 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df241d1aa..457963807 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6473,6 +6473,7 @@ dependencies = [ "thiserror 2.0.6", "tokio", "toml", + "tonic", "tracing", "tracing-subscriber", "typed-builder", diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index fa45f4047..bcaa8dbcb 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -28,22 +28,17 @@ service NodeCtlSvc { message ProvisionClusterRequest { bool dry_run = 1; + // if unset then the configured cluster num partitions will be used optional uint32 num_partitions = 2; + // if unset then the configured cluster placement strategy will be used optional restate.cluster.ReplicationStrategy placement_strategy = 3; + // if unset then the configured cluster default log provider will be used optional restate.cluster.DefaultProvider log_provider = 4; } -enum ProvisionClusterResponseKind { - ProvisionClusterResponseType_UNKNOWN = 0; - DRY_RUN = 1; - NEWLY_PROVISIONED = 2; - ALREADY_PROVISIONED = 3; -} - message ProvisionClusterResponse { - ProvisionClusterResponseKind kind = 1; - // This field will be empty if the cluster is already provisioned - optional restate.cluster.ClusterConfiguration cluster_configuration = 3; + bool dry_run = 1; + restate.cluster.ClusterConfiguration cluster_configuration = 2; } message IdentResponse { diff --git a/crates/core/src/protobuf.rs b/crates/core/src/protobuf.rs index 31a473775..cd47f7bff 100644 --- a/crates/core/src/protobuf.rs +++ b/crates/core/src/protobuf.rs @@ -19,23 +19,16 @@ pub mod node_ctl_svc { impl ProvisionClusterResponse { pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::DryRun.into(), + dry_run: true, cluster_configuration: Some(cluster_configuration), } } - pub fn newly_provisioned(cluster_configuration: ClusterConfiguration) -> Self { + pub fn provisioned(cluster_configuration: ClusterConfiguration) -> Self { ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::NewlyProvisioned.into(), + dry_run: false, cluster_configuration: Some(cluster_configuration), } } - - pub fn already_provisioned() -> Self { - ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::AlreadyProvisioned.into(), - ..Default::default() - } - } } } diff --git a/crates/local-cluster-runner/Cargo.toml b/crates/local-cluster-runner/Cargo.toml index 708524c78..c1db4e4c5 100644 --- a/crates/local-cluster-runner/Cargo.toml +++ b/crates/local-cluster-runner/Cargo.toml @@ -34,6 +34,7 @@ serde = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["process", "fs"] } +tonic = { workspace = true } toml = "0.8" tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index 605e411e9..fc996c252 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -16,9 +16,7 @@ use itertools::Itertools; use regex::{Regex, RegexSet}; use restate_core::network::net_util::create_tonic_channel_from_advertised_address; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::{ - ProvisionClusterRequest as ProtoProvisionClusterRequest, ProvisionClusterResponseKind, -}; +use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest as ProtoProvisionClusterRequest; use restate_types::logs::metadata::DefaultProvider; use restate_types::partition_table::ReplicationStrategy; use restate_types::retries::RetryPolicy; @@ -54,6 +52,7 @@ use tokio::{ task::JoinHandle, }; use tokio::{process::Command, sync::mpsc::Sender}; +use tonic::Code; use tracing::{error, info, warn}; use typed_builder::TypedBuilder; @@ -789,17 +788,23 @@ impl StartedNode { let request = request.clone(); async move { client.provision_cluster(request).await } }) - .await? - .into_inner(); + .await; - Ok(match response.kind() { - ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { - panic!("unknown cluster response type") + match response { + Ok(response) => { + let response = response.into_inner(); + + assert!(!response.dry_run, "provision command was run w/o dry run"); + Ok(true) } - ProvisionClusterResponseKind::DryRun => unreachable!("request non dry run"), - ProvisionClusterResponseKind::NewlyProvisioned => true, - ProvisionClusterResponseKind::AlreadyProvisioned => false, - }) + Err(status) => { + if status.code() == Code::AlreadyExists { + Ok(false) + } else { + Err(status.into()) + } + } + } } } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index e13ca3bb9..3a014e7c8 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -30,7 +30,7 @@ use restate_core::network::{ }; use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::{ProvisionClusterRequest, ProvisionClusterResponseKind}; +use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest; use restate_core::{cancellation_watcher, Metadata, TaskKind}; use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter}; #[cfg(feature = "replicated-loglet")] @@ -387,22 +387,16 @@ impl Node { .await; match response { - Ok(response) => match response.into_inner().kind() { - ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { - panic!("unknown cluster response type") - } - ProvisionClusterResponseKind::DryRun => { - unreachable!("call w/o dry run") - } - ProvisionClusterResponseKind::NewlyProvisioned => { - debug!("Successfully auto provisioned the cluster") - } - ProvisionClusterResponseKind::AlreadyProvisioned => { + Ok(response) => { + let response = response.into_inner(); + debug_assert!(!response.dry_run, "Provision w/o dry run"); + } + Err(err) => { + if err.code() == Code::AlreadyExists { debug!("The cluster is already provisioned.") + } else { + warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); } - }, - Err(err) => { - warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); } } diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 1fd84d2db..28d5cf3d8 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -22,7 +22,7 @@ use restate_core::network::{ConnectionManager, ProtocolError, TransportConnect}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; use restate_core::protobuf::node_ctl_svc::{ GetMetadataRequest, GetMetadataResponse, IdentResponse, ProvisionClusterRequest, - ProvisionClusterResponse, ProvisionClusterResponseKind, + ProvisionClusterResponse, }; use restate_core::task_center::TaskCenterMonitoring; use restate_core::{task_center, Metadata, MetadataKind, TargetVersion}; @@ -39,7 +39,6 @@ use restate_types::{GenerationalNodeId, Version, Versioned}; use std::num::NonZeroU16; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; -use tracing::warn; pub struct NodeCtlSvcHandler { task_center: task_center::Handle, @@ -276,10 +275,9 @@ impl NodeCtlSvc for NodeCtlSvcHandler { .map_err(|err| Status::invalid_argument(err.to_string()))?; if dry_run { - return Ok(Response::new(ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::DryRun.into(), - cluster_configuration: Some(ProtoClusterConfiguration::from(cluster_configuration)), - })); + return Ok(Response::new(ProvisionClusterResponse::dry_run( + ProtoClusterConfiguration::from(cluster_configuration), + ))); } let newly_provisioned = self @@ -287,16 +285,15 @@ impl NodeCtlSvc for NodeCtlSvcHandler { .await .map_err(|err| Status::internal(err.to_string()))?; - let kind = if newly_provisioned { - ProvisionClusterResponseKind::NewlyProvisioned - } else { - ProvisionClusterResponseKind::AlreadyProvisioned - }; + if !newly_provisioned { + return Err(Status::already_exists( + "The cluster has already been provisioned", + )); + } - Ok(Response::new(ProvisionClusterResponse { - kind: kind.into(), - cluster_configuration: Some(ProtoClusterConfiguration::from(cluster_configuration)), - })) + Ok(Response::new(ProvisionClusterResponse::provisioned( + ProtoClusterConfiguration::from(cluster_configuration), + ))) } } diff --git a/tools/restatectl/src/commands/cluster/provision.rs b/tools/restatectl/src/commands/cluster/provision.rs index 1df2cac1f..9b2ef2f16 100644 --- a/tools/restatectl/src/commands/cluster/provision.rs +++ b/tools/restatectl/src/commands/cluster/provision.rs @@ -17,7 +17,7 @@ use cling::{Collect, Run}; use restate_cli_util::ui::console::confirm_or_exit; use restate_cli_util::{c_error, c_println, c_warn}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::{ProvisionClusterRequest, ProvisionClusterResponseKind}; +use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest; use restate_types::logs::metadata::{ DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, }; @@ -26,6 +26,7 @@ use restate_types::partition_table::ReplicationStrategy; use restate_types::replicated_loglet::ReplicationProperty; use std::num::NonZeroU16; use tonic::codec::CompressionEncoding; +use tonic::Code; #[derive(Run, Parser, Collect, Clone, Debug)] #[cling(run = "cluster_provision")] @@ -91,21 +92,10 @@ async fn cluster_provision( } }; - let cluster_configuration_to_provision = match response.kind() { - ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { - panic!("unknown cluster response type") - } - ProvisionClusterResponseKind::DryRun => response - .cluster_configuration - .expect("dry run response needs to carry a cluster configuration"), - ProvisionClusterResponseKind::NewlyProvisioned => { - unreachable!("provisioning a cluster with dry run should not have an effect") - } - ProvisionClusterResponseKind::AlreadyProvisioned => { - c_println!("🏃The cluster has already been provisioned."); - return Ok(()); - } - }; + debug_assert!(response.dry_run, "Provision with dry run"); + let cluster_configuration_to_provision = response + .cluster_configuration + .expect("Provision response should carry a cluster configuration"); c_println!( "{}", @@ -134,28 +124,21 @@ async fn cluster_provision( log_provider: cluster_configuration_to_provision.default_provider, }; - let response = match client.provision_cluster(request).await { - Ok(response) => response.into_inner(), - Err(err) => { - c_error!("Failed to provision cluster: {}", err.message()); - return Ok(()); - } - }; + match client.provision_cluster(request).await { + Ok(response) => { + let response = response.into_inner(); + debug_assert!(!response.dry_run, "Provision w/o dry run"); - match response.kind() { - ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { - panic!("unknown provision cluster response kind") - } - ProvisionClusterResponseKind::DryRun => { - unreachable!("provisioning a cluster w/o dry run should have an effect") - } - ProvisionClusterResponseKind::NewlyProvisioned => { c_println!("✅ Cluster has been successfully provisioned."); } - ProvisionClusterResponseKind::AlreadyProvisioned => { - c_println!("🤷 Cluster has been provisioned by somebody else."); + Err(err) => { + if err.code() == Code::AlreadyExists { + c_println!("🤷 Cluster has been provisioned by somebody else."); + } else { + c_error!("Failed to provision cluster: {}", err.message()); + } } - } + }; Ok(()) } From 9b4eb1836d369bc1eae6b7781797f0efb817a41c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 24 Dec 2024 14:39:40 +0100 Subject: [PATCH 6/6] Let auto-provision call method instead of going through a grpc call --- crates/node/src/lib.rs | 198 ++++++++++++++---- .../src/network_server/grpc_svc_handler.rs | 145 ++----------- 2 files changed, 172 insertions(+), 171 deletions(-) diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 3a014e7c8..0cc38a25c 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -14,7 +14,9 @@ mod network_server; mod roles; use anyhow::Context; -use tonic::{Code, IntoRequest}; +use bytestring::ByteString; +use prost_dto::IntoProto; +use std::num::NonZeroU16; use tracing::{debug, error, info, trace, warn}; use crate::cluster_marker::ClusterValidationError; @@ -23,31 +25,35 @@ use crate::network_server::NetworkServer; use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole}; use codederror::CodedError; use restate_bifrost::BifrostService; -use restate_core::metadata_store::ReadWriteError; -use restate_core::network::net_util::create_tonic_channel_from_advertised_address; +use restate_core::metadata_store::{ + retry_on_network_error, Precondition, ReadWriteError, WriteError, +}; use restate_core::network::{ GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking, }; use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher}; -use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest; use restate_core::{cancellation_watcher, Metadata, TaskKind}; use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter}; #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; -use restate_types::config::Configuration; +use restate_types::config::{CommonOptions, Configuration}; use restate_types::errors::GenericError; use restate_types::health::Health; use restate_types::live::Live; +use restate_types::logs::metadata::{DefaultProvider, Logs, LogsConfiguration}; #[cfg(feature = "replicated-loglet")] use restate_types::logs::RecordCache; -use restate_types::nodes_config::Role; +use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; +use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; +use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy}; use restate_types::protobuf::common::{ AdminStatus, IngressStatus, LogServerStatus, MetadataServerStatus, NodeRpcStatus, NodeStatus, WorkerStatus, }; +use restate_types::storage::StorageEncode; +use restate_types::{GenerationalNodeId, Version, Versioned}; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -359,45 +365,28 @@ impl Node { if config.common.allow_bootstrap { TaskCenter::spawn(TaskKind::SystemBoot, "auto-provision-cluster", { - let channel = create_tonic_channel_from_advertised_address( - config.common.advertised_address.clone(), - &config.networking, - ); - let client = NodeCtlSvcClient::new(channel); - let retry_policy = config.common.network_error_retry_policy.clone(); + let cluster_configuration = ClusterConfiguration::from_configuration(&config); + let metadata_store_client = self.metadata_store_client.clone(); + let common_opts = config.common.clone(); async move { - let response = retry_policy - .retry_if( - || { - let mut client = client.clone(); - async move { - client - .provision_cluster( - ProvisionClusterRequest { - dry_run: false, - ..Default::default() - } - .into_request(), - ) - .await - } - }, - |status| status.code() == Code::Unavailable, - ) - .await; + let response = provision_cluster_metadata( + &metadata_store_client, + &common_opts, + &cluster_configuration, + ) + .await; match response { - Ok(response) => { - let response = response.into_inner(); - debug_assert!(!response.dry_run, "Provision w/o dry run"); - } - Err(err) => { - if err.code() == Code::AlreadyExists { - debug!("The cluster is already provisioned.") + Ok(provisioned) => { + if provisioned { + info!("Auto provisioned cluster."); } else { - warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); + debug!("The cluster is already provisioned."); } } + Err(err) => { + warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); + } } Ok(()) @@ -551,6 +540,135 @@ impl Node { } } +#[derive(Clone, Debug, IntoProto)] +#[proto(target = "restate_types::protobuf::cluster::ClusterConfiguration")] +pub struct ClusterConfiguration { + #[into_proto(map = "num_partitions_to_u32")] + pub num_partitions: NonZeroU16, + #[proto(required)] + pub replication_strategy: ReplicationStrategy, + #[proto(required)] + pub default_provider: DefaultProvider, +} + +fn num_partitions_to_u32(num_partitions: NonZeroU16) -> u32 { + u32::from(num_partitions.get()) +} + +impl ClusterConfiguration { + pub fn from_configuration(configuration: &Configuration) -> Self { + ClusterConfiguration { + num_partitions: configuration.common.bootstrap_num_partitions, + replication_strategy: ReplicationStrategy::default(), + default_provider: DefaultProvider::from_configuration(configuration), + } + } +} + +/// Provision the cluster metadata. Returns `true` if the cluster was newly provisioned. Returns +/// `false` if the cluster is already provisioned. +/// +/// This method returns an error if any of the initial metadata couldn't be written to the +/// metadata store. In this case, the method does not try to clean the already written metadata +/// up. Instead, the caller can retry to complete the provisioning. +async fn provision_cluster_metadata( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, +) -> anyhow::Result { + let (initial_nodes_configuration, initial_partition_table, initial_logs) = + generate_initial_metadata(common_opts, cluster_configuration); + + let result = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + metadata_store_client.provision(&initial_nodes_configuration) + }) + .await?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + write_initial_value_dont_fail_if_it_exists( + metadata_store_client, + PARTITION_TABLE_KEY.clone(), + &initial_partition_table, + ) + }) + .await + .context("failed provisioning the initial partition table")?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + write_initial_value_dont_fail_if_it_exists( + metadata_store_client, + BIFROST_CONFIG_KEY.clone(), + &initial_logs, + ) + }) + .await + .context("failed provisioning the initial logs configuration")?; + + Ok(result) +} + +fn create_initial_nodes_configuration(common_opts: &CommonOptions) -> NodesConfiguration { + let mut initial_nodes_configuration = + NodesConfiguration::new(Version::MIN, common_opts.cluster_name().to_owned()); + let node_config = NodeConfig::new( + common_opts.node_name().to_owned(), + common_opts + .force_node_id + .map(|force_node_id| force_node_id.with_generation(1)) + .unwrap_or(GenerationalNodeId::INITIAL_NODE_ID), + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ); + initial_nodes_configuration.upsert_node(node_config); + initial_nodes_configuration +} + +fn generate_initial_metadata( + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, +) -> (NodesConfiguration, PartitionTable, Logs) { + let mut initial_partition_table_builder = PartitionTableBuilder::default(); + initial_partition_table_builder + .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) + .expect("Empty partition table should not have conflicts"); + initial_partition_table_builder + .set_replication_strategy(cluster_configuration.replication_strategy); + let initial_partition_table = initial_partition_table_builder.build(); + + let mut logs_builder = Logs::default().into_builder(); + logs_builder.set_configuration(LogsConfiguration::from( + cluster_configuration.default_provider.clone(), + )); + let initial_logs = logs_builder.build(); + + let initial_nodes_configuration = create_initial_nodes_configuration(common_opts); + + ( + initial_nodes_configuration, + initial_partition_table, + initial_logs, + ) +} + +async fn write_initial_value_dont_fail_if_it_exists( + metadata_store_client: &MetadataStoreClient, + key: ByteString, + initial_value: &T, +) -> Result<(), WriteError> { + match metadata_store_client + .put(key, initial_value, Precondition::DoesNotExist) + .await + { + Ok(_) => Ok(()), + Err(WriteError::FailedPrecondition(_)) => { + // we might have failed on a previous attempt after writing this value; so let's continue + Ok(()) + } + Err(err) => Err(err), + } +} + #[cfg(not(feature = "replicated-loglet"))] fn warn_if_log_store_left_artifacts(config: &Configuration) { if config.log_server.data_dir().exists() { diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 28d5cf3d8..f413d3196 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -8,15 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::{provision_cluster_metadata, ClusterConfiguration}; use anyhow::Context; use bytes::BytesMut; -use bytestring::ByteString; use enumset::EnumSet; use futures::stream::BoxStream; -use prost_dto::IntoProto; -use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, WriteError, -}; +use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; use restate_core::network::{ConnectionManager, ProtocolError, TransportConnect}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; @@ -26,16 +23,14 @@ use restate_core::protobuf::node_ctl_svc::{ }; use restate_core::task_center::TaskCenterMonitoring; use restate_core::{task_center, Metadata, MetadataKind, TargetVersion}; -use restate_types::config::{CommonOptions, Configuration}; +use restate_types::config::Configuration; use restate_types::health::Health; -use restate_types::logs::metadata::{DefaultProvider, Logs, LogsConfiguration}; -use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; -use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; -use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy}; +use restate_types::logs::metadata::DefaultProvider; +use restate_types::nodes_config::Role; +use restate_types::partition_table::ReplicationStrategy; use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfiguration; use restate_types::protobuf::node::Message; -use restate_types::storage::{StorageCodec, StorageEncode}; -use restate_types::{GenerationalNodeId, Version, Versioned}; +use restate_types::storage::StorageCodec; use std::num::NonZeroU16; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; @@ -65,106 +60,6 @@ impl NodeCtlSvcHandler { } } - /// Provision the cluster metadata. Returns `true` if the cluster was newly provisioned. Returns - /// `false` if the cluster is already provisioned. - async fn provision_cluster_metadata( - &self, - common_opts: &CommonOptions, - cluster_configuration: &ClusterConfiguration, - ) -> anyhow::Result { - let (initial_nodes_configuration, initial_partition_table, initial_logs) = - Self::generate_initial_metadata(common_opts, cluster_configuration); - - let result = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - self.metadata_store_client - .provision(&initial_nodes_configuration) - }) - .await?; - - retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - self.write_initial_value_dont_fail_if_it_exists( - PARTITION_TABLE_KEY.clone(), - &initial_partition_table, - ) - }) - .await - .context("failed provisioning the initial partition table")?; - - retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - self.write_initial_value_dont_fail_if_it_exists( - BIFROST_CONFIG_KEY.clone(), - &initial_logs, - ) - }) - .await - .context("failed provisioning the initial logs configuration")?; - - Ok(result) - } - - pub fn create_initial_nodes_configuration(common_opts: &CommonOptions) -> NodesConfiguration { - let mut initial_nodes_configuration = - NodesConfiguration::new(Version::MIN, common_opts.cluster_name().to_owned()); - let node_config = NodeConfig::new( - common_opts.node_name().to_owned(), - common_opts - .force_node_id - .map(|force_node_id| force_node_id.with_generation(1)) - .unwrap_or(GenerationalNodeId::INITIAL_NODE_ID), - common_opts.advertised_address.clone(), - common_opts.roles, - LogServerConfig::default(), - ); - initial_nodes_configuration.upsert_node(node_config); - initial_nodes_configuration - } - - fn generate_initial_metadata( - common_opts: &CommonOptions, - cluster_configuration: &ClusterConfiguration, - ) -> (NodesConfiguration, PartitionTable, Logs) { - let mut initial_partition_table_builder = PartitionTableBuilder::default(); - initial_partition_table_builder - .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) - .expect("Empty partition table should not have conflicts"); - initial_partition_table_builder - .set_replication_strategy(cluster_configuration.replication_strategy); - let initial_partition_table = initial_partition_table_builder.build(); - - let mut logs_builder = Logs::default().into_builder(); - logs_builder.set_configuration(LogsConfiguration::from( - cluster_configuration.default_provider.clone(), - )); - let initial_logs = logs_builder.build(); - - let initial_nodes_configuration = Self::create_initial_nodes_configuration(common_opts); - - ( - initial_nodes_configuration, - initial_partition_table, - initial_logs, - ) - } - - async fn write_initial_value_dont_fail_if_it_exists( - &self, - key: ByteString, - initial_value: &T, - ) -> Result<(), WriteError> { - match self - .metadata_store_client - .put(key, initial_value, Precondition::DoesNotExist) - .await - { - Ok(_) => Ok(()), - Err(WriteError::FailedPrecondition(_)) => { - // we might have failed on a previous attempt after writing this value; so let's continue - Ok(()) - } - Err(err) => Err(err), - } - } - fn resolve_cluster_configuration( config: &Configuration, request: ProvisionClusterRequest, @@ -280,10 +175,13 @@ impl NodeCtlSvc for NodeCtlSvcHandler { ))); } - let newly_provisioned = self - .provision_cluster_metadata(&config.common, &cluster_configuration) - .await - .map_err(|err| Status::internal(err.to_string()))?; + let newly_provisioned = provision_cluster_metadata( + &self.metadata_store_client, + &config.common, + &cluster_configuration, + ) + .await + .map_err(|err| Status::internal(err.to_string()))?; if !newly_provisioned { return Err(Status::already_exists( @@ -297,21 +195,6 @@ impl NodeCtlSvc for NodeCtlSvcHandler { } } -#[derive(Clone, Debug, IntoProto)] -#[proto(target = "restate_types::protobuf::cluster::ClusterConfiguration")] -pub struct ClusterConfiguration { - #[into_proto(map = "num_partitions_to_u32")] - pub num_partitions: NonZeroU16, - #[proto(required)] - pub replication_strategy: ReplicationStrategy, - #[proto(required)] - pub default_provider: DefaultProvider, -} - -fn num_partitions_to_u32(num_partitions: NonZeroU16) -> u32 { - u32::from(num_partitions.get()) -} - pub struct CoreNodeSvcHandler { connections: ConnectionManager, }