Skip to content

Commit

Permalink
Split NodeSvc into NodeCtlSvc and CoreNodeSvc
Browse files Browse the repository at this point in the history
The CoreNodeSvc contains handler which are only available once a
node has joined the cluster and become alive. This includes currently
the handler to initiate a network connection to the node via Restate's
internal networking.
  • Loading branch information
tillrohrmann committed Dec 16, 2024
1 parent d9d52a8 commit fe189e6
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 82 deletions.
15 changes: 13 additions & 2 deletions crates/core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,24 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("node_svc_descriptor.bin"))
.file_descriptor_set_path(out_dir.join("node_ctl_svc_descriptor.bin"))
// allow older protobuf compiler to be used
.protoc_arg("--experimental_allow_proto3_optional")
.extern_path(".restate.node", "::restate_types::protobuf::node")
.extern_path(".restate.common", "::restate_types::protobuf::common")
.compile_protos(
&["./protobuf/node_svc.proto"],
&["./protobuf/node_ctl_svc.proto"],
&["protobuf", "../types/protobuf"],
)?;

tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("core_node_svc_descriptor.bin"))
// allow older protobuf compiler to be used
.protoc_arg("--experimental_allow_proto3_optional")
.extern_path(".restate.node", "::restate_types::protobuf::node")
.compile_protos(
&["./protobuf/core_node_svc.proto"],
&["protobuf", "../types/protobuf"],
)?;

Expand Down
21 changes: 21 additions & 0 deletions crates/core/protobuf/core_node_svc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/proto/blob/main/LICENSE

syntax = "proto3";

import "restate/node.proto";

package restate.core_node_svc;

// Service which is only accessible on nodes that are alive.
service CoreNodeSvc {
// Create a bidirectional node-to-node stream
rpc CreateConnection(stream restate.node.Message)
returns (stream restate.node.Message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,12 @@ import "google/protobuf/empty.proto";
import "restate/common.proto";
import "restate/node.proto";

package restate.node_svc;
package restate.node_ctl_svc;

service NodeSvc {
service NodeCtlSvc {
// Get identity information from this node.
rpc GetIdent(google.protobuf.Empty) returns (IdentResponse);

// Create a bidirectional node-to-node stream
rpc CreateConnection(stream restate.node.Message)
returns (stream restate.node.Message);

rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse);
}

Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ impl Metadata {
*self.inner.my_node_id.get().expect("my_node_id is set")
}

/// Returns None if my node id has not been set yet.
pub fn my_node_id_opt(&self) -> Option<GenerationalNodeId> {
self.inner.my_node_id.get().cloned()
}

/// Returns Version::INVALID if nodes configuration has not been loaded yet.
pub fn nodes_config_version(&self) -> Version {
self.inner.nodes_config.load().version()
Expand Down
13 changes: 10 additions & 3 deletions crates/core/src/network/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod node_svc {
tonic::include_proto!("restate.node_svc");
pub mod node_ctl_svc {
tonic::include_proto!("restate.node_ctl_svc");

pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("node_svc_descriptor");
tonic::include_file_descriptor_set!("node_ctl_svc_descriptor");
}

pub mod core_node_svc {
tonic::include_proto!("restate.core_node_svc");

pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("core_node_svc_descriptor");
}
4 changes: 2 additions & 2 deletions crates/core/src/network/transport_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::node::Message;
use restate_types::GenerationalNodeId;

use super::protobuf::node_svc::node_svc_client::NodeSvcClient;
use super::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient;
use super::{NetworkError, ProtocolError};
use crate::network::net_util::create_tonic_channel_from_advertised_address;

Expand Down Expand Up @@ -79,7 +79,7 @@ impl TransportConnect for GrpcConnector {
};

// Establish the connection
let mut client = NodeSvcClient::new(channel);
let mut client = CoreNodeSvcClient::new(channel);
let incoming = client.create_connection(output_stream).await?.into_inner();
Ok(incoming.map(|x| x.map_err(ProtocolError::from)))
}
Expand Down
16 changes: 8 additions & 8 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ impl Node {
pub async fn start(self) -> Result<(), anyhow::Error> {
let config = self.updateable_config.pinned();

let metadata_writer = self.metadata_manager.writer();
let metadata = self.metadata_manager.metadata().clone();
let is_set = TaskCenter::try_set_global_metadata(metadata.clone());
debug_assert!(is_set, "Global metadata was already set");

// Start metadata manager
spawn_metadata_manager(self.metadata_manager)?;

// spawn the node rpc server first to enable connecting to the metadata store
TaskCenter::spawn(TaskKind::RpcServer, "node-rpc-server", {
let health = self.health.clone();
Expand Down Expand Up @@ -346,14 +354,6 @@ impl Node {
)?;
}

let metadata_writer = self.metadata_manager.writer();
let metadata = self.metadata_manager.metadata().clone();
let is_set = TaskCenter::try_set_global_metadata(metadata.clone());
debug_assert!(is_set, "Global metadata was already set");

// Start metadata manager
spawn_metadata_manager(self.metadata_manager)?;

// Start partition routing information refresher
spawn_partition_routing_refresher(self.partition_routing_refresher)?;

Expand Down
86 changes: 50 additions & 36 deletions crates/node/src/network_server/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use futures::stream::BoxStream;
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status, Streaming};

use restate_core::network::protobuf::node_svc::node_svc_server::NodeSvc;
use restate_core::network::protobuf::node_svc::{
use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc;
use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc;
use restate_core::network::protobuf::node_ctl_svc::{
GetMetadataRequest, GetMetadataResponse, IdentResponse,
};
use restate_core::network::ConnectionManager;
Expand All @@ -27,34 +28,31 @@ use restate_types::nodes_config::Role;
use restate_types::protobuf::node::Message;
use restate_types::storage::StorageCodec;

pub struct NodeSvcHandler<T> {
pub struct NodeCtlSvcHandler {
task_center: task_center::Handle,
cluster_name: String,
roles: EnumSet<Role>,
health: Health,
connections: ConnectionManager<T>,
}

impl<T: TransportConnect> NodeSvcHandler<T> {
impl NodeCtlSvcHandler {
pub fn new(
task_center: task_center::Handle,
cluster_name: String,
roles: EnumSet<Role>,
health: Health,
connections: ConnectionManager<T>,
) -> Self {
Self {
task_center,
cluster_name,
roles,
health,
connections,
}
}
}

#[async_trait::async_trait]
impl<T: TransportConnect> NodeSvc for NodeSvcHandler<T> {
impl NodeCtlSvc for NodeCtlSvcHandler {
async fn get_ident(&self, _request: Request<()>) -> Result<Response<IdentResponse>, Status> {
let node_status = self.health.current_node_status();
let admin_status = self.health.current_admin_status();
Expand All @@ -65,7 +63,7 @@ impl<T: TransportConnect> NodeSvc for NodeSvcHandler<T> {
let metadata = Metadata::current();
Ok(Response::new(IdentResponse {
status: node_status.into(),
node_id: Some(metadata.my_node_id().into()),
node_id: metadata.my_node_id_opt().map(Into::into),
roles: self.roles.iter().map(|r| r.to_string()).collect(),
cluster_name: self.cluster_name.clone(),
age_s,
Expand All @@ -80,33 +78,6 @@ impl<T: TransportConnect> NodeSvc for NodeSvcHandler<T> {
}))
}

type CreateConnectionStream = BoxStream<'static, Result<Message, Status>>;

// Status codes returned in different scenarios:
// - DeadlineExceeded: No hello received within deadline
// - InvalidArgument: Header should always be set or any other missing required part of the
// handshake. This also happens if the client sent wrong message on handshake.
// - AlreadyExists: A node with a newer generation has been observed already
// - Cancelled: received an error from the client, or the client has dropped the stream during
// handshake.
// - Unavailable: This node is shutting down
async fn create_connection(
&self,
request: Request<Streaming<Message>>,
) -> Result<Response<Self::CreateConnectionStream>, Status> {
let incoming = request.into_inner();
let transformed = incoming.map(|x| x.map_err(ProtocolError::from));
let output_stream = self
.connections
.accept_incoming_connection(transformed)
.await?;

// For uniformity with outbound connections, we map all responses to Ok, we never rely on
// sending tonic::Status errors explicitly. We use ConnectionControl frames to communicate
// errors and/or drop the stream when necessary.
Ok(Response::new(Box::pin(output_stream.map(Ok))))
}

async fn get_metadata(
&self,
request: Request<GetMetadataRequest>,
Expand Down Expand Up @@ -144,3 +115,46 @@ impl<T: TransportConnect> NodeSvc for NodeSvcHandler<T> {
}))
}
}

pub struct CoreNodeSvcHandler<T> {
connections: ConnectionManager<T>,
}

impl<T> CoreNodeSvcHandler<T> {
pub fn new(connections: ConnectionManager<T>) -> Self {
Self { connections }
}
}

#[async_trait::async_trait]
impl<T> CoreNodeSvc for CoreNodeSvcHandler<T>
where
T: TransportConnect,
{
type CreateConnectionStream = BoxStream<'static, Result<Message, Status>>;

// Status codes returned in different scenarios:
// - DeadlineExceeded: No hello received within deadline
// - InvalidArgument: Header should always be set or any other missing required part of the
// handshake. This also happens if the client sent wrong message on handshake.
// - AlreadyExists: A node with a newer generation has been observed already
// - Cancelled: received an error from the client, or the client has dropped the stream during
// handshake.
// - Unavailable: This node is shutting down
async fn create_connection(
&self,
request: Request<Streaming<Message>>,
) -> Result<Response<Self::CreateConnectionStream>, Status> {
let incoming = request.into_inner();
let transformed = incoming.map(|x| x.map_err(ProtocolError::from));
let output_stream = self
.connections
.accept_incoming_connection(transformed)
.await?;

// For uniformity with outbound connections, we map all responses to Ok, we never rely on
// sending tonic::Status errors explicitly. We use ConnectionControl frames to communicate
// errors and/or drop the stream when necessary.
Ok(Response::new(Box::pin(output_stream.map(Ok))))
}
}
35 changes: 21 additions & 14 deletions crates/node/src/network_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ use tracing::{debug, trace};

use crate::network_server::metrics::{install_global_prometheus_recorder, render_metrics};
use crate::network_server::state::NodeCtrlHandlerStateBuilder;
use restate_core::network::protobuf::node_svc::node_svc_server::NodeSvcServer;
use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer;
use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer;
use restate_core::network::tonic_service_filter::TonicServiceFilter;
use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect};
use restate_core::{cancellation_watcher, TaskCenter, TaskKind};
use restate_types::config::CommonOptions;
use restate_types::health::Health;
use restate_types::protobuf::common::NodeStatus;

use super::grpc_svc_handler::NodeSvcHandler;
use super::grpc_svc_handler::{CoreNodeSvcHandler, NodeCtlSvcHandler};
use super::pprof;

pub struct NetworkServer {}
Expand Down Expand Up @@ -93,19 +94,25 @@ impl NetworkServer {

let node_health = health.node_status();

server_builder.register_grpc_service(
NodeCtlSvcServer::new(NodeCtlSvcHandler::new(
TaskCenter::current(),
options.cluster_name().to_owned(),
options.roles,
health,
))
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
restate_core::network::protobuf::node_ctl_svc::FILE_DESCRIPTOR_SET,
);

server_builder.register_grpc_service(
TonicServiceFilter::new(
NodeSvcServer::new(NodeSvcHandler::new(
TaskCenter::current(),
options.cluster_name().to_owned(),
options.roles,
health,
connection_manager,
))
.max_decoding_message_size(32 * 1024 * 1024)
.max_encoding_message_size(32 * 1024 * 1024)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
CoreNodeSvcServer::new(CoreNodeSvcHandler::new(connection_manager))
.max_decoding_message_size(32 * 1024 * 1024)
.max_encoding_message_size(32 * 1024 * 1024)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
move |req| {
if *node_health.get() == NodeStatus::Alive {
Ok(req)
Expand All @@ -114,7 +121,7 @@ impl NetworkServer {
}
},
),
restate_types::protobuf::FILE_DESCRIPTOR_SET,
restate_core::network::protobuf::core_node_svc::FILE_DESCRIPTOR_SET,
);

server_builder
Expand Down
1 change: 1 addition & 0 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ enum TargetName {

enum NodeStatus {
NodeStatus_UNKNOWN = 0;
// The node has joined the cluster and is fully operational.
ALIVE = 1;
// The node is not fully running yet.
STARTING_UP = 2;
Expand Down
10 changes: 5 additions & 5 deletions tools/restatectl/src/commands/node/list_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use restate_cli_util::_comfy_table::{Cell, Table};
use restate_cli_util::c_println;
use restate_cli_util::ui::console::StyledTable;
use restate_cli_util::ui::{duration_to_human_rough, Tense};
use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use restate_core::network::protobuf::node_svc::IdentResponse;
use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::network::protobuf::node_ctl_svc::IdentResponse;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::storage::StorageCodec;
use restate_types::PlainNodeId;
Expand Down Expand Up @@ -155,10 +155,10 @@ async fn fetch_extra_info(
let address = node_config.address.clone();
let get_ident = async move {
let node_channel = grpc_connect(address).await?;
let mut node_svc_client =
NodeSvcClient::new(node_channel).accept_compressed(CompressionEncoding::Gzip);
let mut node_ctl_svc_client =
NodeCtlSvcClient::new(node_channel).accept_compressed(CompressionEncoding::Gzip);

Ok(node_svc_client
Ok(node_ctl_svc_client
.get_ident(())
.await
.map_err(|e| {
Expand Down
Loading

0 comments on commit fe189e6

Please sign in to comment.