Skip to content

Commit

Permalink
Properly set [optional] request timeout in the test context
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Jan 8, 2025
1 parent 23a469d commit 3aca5aa
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 40 deletions.
4 changes: 4 additions & 0 deletions crates/cli-util/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ impl CommonClientConnectionOptions for NetworkOpts {
Duration::from_millis(self.connect_timeout)
}

fn request_timeout(&self) -> Option<Duration> {
Some(Duration::from_millis(self.request_timeout))
}

fn keep_alive_interval(&self) -> Duration {
Duration::from_secs(60)
}
Expand Down
69 changes: 47 additions & 22 deletions crates/core/src/network/net_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,47 @@ use restate_types::net::{AdvertisedAddress, BindAddress};

use crate::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind};

pub fn create_tonic_channel_from_advertised_address<T: CommonClientConnectionOptions>(
pub fn create_tonic_channel<T: CommonClientConnectionOptions>(
address: AdvertisedAddress,
options: &T,
) -> Channel {
let endpoint = match &address {
AdvertisedAddress::Uds(_) => {
// dummy endpoint required to specify an uds connector, it is not used anywhere
Endpoint::try_from("http://127.0.0.1").expect("/ should be a valid Uri")
}
AdvertisedAddress::Http(uri) => Channel::builder(uri.clone()),
};

let endpoint = apply_options(endpoint, options);

match address {
AdvertisedAddress::Uds(uds_path) => {
// dummy endpoint required to specify an uds connector, it is not used anywhere
Endpoint::try_from("http://127.0.0.1")
.expect("/ should be a valid Uri")
.http2_keep_alive_interval(options.keep_alive_interval())
.keep_alive_timeout(options.keep_alive_timeout())
.http2_adaptive_window(options.http2_adaptive_window())
.connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
let uds_path = uds_path.clone();
async move {
Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?))
}
}))
endpoint.connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
let uds_path = uds_path.clone();
async move {
Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?))
}
}))
}
AdvertisedAddress::Http(uri) => Channel::builder(uri)
.connect_timeout(options.connect_timeout())
.http2_keep_alive_interval(options.keep_alive_interval())
.keep_alive_timeout(options.keep_alive_timeout())
.http2_adaptive_window(options.http2_adaptive_window())
// this true by default, but this is to guard against any change in defaults
.tcp_nodelay(true)
.connect_lazy(),
AdvertisedAddress::Http(_) => endpoint.connect_lazy()
}
}

fn apply_options<T: CommonClientConnectionOptions>(endpoint: Endpoint, options: &T) -> Endpoint {
if let Some(request_timeout) = options.request_timeout() {
endpoint.timeout(request_timeout)
} else {
endpoint
}
.connect_timeout(options.connect_timeout())
.http2_keep_alive_interval(options.keep_alive_interval())
.keep_alive_timeout(options.keep_alive_timeout())
.http2_adaptive_window(options.http2_adaptive_window())
// this true by default, but this is to guard against any change in defaults
.tcp_nodelay(true)
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed binding to address '{address}': {source}")]
Expand All @@ -82,7 +93,12 @@ pub enum Error {
Shutdown(#[from] ShutdownError),
}

#[instrument(level = "error", name = "server", skip_all, fields(server_name = %server_name, uds.path = tracing::field::Empty, net.host.addr = tracing::field::Empty, net.host.port = tracing::field::Empty))]
#[instrument(
level = "error",
name = "server",
skip_all,
fields(server_name = %server_name, uds.path = tracing::field::Empty, net.host.addr = tracing::field::Empty, net.host.port = tracing::field::Empty)
)]
pub async fn run_hyper_server<S, B>(
bind_address: &BindAddress,
service: S,
Expand Down Expand Up @@ -249,6 +265,7 @@ where
/// Helper trait to extract common client connection options from different configuration types.
pub trait CommonClientConnectionOptions {
fn connect_timeout(&self) -> Duration;
fn request_timeout(&self) -> Option<Duration>;
fn keep_alive_interval(&self) -> Duration;
fn keep_alive_timeout(&self) -> Duration;
fn http2_adaptive_window(&self) -> bool;
Expand All @@ -259,6 +276,10 @@ impl CommonClientConnectionOptions for NetworkingOptions {
self.connect_timeout.into()
}

fn request_timeout(&self) -> Option<Duration> {
None
}

fn keep_alive_interval(&self) -> Duration {
self.http2_keep_alive_interval.into()
}
Expand All @@ -277,6 +298,10 @@ impl CommonClientConnectionOptions for MetadataStoreClientOptions {
self.metadata_store_connect_timeout.into()
}

fn request_timeout(&self) -> Option<Duration> {
None
}

fn keep_alive_interval(&self) -> Duration {
self.metadata_store_keep_alive_interval.into()
}
Expand Down
6 changes: 2 additions & 4 deletions crates/core/src/network/transport_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use restate_types::GenerationalNodeId;

use super::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient;
use super::{NetworkError, ProtocolError};
use crate::network::net_util::create_tonic_channel_from_advertised_address;
use crate::network::net_util::create_tonic_channel;

pub trait TransportConnect: Send + Sync + 'static {
fn connect(
Expand Down Expand Up @@ -72,9 +72,7 @@ impl TransportConnect for GrpcConnector {
None => self
.channel_cache
.entry(address.clone())
.or_insert_with(|| {
create_tonic_channel_from_advertised_address(address, &self.networking_options)
})
.or_insert_with(|| create_tonic_channel(address, &self.networking_options))
.clone(),
};

Expand Down
4 changes: 2 additions & 2 deletions crates/metadata-store/src/local/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tonic::{Code, Status};
use restate_core::metadata_store::{
MetadataStore, Precondition, ReadError, VersionedValue, WriteError,
};
use restate_core::network::net_util::create_tonic_channel_from_advertised_address;
use restate_core::network::net_util::create_tonic_channel;
use restate_core::network::net_util::CommonClientConnectionOptions;
use restate_types::net::AdvertisedAddress;
use restate_types::Version;
Expand All @@ -36,7 +36,7 @@ impl LocalMetadataStoreClient {
metadata_store_address: AdvertisedAddress,
options: &T,
) -> Self {
let channel = create_tonic_channel_from_advertised_address(metadata_store_address, options);
let channel = create_tonic_channel(metadata_store_address, options);

Self {
svc_client: MetadataStoreSvcClient::new(channel),
Expand Down
22 changes: 12 additions & 10 deletions server/tests/trim_gap_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::Cluste
use restate_admin::cluster_controller::protobuf::{
ClusterStateRequest, CreatePartitionSnapshotRequest, TrimLogRequest,
};
use restate_core::network::net_util::{
create_tonic_channel_from_advertised_address, CommonClientConnectionOptions,
};
use restate_core::network::net_util::{create_tonic_channel, CommonClientConnectionOptions};
use restate_local_cluster_runner::{
cluster::Cluster,
node::{BinarySource, Node},
Expand Down Expand Up @@ -84,7 +82,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> {
tokio::time::timeout(Duration::from_secs(10), worker_1_ready.next()).await?;
tokio::time::timeout(Duration::from_secs(10), worker_2_ready.next()).await?;

let mut client = ClusterCtrlSvcClient::new(create_tonic_channel_from_advertised_address(
let mut client = ClusterCtrlSvcClient::new(create_tonic_channel(
cluster.nodes[0].node_address().clone(),
&TestNetworkOptions::default(),
))
Expand Down Expand Up @@ -316,30 +314,34 @@ async fn applied_lsn_converged(
}

struct TestNetworkOptions {
connect_timeout: u64,
request_timeout: u64,
connect_timeout: Duration,
request_timeout: Duration,
}

impl Default for TestNetworkOptions {
fn default() -> Self {
Self {
connect_timeout: 1000,
request_timeout: 5000,
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(5),
}
}
}

impl CommonClientConnectionOptions for TestNetworkOptions {
fn connect_timeout(&self) -> Duration {
Duration::from_millis(self.connect_timeout)
self.connect_timeout
}

fn request_timeout(&self) -> Option<Duration> {
Some(self.request_timeout)
}

fn keep_alive_interval(&self) -> Duration {
Duration::from_secs(60)
}

fn keep_alive_timeout(&self) -> Duration {
Duration::from_millis(self.request_timeout)
self.connect_timeout
}

fn http2_adaptive_window(&self) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions tools/restatectl/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
use tonic::transport::Channel;

use restate_cli_util::CliContext;
use restate_core::network::net_util::create_tonic_channel_from_advertised_address;
use restate_core::network::net_util::create_tonic_channel;
use restate_types::net::AdvertisedAddress;

pub fn grpc_connect(address: AdvertisedAddress) -> Channel {
let ctx = CliContext::get();
create_tonic_channel_from_advertised_address(address, &ctx.network)
create_tonic_channel(address, &ctx.network)
}

0 comments on commit 3aca5aa

Please sign in to comment.