diff --git a/Cargo.lock b/Cargo.lock index bfacfeffd..76af1b08f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6450,6 +6450,7 @@ dependencies = [ name = "restate-local-cluster-runner" version = "1.1.6" dependencies = [ + "anyhow", "arc-swap", "clap", "clap-verbosity-flag", @@ -6461,6 +6462,7 @@ dependencies = [ "rand", "regex", "reqwest", + "restate-core", "restate-metadata-store", "restate-types", "rev_lines", diff --git a/crates/local-cluster-runner/Cargo.toml b/crates/local-cluster-runner/Cargo.toml index 3f3e26fd9..708524c78 100644 --- a/crates/local-cluster-runner/Cargo.toml +++ b/crates/local-cluster-runner/Cargo.toml @@ -11,10 +11,12 @@ publish = false default = [] [dependencies] +restate-core = { workspace = true } restate-metadata-store = { workspace = true } # nb features here will also affect the compiled restate-server binary in integration tests restate-types = { workspace = true, features = ["unsafe-mutable-config"] } +anyhow = { workspace = true } arc-swap = { workspace = true } clap = { workspace = true } clap-verbosity-flag = { workspace = true } diff --git a/crates/local-cluster-runner/src/cluster/mod.rs b/crates/local-cluster-runner/src/cluster/mod.rs index ba39cb02f..4f0ee4f12 100644 --- a/crates/local-cluster-runner/src/cluster/mod.rs +++ b/crates/local-cluster-runner/src/cluster/mod.rs @@ -102,12 +102,6 @@ impl Cluster { .start_clustered(base_dir.as_path(), &cluster_name) .await .map_err(|err| ClusterStartError::NodeStartError(i, err))?; - if node.admin_address().is_some() { - // admin nodes are needed for later nodes to bootstrap. we should wait until they are serving - HealthCheck::Admin - .wait_healthy(&node, Duration::from_secs(30)) - .await?; - } started_nodes.push(node) } diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index a147666af..605e411e9 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -8,6 +8,31 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::random_socket_address; +use arc_swap::ArcSwapOption; +use enumset::{enum_set, EnumSet}; +use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; +use itertools::Itertools; +use regex::{Regex, RegexSet}; +use restate_core::network::net_util::create_tonic_channel_from_advertised_address; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::{ + ProvisionClusterRequest as ProtoProvisionClusterRequest, ProvisionClusterResponseKind, +}; +use restate_types::logs::metadata::DefaultProvider; +use restate_types::partition_table::ReplicationStrategy; +use restate_types::retries::RetryPolicy; +use restate_types::{ + config::{Configuration, MetadataStoreClient}, + errors::GenericError, + metadata_store::keys::NODES_CONFIG_KEY, + net::{AdvertisedAddress, BindAddress}, + nodes_config::{NodesConfiguration, Role}, + PlainNodeId, +}; +use rev_lines::RevLines; +use serde::{Deserialize, Serialize}; +use std::num::NonZeroU16; use std::{ ffi::OsString, fmt::Display, @@ -22,14 +47,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; - -use arc_swap::ArcSwapOption; -use enumset::{enum_set, EnumSet}; -use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; -use itertools::Itertools; -use regex::{Regex, RegexSet}; -use rev_lines::RevLines; -use serde::{Deserialize, Serialize}; use tokio::{ fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, @@ -40,17 +57,6 @@ use tokio::{process::Command, sync::mpsc::Sender}; use tracing::{error, info, warn}; use typed_builder::TypedBuilder; -use restate_types::{ - config::{Configuration, MetadataStoreClient}, - errors::GenericError, - metadata_store::keys::NODES_CONFIG_KEY, - net::{AdvertisedAddress, BindAddress}, - nodes_config::{NodesConfiguration, Role}, - PlainNodeId, -}; - -use crate::random_socket_address; - #[derive(Debug, Clone, Serialize, Deserialize, TypedBuilder)] pub struct Node { #[builder(mutators( @@ -747,6 +753,54 @@ impl StartedNode { !nodes_config.get_log_server_storage_state(&node_id).empty() } + + /// Provisions the cluster on this node with the given configuration. Returns true if the + /// cluster was newly provisioned. + pub async fn provision_cluster( + &self, + num_partitions: Option, + placement_strategy: Option, + log_provider: Option, + ) -> anyhow::Result { + let channel = create_tonic_channel_from_advertised_address( + self.node_address().clone(), + &Configuration::default().networking, + ); + + let request = ProtoProvisionClusterRequest { + dry_run: false, + num_partitions: num_partitions.map(|num| u32::from(num.get())), + placement_strategy: placement_strategy + .map(|replication_strategy| replication_strategy.into()), + log_provider: log_provider.map(|log_provider| log_provider.into()), + }; + + let retry_policy = RetryPolicy::exponential( + Duration::from_millis(10), + 2.0, + Some(10), + Some(Duration::from_secs(1)), + ); + let client = NodeCtlSvcClient::new(channel); + + let response = retry_policy + .retry(|| { + let mut client = client.clone(); + let request = request.clone(); + async move { client.provision_cluster(request).await } + }) + .await? + .into_inner(); + + Ok(match response.kind() { + ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { + panic!("unknown cluster response type") + } + ProvisionClusterResponseKind::DryRun => unreachable!("request non dry run"), + ProvisionClusterResponseKind::NewlyProvisioned => true, + ProvisionClusterResponseKind::AlreadyProvisioned => false, + }) + } } #[derive(Debug, Clone, Copy)] diff --git a/server/tests/cluster.rs b/server/tests/cluster.rs index 503b529cc..5ecad7686 100644 --- a/server/tests/cluster.rs +++ b/server/tests/cluster.rs @@ -8,18 +8,22 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::num::NonZeroU16; +use std::num::{NonZeroU16, NonZeroU8}; use std::time::Duration; use enumset::enum_set; use futures_util::StreamExt; +use googletest::IntoTestResult; use regex::Regex; use restate_local_cluster_runner::{ cluster::Cluster, node::{BinarySource, Node}, }; use restate_types::config::MetadataStoreClient; -use restate_types::logs::metadata::ProviderKind; +use restate_types::logs::metadata::{ + DefaultProvider, NodeSetSelectionStrategy, ReplicatedLogletConfig, +}; +use restate_types::replicated_loglet::ReplicationProperty; use restate_types::{config::Configuration, nodes_config::Role, PlainNodeId}; use test_log::test; @@ -126,7 +130,8 @@ async fn cluster_name_mismatch() -> googletest::Result<()> { #[test(restate_core::test)] async fn replicated_loglet() -> googletest::Result<()> { let mut base_config = Configuration::default(); - base_config.bifrost.default_provider = ProviderKind::Replicated; + // require an explicit provision step to configure the replication property to 2 + base_config.common.allow_bootstrap = false; base_config.common.bootstrap_num_partitions = NonZeroU16::new(1).expect("1 to be non-zero"); let nodes = Node::new_test_nodes_with_metadata( @@ -148,6 +153,20 @@ async fn replicated_loglet() -> googletest::Result<()> { .start() .await?; + let replicated_loglet_config = ReplicatedLogletConfig { + replication_property: ReplicationProperty::new(NonZeroU8::new(2).expect("to be non-zero")), + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + }; + + cluster.nodes[0] + .provision_cluster( + None, + None, + Some(DefaultProvider::Replicated(replicated_loglet_config)), + ) + .await + .into_test_result()?; + cluster.wait_healthy(Duration::from_secs(30)).await?; for partition_processor in &mut partition_processors_starting_up {