From 3da5356a95db4c84634efd7a33aed9de76e57f3b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 16 Dec 2024 17:06:39 +0100 Subject: [PATCH] Factor out common wait for ready logic of services into WaitForReady The WaitForReady struct takes a `HealthStatus` and a ready status. It implements the Predicate and only lets requests pass if the ready status has been reached. --- .../admin/src/cluster_controller/service.rs | 13 ++------ .../core/src/network/tonic_service_filter.rs | 31 +++++++++++++++++++ crates/log-server/src/service.rs | 13 ++------ crates/node/src/network_server/service.rs | 10 ++---- 4 files changed, 37 insertions(+), 30 deletions(-) diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 6f43d5cfe..a7e393196 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -38,7 +38,7 @@ use tracing::{debug, info}; use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; use restate_core::network::rpc_router::RpcRouter; -use restate_core::network::tonic_service_filter::TonicServiceFilter; +use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ MessageRouterBuilder, NetworkSender, NetworkServerBuilder, Networking, TransportConnect, }; @@ -127,16 +127,7 @@ where )) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), - { - let health_status = health_status.clone(); - move |req| { - if *health_status.get() == AdminStatus::Ready { - Ok(req) - } else { - Err(tonic::Status::unavailable("Admin component is not ready.")) - } - } - }, + WaitForReady::new(health_status.clone(), AdminStatus::Ready), ), crate::cluster_controller::protobuf::FILE_DESCRIPTOR_SET, ); diff --git a/crates/core/src/network/tonic_service_filter.rs b/crates/core/src/network/tonic_service_filter.rs index 1d1dc102c..7dbdf9a4c 100644 --- a/crates/core/src/network/tonic_service_filter.rs +++ b/crates/core/src/network/tonic_service_filter.rs @@ -10,11 +10,13 @@ use futures::future::{Either, Ready}; use http::Request; +use restate_types::health::HealthStatus; use std::convert::Infallible; use std::task::{Context, Poll}; use tonic::body::BoxBody; use tonic::codegen::Service; use tonic::server::NamedService; +use tonic::Status; /// A tonic service wrapper that filters requests based on a predicate. This can be used to /// dynamically disable a service based on some condition. @@ -74,3 +76,32 @@ where self(request) } } + +/// [`Predicate`] implementation which waits for the given status before allowing requests. +#[derive(Clone, Debug)] +pub struct WaitForReady { + status: HealthStatus, + ready_status: T, +} + +impl WaitForReady { + pub fn new(status: HealthStatus, ready_status: T) -> Self { + WaitForReady { + status, + ready_status, + } + } +} + +impl Predicate for WaitForReady +where + T: PartialEq, +{ + fn check(&mut self, request: Request) -> Result, Status> { + if *self.status.get() == self.ready_status { + Ok(request) + } else { + Err(Status::unavailable("svc is not ready yet")) + } + } +} diff --git a/crates/log-server/src/service.rs b/crates/log-server/src/service.rs index 0427feeb6..0f4ea9b06 100644 --- a/crates/log-server/src/service.rs +++ b/crates/log-server/src/service.rs @@ -14,7 +14,7 @@ use anyhow::Context; use tonic::codec::CompressionEncoding; use tracing::{debug, info, instrument}; -use restate_core::network::tonic_service_filter::TonicServiceFilter; +use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{MessageRouterBuilder, NetworkServerBuilder}; use restate_core::{Metadata, MetadataWriter, TaskCenter, TaskKind}; use restate_metadata_store::MetadataStoreClient; @@ -93,16 +93,7 @@ impl LogServerService { )) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), - { - let health_status = health_status.clone(); - move |req| { - if *health_status.get() == LogServerStatus::Ready { - Ok(req) - } else { - Err(tonic::Status::unavailable("LogServer is not ready.")) - } - } - }, + WaitForReady::new(health_status.clone(), LogServerStatus::Ready), ), crate::protobuf::FILE_DESCRIPTOR_SET, ); diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 3081e928f..64ae8b330 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -19,7 +19,7 @@ use crate::network_server::metrics::{install_global_prometheus_recorder, render_ use crate::network_server::state::NodeCtrlHandlerStateBuilder; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; -use restate_core::network::tonic_service_filter::TonicServiceFilter; +use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect}; use restate_core::{cancellation_watcher, TaskCenter, TaskKind}; use restate_types::config::CommonOptions; @@ -116,13 +116,7 @@ impl NetworkServer { .max_encoding_message_size(32 * 1024 * 1024) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), - move |req| { - if *node_health.get() == NodeStatus::Alive { - Ok(req) - } else { - Err(tonic::Status::unavailable("Node is not alive.")) - } - }, + WaitForReady::new(node_health, NodeStatus::Alive), ), restate_core::network::protobuf::core_node_svc::FILE_DESCRIPTOR_SET, );