Skip to content

Commit

Permalink
Merge pull request valkey-io#477 from shachlanAmazon/other-less-logs
Browse files Browse the repository at this point in the history
Add missing logs.
  • Loading branch information
shachlanAmazon authored Oct 1, 2023
2 parents 43e5284 + ed550b5 commit e71a5b9
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 6 deletions.
42 changes: 41 additions & 1 deletion babushka-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::connection_request::{
AddressInfo, AuthenticationInfo, ConnectionRequest, ReadFromReplicaStrategy, TlsMode,
};
use futures::FutureExt;
use logger_core::log_info;
use redis::cluster_async::ClusterConnection;
use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo};
use redis::{
Expand Down Expand Up @@ -165,7 +166,7 @@ async fn create_cluster_client(
request: ConnectionRequest,
) -> RedisResult<redis::cluster_async::ClusterConnection> {
// TODO - implement timeout for each connection attempt
let tls_mode = request.tls_mode.enum_value_or(TlsMode::NoTls);
let tls_mode = request.tls_mode.enum_value_or_default();
let redis_connection_info = get_redis_connection_info(request.authentication_info.0, 0);
let initial_nodes = request
.addresses
Expand Down Expand Up @@ -224,10 +225,49 @@ impl std::fmt::Display for ConnectionError {
}
}

fn format_non_zero_value(name: &'static str, value: u32) -> String {
if value > 0 {
format!("\n{name}: {value}")
} else {
String::new()
}
}

fn sanitized_request_string(request: &ConnectionRequest) -> String {
let addresses = request
.addresses
.iter()
.map(|address| format!("{}:{}", address.host, address.port))
.collect::<Vec<_>>()
.join(", ");
let tls_mode = request.tls_mode.enum_value_or_default();
let cluster_mode = request.cluster_mode_enabled;
let response_timeout = format_non_zero_value("response timeout", request.response_timeout);
let client_creation_timeout =
format_non_zero_value("client creation timeout", request.client_creation_timeout);
let database_id = format_non_zero_value("database ID", request.database_id);
let rfr_strategy = request.read_from_replica_strategy.enum_value_or_default();
let connection_retry_strategy = match &request.connection_retry_strategy.0 {
Some(strategy) => {
format!("\nreconnect backoff strategy: number of increasing duration retries: {}, base: {}, factor: {}",
strategy.number_of_retries, strategy.exponent_base, strategy.factor)
}
None => String::new(),
};

format!(
"\naddresses: {addresses}\nTLS mode: {tls_mode:?}\ncluster mode: {cluster_mode}{response_timeout}{client_creation_timeout}\nRead from replica strategy: {rfr_strategy:?}{database_id}{connection_retry_strategy}",
)
}

impl Client {
pub async fn new(request: ConnectionRequest) -> Result<Self, ConnectionError> {
const DEFAULT_CLIENT_CREATION_TIMEOUT: Duration = Duration::from_millis(2500);

log_info(
"Connection configuration",
sanitized_request_string(&request),
);
let response_timeout = to_duration(request.response_timeout, DEFAULT_RESPONSE_TIMEOUT);
let total_connection_timeout = to_duration(
request.client_creation_timeout,
Expand Down
14 changes: 10 additions & 4 deletions babushka-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl StandaloneClient {
connection_request.database_id,
);

let tls_mode = connection_request.tls_mode.enum_value_or(TlsMode::NoTls);
let tls_mode = connection_request.tls_mode.enum_value_or_default();
let node_count = connection_request.addresses.len();
let mut stream = stream::iter(connection_request.addresses.iter())
.map(|address| async {
Expand Down Expand Up @@ -192,6 +192,10 @@ impl StandaloneClient {
let result = connection.send_packed_command(cmd).await;
match result {
Err(err) if err.is_connection_dropped() => {
log_warn(
"single request",
format!("received disconnect error `{err}`"),
);
reconnecting_connection.reconnect();
Err(err)
}
Expand All @@ -210,6 +214,10 @@ impl StandaloneClient {
let result = connection.send_packed_commands(cmd, offset, count).await;
match result {
Err(err) if err.is_connection_dropped() => {
log_warn(
"pipeline request",
format!("received disconnect error `{err}`"),
);
reconnecting_connection.reconnect();
Err(err)
}
Expand Down Expand Up @@ -292,9 +300,7 @@ async fn get_connection_and_replication_info(
fn get_read_from_replica_strategy(
read_from_replica_strategy: &EnumOrUnknown<crate::connection_request::ReadFromReplicaStrategy>,
) -> ReadFromReplicaStrategy {
match read_from_replica_strategy
.enum_value_or(crate::connection_request::ReadFromReplicaStrategy::AlwaysFromPrimary)
{
match read_from_replica_strategy.enum_value_or_default() {
crate::connection_request::ReadFromReplicaStrategy::AlwaysFromPrimary => {
ReadFromReplicaStrategy::AlwaysFromPrimary
}
Expand Down
1 change: 1 addition & 0 deletions babushka-core/src/protobuf/connection_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ message AuthenticationInfo {
string username = 2;
}

// IMPORTANT - if you add fields here, you probably need to add them also in client/mod.rs:`sanitized_request_string`.
message ConnectionRequest {
repeated AddressInfo addresses = 1;
TlsMode tls_mode = 2;
Expand Down
3 changes: 2 additions & 1 deletion babushka-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::retry_strategies::get_fixed_interval_backoff;
use directories::BaseDirs;
use dispose::{Disposable, Dispose};
use futures::stream::StreamExt;
use logger_core::{log_debug, log_error, log_info, log_trace};
use logger_core::{log_debug, log_error, log_info, log_trace, log_warn};
use protobuf::Message;
use redis::cluster_routing::{
MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr,
Expand Down Expand Up @@ -210,6 +210,7 @@ async fn write_result(
error_message.into(),
))
} else {
log_warn("received error", error_message.as_str());
let mut request_error = response::RequestError::default();
if err.is_connection_dropped() {
request_error.type_ = response::RequestErrorType::Disconnect.into();
Expand Down

0 comments on commit e71a5b9

Please sign in to comment.