Skip to content

Commit

Permalink
Avoid infinite loop checking health checks in single threaded case
Browse files Browse the repository at this point in the history
  • Loading branch information
sosthene-nitrokey committed Sep 16, 2024
1 parent 5c2a686 commit a7134db
Showing 1 changed file with 64 additions and 24 deletions.
88 changes: 64 additions & 24 deletions pkcs11/src/backend/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ impl ShouldHealthCheck {
}
}

#[derive(Debug, Clone, Copy)]
enum HealthCheck {
Possible,
Avoid,
}

#[derive(Debug)]
pub struct LoginCtx {
slot: Arc<Slot>,
Expand Down Expand Up @@ -208,30 +214,37 @@ impl LoginCtx {
}
}

fn next_instance(&self) -> (&InstanceData, ShouldHealthCheck) {
fn next_instance(
&self,
accept_health_check: HealthCheck,
) -> (&InstanceData, ShouldHealthCheck) {
let threads_allowed = THREADS_ALLOWED.load(Relaxed);
let index = self.slot.instance_balancer.fetch_add(1, Relaxed);
let index = index % self.slot.instances.len();
let instance = &self.slot.instances[index];
match (instance.should_try(), threads_allowed) {
(InstanceAttempt::Failed, _) | (InstanceAttempt::Retry, true) => {}
(InstanceAttempt::Working, _) => return (instance, ShouldHealthCheck::RunDirectly),
(InstanceAttempt::Retry, false) => {
match (instance.should_try(), threads_allowed, accept_health_check) {
(InstanceAttempt::Failed, _, _)
| (InstanceAttempt::Retry, true, _)
| (InstanceAttempt::Retry, false, HealthCheck::Avoid) => {}
(InstanceAttempt::Working, _, _) => return (instance, ShouldHealthCheck::RunDirectly),
(InstanceAttempt::Retry, false, HealthCheck::Possible) => {
return (instance, ShouldHealthCheck::HealthCheckFirst)
}
}
for i in 0..self.slot.instances.len() - 1 {
let instance = &self.slot.instances[index + i];

match (instance.should_try(), threads_allowed) {
(InstanceAttempt::Failed, _) | (InstanceAttempt::Retry, true) => continue,
(InstanceAttempt::Working, _) => {
match (instance.should_try(), threads_allowed, accept_health_check) {
(InstanceAttempt::Failed, _, _)
| (InstanceAttempt::Retry, true, _)
| (InstanceAttempt::Retry, false, HealthCheck::Avoid) => continue,
(InstanceAttempt::Working, _, _) => {
// This not true round-robin in case of multithreaded acces
// This is degraded mode so best-effort is attempted at best
self.slot.instance_balancer.fetch_add(i, Relaxed);
return (instance, ShouldHealthCheck::RunDirectly);
}
(InstanceAttempt::Retry, false) => {
(InstanceAttempt::Retry, false, HealthCheck::Possible) => {
// This not true round-robin in case of multithreaded acces
// This is degraded mode so best-effort is attempted at best
self.slot.instance_balancer.fetch_add(i, Relaxed);
Expand All @@ -247,22 +260,32 @@ impl LoginCtx {
(&self.slot.instances[index], ShouldHealthCheck::RunDirectly)
}

fn operator(&self) -> Option<(InstanceData, ShouldHealthCheck)> {
let (instance, should_health_check) = self.next_instance();
fn operator(
&self,
accept_health_check: HealthCheck,
) -> Option<(InstanceData, ShouldHealthCheck)> {
let (instance, should_health_check) = self.next_instance(accept_health_check);
get_user_api_config(self.operator_config(), instance).map(|c| (c, should_health_check))
}

fn administrator(&self) -> Option<(InstanceData, ShouldHealthCheck)> {
let (instance, should_health_check) = self.next_instance();
fn administrator(
&self,
accept_health_check: HealthCheck,
) -> Option<(InstanceData, ShouldHealthCheck)> {
let (instance, should_health_check) = self.next_instance(accept_health_check);
get_user_api_config(self.admin_config(), instance).map(|c| (c, should_health_check))
}

fn operator_or_administrator(&self) -> Option<(InstanceData, ShouldHealthCheck)> {
self.operator().or_else(|| self.administrator())
fn operator_or_administrator(
&self,
accept_health_check: HealthCheck,
) -> Option<(InstanceData, ShouldHealthCheck)> {
self.operator(accept_health_check)
.or_else(|| self.administrator(accept_health_check))
}

fn guest(&self) -> (&InstanceData, ShouldHealthCheck) {
self.next_instance()
fn guest(&self, accept_health_check: HealthCheck) -> (&InstanceData, ShouldHealthCheck) {
self.next_instance(accept_health_check)
}

pub fn can_run_mode(&self, mode: UserMode) -> bool {
Expand Down Expand Up @@ -290,15 +313,18 @@ impl LoginCtx {
fn get_config_user_mode(
&self,
user_mode: &UserMode,
accept_health_check: HealthCheck,
) -> Option<(InstanceData, ShouldHealthCheck)> {
match user_mode {
UserMode::Operator => self.operator(),
UserMode::Administrator => self.administrator(),
UserMode::Operator => self.operator(accept_health_check),
UserMode::Administrator => self.administrator(accept_health_check),
UserMode::Guest => {
let (instance, should_health_check) = self.guest();
let (instance, should_health_check) = self.guest(accept_health_check);
Some((instance.clone(), should_health_check))
}
UserMode::OperatorOrAdministrator => self.operator_or_administrator(),
UserMode::OperatorOrAdministrator => {
self.operator_or_administrator(accept_health_check)
}
}
}

Expand All @@ -307,8 +333,10 @@ impl LoginCtx {
where
F: FnOnce(&Configuration) -> Result<R, apis::Error<T>> + Clone,
{
let mut health_check_count = 0;
// we loop for a maximum of instances.len() times
let Some((mut instance, mut should_health_check)) = self.get_config_user_mode(&user_mode)
let Some((mut instance, mut should_health_check)) =
self.get_config_user_mode(&user_mode, HealthCheck::Possible)
else {
return Err(Error::Login(LoginError::UserNotPresent));
};
Expand All @@ -325,6 +353,11 @@ impl LoginCtx {
let delay = Duration::from_secs(delay_seconds);

loop {
let accept_health_check = if health_check_count < 3 {
HealthCheck::Possible
} else {
HealthCheck::Avoid
};
if retry_count > retry_limit {
error!(
"Retry count exceeded after {retry_limit} attempts, instance is unreachable"
Expand All @@ -333,7 +366,14 @@ impl LoginCtx {
}

if should_health_check.should_check() && !health_check_get_timeout(&instance) {
health_check_count += 1;
// Instance is not valid, we try the next one
if let Some((new_instance, new_should_health_check)) =
self.get_config_user_mode(&user_mode, accept_health_check)
{
instance = new_instance;
should_health_check = new_should_health_check;
}
continue;
}

Expand All @@ -356,7 +396,7 @@ impl LoginCtx {
warn!("Connection attempt {retry_count} failed: Status error connecting to the instance, {:?}, retrying in {delay_seconds}s", err.status);
thread::sleep(delay);
if let Some((new_instance, new_should_health_check)) =
self.get_config_user_mode(&user_mode)
self.get_config_user_mode(&user_mode, accept_health_check)
{
instance = new_instance;
should_health_check = new_should_health_check;
Expand All @@ -375,7 +415,7 @@ impl LoginCtx {
warn!("Connection attempt {retry_count} failed: IO error connecting to the instance, {err}, retrying in {delay_seconds}s");
thread::sleep(delay);
if let Some((new_instance, new_should_health_check)) =
self.get_config_user_mode(&user_mode)
self.get_config_user_mode(&user_mode, accept_health_check)
{
instance = new_instance;
should_health_check = new_should_health_check;
Expand Down

0 comments on commit a7134db

Please sign in to comment.