Skip to content

Commit

Permalink
Let MetadataStore use the node grpc server
Browse files Browse the repository at this point in the history
In order to make the MetadataStore use the node grpc server, the grpc
server needs to be started first. Since a few grpc services depend on
a node having joined the cluster (in order to obtain a NodeId), this
commit disables these until they can be used.

To disable Tonic services, this commit introduces the TonicServiceFilter
which takes a Tonic service and a predicate. The predicate allows to
check on a status to decide whether to forward the request or fail the
request with tonic::Status.

This fixes restatedev#2411.
  • Loading branch information
tillrohrmann committed Dec 16, 2024
1 parent 0cd6be3 commit d9d52a8
Show file tree
Hide file tree
Showing 30 changed files with 431 additions and 303 deletions.
10 changes: 1 addition & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 22 additions & 9 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tracing::{debug, info};
use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment};
use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient};
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::tonic_service_filter::TonicServiceFilter;
use restate_core::network::{
MessageRouterBuilder, NetworkSender, NetworkServerBuilder, Networking, TransportConnect,
};
Expand Down Expand Up @@ -115,16 +116,28 @@ where

// Registering ClusterCtrlSvc grpc service to network server
server_builder.register_grpc_service(
ClusterCtrlSvcServer::new(ClusterCtrlSvcHandler::new(
ClusterControllerHandle {
tx: command_tx.clone(),
TonicServiceFilter::new(
ClusterCtrlSvcServer::new(ClusterCtrlSvcHandler::new(
ClusterControllerHandle {
tx: command_tx.clone(),
},
metadata_store_client.clone(),
bifrost.clone(),
metadata_writer.clone(),
))
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
{
let health_status = health_status.clone();
move |req| {
if *health_status.get() == AdminStatus::Ready {
Ok(req)
} else {
Err(tonic::Status::unavailable("Admin component is not ready."))
}
}
},
metadata_store_client.clone(),
bifrost.clone(),
metadata_writer.clone(),
))
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
),
crate::cluster_controller::protobuf::FILE_DESCRIPTOR_SET,
);

Expand Down
7 changes: 4 additions & 3 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ mod tests {
set_current_config(config.clone());
let config = Live::from_value(config);

RocksDbManager::init(config.clone().map(|c| &c.common));

let mut node_env =
TestCoreEnvBuilder::with_incoming_only_connector().add_mock_nodes_config();
let mut server_builder = NetworkServerBuilder::default();
Expand All @@ -384,15 +386,14 @@ mod tests {
node_env.metadata_store_client.clone(),
record_cache.clone(),
&mut node_env.router_builder,
&mut server_builder,
)
.await?;

let node_env = node_env.build().await;

RocksDbManager::init(config.clone().map(|c| &c.common));

log_server
.start(node_env.metadata_writer.clone(), &mut server_builder)
.start(node_env.metadata_writer.clone())
.await
.into_test_result()?;

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod partition_processor_rpc_client;
pub mod protobuf;
pub mod rpc_router;
mod server_builder;
pub mod tonic_service_filter;
pub mod transport_connector;
mod types;

Expand Down
14 changes: 8 additions & 6 deletions crates/core/src/network/server_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tracing::debug;

use restate_types::health::HealthStatus;
use restate_types::net::BindAddress;
use restate_types::protobuf::common::NodeStatus;
use restate_types::protobuf::common::NodeRpcStatus;

use super::multiplex::MultiplexService;
use super::net_util::run_hyper_server;
Expand Down Expand Up @@ -58,16 +58,18 @@ impl NetworkServerBuilder {

pub async fn run(
self,
node_health: HealthStatus<NodeStatus>,
axum_router: axum::routing::Router,
node_rpc_health: HealthStatus<NodeRpcStatus>,
axum_router: Option<axum::routing::Router>,
bind_address: &BindAddress,
) -> Result<(), anyhow::Error> {
node_rpc_health.update(NodeRpcStatus::StartingUp);
// Trace layer
let span_factory = tower_http::trace::DefaultMakeSpan::new()
.include_headers(true)
.level(tracing::Level::ERROR);

let axum_router = axum_router
.unwrap_or_default()
.layer(TraceLayer::new_for_http().make_span_with(span_factory.clone()))
.fallback(handler_404);

Expand All @@ -92,8 +94,8 @@ impl NetworkServerBuilder {
bind_address,
service,
"node-rpc-server",
|| node_health.update(NodeStatus::Alive),
|| node_health.update(NodeStatus::ShuttingDown),
|| node_rpc_health.update(NodeRpcStatus::Ready),
|| node_rpc_health.update(NodeRpcStatus::Stopping),
)
.await?;

Expand All @@ -102,7 +104,7 @@ impl NetworkServerBuilder {
}

// handle 404
async fn handler_404() -> (axum::http::StatusCode, &'static str) {
async fn handler_404() -> (http::StatusCode, &'static str) {
(
axum::http::StatusCode::NOT_FOUND,
"Are you lost? Maybe visit https://restate.dev instead!",
Expand Down
76 changes: 76 additions & 0 deletions crates/core/src/network/tonic_service_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use futures::future::{Either, Ready};
use http::Request;
use std::convert::Infallible;
use std::task::{Context, Poll};
use tonic::body::BoxBody;
use tonic::codegen::Service;
use tonic::server::NamedService;

/// A tonic service wrapper that filters requests based on a predicate. This can be used to
/// dynamically disable a service based on some condition.
#[derive(Clone)]
pub struct TonicServiceFilter<T, U> {
inner: T,
predicate: U,
}

impl<T, U> TonicServiceFilter<T, U> {
pub fn new(inner: T, predicate: U) -> Self {
Self { inner, predicate }
}
}

impl<T, U> Service<Request<BoxBody>> for TonicServiceFilter<T, U>
where
T: Service<Request<BoxBody>, Response = http::Response<BoxBody>, Error = Infallible>,
U: Predicate,
{
type Response = http::Response<BoxBody>;
type Error = Infallible;
type Future = Either<T::Future, Ready<Result<http::Response<BoxBody>, Infallible>>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
match self.predicate.check(req) {
Ok(req) => Either::Left(self.inner.call(req)),
Err(status) => Either::Right(futures::future::ready(Ok(status.into_http()))),
}
}
}

/// Predicate for the [`TonicServiceFilter`].
pub trait Predicate {
/// Checks whether the given request should be processed. Return the given `request` if it
/// should be processed. Otherwise, return the [`tonic::Status`] with which the request should
/// fail.
fn check(&mut self, request: Request<BoxBody>) -> Result<Request<BoxBody>, tonic::Status>;
}

impl<T, U> NamedService for TonicServiceFilter<T, U>
where
T: NamedService,
{
const NAME: &'static str = T::NAME;
}

impl<F> Predicate for F
where
F: FnMut(Request<BoxBody>) -> Result<Request<BoxBody>, tonic::Status>,
{
fn check(&mut self, request: Request<BoxBody>) -> Result<Request<BoxBody>, tonic::Status> {
self(request)
}
}
76 changes: 41 additions & 35 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ pub struct Node {
self.base_config.common.advertised_address = AdvertisedAddress::Uds(node_socket);
}
#[mutator(requires = [base_dir])]
pub fn with_socket_metadata(self) {
let metadata_socket: PathBuf = "metadata.sock".into();
self.base_config.metadata_store.bind_address = BindAddress::Uds(metadata_socket.clone());
self.base_config.common.metadata_store_client.metadata_store_client = MetadataStoreClient::Embedded { address: AdvertisedAddress::Uds(metadata_socket) }
}
pub fn with_random_ports(self) {
self.base_config.admin.bind_address =
random_socket_address().expect("to find a random port for the admin server");
Expand Down Expand Up @@ -127,6 +120,18 @@ impl Node {
self.base_config.node_name()
}

pub fn advertised_address(&self) -> &AdvertisedAddress {
&self.base_config.common.advertised_address
}

pub fn metadata_store_client_mut(&mut self) -> &mut MetadataStoreClient {
&mut self
.base_config
.common
.metadata_store_client
.metadata_store_client
}

pub fn config(&self) -> &Configuration {
&self.base_config
}
Expand All @@ -144,15 +149,15 @@ impl Node {
binary_source: BinarySource,
roles: EnumSet<Role>,
) -> Self {
Self::builder()
let builder = Self::builder()
.binary_source(binary_source)
.base_config(base_config)
.with_node_name(node_name)
.with_node_socket()
.with_socket_metadata()
.with_random_ports()
.with_roles(roles)
.build()
.with_roles(roles);

builder.build()
}

// Creates a group of Nodes with a single metadata node "metadata-node" running the
Expand All @@ -167,20 +172,28 @@ impl Node {
) -> Vec<Self> {
let mut nodes = Vec::with_capacity((size + 1) as usize);

{
let mut base_config = base_config.clone();
base_config.common.force_node_id = Some(PlainNodeId::new(0));
nodes.push(Self::new_test_node(
let metadata_node_address = {
let mut node_config = base_config.clone();
node_config.common.force_node_id = Some(PlainNodeId::new(0));
let mut metadata_node = Self::new_test_node(
"metadata-node",
base_config,
node_config,
binary_source.clone(),
enum_set!(Role::Admin | Role::MetadataStore),
));
}
);
let metadata_node_address = metadata_node.advertised_address().clone();
*metadata_node.metadata_store_client_mut() = MetadataStoreClient::Embedded {
address: metadata_node_address.clone(),
};

nodes.push(metadata_node);

metadata_node_address
};

for node in 1..=size {
for node_id in 1..=size {
let mut base_config = base_config.clone();
base_config.common.force_node_id = Some(PlainNodeId::new(node));
base_config.common.force_node_id = Some(PlainNodeId::new(node_id));

// Create a separate ingress role when running a worker
let roles = if roles.contains(Role::Worker) {
Expand All @@ -192,12 +205,16 @@ impl Node {
roles
};

nodes.push(Self::new_test_node(
format!("node-{node}"),
let mut node = Self::new_test_node(
format!("node-{node_id}"),
base_config,
binary_source.clone(),
roles,
));
);
*node.metadata_store_client_mut() = MetadataStoreClient::Embedded {
address: metadata_node_address.clone(),
};
nodes.push(node);
}

nodes
Expand Down Expand Up @@ -225,10 +242,6 @@ impl Node {
{
*file = base_dir.join(&*file)
}
if let BindAddress::Uds(file) = &mut self.base_config.metadata_store.bind_address {
*file = base_dir.join(&*file)
}

if self.base_config.common.bind_address.is_none() {
// Derive bind_address from advertised_address
self.base_config.common.bind_address = Some(
Expand All @@ -242,6 +255,7 @@ impl Node {
if let Some(BindAddress::Uds(file)) = &mut self.base_config.common.bind_address {
*file = base_dir.join(&*file);
}

if let AdvertisedAddress::Uds(file) = &mut self.base_config.common.advertised_address {
*file = base_dir.join(&*file)
}
Expand Down Expand Up @@ -663,14 +677,6 @@ impl StartedNode {
}
}

pub fn metadata_address(&self) -> Option<&BindAddress> {
if self.config().has_role(Role::MetadataStore) {
Some(&self.config().metadata_store.bind_address)
} else {
None
}
}

/// Obtain a stream of loglines matching this pattern. The stream will end
/// when the stdout and stderr files on the process close.
pub fn lines(&self, pattern: Regex) -> impl Stream<Item = String> + '_ {
Expand Down
Loading

0 comments on commit d9d52a8

Please sign in to comment.