Skip to content

Commit

Permalink
Factor out common wait for ready logic of services into WaitForReady
Browse files Browse the repository at this point in the history
The WaitForReady struct takes a `HealthStatus` and a ready status. It
implements the Predicate and only lets requests pass if the ready status
has been reached.
  • Loading branch information
tillrohrmann committed Dec 16, 2024
1 parent 1318cd9 commit 3da5356
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 30 deletions.
13 changes: 2 additions & 11 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use tracing::{debug, info};
use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment};
use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient};
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::tonic_service_filter::TonicServiceFilter;
use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady};
use restate_core::network::{
MessageRouterBuilder, NetworkSender, NetworkServerBuilder, Networking, TransportConnect,
};
Expand Down Expand Up @@ -127,16 +127,7 @@ where
))
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
{
let health_status = health_status.clone();
move |req| {
if *health_status.get() == AdminStatus::Ready {
Ok(req)
} else {
Err(tonic::Status::unavailable("Admin component is not ready."))
}
}
},
WaitForReady::new(health_status.clone(), AdminStatus::Ready),
),
crate::cluster_controller::protobuf::FILE_DESCRIPTOR_SET,
);
Expand Down
31 changes: 31 additions & 0 deletions crates/core/src/network/tonic_service_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

use futures::future::{Either, Ready};
use http::Request;
use restate_types::health::HealthStatus;
use std::convert::Infallible;
use std::task::{Context, Poll};
use tonic::body::BoxBody;
use tonic::codegen::Service;
use tonic::server::NamedService;
use tonic::Status;

/// A tonic service wrapper that filters requests based on a predicate. This can be used to
/// dynamically disable a service based on some condition.
Expand Down Expand Up @@ -74,3 +76,32 @@ where
self(request)
}
}

/// [`Predicate`] implementation which waits for the given status before allowing requests.
#[derive(Clone, Debug)]
pub struct WaitForReady<T> {
status: HealthStatus<T>,
ready_status: T,
}

impl<T> WaitForReady<T> {
pub fn new(status: HealthStatus<T>, ready_status: T) -> Self {
WaitForReady {
status,
ready_status,
}
}
}

impl<T> Predicate for WaitForReady<T>
where
T: PartialEq,
{
fn check(&mut self, request: Request<BoxBody>) -> Result<Request<BoxBody>, Status> {
if *self.status.get() == self.ready_status {
Ok(request)
} else {
Err(Status::unavailable("svc is not ready yet"))
}
}
}
13 changes: 2 additions & 11 deletions crates/log-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use anyhow::Context;
use tonic::codec::CompressionEncoding;
use tracing::{debug, info, instrument};

use restate_core::network::tonic_service_filter::TonicServiceFilter;
use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady};
use restate_core::network::{MessageRouterBuilder, NetworkServerBuilder};
use restate_core::{Metadata, MetadataWriter, TaskCenter, TaskKind};
use restate_metadata_store::MetadataStoreClient;
Expand Down Expand Up @@ -93,16 +93,7 @@ impl LogServerService {
))
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
{
let health_status = health_status.clone();
move |req| {
if *health_status.get() == LogServerStatus::Ready {
Ok(req)
} else {
Err(tonic::Status::unavailable("LogServer is not ready."))
}
}
},
WaitForReady::new(health_status.clone(), LogServerStatus::Ready),
),
crate::protobuf::FILE_DESCRIPTOR_SET,
);
Expand Down
10 changes: 2 additions & 8 deletions crates/node/src/network_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::network_server::metrics::{install_global_prometheus_recorder, render_
use crate::network_server::state::NodeCtrlHandlerStateBuilder;
use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer;
use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer;
use restate_core::network::tonic_service_filter::TonicServiceFilter;
use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady};
use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect};
use restate_core::{cancellation_watcher, TaskCenter, TaskKind};
use restate_types::config::CommonOptions;
Expand Down Expand Up @@ -116,13 +116,7 @@ impl NetworkServer {
.max_encoding_message_size(32 * 1024 * 1024)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
move |req| {
if *node_health.get() == NodeStatus::Alive {
Ok(req)
} else {
Err(tonic::Status::unavailable("Node is not alive."))
}
},
WaitForReady::new(node_health, NodeStatus::Alive),
),
restate_core::network::protobuf::core_node_svc::FILE_DESCRIPTOR_SET,
);
Expand Down

0 comments on commit 3da5356

Please sign in to comment.