diff --git a/babushka-core/src/client/mod.rs b/babushka-core/src/client/mod.rs index 0d36812f9e..f9927b6c56 100644 --- a/babushka-core/src/client/mod.rs +++ b/babushka-core/src/client/mod.rs @@ -2,6 +2,7 @@ use crate::connection_request::{ AddressInfo, AuthenticationInfo, ConnectionRequest, ReadFromReplicaStrategy, TlsMode, }; use futures::FutureExt; +use logger_core::log_info; use redis::cluster_async::ClusterConnection; use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo}; use redis::{ @@ -165,7 +166,7 @@ async fn create_cluster_client( request: ConnectionRequest, ) -> RedisResult { // TODO - implement timeout for each connection attempt - let tls_mode = request.tls_mode.enum_value_or(TlsMode::NoTls); + let tls_mode = request.tls_mode.enum_value_or_default(); let redis_connection_info = get_redis_connection_info(request.authentication_info.0, 0); let initial_nodes = request .addresses @@ -224,10 +225,49 @@ impl std::fmt::Display for ConnectionError { } } +fn format_non_zero_value(name: &'static str, value: u32) -> String { + if value > 0 { + format!("\n{name}: {value}") + } else { + String::new() + } +} + +fn sanitized_request_string(request: &ConnectionRequest) -> String { + let addresses = request + .addresses + .iter() + .map(|address| format!("{}:{}", address.host, address.port)) + .collect::>() + .join(", "); + let tls_mode = request.tls_mode.enum_value_or_default(); + let cluster_mode = request.cluster_mode_enabled; + let response_timeout = format_non_zero_value("response timeout", request.response_timeout); + let client_creation_timeout = + format_non_zero_value("client creation timeout", request.client_creation_timeout); + let database_id = format_non_zero_value("database ID", request.database_id); + let rfr_strategy = request.read_from_replica_strategy.enum_value_or_default(); + let connection_retry_strategy = match &request.connection_retry_strategy.0 { + Some(strategy) => { + format!("\nreconnect backoff strategy: number of increasing duration retries: {}, base: {}, factor: {}", + strategy.number_of_retries, strategy.exponent_base, strategy.factor) + } + None => String::new(), + }; + + format!( + "\naddresses: {addresses}\nTLS mode: {tls_mode:?}\ncluster mode: {cluster_mode}{response_timeout}{client_creation_timeout}\nRead from replica strategy: {rfr_strategy:?}{database_id}{connection_retry_strategy}", + ) +} + impl Client { pub async fn new(request: ConnectionRequest) -> Result { const DEFAULT_CLIENT_CREATION_TIMEOUT: Duration = Duration::from_millis(2500); + log_info( + "Connection configuration", + sanitized_request_string(&request), + ); let response_timeout = to_duration(request.response_timeout, DEFAULT_RESPONSE_TIMEOUT); let total_connection_timeout = to_duration( request.client_creation_timeout, diff --git a/babushka-core/src/client/standalone_client.rs b/babushka-core/src/client/standalone_client.rs index a767bbcb5b..3a21defe5d 100644 --- a/babushka-core/src/client/standalone_client.rs +++ b/babushka-core/src/client/standalone_client.rs @@ -75,7 +75,7 @@ impl StandaloneClient { connection_request.database_id, ); - let tls_mode = connection_request.tls_mode.enum_value_or(TlsMode::NoTls); + let tls_mode = connection_request.tls_mode.enum_value_or_default(); let node_count = connection_request.addresses.len(); let mut stream = stream::iter(connection_request.addresses.iter()) .map(|address| async { @@ -192,6 +192,10 @@ impl StandaloneClient { let result = connection.send_packed_command(cmd).await; match result { Err(err) if err.is_connection_dropped() => { + log_warn( + "single request", + format!("received disconnect error `{err}`"), + ); reconnecting_connection.reconnect(); Err(err) } @@ -210,6 +214,10 @@ impl StandaloneClient { let result = connection.send_packed_commands(cmd, offset, count).await; match result { Err(err) if err.is_connection_dropped() => { + log_warn( + "pipeline request", + format!("received disconnect error `{err}`"), + ); reconnecting_connection.reconnect(); Err(err) } @@ -292,9 +300,7 @@ async fn get_connection_and_replication_info( fn get_read_from_replica_strategy( read_from_replica_strategy: &EnumOrUnknown, ) -> ReadFromReplicaStrategy { - match read_from_replica_strategy - .enum_value_or(crate::connection_request::ReadFromReplicaStrategy::AlwaysFromPrimary) - { + match read_from_replica_strategy.enum_value_or_default() { crate::connection_request::ReadFromReplicaStrategy::AlwaysFromPrimary => { ReadFromReplicaStrategy::AlwaysFromPrimary } diff --git a/babushka-core/src/protobuf/connection_request.proto b/babushka-core/src/protobuf/connection_request.proto index 67e728a81d..41cb198583 100644 --- a/babushka-core/src/protobuf/connection_request.proto +++ b/babushka-core/src/protobuf/connection_request.proto @@ -24,6 +24,7 @@ message AuthenticationInfo { string username = 2; } +// IMPORTANT - if you add fields here, you probably need to add them also in client/mod.rs:`sanitized_request_string`. message ConnectionRequest { repeated AddressInfo addresses = 1; TlsMode tls_mode = 2; diff --git a/babushka-core/src/socket_listener.rs b/babushka-core/src/socket_listener.rs index da61e75730..377b883594 100644 --- a/babushka-core/src/socket_listener.rs +++ b/babushka-core/src/socket_listener.rs @@ -10,7 +10,7 @@ use crate::retry_strategies::get_fixed_interval_backoff; use directories::BaseDirs; use dispose::{Disposable, Dispose}; use futures::stream::StreamExt; -use logger_core::{log_debug, log_error, log_info, log_trace}; +use logger_core::{log_debug, log_error, log_info, log_trace, log_warn}; use protobuf::Message; use redis::cluster_routing::{ MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, @@ -210,6 +210,7 @@ async fn write_result( error_message.into(), )) } else { + log_warn("received error", error_message.as_str()); let mut request_error = response::RequestError::default(); if err.is_connection_dropped() { request_error.type_ = response::RequestErrorType::Disconnect.into();