Skip to content

Commit

Permalink
Do not retry indefinitely if service is gone
Browse files Browse the repository at this point in the history
If there's a call to an external service, saga execution cannot move
forward until the result of that call is known, in the sense that Nexus
received a result. If there are transient problems, Nexus must retry
until a known result is returned.

This is problematic when the destination service is gone - Nexus will
retry indefinitely, halting the saga execution. Worse, in the case of
sagas calling the volume delete subsaga, subsequent calls will also
halt.

With the introduction of a physical disk policy, Nexus can know when to
stop retrying a call - the destination service is gone, so the known
result is an error.

This commit adds a `ProgenitorOperationRetry` object that takes an
operation to retry plus a "gone" check, and checks each retry iteration
if the destination is gone. If it is, then bail out, otherwise assume
that any errors seen are transient.

Further work is required to deprecate the `retry_until_known_result`
function, as retrying indefinitely is a bad pattern.

Fixes oxidecomputer#4331
Fixes oxidecomputer#5022
  • Loading branch information
jmpesp committed May 18, 2024
1 parent 116590c commit fad5da9
Show file tree
Hide file tree
Showing 14 changed files with 1,507 additions and 861 deletions.
17 changes: 15 additions & 2 deletions common/src/api/external/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ pub enum Error {
/// ObjectNotFound instead.
#[error("Not found: {}", .message.display_internal())]
NotFound { message: MessagePair },

/// Access to the target resource is no longer available, and this condition
/// is likely to be permanent.
#[error("Gone")]
Gone,
}

/// Represents an error message which has an external component, along with
Expand Down Expand Up @@ -214,7 +219,8 @@ impl Error {
| Error::InternalError { .. }
| Error::TypeVersionMismatch { .. }
| Error::NotFound { .. }
| Error::Conflict { .. } => false,
| Error::Conflict { .. }
| Error::Gone => false,
}
}

Expand Down Expand Up @@ -335,7 +341,8 @@ impl Error {
match self {
Error::ObjectNotFound { .. }
| Error::ObjectAlreadyExists { .. }
| Error::Forbidden => self,
| Error::Forbidden
| Error::Gone => self,
Error::InvalidRequest { message } => Error::InvalidRequest {
message: message.with_internal_context(context),
},
Expand Down Expand Up @@ -513,6 +520,12 @@ impl From<Error> for HttpError {
internal_message,
}
}

Error::Gone => HttpError::for_client_error(
Some(String::from("Gone")),
http::StatusCode::GONE,
String::from("Gone"),
),
}
}
}
Expand Down
86 changes: 22 additions & 64 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod backoff;
pub mod cmd;
pub mod disk;
pub mod ledger;
pub mod progenitor_operation_retry;
pub mod update;
pub mod vlan;
pub mod zpool_name;
Expand Down Expand Up @@ -79,83 +80,40 @@ impl slog::KV for FileKv {

pub const OMICRON_DPD_TAG: &str = "omicron";

use futures::Future;
use slog::warn;
use crate::api::external::Error;
use crate::progenitor_operation_retry::ProgenitorOperationRetry;
use crate::progenitor_operation_retry::ProgenitorOperationRetryError;
use std::future::Future;

/// Retry a progenitor client operation until a known result is returned.
///
/// Saga execution relies on the outcome of an external call being known: since
/// they are idempotent, reissue the external call until a known result comes
/// back. Retry if a communication error is seen, or if another retryable error
/// is seen.
///
/// Note that retrying is only valid if the call itself is idempotent.
/// See [`ProgenitorOperationRetry`] for more information.
// TODO mark this deprecated, `never_bail` is a bad idea
pub async fn retry_until_known_result<F, T, E, Fut>(
log: &slog::Logger,
mut f: F,
f: F,
) -> Result<T, progenitor_client::Error<E>>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
E: std::fmt::Debug,
{
backoff::retry_notify(
backoff::retry_policy_internal_service(),
move || {
let fut = f();
async move {
match fut.await {
Err(progenitor_client::Error::CommunicationError(e)) => {
warn!(
log,
"saw transient communication error {}, retrying...",
e,
);

Err(backoff::BackoffError::transient(
progenitor_client::Error::CommunicationError(e),
))
}

Err(progenitor_client::Error::ErrorResponse(
response_value,
)) => {
match response_value.status() {
// Retry on 503 or 429
http::StatusCode::SERVICE_UNAVAILABLE
| http::StatusCode::TOO_MANY_REQUESTS => {
Err(backoff::BackoffError::transient(
progenitor_client::Error::ErrorResponse(
response_value,
),
))
}

// Anything else is a permanent error
_ => Err(backoff::BackoffError::Permanent(
progenitor_client::Error::ErrorResponse(
response_value,
),
)),
}
}

Err(e) => {
warn!(log, "saw permanent error {}, aborting", e,);
match ProgenitorOperationRetry::new(f, never_bail).run(log).await {
Ok(v) => Ok(v),

Err(backoff::BackoffError::Permanent(e))
}
Err(e) => match e {
ProgenitorOperationRetryError::ProgenitorError(e) => Err(e),

Ok(v) => Ok(v),
}
ProgenitorOperationRetryError::Gone
| ProgenitorOperationRetryError::GoneCheckError(_) => {
// ProgenitorOperationRetry::new called with `never_bail` as the
// bail check should never return these variants!
unreachable!();
}
},
|error: progenitor_client::Error<_>, delay| {
warn!(
log,
"failed external call ({:?}), will retry in {:?}", error, delay,
);
},
)
.await
}
}

async fn never_bail() -> Result<bool, Error> {
Ok(false)
}
159 changes: 159 additions & 0 deletions common/src/progenitor_operation_retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use futures::Future;
use slog::warn;
use slog::Logger;

use crate::api::external::Error;
use crate::backoff::retry_notify;
use crate::backoff::retry_policy_internal_service;
use crate::backoff::BackoffError;

#[derive(Debug)]
pub enum ProgenitorOperationRetryError<E> {
/// Nexus determined that the operation will never return a known result
/// because the remote server is gone.
Gone,

/// Attempting to check if the retry loop should be stopped failed
GoneCheckError(Error),

/// The retry loop progenitor operation saw a permanent client error
ProgenitorError(progenitor_client::Error<E>),
}

/// Retry a progenitor client operation until a known result is returned, or
/// until something tells us that we should stop trying.
///
/// Saga execution relies on the outcome of an external call being known: since
/// they are idempotent, reissue the external call until a known result comes
/// back. Retry if a communication error is seen, or if another retryable error
/// is seen.
///
/// During the retry loop, call the supplied `gone_check` function to see if the
/// retry loop should be aborted: in the cases where Nexus can _know_ that a
/// request will never complete, the retry loop must be aborted. Otherwise,
/// Nexus will indefinitely retry until some known result is returned.
///
/// Note that retrying is only valid if the `operation` itself is idempotent.
pub struct ProgenitorOperationRetry<
T,
E: std::fmt::Debug,
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
BF: FnMut() -> BFut,
BFut: Future<Output = Result<bool, Error>>,
> {
operation: F,

/// If Nexus knows that the supplied operation will never successfully
/// complete, then `gone_check` should return true.
gone_check: BF,
}

impl<
T,
E: std::fmt::Debug,
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
BF: FnMut() -> BFut,
BFut: Future<Output = Result<bool, Error>>,
> ProgenitorOperationRetry<T, E, F, Fut, BF, BFut>
{
pub fn new(operation: F, gone_check: BF) -> Self {
Self { operation, gone_check }
}

pub async fn run(
mut self,
log: &Logger,
) -> Result<T, ProgenitorOperationRetryError<E>> {
retry_notify(
retry_policy_internal_service(),
move || {
let gone_check = (self.gone_check)();
let f = (self.operation)();

async move {
match gone_check.await {
Ok(dest_is_gone) => {
if dest_is_gone {
return Err(BackoffError::Permanent(
ProgenitorOperationRetryError::Gone
));
}
}

Err(e) => {
return Err(BackoffError::Permanent(
ProgenitorOperationRetryError::GoneCheckError(e)
));
}
}

match f.await {
Err(progenitor_client::Error::CommunicationError(e)) => {
warn!(
log,
"saw transient communication error {}, retrying...",
e,
);

Err(BackoffError::transient(
ProgenitorOperationRetryError::ProgenitorError(
progenitor_client::Error::CommunicationError(e)
)
))
}

Err(progenitor_client::Error::ErrorResponse(
response_value,
)) => {
match response_value.status() {
// Retry on 503 or 429
http::StatusCode::SERVICE_UNAVAILABLE
| http::StatusCode::TOO_MANY_REQUESTS => {
Err(BackoffError::transient(
ProgenitorOperationRetryError::ProgenitorError(
progenitor_client::Error::ErrorResponse(
response_value
)
)
))
}

// Anything else is a permanent error
_ => Err(BackoffError::Permanent(
ProgenitorOperationRetryError::ProgenitorError(
progenitor_client::Error::ErrorResponse(
response_value
)
)
))
}
}

Err(e) => {
warn!(log, "saw permanent error {}, aborting", e,);

Err(BackoffError::Permanent(
ProgenitorOperationRetryError::ProgenitorError(e)
))
}

Ok(v) => Ok(v),
}
}
},
|error: ProgenitorOperationRetryError<E>, delay| {
warn!(
log,
"failed external call ({:?}), will retry in {:?}", error, delay,
);
},
)
.await
}
}
52 changes: 52 additions & 0 deletions nexus/db-queries/src/db/datastore/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::db::error::public_error_from_diesel;
use crate::db::error::ErrorHandler;
use crate::db::identity::Asset;
use crate::db::model::Dataset;
use crate::db::model::PhysicalDisk;
use crate::db::model::PhysicalDiskPolicy;
use crate::db::model::Zpool;
use crate::db::pagination::paginated;
use crate::db::pagination::Paginator;
Expand Down Expand Up @@ -180,6 +182,56 @@ impl DataStore {

Ok(all_datasets)
}

pub async fn dataset_on_in_service_physical_disk(
&self,
// opctx: &OpContext,
dataset_id: Uuid,
) -> LookupResult<bool> {
//let conn = self.pool_connection_authorized(opctx).await?;
let conn = self.pool_connection_unauthorized().await?;

let dataset = {
use db::schema::dataset::dsl;

dsl::dataset
.filter(dsl::id.eq(dataset_id))
.select(Dataset::as_select())
.first_async::<Dataset>(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(e, ErrorHandler::Server)
})?
};

let zpool = {
use db::schema::zpool::dsl;

dsl::zpool
.filter(dsl::id.eq(dataset.pool_id))
.select(Zpool::as_select())
.first_async::<Zpool>(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(e, ErrorHandler::Server)
})?
};

let physical_disk = {
use db::schema::physical_disk::dsl;

dsl::physical_disk
.filter(dsl::id.eq(zpool.physical_disk_id))
.select(PhysicalDisk::as_select())
.first_async::<PhysicalDisk>(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(e, ErrorHandler::Server)
})?
};

Ok(physical_disk.disk_policy == PhysicalDiskPolicy::InService)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit fad5da9

Please sign in to comment.