From 366c3f9a42392388796913e0d72ab42da8fbce89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sosth=C3=A8ne=20Gu=C3=A9don?= Date: Wed, 11 Sep 2024 12:03:37 +0200 Subject: [PATCH] Run regular retries for new failed instances --- pkcs11/src/backend/login.rs | 54 ++++++++++-- pkcs11/src/config/device.rs | 162 ++++++++++++++++++++++++++++++++++-- 2 files changed, 201 insertions(+), 15 deletions(-) diff --git a/pkcs11/src/backend/login.rs b/pkcs11/src/backend/login.rs index 751c762..46f12d3 100644 --- a/pkcs11/src/backend/login.rs +++ b/pkcs11/src/backend/login.rs @@ -17,7 +17,7 @@ use std::{ use crate::config::{ config_file::{RetryConfig, UserConfig}, - device::{InstanceData, Slot}, + device::{InstanceAttempt, InstanceData, Slot}, }; use super::{ApiError, Error}; @@ -160,6 +160,28 @@ impl LoginCtx { } fn next_instance(&self) -> &InstanceData { + let index = self.slot.instance_balancer.fetch_add(1, Relaxed); + let index = index % self.slot.instances.len(); + let instance = &self.slot.instances[index]; + match instance.should_try() { + InstanceAttempt::Failed => {} + InstanceAttempt::Working | InstanceAttempt::Retry => return instance, + } + for i in 0..self.slot.instances.len() - 1 { + let instance = &self.slot.instances[index + i]; + + match instance.should_try() { + InstanceAttempt::Failed => continue, + InstanceAttempt::Working | InstanceAttempt::Retry => { + // This not true round-robin in case of multithreaded acces + // This is degraded mode so best-effort is attempted at best + self.slot.instance_balancer.fetch_add(i, Relaxed); + return instance; + } + } + } + + // No instance is valid, return a failed instance for an attempt let index = self.slot.instance_balancer.fetch_add(1, Relaxed); let index = index % self.slot.instances.len(); &self.slot.instances[index] @@ -237,14 +259,29 @@ impl LoginCtx { retry_count += 1; let api_call_clone = api_call.clone(); match api_call_clone(&instance.config) { - Ok(result) => return Ok(result), + Ok(result) => { + instance.clear_failed(); + return Ok(result); + } // If the server is in an unusable state, skip retries and try the next one - Err(apis::Error::ResponseError(ResponseContent { status: 500, .. })) - | Err(apis::Error::ResponseError(ResponseContent { status: 501, .. })) - | Err(apis::Error::ResponseError(ResponseContent { status: 502, .. })) - | Err(apis::Error::ResponseError(ResponseContent { status: 503, .. })) - | Err(apis::Error::ResponseError(ResponseContent { status: 412, .. })) => break, + Err(apis::Error::ResponseError(err @ ResponseContent { status: 500, .. })) + | Err(apis::Error::ResponseError(err @ ResponseContent { status: 501, .. })) + | Err(apis::Error::ResponseError(err @ ResponseContent { status: 502, .. })) + | Err(apis::Error::ResponseError(err @ ResponseContent { status: 503, .. })) + | Err(apis::Error::ResponseError(err @ ResponseContent { status: 412, .. })) => { + instance.bump_failed(); + if retry_count == retry_limit { + error!("Retry count exceeded after {retry_limit} attempts, instance is unreachable: {:?}",err.status); + return Err(ApiError::InstanceRemoved.into()); + } + + warn!("Connection attempt {retry_count} failed: IO error connecting to the instance, {:?}, retrying in {delay_seconds}s", err.status); + thread::sleep(delay); + if let Some(new_conf) = self.get_config_user_mode(&user_mode) { + instance = new_conf; + } + } // If the connection to the server failed with a network error, reconnecting might solve the issue Err(apis::Error::Ureq(ureq::Error::Transport(err))) @@ -253,6 +290,7 @@ impl LoginCtx { ureq::ErrorKind::Io | ureq::ErrorKind::ConnectionFailed ) => { + instance.bump_failed(); if retry_count == retry_limit { error!("Retry count exceeded after {retry_limit} attempts, instance is unreachable: {err}"); return Err(ApiError::InstanceRemoved.into()); @@ -268,8 +306,6 @@ impl LoginCtx { Err(err) => return Err(err.into()), } } - - Err(ApiError::NoInstance.into()) } pub fn ck_state(&self) -> CK_STATE { diff --git a/pkcs11/src/config/device.rs b/pkcs11/src/config/device.rs index 368dec8..34ccd99 100644 --- a/pkcs11/src/config/device.rs +++ b/pkcs11/src/config/device.rs @@ -1,14 +1,76 @@ use std::{ - sync::{atomic::AtomicUsize, Arc, Condvar, Mutex, RwLock}, - time::Instant, + collections::BTreeMap, + sync::{ + atomic::{AtomicUsize, Ordering::Relaxed}, + mpsc::{self, RecvTimeoutError}, + Arc, Condvar, LazyLock, Mutex, RwLock, + }, + thread, + time::{Duration, Instant}, }; -use nethsm_sdk_rs::apis::configuration::Configuration; +use nethsm_sdk_rs::apis::{configuration::Configuration, default_api::health_ready_get}; -use crate::backend::db::Db; +use crate::{backend::db::Db, data::THREADS_ALLOWED}; use super::config_file::{RetryConfig, UserConfig}; +static RETRY_THREAD: LazyLock> = LazyLock::new(|| { + let (tx, rx) = mpsc::channel(); + let (tx_instance, rx_instance) = mpsc::channel(); + thread::spawn(background_thread(rx_instance)); + thread::spawn(background_timer(rx, tx_instance)); + tx +}); + +fn background_timer( + rx: mpsc::Receiver<(Duration, InstanceData)>, + tx_instance: mpsc::Sender, +) -> impl FnOnce() { + let mut jobs: BTreeMap = BTreeMap::new(); + move || loop { + let next_job = jobs.pop_first(); + let Some((next_job_deadline, next_job_instance)) = next_job else { + // No jobs in the queue, we can just run the next + let Ok((new_job_duration, new_state)) = rx.recv() else { + return; + }; + + jobs.insert(Instant::now() + new_job_duration, new_state); + continue; + }; + + let now = Instant::now(); + + if now >= next_job_deadline { + tx_instance.send(next_job_instance).unwrap(); + continue; + } + jobs.insert(next_job_deadline, next_job_instance); + + let timeout = next_job_deadline.duration_since(now); + match rx.recv_timeout(timeout) { + Ok((run_in, new_instance)) => { + jobs.insert(now + run_in, new_instance); + continue; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } +} + +fn background_thread(rx: mpsc::Receiver) -> impl FnOnce() { + move || loop { + while let Ok(instance) = rx.recv() { + match health_ready_get(&instance.config) { + Ok(_) => instance.clear_failed(), + Err(_) => instance.bump_failed(), + } + } + } +} + // stores the global configuration of the module #[derive(Debug, Clone)] pub struct Device { @@ -16,22 +78,110 @@ pub struct Device { pub enable_set_attribute_value: bool, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub enum InstanceState { #[default] Working, Failed { - retry_count: usize, + retry_count: u8, last_retry_at: Instant, }, } +impl InstanceState { + pub fn new_failed() -> InstanceState { + InstanceState::Failed { + retry_count: 0, + last_retry_at: Instant::now(), + } + } +} + #[derive(Debug, Clone)] pub struct InstanceData { pub config: Configuration, pub state: Arc>, } +pub enum InstanceAttempt { + /// The instance is in the failed state and should not be used + Failed, + /// The instance is in the failed state but a connection should be attempted + Retry, + /// The instance is in the working state + Working, +} + +impl InstanceData { + pub fn should_try(&self) -> InstanceAttempt { + let this = self.state.read().unwrap(); + match *this { + InstanceState::Working => InstanceAttempt::Working, + InstanceState::Failed { + retry_count, + last_retry_at, + } => { + if last_retry_at.elapsed() < retry_duration_from_count(retry_count) { + InstanceAttempt::Failed + } else { + InstanceAttempt::Retry + } + } + } + } + + pub fn clear_failed(&self) { + *self.state.write().unwrap() = InstanceState::Working; + } + + pub fn bump_failed(&self) { + let mut write = self.state.write().unwrap(); + let retry_count = match *write { + InstanceState::Working => { + *write = InstanceState::new_failed(); + 0 + } + InstanceState::Failed { + retry_count: prev_retry_count, + last_retry_at, + } => { + // We only bump if it's a "real" retry. This is to avoid race conditions where + // the same instance stops working when multiple threads are simultaneously connecting + // to it + if last_retry_at.elapsed() >= retry_duration_from_count(prev_retry_count) { + let retry_count = prev_retry_count.saturating_add(1); + *write = InstanceState::Failed { + retry_count, + last_retry_at: Instant::now(), + }; + retry_count + } else { + prev_retry_count + } + } + }; + drop(write); + if THREADS_ALLOWED.load(Relaxed) { + RETRY_THREAD + .send((retry_duration_from_count(retry_count), self.clone())) + .ok(); + } + } +} + +fn retry_duration_from_count(retry_count: u8) -> Duration { + let secs = match retry_count { + 0 | 1 => 1, + 2 => 2, + 3 => 5, + 4 => 10, + 5 => 60, + 6.. => 60 * 5, + }; + + Duration::from_secs(secs) +} + #[derive(Debug, Clone)] pub struct Slot { pub label: String,