diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index e74ffcbf56..46fa84ab53 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -37,5 +37,6 @@ lru = { version = "0.12.3" } serde = { version = "1.0.197", optional = true, features = ["derive"] } serde_json = { version = "1.0.120", optional = true } chrono = { version = "0.4.38", features = ["serde"] } + [dev-dependencies] tokio = { version = "1.40", features = ["macros"] } diff --git a/packages/rs-dapi-client/src/address_list.rs b/packages/rs-dapi-client/src/address_list.rs index 0c21ecc0b1..5a92df63fe 100644 --- a/packages/rs-dapi-client/src/address_list.rs +++ b/packages/rs-dapi-client/src/address_list.rs @@ -4,9 +4,12 @@ use chrono::Utc; use dapi_grpc::tonic::codegen::http; use dapi_grpc::tonic::transport::Uri; use rand::{rngs::SmallRng, seq::IteratorRandom, SeedableRng}; -use std::collections::HashSet; +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::hash::{Hash, Hasher}; +use std::mem; use std::str::FromStr; +use std::sync::{Arc, RwLock}; use std::time::Duration; const DEFAULT_BASE_BAN_PERIOD: Duration = Duration::from_secs(60); @@ -14,12 +17,7 @@ const DEFAULT_BASE_BAN_PERIOD: Duration = Duration::from_secs(60); /// DAPI address. #[derive(Debug, Clone, Eq)] #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] -pub struct Address { - ban_count: usize, - banned_until: Option>, - #[cfg_attr(feature = "mocks", serde(with = "http_serde::uri"))] - uri: Uri, -} +pub struct Address(#[cfg_attr(feature = "mocks", serde(with = "http_serde::uri"))] Uri); impl FromStr for Address { type Err = AddressListError; @@ -33,35 +31,46 @@ impl FromStr for Address { impl PartialEq for Address { fn eq(&self, other: &Self) -> bool { - self.uri == other.uri + self.0 == other.0 } } impl PartialEq for Address { fn eq(&self, other: &Uri) -> bool { - self.uri == *other + self.0 == *other } } impl Hash for Address { fn hash(&self, state: &mut H) { - self.uri.hash(state); + self.0.hash(state); } } impl From for Address { fn from(uri: Uri) -> Self { - Address { - ban_count: 0, - banned_until: None, - uri, - } + Address(uri) } } impl Address { + /// Get [Uri] of a node. + pub fn uri(&self) -> &Uri { + &self.0 + } +} + +/// Address status +/// Contains information about the number of bans and the time until the next ban is lifted. +#[derive(Debug, Default, Clone)] +pub struct AddressStatus { + ban_count: usize, + banned_until: Option>, +} + +impl AddressStatus { /// Ban the [Address] so it won't be available through [AddressList::get_live_address] for some time. - fn ban(&mut self, base_ban_period: &Duration) { + pub fn ban(&mut self, base_ban_period: &Duration) { let coefficient = (self.ban_count as f64).exp(); let ban_period = Duration::from_secs_f64(base_ban_period.as_secs_f64() * coefficient); @@ -75,24 +84,16 @@ impl Address { } /// Clears ban record. - fn unban(&mut self) { + pub fn unban(&mut self) { self.ban_count = 0; self.banned_until = None; } - - /// Get [Uri] of a node. - pub fn uri(&self) -> &Uri { - &self.uri - } } /// [AddressList] errors #[derive(Debug, thiserror::Error)] #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] pub enum AddressListError { - /// Specified address is not present in the list - #[error("address {0} not found in the list")] - AddressNotFound(#[cfg_attr(feature = "mocks", serde(with = "http_serde::uri"))] Uri), /// A valid uri is required to create an Address #[error("unable parse address: {0}")] #[cfg_attr(feature = "mocks", serde(skip))] @@ -103,7 +104,7 @@ pub enum AddressListError { /// for [DapiRequest](crate::DapiRequest) execution. #[derive(Debug, Clone)] pub struct AddressList { - addresses: HashSet
, + addresses: Arc>>, base_ban_period: Duration, } @@ -115,7 +116,7 @@ impl Default for AddressList { impl std::fmt::Display for Address { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.uri.fmt(f) + self.0.fmt(f) } } @@ -128,43 +129,70 @@ impl AddressList { /// Creates an empty [AddressList] with adjustable base ban time. pub fn with_settings(base_ban_period: Duration) -> Self { AddressList { - addresses: HashSet::new(), + addresses: Arc::new(RwLock::new(HashMap::new())), base_ban_period, } } /// Bans address - pub(crate) fn ban_address(&mut self, address: &Address) -> Result<(), AddressListError> { - if !self.addresses.remove(address) { - return Err(AddressListError::AddressNotFound(address.uri.clone())); - }; + /// Returns false if the address is not in the list. + pub fn ban(&self, address: &Address) -> bool { + let mut guard = self.addresses.write().unwrap(); - let mut banned_address = address.clone(); - banned_address.ban(&self.base_ban_period); + let Some(status) = guard.get_mut(address) else { + return false; + }; - self.addresses.insert(banned_address); + status.ban(&self.base_ban_period); - Ok(()) + true } /// Clears address' ban record - pub(crate) fn unban_address(&mut self, address: &Address) -> Result<(), AddressListError> { - if !self.addresses.remove(address) { - return Err(AddressListError::AddressNotFound(address.uri.clone())); + /// Returns false if the address is not in the list. + pub fn unban(&self, address: &Address) -> bool { + let mut guard = self.addresses.write().unwrap(); + + let Some(status) = guard.get_mut(address) else { + return false; }; - let mut unbanned_address = address.clone(); - unbanned_address.unban(); + status.unban(); + + true + } - self.addresses.insert(unbanned_address); + /// Check if the address is banned. + pub fn is_banned(&self, address: &Address) -> bool { + let guard = self.addresses.read().unwrap(); - Ok(()) + guard + .get(address) + .map(|status| status.is_banned()) + .unwrap_or(false) } /// Adds a node [Address] to [AddressList] /// Returns false if the address is already in the list. pub fn add(&mut self, address: Address) -> bool { - self.addresses.insert(address) + let mut guard = self.addresses.write().unwrap(); + + match guard.entry(address) { + Entry::Occupied(_) => false, + Entry::Vacant(e) => { + e.insert(AddressStatus::default()); + + true + } + } + } + + /// Remove address from the list + /// Returns [AddressStatus] if the address was in the list. + pub fn remove(&mut self, address: &Address) -> Option { + let mut guard = self.addresses.write().unwrap(); + + guard.remove(address) } // TODO: this is the most simple way to add an address @@ -173,38 +201,32 @@ impl AddressList { /// Add a node [Address] to [AddressList] by [Uri]. /// Returns false if the address is already in the list. pub fn add_uri(&mut self, uri: Uri) -> bool { - self.addresses.insert(uri.into()) + self.add(Address::from(uri)) } /// Randomly select a not banned address. - pub fn get_live_address(&self) -> Option<&Address> { - let mut rng = SmallRng::from_entropy(); + pub fn get_live_address(&self) -> Option
{ + let guard = self.addresses.read().unwrap(); - self.unbanned().into_iter().choose(&mut rng) - } + let mut rng = SmallRng::from_entropy(); - /// Get all addresses that are not banned. - fn unbanned(&self) -> Vec<&Address> { let now = chrono::Utc::now(); - self.addresses + guard .iter() - .filter(|addr| { - addr.banned_until + .filter(|(_, status)| { + status + .banned_until .map(|banned_until| banned_until < now) .unwrap_or(true) }) - .collect() - } - - /// Get number of available, not banned addresses. - pub fn available(&self) -> usize { - self.unbanned().len() + .choose(&mut rng) + .map(|(addr, _)| addr.clone()) } /// Get number of all addresses, both banned and not banned. pub fn len(&self) -> usize { - self.addresses.len() + self.addresses.read().unwrap().len() } /// Check if the list is empty. @@ -212,7 +234,20 @@ impl AddressList { /// Returns false if there is at least one address in the list. /// Banned addresses are also counted. pub fn is_empty(&self) -> bool { - self.addresses.is_empty() + self.addresses.read().unwrap().is_empty() + } +} + +impl IntoIterator for AddressList { + type Item = (Address, AddressStatus); + type IntoIter = std::collections::hash_map::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + let mut guard = self.addresses.write().unwrap(); + + let addresses_map = mem::take(&mut *guard); + + addresses_map.into_iter() } } @@ -238,12 +273,3 @@ impl FromIterator for AddressList { address_list } } - -impl IntoIterator for AddressList { - type Item = Address; - type IntoIter = std::collections::hash_set::IntoIter
; - - fn into_iter(self) -> Self::IntoIter { - self.addresses.into_iter() - } -} diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index c5ef5e67a1..ebca641f40 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -3,14 +3,15 @@ use backon::{ConstantBuilder, Retryable}; use dapi_grpc::mock::Mockable; use dapi_grpc::tonic::async_trait; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::sync::atomic::AtomicUsize; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use tracing::Instrument; use crate::address_list::AddressListError; use crate::connection_pool::ConnectionPool; +use crate::request_settings::AppliedRequestSettings; use crate::transport::TransportError; use crate::{ transport::{TransportClient, TransportRequest}, @@ -72,7 +73,7 @@ impl Mockable for DapiClientError { /// Access point to DAPI. #[derive(Debug, Clone)] pub struct DapiClient { - address_list: Arc>, + address_list: AddressList, settings: RequestSettings, pool: ConnectionPool, #[cfg(feature = "dump")] @@ -86,7 +87,7 @@ impl DapiClient { let address_count = 3 * address_list.len(); Self { - address_list: Arc::new(RwLock::new(address_list)), + address_list, settings, pool: ConnectionPool::new(address_count), #[cfg(feature = "dump")] @@ -95,11 +96,74 @@ impl DapiClient { } /// Return the [DapiClient] address list. - pub fn address_list(&self) -> &Arc> { + pub fn address_list(&self) -> &AddressList { &self.address_list } } +/// Ban address in case of retryable error or unban it +/// if it was banned, and the request was successful. +pub fn update_address_ban_status( + address_list: &AddressList, + result: &ExecutionResult, + applied_settings: &AppliedRequestSettings, +) where + E: CanRetry + Display + Debug, +{ + match &result { + Ok(response) => { + // Unban the address if it was banned and node responded successfully this time + if address_list.is_banned(&response.address) { + if address_list.unban(&response.address) { + tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address); + } else { + // The address might be already removed from the list + // by background process (i.e., SML update), and it's fine. + tracing::debug!( + address = ?response.address, + "unable to unban address {} because it's not in the list anymore", + response.address + ); + } + } + } + Err(error) => { + if error.can_retry() { + if let Some(address) = error.address.as_ref() { + if applied_settings.ban_failed_address { + if address_list.ban(address) { + tracing::warn!( + ?address, + ?error, + "ban address {address} due to error: {error}" + ); + } else { + // The address might be already removed from the list + // by background process (i.e., SML update), and it's fine. + tracing::debug!( + ?address, + ?error, + "unable to ban address {address} because it's not in the list anymore" + ); + } + } else { + tracing::debug!( + ?error, + ?address, + "we should ban the address {address} due to the error but banning is disabled" + ); + } + } else { + tracing::debug!( + ?error, + "we should ban an address due to the error but address is absent" + ); + } + } + } + }; +} + #[async_trait] impl DapiRequestExecutor for DapiClient { /// Execute the [DapiRequest](crate::DapiRequest). @@ -140,18 +204,11 @@ impl DapiRequestExecutor for DapiClient { let retries_counter = Arc::clone(retries_counter_arc_ref); // Try to get an address to initialize transport on: - let address_list = self + let address_result = self .address_list - .read() - .expect("can't get address list for read"); - - let address_result = address_list .get_live_address() - .cloned() .ok_or(DapiClientError::NoAvailableAddresses); - drop(address_list); - let _span = tracing::trace_span!( "execute request", address = ?address_result, @@ -177,7 +234,7 @@ impl DapiRequestExecutor for DapiClient { // `impl Future`, not a `Result` itself. let address = address_result.map_err(|inner| ExecutionError { inner, - retries: retries_counter.load(std::sync::atomic::Ordering::Acquire), + retries: retries_counter.load(std::sync::atomic::Ordering::Relaxed), address: None, })?; @@ -190,85 +247,44 @@ impl DapiRequestExecutor for DapiClient { ) .map_err(|error| ExecutionError { inner: DapiClientError::Transport(error), - retries: retries_counter.load(std::sync::atomic::Ordering::Acquire), + retries: retries_counter.load(std::sync::atomic::Ordering::Relaxed), address: Some(address.clone()), })?; - let response = transport_request + let result = transport_request .execute_transport(&mut transport_client, &applied_settings) .await .map_err(DapiClientError::Transport); - match &response { - Ok(_) => { - // Unban the address if it was banned and node responded successfully this time - if address.is_banned() { - let mut address_list = self - .address_list - .write() - .expect("can't get address list for write"); - - address_list.unban_address(&address).map_err(|error| { - ExecutionError { - inner: DapiClientError::AddressList(error), - retries: retries_counter - .load(std::sync::atomic::Ordering::Acquire), - address: Some(address.clone()), - } - })?; + let retries = retries_counter.load(std::sync::atomic::Ordering::Relaxed); + + let execution_result = result + .map(|inner| { + tracing::trace!(response = ?inner, "received {} response", response_name); + + ExecutionResponse { + inner, + retries, + address: address.clone(), } + }) + .map_err(|inner| { + tracing::debug!(error = ?inner, "received error: {inner}"); - tracing::trace!(?response, "received {} response", response_name); - } - Err(error) => { - if error.can_retry() { - if applied_settings.ban_failed_address { - let mut address_list = self - .address_list - .write() - .expect("can't get address list for write"); - tracing::warn!( - ?address, - ?error, - "received server error, banning address" - ); - address_list.ban_address(&address).map_err(|error| { - ExecutionError { - inner: DapiClientError::AddressList(error), - retries: retries_counter - .load(std::sync::atomic::Ordering::Acquire), - address: Some(address.clone()), - } - })?; - } else { - tracing::debug!( - ?address, - ?error, - "received server error, we should ban the node but banning is disabled" - ); - } - } else { - tracing::debug!( - ?error, - "received server error, most likely the request is invalid" - ); + ExecutionError { + inner, + retries, + address: Some(address.clone()), } - } - }; + }); - let retries = retries_counter.load(std::sync::atomic::Ordering::Acquire); + update_address_ban_status::( + &self.address_list, + &execution_result, + &applied_settings, + ); - response - .map(|inner| ExecutionResponse { - inner, - retries, - address: address.clone(), - }) - .map_err(|inner| ExecutionError { - inner, - retries, - address: Some(address), - }) + execution_result } }; @@ -278,7 +294,7 @@ impl DapiRequestExecutor for DapiClient { .retry(retry_settings) .notify(|error, duration| { let retries_counter = Arc::clone(&retries_counter_arc); - retries_counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel); + retries_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); tracing::warn!( ?error, diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index f8c03f3956..e820a714a0 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -16,8 +16,9 @@ pub mod transport; pub use address_list::Address; pub use address_list::AddressList; pub use address_list::AddressListError; +pub use address_list::AddressStatus; pub use connection_pool::ConnectionPool; -pub use dapi_client::{DapiClient, DapiClientError}; +pub use dapi_client::{update_address_ban_status, DapiClient, DapiClientError}; #[cfg(feature = "dump")] pub use dump::DumpData; pub use executor::{ diff --git a/packages/rs-drive-proof-verifier/src/error.rs b/packages/rs-drive-proof-verifier/src/error.rs index 8c0664c825..3fb5825a8c 100644 --- a/packages/rs-drive-proof-verifier/src/error.rs +++ b/packages/rs-drive-proof-verifier/src/error.rs @@ -1,5 +1,4 @@ use dpp::ProtocolError; -use drive::grovedb::operations::proof::GroveDBProof; /// Errors #[derive(Debug, thiserror::Error)] diff --git a/packages/rs-drive-verify-c-binding/Cargo.toml b/packages/rs-drive-verify-c-binding/Cargo.toml index 1f6d9b4f1e..22da440ca7 100644 --- a/packages/rs-drive-verify-c-binding/Cargo.toml +++ b/packages/rs-drive-verify-c-binding/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rs-drive-verify-c-binding" -version = "1.1.0" +version = "1.6.2" edition = "2021" rust-version.workspace = true diff --git a/packages/rs-sdk/src/platform/fetch.rs b/packages/rs-sdk/src/platform/fetch.rs index 80564fbdf2..7fdf5e1974 100644 --- a/packages/rs-sdk/src/platform/fetch.rs +++ b/packages/rs-sdk/src/platform/fetch.rs @@ -195,7 +195,7 @@ where .dapi_client_settings .override_by(settings.unwrap_or_default()); - retry(settings, fut).await.into_inner() + retry(sdk.address_list(), settings, fut).await.into_inner() } /// Fetch single object from Platform. diff --git a/packages/rs-sdk/src/platform/fetch_many.rs b/packages/rs-sdk/src/platform/fetch_many.rs index 360a3559b3..1fcdb1043a 100644 --- a/packages/rs-sdk/src/platform/fetch_many.rs +++ b/packages/rs-sdk/src/platform/fetch_many.rs @@ -252,7 +252,7 @@ where .dapi_client_settings .override_by(settings.unwrap_or_default()); - retry(settings, fut).await.into_inner() + retry(sdk.address_list(), settings, fut).await.into_inner() } /// Fetch multiple objects from Platform by their identifiers. @@ -327,7 +327,7 @@ impl FetchMany for Document { ) -> Result { let document_query: &DocumentQuery = &query.query(sdk.prove())?; - retry(sdk.dapi_client_settings, |settings| async move { + retry(sdk.address_list(), sdk.dapi_client_settings, |settings| async move { let request = document_query.clone(); let ExecutionResponse { diff --git a/packages/rs-sdk/src/platform/fetch_unproved.rs b/packages/rs-sdk/src/platform/fetch_unproved.rs index ac3a682f81..d98d598844 100644 --- a/packages/rs-sdk/src/platform/fetch_unproved.rs +++ b/packages/rs-sdk/src/platform/fetch_unproved.rs @@ -55,7 +55,6 @@ where /// - `settings`: Request settings for the connection to Platform. /// /// ## Returns - /// Returns: /// * `Ok(Some(Self))` when object is found. /// * `Ok(None)` when object is not found. /// * [`Err(Error)`](Error) when an error occurs. @@ -107,7 +106,9 @@ where }; let settings = sdk.dapi_client_settings.override_by(settings); - retry(settings, closure).await.into_inner() + retry(sdk.address_list(), settings, closure) + .await + .into_inner() } } diff --git a/packages/rs-sdk/src/platform/transition/broadcast.rs b/packages/rs-sdk/src/platform/transition/broadcast.rs index f41a279b13..f7c3f75d32 100644 --- a/packages/rs-sdk/src/platform/transition/broadcast.rs +++ b/packages/rs-sdk/src/platform/transition/broadcast.rs @@ -52,7 +52,7 @@ impl BroadcastStateTransition for StateTransition { }; // response is empty for a broadcast, result comes from the stream wait for state transition result - retry(retry_settings, factory) + retry(sdk.address_list(), retry_settings, factory) .await .into_inner() .map(|_| ()) @@ -122,7 +122,7 @@ impl BroadcastStateTransition for StateTransition { .wrap_to_execution_result(&response) }; - let future = retry(retry_settings, factory); + let future = retry(sdk.address_list(), retry_settings, factory); // run the future with or without timeout, depending on the settings let wait_timeout = settings.and_then(|s| s.wait_timeout); match wait_timeout { diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index 3fd570e206..bceb9dacf3 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -184,7 +184,7 @@ enum SdkInstance { dapi: Arc>, /// Mock SDK implementation processing mock expectations and responses. mock: Arc>, - + address_list: AddressList, /// Platform version configured for this Sdk version: &'static PlatformVersion, }, @@ -554,19 +554,11 @@ impl Sdk { } /// Return the [DapiClient] address list - pub fn address_list(&self) -> Result { + pub fn address_list(&self) -> &AddressList { match &self.inner { - SdkInstance::Dapi { dapi, version: _ } => { - let address_list_arc = dapi.address_list(); - let address_list_lock = address_list_arc - .read() - .map_err(|e| format!("Failed to read address list: {e}"))?; - Ok(address_list_lock.clone()) - } + SdkInstance::Dapi { dapi, .. } => dapi.address_list(), #[cfg(feature = "mocks")] - SdkInstance::Mock { .. } => { - unimplemented!("mock Sdk does not have address list") - } + SdkInstance::Mock { address_list, .. } => address_list, } } } @@ -1026,10 +1018,11 @@ impl SdkBuilder { let sdk= Sdk { network: self.network, dapi_client_settings: self.settings, - inner:SdkInstance::Mock { - mock:mock_sdk.clone(), + inner: SdkInstance::Mock { + mock: mock_sdk.clone(), dapi, - version:self.version, + address_list: AddressList::new(), + version: self.version, }, dump_dir: self.dump_dir.clone(), proofs:self.proofs, diff --git a/packages/rs-sdk/src/sync.rs b/packages/rs-sdk/src/sync.rs index 38a878e174..be7cf9d265 100644 --- a/packages/rs-sdk/src/sync.rs +++ b/packages/rs-sdk/src/sync.rs @@ -6,13 +6,17 @@ use arc_swap::ArcSwap; use drive_proof_verifier::error::ContextProviderError; -use rs_dapi_client::{CanRetry, ExecutionResult, RequestSettings}; +use rs_dapi_client::{ + update_address_ban_status, AddressList, CanRetry, ExecutionResult, RequestSettings, +}; +use std::fmt::Display; use std::{ fmt::Debug, future::Future, sync::{mpsc::SendError, Arc}, }; use tokio::{runtime::TryCurrentError, sync::Mutex}; + #[derive(Debug, thiserror::Error)] pub enum AsyncError { /// Not running inside tokio runtime @@ -110,6 +114,7 @@ async fn worker( /// /// ## Parameters /// +/// - `address_list` - list of addresses to be used for the requests. /// - `settings` - global settings with any request-specific settings overrides applied. /// - `future_factory_fn` - closure that returns a future that should be retried. It should take [`RequestSettings`] as /// an argument and return [`ExecutionResult`]. @@ -138,8 +143,9 @@ async fn worker( /// } /// #[tokio::main] /// async fn main() { +/// let address_list = rs_dapi_client::AddressList::default(); /// let global_settings = RequestSettings::default(); -/// dash_sdk::sync::retry(global_settings, retry_test_function).await.expect_err("should fail"); +/// dash_sdk::sync::retry(&address_list, global_settings, retry_test_function).await.expect_err("should fail"); /// } /// ``` /// @@ -154,13 +160,14 @@ async fn worker( /// /// - [`::backon`] crate that is used by this function. pub async fn retry( + address_list: &AddressList, settings: RequestSettings, future_factory_fn: FutureFactoryFn, ) -> ExecutionResult where Fut: Future>, FutureFactoryFn: FnMut(RequestSettings) -> Fut, - E: CanRetry + Debug, + E: CanRetry + Display + Debug, { let max_retries = settings.retries.unwrap_or_default(); @@ -187,21 +194,26 @@ where async move { let settings = closure_settings.load_full().clone(); let mut func = inner_fn.lock().await; - (*func)(*settings).await + let result = (*func)(*settings).await; + + // Ban or unban the address based on the result + update_address_ban_status(address_list, &result, &settings.finalize()); + + result } }; - let result= ::backon::Retryable::retry(closure,backoff_strategy) + let result = ::backon::Retryable::retry(closure, backoff_strategy) .when(|e| { if e.can_retry() { - // requests sent for current execution attempt; + // requests sent for current execution attempt; let requests_sent = e.retries + 1; - // requests sent in all preceeding attempts; user expects `settings.retries +1` + // requests sent in all preceeding attempts; user expects `settings.retries +1` retries += requests_sent; let all_requests_sent = retries; - if all_requests_sent <=max_retries { // we account for for initial request + if all_requests_sent <= max_retries { // we account for initial request tracing::warn!(retry = all_requests_sent, max_retries, error=?e, "retrying request"); let new_settings = RequestSettings { retries: Some(max_retries - all_requests_sent), // limit num of retries for lower layer @@ -231,6 +243,7 @@ where #[cfg(test)] mod test { use super::*; + use derive_more::Display; use http::Uri; use rs_dapi_client::ExecutionError; use std::{ @@ -314,7 +327,7 @@ mod test { } } - #[derive(Debug)] + #[derive(Debug, Display)] enum MockError { Generic, } @@ -352,6 +365,8 @@ mod test { for _ in 0..1 { let counter = Arc::new(AtomicUsize::new(0)); + let address_list = AddressList::default(); + // we retry 5 times, and expect 5 retries + 1 initial request let mut global_settings = RequestSettings::default(); global_settings.retries = Some(expected_requests - 1); @@ -361,7 +376,7 @@ mod test { retry_test_function(s, counter) }; - retry(global_settings, closure) + retry(&address_list, global_settings, closure) .await .expect_err("should fail"); diff --git a/packages/rs-sdk/tests/fetch/evonode.rs b/packages/rs-sdk/tests/fetch/evonode.rs index 0d35d5be9f..6a6ce4a1f8 100644 --- a/packages/rs-sdk/tests/fetch/evonode.rs +++ b/packages/rs-sdk/tests/fetch/evonode.rs @@ -16,9 +16,7 @@ async fn test_evonode_status() { let cfg = Config::new(); let sdk = cfg.setup_api("test_evonode_status").await; - let addresses = cfg.address_list(); - - for address in addresses { + for (address, _status) in cfg.address_list() { let node = EvoNode::new(address.clone()); match timeout( Duration::from_secs(3), @@ -33,8 +31,9 @@ async fn test_evonode_status() { status.chain.latest_block_height > 0, "latest block height must be positive" ); - assert!( - status.node.pro_tx_hash.unwrap_or_default().len() == ProTxHash::LEN, + assert_eq!( + status.node.pro_tx_hash.unwrap_or_default().len(), + ProTxHash::LEN, "latest block hash must be non-empty" ); // Add more specific assertions based on expected status properties diff --git a/packages/rs-sdk/tests/vectors/test_evonode_status_refused/msg_EvoNode_6db392ff1869b56ecc7de9ace5864123671ed14d3f0c537aa8e878d24e529de5.json b/packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_dae36baf8dec4f117f97a27099eb28ff908ec0406a4ac48fff5727a9b9a4ee57.json similarity index 58% rename from packages/rs-sdk/tests/vectors/test_evonode_status_refused/msg_EvoNode_6db392ff1869b56ecc7de9ace5864123671ed14d3f0c537aa8e878d24e529de5.json rename to packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_dae36baf8dec4f117f97a27099eb28ff908ec0406a4ac48fff5727a9b9a4ee57.json index c80da24adb..c2bdd96612 100644 Binary files a/packages/rs-sdk/tests/vectors/test_evonode_status_refused/msg_EvoNode_6db392ff1869b56ecc7de9ace5864123671ed14d3f0c537aa8e878d24e529de5.json and b/packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_dae36baf8dec4f117f97a27099eb28ff908ec0406a4ac48fff5727a9b9a4ee57.json differ diff --git a/packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_fbdf15806b1160a9fb482d5663371cdde55f94897dcf9d905573b01fe445fbc9.json b/packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_fbdf15806b1160a9fb482d5663371cdde55f94897dcf9d905573b01fe445fbc9.json deleted file mode 100644 index e51843cf30..0000000000 Binary files a/packages/rs-sdk/tests/vectors/test_evonode_status/msg_EvoNode_fbdf15806b1160a9fb482d5663371cdde55f94897dcf9d905573b01fe445fbc9.json and /dev/null differ diff --git a/packages/rs-sdk/tests/vectors/test_evonode_status_refused/msg_EvoNode_7a8ca78c81edf0322718e172f59dab90acb35dbe92b5072c67ae42b121a30dae.json b/packages/rs-sdk/tests/vectors/test_evonode_status_refused/msg_EvoNode_7a8ca78c81edf0322718e172f59dab90acb35dbe92b5072c67ae42b121a30dae.json new file mode 100644 index 0000000000..a72158ecd4 Binary files /dev/null and b/packages/rs-sdk/tests/vectors/test_evonode_status_refused/msg_EvoNode_7a8ca78c81edf0322718e172f59dab90acb35dbe92b5072c67ae42b121a30dae.json differ diff --git a/scripts/configure_test_suite_network.sh b/scripts/configure_test_suite_network.sh index 54e6f99349..498e9d2d03 100755 --- a/scripts/configure_test_suite_network.sh +++ b/scripts/configure_test_suite_network.sh @@ -66,7 +66,7 @@ else CERT_FLAG="" ST_EXECUTION_INTERVAL=15000 fi -SKIP_SYNC_BEFORE_HEIGHT=$(curl -s $INSIGHT_URL | jq '.height - 200') +SKIP_SYNC_BEFORE_HEIGHT=4800 # $(curl -s $INSIGHT_URL | jq '.height - 200') # check variables are not empty if [ -z "$FAUCET_ADDRESS" ] || \