Skip to content

Commit

Permalink
Run regular retries for new failed instances
Browse files Browse the repository at this point in the history
  • Loading branch information
sosthene-nitrokey committed Sep 11, 2024
1 parent 8491d10 commit f096839
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 14 deletions.
39 changes: 31 additions & 8 deletions pkcs11/src/backend/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{

use crate::config::{
config_file::{RetryConfig, UserConfig},
device::{InstanceData, Slot},
device::{InstanceAttempt, InstanceData, Slot},
};

use super::{ApiError, Error};
Expand Down Expand Up @@ -160,6 +160,18 @@ impl LoginCtx {
}

fn next_instance(&self) -> &InstanceData {
for _ in 0..self.slot.instances.len() {
let index = self.slot.instance_balancer.fetch_add(1, Relaxed);
let index = index % self.slot.instances.len();
let instance = &self.slot.instances[index];

match instance.should_try() {
InstanceAttempt::Failed => continue,
InstanceAttempt::Working | InstanceAttempt::Retry => return instance,
}
}

// No instance is valid, return a failed instance for an attempt
let index = self.slot.instance_balancer.fetch_add(1, Relaxed);
let index = index % self.slot.instances.len();
&self.slot.instances[index]
Expand Down Expand Up @@ -240,11 +252,23 @@ impl LoginCtx {
Ok(result) => return Ok(result),

// If the server is in an unusable state, skip retries and try the next one
Err(apis::Error::ResponseError(ResponseContent { status: 500, .. }))
| Err(apis::Error::ResponseError(ResponseContent { status: 501, .. }))
| Err(apis::Error::ResponseError(ResponseContent { status: 502, .. }))
| Err(apis::Error::ResponseError(ResponseContent { status: 503, .. }))
| Err(apis::Error::ResponseError(ResponseContent { status: 412, .. })) => break,
Err(apis::Error::ResponseError(err @ ResponseContent { status: 500, .. }))
| Err(apis::Error::ResponseError(err @ ResponseContent { status: 501, .. }))
| Err(apis::Error::ResponseError(err @ ResponseContent { status: 502, .. }))
| Err(apis::Error::ResponseError(err @ ResponseContent { status: 503, .. }))
| Err(apis::Error::ResponseError(err @ ResponseContent { status: 412, .. })) => {
instance.bump_failed();
if retry_count == retry_limit {
error!("Retry count exceeded after {retry_limit} attempts, instance is unreachable: {:?}",err.status);
return Err(ApiError::InstanceRemoved.into());
}

warn!("Connection attempt {retry_count} failed: IO error connecting to the instance, {:?}, retrying in {delay_seconds}s", err.status);
thread::sleep(delay);
if let Some(new_conf) = self.get_config_user_mode(&user_mode) {
instance = new_conf;
}
}

// If the connection to the server failed with a network error, reconnecting might solve the issue
Err(apis::Error::Ureq(ureq::Error::Transport(err)))
Expand All @@ -253,6 +277,7 @@ impl LoginCtx {
ureq::ErrorKind::Io | ureq::ErrorKind::ConnectionFailed
) =>
{
instance.bump_failed();
if retry_count == retry_limit {
error!("Retry count exceeded after {retry_limit} attempts, instance is unreachable: {err}");
return Err(ApiError::InstanceRemoved.into());
Expand All @@ -268,8 +293,6 @@ impl LoginCtx {
Err(err) => return Err(err.into()),
}
}

Err(ApiError::NoInstance.into())
}

pub fn ck_state(&self) -> CK_STATE {
Expand Down
162 changes: 156 additions & 6 deletions pkcs11/src/config/device.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,187 @@
use std::{
sync::{atomic::AtomicUsize, Arc, Condvar, Mutex, RwLock},
time::Instant,
collections::BTreeMap,
sync::{
atomic::{AtomicUsize, Ordering::Relaxed},
mpsc::{self, RecvTimeoutError},
Arc, Condvar, LazyLock, Mutex, RwLock,
},
thread,
time::{Duration, Instant},
};

use nethsm_sdk_rs::apis::configuration::Configuration;
use nethsm_sdk_rs::apis::{configuration::Configuration, default_api::health_ready_get};

use crate::backend::db::Db;
use crate::{backend::db::Db, data::THREADS_ALLOWED};

use super::config_file::{RetryConfig, UserConfig};

static RETRY_THREAD: LazyLock<mpsc::Sender<(Duration, InstanceData)>> = LazyLock::new(|| {
let (tx, rx) = mpsc::channel();
let (tx_instance, rx_instance) = mpsc::channel();
thread::spawn(background_thread(rx_instance));
thread::spawn(background_timer(rx, tx_instance));
tx
});

fn background_timer(
rx: mpsc::Receiver<(Duration, InstanceData)>,
tx_instance: mpsc::Sender<InstanceData>,
) -> impl FnOnce() {
let mut jobs: BTreeMap<Instant, InstanceData> = BTreeMap::new();
move || loop {
let next_job = jobs.pop_first();
let Some((next_job_deadline, next_job_instance)) = next_job else {
// No jobs in the queue, we can just run the next
let Ok((new_job_duration, new_state)) = rx.recv() else {
return;
};

jobs.insert(Instant::now() + new_job_duration, new_state);
continue;
};

let now = Instant::now();

if now >= next_job_deadline {
tx_instance.send(next_job_instance).unwrap();
continue;
}
jobs.insert(next_job_deadline, next_job_instance);

let timeout = next_job_deadline.duration_since(now);
match rx.recv_timeout(timeout) {
Ok((run_in, new_instance)) => {
jobs.insert(now + run_in, new_instance);
continue;
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => break,
}
}
}

fn background_thread(rx: mpsc::Receiver<InstanceData>) -> impl FnOnce() {
move || loop {
while let Ok(instance) = rx.recv() {
match health_ready_get(&instance.config) {
Ok(_) => instance.clear_failed(),
Err(_) => instance.bump_failed(),
}
}
}
}

// stores the global configuration of the module
#[derive(Debug, Clone)]
pub struct Device {
pub slots: Vec<Arc<Slot>>,
pub enable_set_attribute_value: bool,
}

#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub enum InstanceState {
#[default]
Working,
Failed {
retry_count: usize,
retry_count: u8,
last_retry_at: Instant,
},
}

impl InstanceState {
pub fn new_failed() -> InstanceState {
InstanceState::Failed {
retry_count: 0,
last_retry_at: Instant::now(),
}
}
}

#[derive(Debug, Clone)]
pub struct InstanceData {
pub config: Configuration,
pub state: Arc<RwLock<InstanceState>>,
}

pub enum InstanceAttempt {
/// The instance is in the failed state and should not be used
Failed,
/// The instance is in the failed state but a connection should be attempted
Retry,
/// The instance is in the working state
Working,
}

impl InstanceData {
pub fn should_try(&self) -> InstanceAttempt {
let this = self.state.read().unwrap();
match *this {
InstanceState::Working => InstanceAttempt::Working,
InstanceState::Failed {
retry_count,
last_retry_at,
} => {
if last_retry_at.elapsed() < retry_duration_from_count(retry_count) {
InstanceAttempt::Failed
} else {
InstanceAttempt::Retry
}
}
}
}

pub fn clear_failed(&self) {
*self.state.write().unwrap() = InstanceState::Working;
}

pub fn bump_failed(&self) {
let mut write = self.state.write().unwrap();
let retry_count = match *write {
InstanceState::Working => {
*write = InstanceState::new_failed();
0
}
InstanceState::Failed {
retry_count: prev_retry_count,
last_retry_at,
} => {
// We only bump if it's a "real" retry. This is to avoid race conditions where
// the same instance stops working when multiple threads are simultaneously connecting
// to it
if last_retry_at.elapsed() >= retry_duration_from_count(prev_retry_count) {
let retry_count = prev_retry_count.saturating_add(1);
*write = InstanceState::Failed {
retry_count,
last_retry_at: Instant::now(),
};
retry_count
} else {
prev_retry_count
}
}
};
drop(write);
if THREADS_ALLOWED.load(Relaxed) {
RETRY_THREAD
.send((retry_duration_from_count(retry_count), self.clone()))
.ok();
}
}
}

fn retry_duration_from_count(retry_count: u8) -> Duration {
let secs = match retry_count {
0 | 1 => 1,
2 => 2,
3 => 5,
4 => 10,
5 => 60,
6.. => 60 * 5,
};

Duration::from_secs(secs)
}

#[derive(Debug, Clone)]
pub struct Slot {
pub label: String,
Expand Down

0 comments on commit f096839

Please sign in to comment.