diff --git a/crates/core/build.rs b/crates/core/build.rs index 2089560ef..135e37a1d 100644 --- a/crates/core/build.rs +++ b/crates/core/build.rs @@ -16,13 +16,24 @@ fn main() -> Result<(), Box> { tonic_build::configure() .bytes(["."]) - .file_descriptor_set_path(out_dir.join("node_svc_descriptor.bin")) + .file_descriptor_set_path(out_dir.join("node_ctl_svc_descriptor.bin")) // allow older protobuf compiler to be used .protoc_arg("--experimental_allow_proto3_optional") .extern_path(".restate.node", "::restate_types::protobuf::node") .extern_path(".restate.common", "::restate_types::protobuf::common") .compile_protos( - &["./protobuf/node_svc.proto"], + &["./protobuf/node_ctl_svc.proto"], + &["protobuf", "../types/protobuf"], + )?; + + tonic_build::configure() + .bytes(["."]) + .file_descriptor_set_path(out_dir.join("core_node_svc_descriptor.bin")) + // allow older protobuf compiler to be used + .protoc_arg("--experimental_allow_proto3_optional") + .extern_path(".restate.node", "::restate_types::protobuf::node") + .compile_protos( + &["./protobuf/core_node_svc.proto"], &["protobuf", "../types/protobuf"], )?; diff --git a/crates/core/protobuf/core_node_svc.proto b/crates/core/protobuf/core_node_svc.proto new file mode 100644 index 000000000..333ae2cda --- /dev/null +++ b/crates/core/protobuf/core_node_svc.proto @@ -0,0 +1,21 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +import "restate/node.proto"; + +package restate.core_node_svc; + +// Service which is only accessible on nodes that are alive. +service CoreNodeSvc { + // Create a bidirectional node-to-node stream + rpc CreateConnection(stream restate.node.Message) + returns (stream restate.node.Message); +} \ No newline at end of file diff --git a/crates/core/protobuf/node_svc.proto b/crates/core/protobuf/node_ctl_svc.proto similarity index 89% rename from crates/core/protobuf/node_svc.proto rename to crates/core/protobuf/node_ctl_svc.proto index 62a81bf87..74350bc4a 100644 --- a/crates/core/protobuf/node_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -13,16 +13,12 @@ import "google/protobuf/empty.proto"; import "restate/common.proto"; import "restate/node.proto"; -package restate.node_svc; +package restate.node_ctl_svc; -service NodeSvc { +service NodeCtlSvc { // Get identity information from this node. rpc GetIdent(google.protobuf.Empty) returns (IdentResponse); - // Create a bidirectional node-to-node stream - rpc CreateConnection(stream restate.node.Message) - returns (stream restate.node.Message); - rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse); } diff --git a/crates/core/src/metadata.rs b/crates/core/src/metadata.rs index c0c9d3227..2244c0ef3 100644 --- a/crates/core/src/metadata.rs +++ b/crates/core/src/metadata.rs @@ -116,6 +116,11 @@ impl Metadata { *self.inner.my_node_id.get().expect("my_node_id is set") } + /// Returns None if my node id has not been set yet. + pub fn my_node_id_opt(&self) -> Option { + self.inner.my_node_id.get().cloned() + } + /// Returns Version::INVALID if nodes configuration has not been loaded yet. pub fn nodes_config_version(&self) -> Version { self.inner.nodes_config.load().version() diff --git a/crates/core/src/network/protobuf.rs b/crates/core/src/network/protobuf.rs index aea08f2d7..bf64a3e30 100644 --- a/crates/core/src/network/protobuf.rs +++ b/crates/core/src/network/protobuf.rs @@ -8,9 +8,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -pub mod node_svc { - tonic::include_proto!("restate.node_svc"); +pub mod node_ctl_svc { + tonic::include_proto!("restate.node_ctl_svc"); pub const FILE_DESCRIPTOR_SET: &[u8] = - tonic::include_file_descriptor_set!("node_svc_descriptor"); + tonic::include_file_descriptor_set!("node_ctl_svc_descriptor"); +} + +pub mod core_node_svc { + tonic::include_proto!("restate.core_node_svc"); + + pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("core_node_svc_descriptor"); } diff --git a/crates/core/src/network/transport_connector.rs b/crates/core/src/network/transport_connector.rs index b471246bd..af406c4b7 100644 --- a/crates/core/src/network/transport_connector.rs +++ b/crates/core/src/network/transport_connector.rs @@ -21,7 +21,7 @@ use restate_types::nodes_config::NodesConfiguration; use restate_types::protobuf::node::Message; use restate_types::GenerationalNodeId; -use super::protobuf::node_svc::node_svc_client::NodeSvcClient; +use super::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient; use super::{NetworkError, ProtocolError}; use crate::network::net_util::create_tonic_channel_from_advertised_address; @@ -79,7 +79,7 @@ impl TransportConnect for GrpcConnector { }; // Establish the connection - let mut client = NodeSvcClient::new(channel); + let mut client = CoreNodeSvcClient::new(channel); let incoming = client.create_connection(output_stream).await?.into_inner(); Ok(incoming.map(|x| x.map_err(ProtocolError::from))) } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 1376007fd..42f3b868a 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -318,6 +318,14 @@ impl Node { pub async fn start(self) -> Result<(), anyhow::Error> { let config = self.updateable_config.pinned(); + let metadata_writer = self.metadata_manager.writer(); + let metadata = self.metadata_manager.metadata().clone(); + let is_set = TaskCenter::try_set_global_metadata(metadata.clone()); + debug_assert!(is_set, "Global metadata was already set"); + + // Start metadata manager + spawn_metadata_manager(self.metadata_manager)?; + // spawn the node rpc server first to enable connecting to the metadata store TaskCenter::spawn(TaskKind::RpcServer, "node-rpc-server", { let health = self.health.clone(); @@ -346,14 +354,6 @@ impl Node { )?; } - let metadata_writer = self.metadata_manager.writer(); - let metadata = self.metadata_manager.metadata().clone(); - let is_set = TaskCenter::try_set_global_metadata(metadata.clone()); - debug_assert!(is_set, "Global metadata was already set"); - - // Start metadata manager - spawn_metadata_manager(self.metadata_manager)?; - // Start partition routing information refresher spawn_partition_routing_refresher(self.partition_routing_refresher)?; diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 9e3fb26cf..0ec5b7bc4 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -14,8 +14,9 @@ use futures::stream::BoxStream; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; -use restate_core::network::protobuf::node_svc::node_svc_server::NodeSvc; -use restate_core::network::protobuf::node_svc::{ +use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; +use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; +use restate_core::network::protobuf::node_ctl_svc::{ GetMetadataRequest, GetMetadataResponse, IdentResponse, }; use restate_core::network::ConnectionManager; @@ -27,34 +28,31 @@ use restate_types::nodes_config::Role; use restate_types::protobuf::node::Message; use restate_types::storage::StorageCodec; -pub struct NodeSvcHandler { +pub struct NodeCtlSvcHandler { task_center: task_center::Handle, cluster_name: String, roles: EnumSet, health: Health, - connections: ConnectionManager, } -impl NodeSvcHandler { +impl NodeCtlSvcHandler { pub fn new( task_center: task_center::Handle, cluster_name: String, roles: EnumSet, health: Health, - connections: ConnectionManager, ) -> Self { Self { task_center, cluster_name, roles, health, - connections, } } } #[async_trait::async_trait] -impl NodeSvc for NodeSvcHandler { +impl NodeCtlSvc for NodeCtlSvcHandler { async fn get_ident(&self, _request: Request<()>) -> Result, Status> { let node_status = self.health.current_node_status(); let admin_status = self.health.current_admin_status(); @@ -65,7 +63,7 @@ impl NodeSvc for NodeSvcHandler { let metadata = Metadata::current(); Ok(Response::new(IdentResponse { status: node_status.into(), - node_id: Some(metadata.my_node_id().into()), + node_id: metadata.my_node_id_opt().map(Into::into), roles: self.roles.iter().map(|r| r.to_string()).collect(), cluster_name: self.cluster_name.clone(), age_s, @@ -80,33 +78,6 @@ impl NodeSvc for NodeSvcHandler { })) } - type CreateConnectionStream = BoxStream<'static, Result>; - - // Status codes returned in different scenarios: - // - DeadlineExceeded: No hello received within deadline - // - InvalidArgument: Header should always be set or any other missing required part of the - // handshake. This also happens if the client sent wrong message on handshake. - // - AlreadyExists: A node with a newer generation has been observed already - // - Cancelled: received an error from the client, or the client has dropped the stream during - // handshake. - // - Unavailable: This node is shutting down - async fn create_connection( - &self, - request: Request>, - ) -> Result, Status> { - let incoming = request.into_inner(); - let transformed = incoming.map(|x| x.map_err(ProtocolError::from)); - let output_stream = self - .connections - .accept_incoming_connection(transformed) - .await?; - - // For uniformity with outbound connections, we map all responses to Ok, we never rely on - // sending tonic::Status errors explicitly. We use ConnectionControl frames to communicate - // errors and/or drop the stream when necessary. - Ok(Response::new(Box::pin(output_stream.map(Ok)))) - } - async fn get_metadata( &self, request: Request, @@ -144,3 +115,46 @@ impl NodeSvc for NodeSvcHandler { })) } } + +pub struct CoreNodeSvcHandler { + connections: ConnectionManager, +} + +impl CoreNodeSvcHandler { + pub fn new(connections: ConnectionManager) -> Self { + Self { connections } + } +} + +#[async_trait::async_trait] +impl CoreNodeSvc for CoreNodeSvcHandler +where + T: TransportConnect, +{ + type CreateConnectionStream = BoxStream<'static, Result>; + + // Status codes returned in different scenarios: + // - DeadlineExceeded: No hello received within deadline + // - InvalidArgument: Header should always be set or any other missing required part of the + // handshake. This also happens if the client sent wrong message on handshake. + // - AlreadyExists: A node with a newer generation has been observed already + // - Cancelled: received an error from the client, or the client has dropped the stream during + // handshake. + // - Unavailable: This node is shutting down + async fn create_connection( + &self, + request: Request>, + ) -> Result, Status> { + let incoming = request.into_inner(); + let transformed = incoming.map(|x| x.map_err(ProtocolError::from)); + let output_stream = self + .connections + .accept_incoming_connection(transformed) + .await?; + + // For uniformity with outbound connections, we map all responses to Ok, we never rely on + // sending tonic::Status errors explicitly. We use ConnectionControl frames to communicate + // errors and/or drop the stream when necessary. + Ok(Response::new(Box::pin(output_stream.map(Ok)))) + } +} diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 7f98100d4..fd45720b8 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -17,7 +17,8 @@ use tracing::{debug, trace}; use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics}; use crate::network_server::state::NodeCtrlHandlerStateBuilder; -use restate_core::network::protobuf::node_svc::node_svc_server::NodeSvcServer; +use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer; +use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; use restate_core::network::tonic_service_filter::TonicServiceFilter; use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect}; use restate_core::{cancellation_watcher, TaskCenter, TaskKind}; @@ -25,7 +26,7 @@ use restate_types::config::CommonOptions; use restate_types::health::Health; use restate_types::protobuf::common::NodeStatus; -use super::grpc_svc_handler::NodeSvcHandler; +use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler}; use super::pprof; pub struct NetworkServer {} @@ -93,19 +94,25 @@ impl NetworkServer { let node_health = health.node_status(); + server_builder.register_grpc_service( + NodeCtlSvcServer::new(NodeCtlSvcHandler::new( + TaskCenter::current(), + options.cluster_name().to_owned(), + options.roles, + health, + )) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip), + restate_core::network::protobuf::node_ctl_svc::FILE_DESCRIPTOR_SET, + ); + server_builder.register_grpc_service( TonicServiceFilter::new( - NodeSvcServer::new(NodeSvcHandler::new( - TaskCenter::current(), - options.cluster_name().to_owned(), - options.roles, - health, - connection_manager, - )) - .max_decoding_message_size(32 * 1024 * 1024) - .max_encoding_message_size(32 * 1024 * 1024) - .accept_compressed(CompressionEncoding::Gzip) - .send_compressed(CompressionEncoding::Gzip), + CoreNodeSvcServer::new(CoreNodeSvcHandler::new(connection_manager)) + .max_decoding_message_size(32 * 1024 * 1024) + .max_encoding_message_size(32 * 1024 * 1024) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip), move |req| { if *node_health.get() == NodeStatus::Alive { Ok(req) @@ -114,7 +121,7 @@ impl NetworkServer { } }, ), - restate_types::protobuf::FILE_DESCRIPTOR_SET, + restate_core::network::protobuf::core_node_svc::FILE_DESCRIPTOR_SET, ); server_builder diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index 246d556a6..eb8488b52 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -86,6 +86,7 @@ enum TargetName { enum NodeStatus { NodeStatus_UNKNOWN = 0; + // The node has joined the cluster and is fully operational. ALIVE = 1; // The node is not fully running yet. STARTING_UP = 2; diff --git a/tools/restatectl/src/commands/node/list_nodes.rs b/tools/restatectl/src/commands/node/list_nodes.rs index 4624fb876..fbcc1b5a4 100644 --- a/tools/restatectl/src/commands/node/list_nodes.rs +++ b/tools/restatectl/src/commands/node/list_nodes.rs @@ -24,8 +24,8 @@ use restate_cli_util::_comfy_table::{Cell, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; use restate_cli_util::ui::{duration_to_human_rough, Tense}; -use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; -use restate_core::network::protobuf::node_svc::IdentResponse; +use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::network::protobuf::node_ctl_svc::IdentResponse; use restate_types::nodes_config::NodesConfiguration; use restate_types::storage::StorageCodec; use restate_types::PlainNodeId; @@ -155,10 +155,10 @@ async fn fetch_extra_info( let address = node_config.address.clone(); let get_ident = async move { let node_channel = grpc_connect(address).await?; - let mut node_svc_client = - NodeSvcClient::new(node_channel).accept_compressed(CompressionEncoding::Gzip); + let mut node_ctl_svc_client = + NodeCtlSvcClient::new(node_channel).accept_compressed(CompressionEncoding::Gzip); - Ok(node_svc_client + Ok(node_ctl_svc_client .get_ident(()) .await .map_err(|e| { diff --git a/tools/restatectl/src/commands/replicated_loglet/digest.rs b/tools/restatectl/src/commands/replicated_loglet/digest.rs index 9fdb6fd9c..272bd3951 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest.rs @@ -19,8 +19,8 @@ use restate_bifrost::providers::replicated_loglet::replication::NodeSetChecker; use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; -use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; -use restate_core::network::protobuf::node_svc::GetMetadataRequest; +use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::network::protobuf::node_ctl_svc::GetMetadataRequest; use restate_core::MetadataKind; use restate_log_server::protobuf::log_server_svc_client::LogServerSvcClient; use restate_log_server::protobuf::GetDigestRequest; @@ -61,7 +61,7 @@ async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::R connection.cluster_controller ) })?; - let mut client = NodeSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + let mut client = NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); let req = GetMetadataRequest { kind: MetadataKind::Logs.into(), sync: opts.sync_metadata, diff --git a/tools/restatectl/src/commands/replicated_loglet/info.rs b/tools/restatectl/src/commands/replicated_loglet/info.rs index 3a4535d05..521f79355 100644 --- a/tools/restatectl/src/commands/replicated_loglet/info.rs +++ b/tools/restatectl/src/commands/replicated_loglet/info.rs @@ -12,8 +12,8 @@ use anyhow::Context; use cling::prelude::*; use restate_cli_util::{c_indentln, c_println}; -use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; -use restate_core::network::protobuf::node_svc::GetMetadataRequest; +use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::network::protobuf::node_ctl_svc::GetMetadataRequest; use restate_core::MetadataKind; use restate_types::logs::metadata::Logs; use restate_types::replicated_loglet::ReplicatedLogletId; @@ -43,7 +43,7 @@ async fn get_info(connection: &ConnectionInfo, opts: &InfoOpts) -> anyhow::Resul connection.cluster_controller ) })?; - let mut client = NodeSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + let mut client = NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); let req = GetMetadataRequest { kind: MetadataKind::Logs.into(),