Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk)!: ban addresses failed in sdk #2351

Merged
merged 15 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/rs-dapi-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,7 @@ lru = { version = "0.12.3" }
serde = { version = "1.0.197", optional = true, features = ["derive"] }
serde_json = { version = "1.0.120", optional = true }
chrono = { version = "0.4.38", features = ["serde"] }
dashmap = "6.1.0"

[dev-dependencies]
tokio = { version = "1.40", features = ["macros"] }
44 changes: 16 additions & 28 deletions packages/rs-dapi-client/src/address_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
use chrono::Utc;
use dapi_grpc::tonic::codegen::http;
use dapi_grpc::tonic::transport::Uri;
use dashmap::setref::multiple::RefMulti;
use dashmap::DashSet;
use rand::{rngs::SmallRng, seq::IteratorRandom, SeedableRng};
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

const DEFAULT_BASE_BAN_PERIOD: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -103,7 +105,7 @@ pub enum AddressListError {
/// for [DapiRequest](crate::DapiRequest) execution.
#[derive(Debug, Clone)]
pub struct AddressList {
addresses: HashSet<Address>,
addresses: Arc<DashSet<Address>>,
base_ban_period: Duration,
}

Expand All @@ -128,14 +130,14 @@ impl AddressList {
/// Creates an empty [AddressList] with adjustable base ban time.
pub fn with_settings(base_ban_period: Duration) -> Self {
AddressList {
addresses: HashSet::new(),
addresses: Arc::new(DashSet::new()),
base_ban_period,
}
}

/// Bans address
pub(crate) fn ban_address(&mut self, address: &Address) -> Result<(), AddressListError> {
if !self.addresses.remove(address) {
pub fn ban_address(&self, address: &Address) -> Result<(), AddressListError> {
if self.addresses.remove(address).is_none() {
return Err(AddressListError::AddressNotFound(address.uri.clone()));
};

Expand All @@ -148,8 +150,8 @@ impl AddressList {
}

/// Clears address' ban record
pub(crate) fn unban_address(&mut self, address: &Address) -> Result<(), AddressListError> {
if !self.addresses.remove(address) {
pub fn unban_address(&self, address: &Address) -> Result<(), AddressListError> {
if self.addresses.remove(address).is_none() {
return Err(AddressListError::AddressNotFound(address.uri.clone()));
};

Expand Down Expand Up @@ -177,14 +179,9 @@ impl AddressList {
}

/// Randomly select a not banned address.
pub fn get_live_address(&self) -> Option<&Address> {
pub fn get_live_address(&self) -> Option<RefMulti<Address>> {
let mut rng = SmallRng::from_entropy();

self.unbanned().into_iter().choose(&mut rng)
}

/// Get all addresses that are not banned.
fn unbanned(&self) -> Vec<&Address> {
let now = chrono::Utc::now();

self.addresses
Expand All @@ -194,12 +191,7 @@ impl AddressList {
.map(|banned_until| banned_until < now)
.unwrap_or(true)
})
.collect()
}

/// Get number of available, not banned addresses.
pub fn available(&self) -> usize {
self.unbanned().len()
.choose(&mut rng)
}

/// Get number of all addresses, both banned and not banned.
Expand All @@ -214,6 +206,11 @@ impl AddressList {
pub fn is_empty(&self) -> bool {
self.addresses.is_empty()
}

/// Get an iterator over all addresses.
pub fn iter(&self) -> impl Iterator<Item = RefMulti<Address>> {
self.addresses.iter()
}
}

// TODO: Must be changed to FromStr
Expand All @@ -238,12 +235,3 @@ impl FromIterator<Uri> for AddressList {
address_list
}
}

impl IntoIterator for AddressList {
type Item = Address;
type IntoIter = std::collections::hash_set::IntoIter<Address>;

fn into_iter(self) -> Self::IntoIter {
self.addresses.into_iter()
}
}
186 changes: 108 additions & 78 deletions packages/rs-dapi-client/src/dapi_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
use backon::{ConstantBuilder, Retryable};
use dapi_grpc::mock::Mockable;
use dapi_grpc::tonic::async_trait;
use std::fmt::Debug;
use std::fmt::{Debug, Display};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;
use tracing::Instrument;

use crate::address_list::AddressListError;
use crate::connection_pool::ConnectionPool;
use crate::request_settings::AppliedRequestSettings;
use crate::transport::TransportError;
use crate::{
transport::{TransportClient, TransportRequest},
Expand Down Expand Up @@ -72,7 +73,7 @@ impl Mockable for DapiClientError {
/// Access point to DAPI.
#[derive(Debug, Clone)]
pub struct DapiClient {
address_list: Arc<RwLock<AddressList>>,
address_list: AddressList,
settings: RequestSettings,
pool: ConnectionPool,
#[cfg(feature = "dump")]
Expand All @@ -86,7 +87,7 @@ impl DapiClient {
let address_count = 3 * address_list.len();

Self {
address_list: Arc::new(RwLock::new(address_list)),
address_list,
settings,
pool: ConnectionPool::new(address_count),
#[cfg(feature = "dump")]
Expand All @@ -95,11 +96,86 @@ impl DapiClient {
}

/// Return the [DapiClient] address list.
pub fn address_list(&self) -> &Arc<RwLock<AddressList>> {
pub fn address_list(&self) -> &AddressList {
&self.address_list
}
}

/// Ban address in case of retryable error or unban it
/// if it was banned, and the request was successful.
pub fn ban_failed_address<R, E>(
address_list: &AddressList,
result: &ExecutionResult<R, E>,
applied_settings: &AppliedRequestSettings,
) where
E: CanRetry + Display + Debug,
{
match &result {
Ok(response) => {
// Unban the address if it was banned and node responded successfully this time
if response.address.is_banned() {
match address_list.unban_address(&response.address) {
Ok(_) => {
tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address);
}
// The address might be already removed from the list
// by background process (i.e., SML update), and it's fine.
Err(AddressListError::AddressNotFound(_)) => {
tracing::debug!(
address = ?response.address,
"unable to unban address {} because it's not in the list anymore",
response.address
);
}
Err(AddressListError::InvalidAddressUri(_)) => {
unreachable!("unban address doesn't return InvalidAddressUri")
}
}
}
}
Err(error) => {
if error.can_retry() {
if let Some(address) = error.address.as_ref() {
if applied_settings.ban_failed_address {
match address_list.ban_address(address) {
Ok(_) => {
tracing::warn!(
?address,
?error,
"ban address {address} due to error: {error}"
);
}
// The address might be already removed from the list
// by background process (i.e., SML update), and it's fine.
Err(AddressListError::AddressNotFound(_)) => {
tracing::debug!(
?address,
?error,
"unable to ban address {address} because it's not in the list anymore"
);
}
Err(AddressListError::InvalidAddressUri(_)) => {
unreachable!("ban address doesn't return InvalidAddressUri")
}
}
} else {
tracing::debug!(
?error,
?address,
"we should ban the address {address} due to the error but banning is disabled"
);
}
} else {
tracing::debug!(
?error,
"we should ban an address due to the error but address is absent"
);
}
}
}
};
}
shumkov marked this conversation as resolved.
Show resolved Hide resolved

#[async_trait]
impl DapiRequestExecutor for DapiClient {
/// Execute the [DapiRequest](crate::DapiRequest).
Expand Down Expand Up @@ -140,18 +216,13 @@ impl DapiRequestExecutor for DapiClient {
let retries_counter = Arc::clone(retries_counter_arc_ref);

// Try to get an address to initialize transport on:
let address_list = self
let address_result = self
.address_list
.read()
.expect("can't get address list for read");

let address_result = address_list
.get_live_address()
.as_deref()
.cloned()
.ok_or(DapiClientError::NoAvailableAddresses);

drop(address_list);

let _span = tracing::trace_span!(
"execute request",
address = ?address_result,
Expand Down Expand Up @@ -194,81 +265,40 @@ impl DapiRequestExecutor for DapiClient {
address: Some(address.clone()),
})?;

let response = transport_request
let result = transport_request
.execute_transport(&mut transport_client, &applied_settings)
.await
.map_err(DapiClientError::Transport);

match &response {
Ok(_) => {
// Unban the address if it was banned and node responded successfully this time
if address.is_banned() {
let mut address_list = self
.address_list
.write()
.expect("can't get address list for write");

address_list.unban_address(&address).map_err(|error| {
ExecutionError {
inner: DapiClientError::AddressList(error),
retries: retries_counter
.load(std::sync::atomic::Ordering::Acquire),
address: Some(address.clone()),
}
})?;
let retries = retries_counter.load(std::sync::atomic::Ordering::Acquire);
shumkov marked this conversation as resolved.
Show resolved Hide resolved

let execution_result = result
.map(|inner| {
tracing::trace!(response = ?inner, "received {} response", response_name);

ExecutionResponse {
inner,
retries,
address: address.clone(),
}
})
.map_err(|inner| {
tracing::debug!(error = ?inner, "received error: {inner}");

tracing::trace!(?response, "received {} response", response_name);
}
Err(error) => {
if error.can_retry() {
if applied_settings.ban_failed_address {
let mut address_list = self
.address_list
.write()
.expect("can't get address list for write");
tracing::warn!(
?address,
?error,
"received server error, banning address"
);
address_list.ban_address(&address).map_err(|error| {
ExecutionError {
inner: DapiClientError::AddressList(error),
retries: retries_counter
.load(std::sync::atomic::Ordering::Acquire),
address: Some(address.clone()),
}
})?;
} else {
tracing::debug!(
?address,
?error,
"received server error, we should ban the node but banning is disabled"
);
}
} else {
tracing::debug!(
?error,
"received server error, most likely the request is invalid"
);
ExecutionError {
inner,
retries,
address: Some(address.clone()),
}
}
};
});

let retries = retries_counter.load(std::sync::atomic::Ordering::Acquire);
ban_failed_address::<R::Response, DapiClientError>(
lklimek marked this conversation as resolved.
Show resolved Hide resolved
&self.address_list,
&execution_result,
&applied_settings,
);

response
.map(|inner| ExecutionResponse {
inner,
retries,
address: address.clone(),
})
.map_err(|inner| ExecutionError {
inner,
retries,
address: Some(address),
})
execution_result
}
};

Expand Down
Loading
Loading