From 3aca5aa3ae9f2ace918c0be24e14deb360fea61f Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Wed, 8 Jan 2025 12:23:59 +0200 Subject: [PATCH] Properly set [optional] request timeout in the test context --- crates/cli-util/src/opts.rs | 4 ++ crates/core/src/network/net_util.rs | 69 +++++++++++++------ .../core/src/network/transport_connector.rs | 6 +- .../metadata-store/src/local/grpc/client.rs | 4 +- server/tests/trim_gap_handling.rs | 22 +++--- tools/restatectl/src/util.rs | 4 +- 6 files changed, 69 insertions(+), 40 deletions(-) diff --git a/crates/cli-util/src/opts.rs b/crates/cli-util/src/opts.rs index ce3265b56..ed1170243 100644 --- a/crates/cli-util/src/opts.rs +++ b/crates/cli-util/src/opts.rs @@ -81,6 +81,10 @@ impl CommonClientConnectionOptions for NetworkOpts { Duration::from_millis(self.connect_timeout) } + fn request_timeout(&self) -> Option { + Some(Duration::from_millis(self.request_timeout)) + } + fn keep_alive_interval(&self) -> Duration { Duration::from_secs(60) } diff --git a/crates/core/src/network/net_util.rs b/crates/core/src/network/net_util.rs index 499149aa7..a3ee188d0 100644 --- a/crates/core/src/network/net_util.rs +++ b/crates/core/src/network/net_util.rs @@ -30,36 +30,47 @@ use restate_types::net::{AdvertisedAddress, BindAddress}; use crate::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; -pub fn create_tonic_channel_from_advertised_address( +pub fn create_tonic_channel( address: AdvertisedAddress, options: &T, ) -> Channel { + let endpoint = match &address { + AdvertisedAddress::Uds(_) => { + // dummy endpoint required to specify an uds connector, it is not used anywhere + Endpoint::try_from("http://127.0.0.1").expect("/ should be a valid Uri") + } + AdvertisedAddress::Http(uri) => Channel::builder(uri.clone()), + }; + + let endpoint = apply_options(endpoint, options); + match address { AdvertisedAddress::Uds(uds_path) => { - // dummy endpoint required to specify an uds connector, it is not used anywhere - Endpoint::try_from("http://127.0.0.1") - .expect("/ should be a valid Uri") - .http2_keep_alive_interval(options.keep_alive_interval()) - .keep_alive_timeout(options.keep_alive_timeout()) - .http2_adaptive_window(options.http2_adaptive_window()) - .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { - let uds_path = uds_path.clone(); - async move { - Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) - } - })) + endpoint.connect_with_connector_lazy(tower::service_fn(move |_: Uri| { + let uds_path = uds_path.clone(); + async move { + Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) + } + })) } - AdvertisedAddress::Http(uri) => Channel::builder(uri) - .connect_timeout(options.connect_timeout()) - .http2_keep_alive_interval(options.keep_alive_interval()) - .keep_alive_timeout(options.keep_alive_timeout()) - .http2_adaptive_window(options.http2_adaptive_window()) - // this true by default, but this is to guard against any change in defaults - .tcp_nodelay(true) - .connect_lazy(), + AdvertisedAddress::Http(_) => endpoint.connect_lazy() } } +fn apply_options(endpoint: Endpoint, options: &T) -> Endpoint { + if let Some(request_timeout) = options.request_timeout() { + endpoint.timeout(request_timeout) + } else { + endpoint + } + .connect_timeout(options.connect_timeout()) + .http2_keep_alive_interval(options.keep_alive_interval()) + .keep_alive_timeout(options.keep_alive_timeout()) + .http2_adaptive_window(options.http2_adaptive_window()) + // this true by default, but this is to guard against any change in defaults + .tcp_nodelay(true) +} + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("failed binding to address '{address}': {source}")] @@ -82,7 +93,12 @@ pub enum Error { Shutdown(#[from] ShutdownError), } -#[instrument(level = "error", name = "server", skip_all, fields(server_name = %server_name, uds.path = tracing::field::Empty, net.host.addr = tracing::field::Empty, net.host.port = tracing::field::Empty))] +#[instrument( + level = "error", + name = "server", + skip_all, + fields(server_name = %server_name, uds.path = tracing::field::Empty, net.host.addr = tracing::field::Empty, net.host.port = tracing::field::Empty) +)] pub async fn run_hyper_server( bind_address: &BindAddress, service: S, @@ -249,6 +265,7 @@ where /// Helper trait to extract common client connection options from different configuration types. pub trait CommonClientConnectionOptions { fn connect_timeout(&self) -> Duration; + fn request_timeout(&self) -> Option; fn keep_alive_interval(&self) -> Duration; fn keep_alive_timeout(&self) -> Duration; fn http2_adaptive_window(&self) -> bool; @@ -259,6 +276,10 @@ impl CommonClientConnectionOptions for NetworkingOptions { self.connect_timeout.into() } + fn request_timeout(&self) -> Option { + None + } + fn keep_alive_interval(&self) -> Duration { self.http2_keep_alive_interval.into() } @@ -277,6 +298,10 @@ impl CommonClientConnectionOptions for MetadataStoreClientOptions { self.metadata_store_connect_timeout.into() } + fn request_timeout(&self) -> Option { + None + } + fn keep_alive_interval(&self) -> Duration { self.metadata_store_keep_alive_interval.into() } diff --git a/crates/core/src/network/transport_connector.rs b/crates/core/src/network/transport_connector.rs index af406c4b7..7352ad8df 100644 --- a/crates/core/src/network/transport_connector.rs +++ b/crates/core/src/network/transport_connector.rs @@ -23,7 +23,7 @@ use restate_types::GenerationalNodeId; use super::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient; use super::{NetworkError, ProtocolError}; -use crate::network::net_util::create_tonic_channel_from_advertised_address; +use crate::network::net_util::create_tonic_channel; pub trait TransportConnect: Send + Sync + 'static { fn connect( @@ -72,9 +72,7 @@ impl TransportConnect for GrpcConnector { None => self .channel_cache .entry(address.clone()) - .or_insert_with(|| { - create_tonic_channel_from_advertised_address(address, &self.networking_options) - }) + .or_insert_with(|| create_tonic_channel(address, &self.networking_options)) .clone(), }; diff --git a/crates/metadata-store/src/local/grpc/client.rs b/crates/metadata-store/src/local/grpc/client.rs index 4d5362281..f7791fbfc 100644 --- a/crates/metadata-store/src/local/grpc/client.rs +++ b/crates/metadata-store/src/local/grpc/client.rs @@ -16,7 +16,7 @@ use tonic::{Code, Status}; use restate_core::metadata_store::{ MetadataStore, Precondition, ReadError, VersionedValue, WriteError, }; -use restate_core::network::net_util::create_tonic_channel_from_advertised_address; +use restate_core::network::net_util::create_tonic_channel; use restate_core::network::net_util::CommonClientConnectionOptions; use restate_types::net::AdvertisedAddress; use restate_types::Version; @@ -36,7 +36,7 @@ impl LocalMetadataStoreClient { metadata_store_address: AdvertisedAddress, options: &T, ) -> Self { - let channel = create_tonic_channel_from_advertised_address(metadata_store_address, options); + let channel = create_tonic_channel(metadata_store_address, options); Self { svc_client: MetadataStoreSvcClient::new(channel), diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs index 551c371a5..070750134 100644 --- a/server/tests/trim_gap_handling.rs +++ b/server/tests/trim_gap_handling.rs @@ -24,9 +24,7 @@ use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::Cluste use restate_admin::cluster_controller::protobuf::{ ClusterStateRequest, CreatePartitionSnapshotRequest, TrimLogRequest, }; -use restate_core::network::net_util::{ - create_tonic_channel_from_advertised_address, CommonClientConnectionOptions, -}; +use restate_core::network::net_util::{create_tonic_channel, CommonClientConnectionOptions}; use restate_local_cluster_runner::{ cluster::Cluster, node::{BinarySource, Node}, @@ -84,7 +82,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { tokio::time::timeout(Duration::from_secs(10), worker_1_ready.next()).await?; tokio::time::timeout(Duration::from_secs(10), worker_2_ready.next()).await?; - let mut client = ClusterCtrlSvcClient::new(create_tonic_channel_from_advertised_address( + let mut client = ClusterCtrlSvcClient::new(create_tonic_channel( cluster.nodes[0].node_address().clone(), &TestNetworkOptions::default(), )) @@ -316,22 +314,26 @@ async fn applied_lsn_converged( } struct TestNetworkOptions { - connect_timeout: u64, - request_timeout: u64, + connect_timeout: Duration, + request_timeout: Duration, } impl Default for TestNetworkOptions { fn default() -> Self { Self { - connect_timeout: 1000, - request_timeout: 5000, + connect_timeout: Duration::from_secs(1), + request_timeout: Duration::from_secs(5), } } } impl CommonClientConnectionOptions for TestNetworkOptions { fn connect_timeout(&self) -> Duration { - Duration::from_millis(self.connect_timeout) + self.connect_timeout + } + + fn request_timeout(&self) -> Option { + Some(self.request_timeout) } fn keep_alive_interval(&self) -> Duration { @@ -339,7 +341,7 @@ impl CommonClientConnectionOptions for TestNetworkOptions { } fn keep_alive_timeout(&self) -> Duration { - Duration::from_millis(self.request_timeout) + self.connect_timeout } fn http2_adaptive_window(&self) -> bool { diff --git a/tools/restatectl/src/util.rs b/tools/restatectl/src/util.rs index 07f3d07d9..f96dc546f 100644 --- a/tools/restatectl/src/util.rs +++ b/tools/restatectl/src/util.rs @@ -11,10 +11,10 @@ use tonic::transport::Channel; use restate_cli_util::CliContext; -use restate_core::network::net_util::create_tonic_channel_from_advertised_address; +use restate_core::network::net_util::create_tonic_channel; use restate_types::net::AdvertisedAddress; pub fn grpc_connect(address: AdvertisedAddress) -> Channel { let ctx = CliContext::get(); - create_tonic_channel_from_advertised_address(address, &ctx.network) + create_tonic_channel(address, &ctx.network) }