From fad5da9c2ed76e58b2b79e33be05902fd1abb6f7 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Sat, 11 May 2024 02:13:19 +0000 Subject: [PATCH] Do not retry indefinitely if service is gone If there's a call to an external service, saga execution cannot move forward until the result of that call is known, in the sense that Nexus received a result. If there are transient problems, Nexus must retry until a known result is returned. This is problematic when the destination service is gone - Nexus will retry indefinitely, halting the saga execution. Worse, in the case of sagas calling the volume delete subsaga, subsequent calls will also halt. With the introduction of a physical disk policy, Nexus can know when to stop retrying a call - the destination service is gone, so the known result is an error. This commit adds a `ProgenitorOperationRetry` object that takes an operation to retry plus a "gone" check, and checks each retry iteration if the destination is gone. If it is, then bail out, otherwise assume that any errors seen are transient. Further work is required to deprecate the `retry_until_known_result` function, as retrying indefinitely is a bad pattern. Fixes #4331 Fixes #5022 --- common/src/api/external/error.rs | 17 +- common/src/lib.rs | 86 +- common/src/progenitor_operation_retry.rs | 159 +++ nexus/db-queries/src/db/datastore/dataset.rs | 52 + nexus/src/app/crucible.rs | 1086 ++++++++++++++++++ nexus/src/app/mod.rs | 1 + nexus/src/app/sagas/common_storage.rs | 725 +----------- nexus/src/app/sagas/disk_create.rs | 37 +- nexus/src/app/sagas/snapshot_create.rs | 71 +- nexus/src/app/sagas/volume_delete.rs | 36 +- nexus/src/app/session.rs | 5 +- nexus/tests/integration_tests/disks.rs | 77 ++ sled-agent/src/sim/sled_agent.rs | 4 + sled-agent/src/sim/storage.rs | 12 + 14 files changed, 1507 insertions(+), 861 deletions(-) create mode 100644 common/src/progenitor_operation_retry.rs create mode 100644 nexus/src/app/crucible.rs diff --git a/common/src/api/external/error.rs b/common/src/api/external/error.rs index 6b3b93187f..10731c61c3 100644 --- a/common/src/api/external/error.rs +++ b/common/src/api/external/error.rs @@ -85,6 +85,11 @@ pub enum Error { /// ObjectNotFound instead. #[error("Not found: {}", .message.display_internal())] NotFound { message: MessagePair }, + + /// Access to the target resource is no longer available, and this condition + /// is likely to be permanent. + #[error("Gone")] + Gone, } /// Represents an error message which has an external component, along with @@ -214,7 +219,8 @@ impl Error { | Error::InternalError { .. } | Error::TypeVersionMismatch { .. } | Error::NotFound { .. } - | Error::Conflict { .. } => false, + | Error::Conflict { .. } + | Error::Gone => false, } } @@ -335,7 +341,8 @@ impl Error { match self { Error::ObjectNotFound { .. } | Error::ObjectAlreadyExists { .. } - | Error::Forbidden => self, + | Error::Forbidden + | Error::Gone => self, Error::InvalidRequest { message } => Error::InvalidRequest { message: message.with_internal_context(context), }, @@ -513,6 +520,12 @@ impl From for HttpError { internal_message, } } + + Error::Gone => HttpError::for_client_error( + Some(String::from("Gone")), + http::StatusCode::GONE, + String::from("Gone"), + ), } } } diff --git a/common/src/lib.rs b/common/src/lib.rs index a92237adfa..e4f53cbfab 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -26,6 +26,7 @@ pub mod backoff; pub mod cmd; pub mod disk; pub mod ledger; +pub mod progenitor_operation_retry; pub mod update; pub mod vlan; pub mod zpool_name; @@ -79,83 +80,40 @@ impl slog::KV for FileKv { pub const OMICRON_DPD_TAG: &str = "omicron"; -use futures::Future; -use slog::warn; +use crate::api::external::Error; +use crate::progenitor_operation_retry::ProgenitorOperationRetry; +use crate::progenitor_operation_retry::ProgenitorOperationRetryError; +use std::future::Future; /// Retry a progenitor client operation until a known result is returned. /// -/// Saga execution relies on the outcome of an external call being known: since -/// they are idempotent, reissue the external call until a known result comes -/// back. Retry if a communication error is seen, or if another retryable error -/// is seen. -/// -/// Note that retrying is only valid if the call itself is idempotent. +/// See [`ProgenitorOperationRetry`] for more information. +// TODO mark this deprecated, `never_bail` is a bad idea pub async fn retry_until_known_result( log: &slog::Logger, - mut f: F, + f: F, ) -> Result> where F: FnMut() -> Fut, Fut: Future>>, E: std::fmt::Debug, { - backoff::retry_notify( - backoff::retry_policy_internal_service(), - move || { - let fut = f(); - async move { - match fut.await { - Err(progenitor_client::Error::CommunicationError(e)) => { - warn!( - log, - "saw transient communication error {}, retrying...", - e, - ); - - Err(backoff::BackoffError::transient( - progenitor_client::Error::CommunicationError(e), - )) - } - - Err(progenitor_client::Error::ErrorResponse( - response_value, - )) => { - match response_value.status() { - // Retry on 503 or 429 - http::StatusCode::SERVICE_UNAVAILABLE - | http::StatusCode::TOO_MANY_REQUESTS => { - Err(backoff::BackoffError::transient( - progenitor_client::Error::ErrorResponse( - response_value, - ), - )) - } - - // Anything else is a permanent error - _ => Err(backoff::BackoffError::Permanent( - progenitor_client::Error::ErrorResponse( - response_value, - ), - )), - } - } - - Err(e) => { - warn!(log, "saw permanent error {}, aborting", e,); + match ProgenitorOperationRetry::new(f, never_bail).run(log).await { + Ok(v) => Ok(v), - Err(backoff::BackoffError::Permanent(e)) - } + Err(e) => match e { + ProgenitorOperationRetryError::ProgenitorError(e) => Err(e), - Ok(v) => Ok(v), - } + ProgenitorOperationRetryError::Gone + | ProgenitorOperationRetryError::GoneCheckError(_) => { + // ProgenitorOperationRetry::new called with `never_bail` as the + // bail check should never return these variants! + unreachable!(); } }, - |error: progenitor_client::Error<_>, delay| { - warn!( - log, - "failed external call ({:?}), will retry in {:?}", error, delay, - ); - }, - ) - .await + } +} + +async fn never_bail() -> Result { + Ok(false) } diff --git a/common/src/progenitor_operation_retry.rs b/common/src/progenitor_operation_retry.rs new file mode 100644 index 0000000000..8d9537a30c --- /dev/null +++ b/common/src/progenitor_operation_retry.rs @@ -0,0 +1,159 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use futures::Future; +use slog::warn; +use slog::Logger; + +use crate::api::external::Error; +use crate::backoff::retry_notify; +use crate::backoff::retry_policy_internal_service; +use crate::backoff::BackoffError; + +#[derive(Debug)] +pub enum ProgenitorOperationRetryError { + /// Nexus determined that the operation will never return a known result + /// because the remote server is gone. + Gone, + + /// Attempting to check if the retry loop should be stopped failed + GoneCheckError(Error), + + /// The retry loop progenitor operation saw a permanent client error + ProgenitorError(progenitor_client::Error), +} + +/// Retry a progenitor client operation until a known result is returned, or +/// until something tells us that we should stop trying. +/// +/// Saga execution relies on the outcome of an external call being known: since +/// they are idempotent, reissue the external call until a known result comes +/// back. Retry if a communication error is seen, or if another retryable error +/// is seen. +/// +/// During the retry loop, call the supplied `gone_check` function to see if the +/// retry loop should be aborted: in the cases where Nexus can _know_ that a +/// request will never complete, the retry loop must be aborted. Otherwise, +/// Nexus will indefinitely retry until some known result is returned. +/// +/// Note that retrying is only valid if the `operation` itself is idempotent. +pub struct ProgenitorOperationRetry< + T, + E: std::fmt::Debug, + F: FnMut() -> Fut, + Fut: Future>>, + BF: FnMut() -> BFut, + BFut: Future>, +> { + operation: F, + + /// If Nexus knows that the supplied operation will never successfully + /// complete, then `gone_check` should return true. + gone_check: BF, +} + +impl< + T, + E: std::fmt::Debug, + F: FnMut() -> Fut, + Fut: Future>>, + BF: FnMut() -> BFut, + BFut: Future>, + > ProgenitorOperationRetry +{ + pub fn new(operation: F, gone_check: BF) -> Self { + Self { operation, gone_check } + } + + pub async fn run( + mut self, + log: &Logger, + ) -> Result> { + retry_notify( + retry_policy_internal_service(), + move || { + let gone_check = (self.gone_check)(); + let f = (self.operation)(); + + async move { + match gone_check.await { + Ok(dest_is_gone) => { + if dest_is_gone { + return Err(BackoffError::Permanent( + ProgenitorOperationRetryError::Gone + )); + } + } + + Err(e) => { + return Err(BackoffError::Permanent( + ProgenitorOperationRetryError::GoneCheckError(e) + )); + } + } + + match f.await { + Err(progenitor_client::Error::CommunicationError(e)) => { + warn!( + log, + "saw transient communication error {}, retrying...", + e, + ); + + Err(BackoffError::transient( + ProgenitorOperationRetryError::ProgenitorError( + progenitor_client::Error::CommunicationError(e) + ) + )) + } + + Err(progenitor_client::Error::ErrorResponse( + response_value, + )) => { + match response_value.status() { + // Retry on 503 or 429 + http::StatusCode::SERVICE_UNAVAILABLE + | http::StatusCode::TOO_MANY_REQUESTS => { + Err(BackoffError::transient( + ProgenitorOperationRetryError::ProgenitorError( + progenitor_client::Error::ErrorResponse( + response_value + ) + ) + )) + } + + // Anything else is a permanent error + _ => Err(BackoffError::Permanent( + ProgenitorOperationRetryError::ProgenitorError( + progenitor_client::Error::ErrorResponse( + response_value + ) + ) + )) + } + } + + Err(e) => { + warn!(log, "saw permanent error {}, aborting", e,); + + Err(BackoffError::Permanent( + ProgenitorOperationRetryError::ProgenitorError(e) + )) + } + + Ok(v) => Ok(v), + } + } + }, + |error: ProgenitorOperationRetryError, delay| { + warn!( + log, + "failed external call ({:?}), will retry in {:?}", error, delay, + ); + }, + ) + .await + } +} diff --git a/nexus/db-queries/src/db/datastore/dataset.rs b/nexus/db-queries/src/db/datastore/dataset.rs index 3617f6d7fc..4058675c68 100644 --- a/nexus/db-queries/src/db/datastore/dataset.rs +++ b/nexus/db-queries/src/db/datastore/dataset.rs @@ -15,6 +15,8 @@ use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::identity::Asset; use crate::db::model::Dataset; +use crate::db::model::PhysicalDisk; +use crate::db::model::PhysicalDiskPolicy; use crate::db::model::Zpool; use crate::db::pagination::paginated; use crate::db::pagination::Paginator; @@ -180,6 +182,56 @@ impl DataStore { Ok(all_datasets) } + + pub async fn dataset_on_in_service_physical_disk( + &self, + // opctx: &OpContext, + dataset_id: Uuid, + ) -> LookupResult { + //let conn = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_unauthorized().await?; + + let dataset = { + use db::schema::dataset::dsl; + + dsl::dataset + .filter(dsl::id.eq(dataset_id)) + .select(Dataset::as_select()) + .first_async::(&*conn) + .await + .map_err(|e| { + public_error_from_diesel(e, ErrorHandler::Server) + })? + }; + + let zpool = { + use db::schema::zpool::dsl; + + dsl::zpool + .filter(dsl::id.eq(dataset.pool_id)) + .select(Zpool::as_select()) + .first_async::(&*conn) + .await + .map_err(|e| { + public_error_from_diesel(e, ErrorHandler::Server) + })? + }; + + let physical_disk = { + use db::schema::physical_disk::dsl; + + dsl::physical_disk + .filter(dsl::id.eq(zpool.physical_disk_id)) + .select(PhysicalDisk::as_select()) + .first_async::(&*conn) + .await + .map_err(|e| { + public_error_from_diesel(e, ErrorHandler::Server) + })? + }; + + Ok(physical_disk.disk_policy == PhysicalDiskPolicy::InService) + } } #[cfg(test)] diff --git a/nexus/src/app/crucible.rs b/nexus/src/app/crucible.rs new file mode 100644 index 0000000000..5a1dead368 --- /dev/null +++ b/nexus/src/app/crucible.rs @@ -0,0 +1,1086 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Functions common to interacting with Crucible agents + +use super::*; + +use anyhow::anyhow; +use crucible_agent_client::types::CreateRegion; +use crucible_agent_client::types::GetSnapshotResponse; +use crucible_agent_client::types::Region; +use crucible_agent_client::types::RegionId; +use crucible_agent_client::types::State as RegionState; +use crucible_agent_client::Client as CrucibleAgentClient; +use futures::StreamExt; +use nexus_db_queries::db; +use nexus_db_queries::db::identity::Asset; +use omicron_common::api::external::Error; +use omicron_common::backoff::{self, BackoffError}; +use omicron_common::progenitor_operation_retry::ProgenitorOperationRetry; +use omicron_common::progenitor_operation_retry::ProgenitorOperationRetryError; +use slog::Logger; + +// Arbitrary limit on concurrency, for operations issued on multiple regions +// within a disk at the same time. +const MAX_CONCURRENT_REGION_REQUESTS: usize = 3; + +/// Provides a way for (with BackoffError) Permanent errors to have a different error type than +/// Transient errors. +#[derive(Debug, thiserror::Error)] +enum WaitError { + #[error("Transient error: {0}")] + Transient(#[from] anyhow::Error), + + #[error("Permanent error: {0}")] + Permanent(#[from] Error), +} + +impl super::Nexus { + fn crucible_agent_client_for_dataset( + &self, + dataset: &db::model::Dataset, + ) -> CrucibleAgentClient { + CrucibleAgentClient::new_with_client( + &format!("http://{}", dataset.address()), + self.reqwest_client.clone(), + ) + } + + /// Return if the Crucible agent is expected to be there and answer Nexus: + /// true means it's gone, and the caller should bail out of the + /// ProgenitorOperationRetry loop. + async fn crucible_agent_gone_check( + &self, + dataset_id: Uuid, + ) -> Result { + let on_in_service_physical_disk = self + .datastore() + .dataset_on_in_service_physical_disk(dataset_id) + .await?; + + Ok(!on_in_service_physical_disk) + } + + /// Call out to Crucible agent and perform region creation. + async fn ensure_region_in_dataset( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region: &db::model::Region, + ) -> Result { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let region_request = CreateRegion { + block_size: region.block_size().to_bytes(), + extent_count: region.extent_count(), + extent_size: region.blocks_per_extent(), + // TODO: Can we avoid casting from UUID to string? + // NOTE: This'll require updating the crucible agent client. + id: RegionId(region.id().to_string()), + encrypted: region.encrypted(), + cert_pem: None, + key_pem: None, + root_pem: None, + source: None, + }; + + let create_region = || async { + let region = match ProgenitorOperationRetry::new( + || async { client.region_create(®ion_request).await }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await + { + Ok(v) => Ok(v), + + Err(e) => { + error!( + log, + "region_create saw {:?}", + e; + "region_id" => %region.id(), + "dataset_id" => %dataset_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return an error if Nexus is unable to create the + // requested region + Err(BackoffError::Permanent(WaitError::Permanent(Error::Gone))) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error("insufficient permission for crucible_agent_gone_check") + ))) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::invalid_request(&rv.message) + ))) + } + + _ => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error(&rv.message) + ))) + } + } + } + + _ => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error("unexpected failure during `region_delete`") + ))) + } + } + } + } + }?; + + match region.state { + RegionState::Requested => { + Err(BackoffError::transient(WaitError::Transient(anyhow!( + "Region creation in progress" + )))) + } + + RegionState::Created => Ok(region), + + _ => Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error(&format!( + "Failed to create region, unexpected state: {:?}", + region.state + )), + ))), + } + }; + + let log_create_failure = |_, delay| { + warn!( + log, + "Region requested, not yet created. Retrying in {:?}", + delay; + "dataset" => %dataset.id(), + "region" => %region.id(), + ); + }; + + let region = backoff::retry_notify( + backoff::retry_policy_internal_service(), + create_region, + log_create_failure, + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed + // time before giving up, which means that Transient could be + // returned here. Our current policies do **not** set this + // though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => e, + })?; + + Ok(region.into_inner()) + } + + /// Returns a Ok(Some(Region)) if a region with id {region_id} exists, + /// Ok(None) if it does not (a 404 was seen), and Err otherwise. + async fn maybe_get_crucible_region( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + ) -> Result, Error> { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client.region_get(&RegionId(region_id.to_string())).await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(v) => Ok(Some(v.into_inner())), + + Err(e) => { + error!( + log, + "region_get saw {:?}", + e; + "region_id" => %region_id, + "dataset_id" => %dataset_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return an error if Nexus is unable to query the + // dataset's agent for the requested region + Err(Error::Gone) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + http::StatusCode::NOT_FOUND => { + Ok(None) + } + + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_get`", + )) + } + } + } + } + } + } + + async fn get_crucible_region_snapshots( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + ) -> Result { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client + .region_get_snapshots(&RegionId(region_id.to_string())) + .await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(v) => Ok(v.into_inner()), + + Err(e) => { + error!( + log, + "region_get_snapshots saw {:?}", + e; + "region_id" => %region_id, + "dataset_id" => %dataset_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return an error if Nexus is unable to query the + // dataset's agent for the requested region 's snapshots + Err(Error::Gone) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_get`", + )) + } + } + } + } + } + } + + /// Send a region deletion request + async fn request_crucible_region_delete( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + ) -> Result<(), Error> { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client.region_delete(&RegionId(region_id.to_string())).await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(_) => Ok(()), + + Err(e) => { + error!( + log, + "region_delete saw {:?}", + e; + "region_id" => %region_id, + "dataset_id" => %dataset.id(), + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return Ok if the dataset's agent is gone, no delete + // call is required. + Ok(()) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_delete`", + )) + } + } + } + } + } + } + + /// Send a running snapshot deletion request + async fn request_crucible_running_snapshot_delete( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client + .region_delete_running_snapshot( + &RegionId(region_id.to_string()), + &snapshot_id.to_string(), + ) + .await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(_) => Ok(()), + + Err(e) => { + error!( + log, + "region_delete_running_snapshot saw {:?}", + e; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return Ok if the dataset's agent is gone, no delete + // call is required. + Ok(()) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_delete_running_snapshot`", + )) + } + } + } + } + } + } + + /// Send a snapshot deletion request + async fn request_crucible_snapshot_delete( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client + .region_delete_snapshot( + &RegionId(region_id.to_string()), + &snapshot_id.to_string(), + ) + .await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(_) => Ok(()), + + Err(e) => { + error!( + log, + "region_delete_snapshot saw {:?}", + e; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return Ok if the dataset's agent is gone, no delete + // call is required. + Ok(()) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_delete_snapshot`", + )) + } + } + } + } + } + } + + /// Call out to a Crucible agent to delete a region + async fn delete_crucible_region( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + ) -> Result<(), Error> { + // If the region never existed, then a `GET` will return 404, and so + // will a `DELETE`. Catch this case, and return Ok if the region never + // existed. This can occur if an `ensure_all_datasets_and_regions` + // partially fails. + + match self.maybe_get_crucible_region(log, dataset, region_id).await { + Ok(Some(_)) => { + // region found, proceed with deleting + } + + Ok(None) => { + // region never exited, return Ok + return Ok(()); + } + + Err(e) => match e { + Error::Gone => { + // Return Ok if the dataset's agent is gone, no delete call + // is required. + return Ok(()); + } + + _ => { + return Err(e); + } + }, + } + + // Past here, the region exists (or existed at some point): ensure it is + // deleted. Request the deletion (which is idempotent), then wait for + // the appropriate state change. + + self.request_crucible_region_delete(log, dataset, region_id).await?; + + // Wait until the region is deleted + + backoff::retry_notify( + backoff::retry_policy_internal_service_aggressive(), + || async { + let region = match self.maybe_get_crucible_region( + log, + dataset, + region_id, + ).await { + Ok(None) => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error( + "dataset {dataset_id} region {region_id} is missing now!", + ) + ))) + } + + Ok(Some(v)) => Ok(v), + + Err(e) => match e { + Error::Gone => { + // Return Ok if the dataset's agent is gone, no + // delete call is required. + return Ok(()); + } + + _ => { + Err(BackoffError::Permanent(WaitError::Permanent(e))) + } + } + }?; + + match region.state { + RegionState::Tombstoned => Err(BackoffError::transient( + WaitError::Transient(anyhow!("region not deleted yet")), + )), + + RegionState::Destroyed => { + info!( + log, + "region deleted"; + "region_id" => %region_id, + ); + + Ok(()) + } + + _ => Err(BackoffError::transient(WaitError::Transient( + anyhow!("region unexpected state {:?}", region.state), + ))), + } + }, + |e: WaitError, delay| { + info!( + log, + "{:?}, trying again in {:?}", + e, + delay; + "region_id" => %region_id, + ); + }, + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed time + // before giving up, which means that Transient could be returned + // here. Our current policies do **not** set this though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => e, + }) + } + + async fn delete_crucible_running_snapshot_impl( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + // request running snapshot deletion + + self.request_crucible_running_snapshot_delete( + log, + dataset, + region_id, + snapshot_id, + ) + .await?; + + // `region_delete_running_snapshot` is only a request: wait until + // running snapshot is deleted + backoff::retry_notify( + backoff::retry_policy_internal_service_aggressive(), + || async { + let response = match self.get_crucible_region_snapshots( + log, + dataset, + region_id, + ) + .await { + Ok(v) => Ok(v), + Err(e) => match e { + Error::Gone => { + // Return Ok if the dataset's agent is gone, no + // delete call is required. + return Ok(()); + } + + _ => { + Err(BackoffError::Permanent(WaitError::Permanent(e))) + } + } + }?; + + match response.running_snapshots.get(&snapshot_id.to_string()) { + Some(running_snapshot) => { + info!( + log, + "running_snapshot is Some, state is {}", + running_snapshot.state.to_string(); + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + match running_snapshot.state { + RegionState::Tombstoned => { + Err(BackoffError::transient( + WaitError::Transient(anyhow!( + "running_snapshot tombstoned, not deleted yet", + ) + ))) + } + + RegionState::Destroyed => { + info!( + log, + "running_snapshot deleted", + ); + + Ok(()) + } + + _ => { + Err(BackoffError::transient( + WaitError::Transient(anyhow!( + "running_snapshot unexpected state", + ) + ))) + } + } + } + + None => { + // deleted? + info!( + log, + "running_snapshot is None"; + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + // break here - it's possible that the running snapshot + // record was GCed, and it won't come back. + Ok(()) + } + } + }, + |e: WaitError, delay| { + info!( + log, + "{:?}, trying again in {:?}", + e, + delay; + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + } + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed time + // before giving up, which means that Transient could be returned + // here. Our current policies do **not** set this though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => { + e + } + }) + } + + pub async fn delete_crucible_snapshot( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + self.delete_crucible_snapshot_impl(log, dataset, region_id, snapshot_id) + .await + } + + async fn delete_crucible_snapshot_impl( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + // Unlike other Crucible agent endpoints, this one is synchronous in that it + // is not only a request to the Crucible agent: `zfs destroy` is performed + // right away. However this is still a request to illumos that may not take + // effect right away. Wait until the snapshot no longer appears in the list + // of region snapshots, meaning it was not returned from `zfs list`. + + let dataset_id = dataset.id(); + + info!( + log, + "requesting region snapshot delete"; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + self.request_crucible_snapshot_delete( + log, + dataset, + region_id, + snapshot_id, + ) + .await?; + + backoff::retry_notify( + backoff::retry_policy_internal_service_aggressive(), + || async { + let response = match self + .get_crucible_region_snapshots(log, dataset, region_id) + .await + { + Ok(v) => Ok(v), + + Err(e) => match e { + Error::Gone => { + // Return Ok if the dataset's agent is gone, no + // delete call is required. + return Ok(()); + } + + _ => Err(BackoffError::Permanent( + WaitError::Permanent(e), + )), + }, + }?; + + if response + .snapshots + .iter() + .any(|x| x.name == snapshot_id.to_string()) + { + info!( + log, + "snapshot still exists, waiting"; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + Err(BackoffError::transient(WaitError::Transient(anyhow!( + "snapshot not deleted yet", + )))) + } else { + info!( + log, + "snapshot deleted"; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + Ok(()) + } + }, + |e: WaitError, delay| { + info!( + log, + "{:?}, trying again in {:?}", + e, + delay; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + }, + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed time + // before giving up, which means that Transient could be returned + // here. Our current policies do **not** set this though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => e, + }) + } + + // PUBLIC API + + pub async fn ensure_all_datasets_and_regions( + &self, + log: &Logger, + datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, + ) -> Result, Error> { + let request_count = datasets_and_regions.len(); + if request_count == 0 { + return Ok(vec![]); + } + + // Allocate regions, and additionally return the dataset that the region was + // allocated in. + let datasets_and_regions: Vec<(db::model::Dataset, Region)> = + futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + match self + .ensure_region_in_dataset(log, &dataset, ®ion) + .await + { + Ok(result) => Ok((dataset, result)), + Err(e) => Err(e), + } + }) + // Execute the allocation requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, Error>>( + )?; + + // Assert each region has the same block size, otherwise Volume creation + // will fail. + let all_region_have_same_block_size = datasets_and_regions + .windows(2) + .all(|w| w[0].1.block_size == w[1].1.block_size); + + if !all_region_have_same_block_size { + return Err(Error::internal_error( + "volume creation will fail due to block size mismatch", + )); + } + + Ok(datasets_and_regions) + } + + /// Given a list of datasets and regions, send DELETE calls to the datasets + /// corresponding Crucible Agent for each region. + pub async fn delete_crucible_regions( + &self, + log: &Logger, + datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, + ) -> Result<(), Error> { + let request_count = datasets_and_regions.len(); + if request_count == 0 { + return Ok(()); + } + + futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + self.delete_crucible_region(log, &dataset, region.id()).await + }) + // Execute the requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + + Ok(()) + } + + /// Ensure that a Crucible "running snapshot" is deleted. + pub async fn delete_crucible_running_snapshot( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + self.delete_crucible_running_snapshot_impl( + log, + dataset, + region_id, + snapshot_id, + ) + .await + } + + /// Given a list of datasets and region snapshots, send DELETE calls to the + /// datasets corresponding Crucible Agent for each running read-only + /// downstairs corresponding to the snapshot. + pub async fn delete_crucible_running_snapshots( + &self, + log: &Logger, + datasets_and_snapshots: Vec<( + db::model::Dataset, + db::model::RegionSnapshot, + )>, + ) -> Result<(), Error> { + let request_count = datasets_and_snapshots.len(); + if request_count == 0 { + return Ok(()); + } + + futures::stream::iter(datasets_and_snapshots) + .map(|(dataset, region_snapshot)| async move { + self.delete_crucible_running_snapshot_impl( + &log, + &dataset, + region_snapshot.region_id, + region_snapshot.snapshot_id, + ) + .await + }) + // Execute the requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + + Ok(()) + } + + /// Given a list of datasets and region snapshots, send DELETE calls to the + /// datasets corresponding Crucible Agent for each snapshot. + pub async fn delete_crucible_snapshots( + &self, + log: &Logger, + datasets_and_snapshots: Vec<( + db::model::Dataset, + db::model::RegionSnapshot, + )>, + ) -> Result<(), Error> { + let request_count = datasets_and_snapshots.len(); + if request_count == 0 { + return Ok(()); + } + + futures::stream::iter(datasets_and_snapshots) + .map(|(dataset, region_snapshot)| async move { + self.delete_crucible_snapshot_impl( + &log, + &dataset, + region_snapshot.region_id, + region_snapshot.snapshot_id, + ) + .await + }) + // Execute the requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + + Ok(()) + } +} diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 4b77788c96..a21db65c0b 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -44,6 +44,7 @@ pub(crate) mod background; mod bfd; mod bgp; mod certificate; +mod crucible; mod deployment; mod device_auth; mod disk; diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index 0fe14f6d2a..1fe8d76783 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -7,737 +7,16 @@ use super::*; use crate::Nexus; -use anyhow::anyhow; -use crucible_agent_client::{ - types::{CreateRegion, RegionId, State as RegionState}, - Client as CrucibleAgentClient, -}; -use futures::StreamExt; use internal_dns::ServiceName; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; -use nexus_db_queries::db::identity::Asset; use nexus_db_queries::db::lookup::LookupPath; use omicron_common::api::external::Error; -use omicron_common::backoff::{self, BackoffError}; use omicron_common::retry_until_known_result; -use slog::Logger; use std::net::SocketAddrV6; -// Arbitrary limit on concurrency, for operations issued on multiple regions -// within a disk at the same time. -const MAX_CONCURRENT_REGION_REQUESTS: usize = 3; - -/// Call out to Crucible agent and perform region creation. -pub(crate) async fn ensure_region_in_dataset( - log: &Logger, - dataset: &db::model::Dataset, - region: &db::model::Region, -) -> Result { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - let region_request = CreateRegion { - block_size: region.block_size().to_bytes(), - extent_count: region.extent_count(), - extent_size: region.blocks_per_extent(), - // TODO: Can we avoid casting from UUID to string? - // NOTE: This'll require updating the crucible agent client. - id: RegionId(region.id().to_string()), - encrypted: region.encrypted(), - cert_pem: None, - key_pem: None, - root_pem: None, - source: None, - }; - - let create_region = || async { - let region = client - .region_create(®ion_request) - .await - .map_err(|e| BackoffError::Permanent(e.into()))?; - match region.state { - RegionState::Requested => Err(BackoffError::transient(anyhow!( - "Region creation in progress" - ))), - - RegionState::Created => Ok(region), - - _ => Err(BackoffError::Permanent(anyhow!( - "Failed to create region, unexpected state: {:?}", - region.state - ))), - } - }; - - let log_create_failure = |_, delay| { - warn!( - log, - "Region requested, not yet created. Retrying in {:?}", - delay; - "region" => %region.id(), - ); - }; - - let region = backoff::retry_notify( - backoff::retry_policy_internal_service(), - create_region, - log_create_failure, - ) - .await - .map_err(|e| Error::internal_error(&e.to_string()))?; - - Ok(region.into_inner()) -} - -pub(crate) async fn ensure_all_datasets_and_regions( - log: &Logger, - datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, -) -> Result< - Vec<(db::model::Dataset, crucible_agent_client::types::Region)>, - ActionError, -> { - let request_count = datasets_and_regions.len(); - - // Allocate regions, and additionally return the dataset that the region was - // allocated in. - let datasets_and_regions: Vec<( - db::model::Dataset, - crucible_agent_client::types::Region, - )> = futures::stream::iter(datasets_and_regions) - .map(|(dataset, region)| async move { - match ensure_region_in_dataset(log, &dataset, ®ion).await { - Ok(result) => Ok((dataset, result)), - Err(e) => Err(e), - } - }) - // Execute the allocation requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::, - >>() - .await - .into_iter() - .collect::, - Error, - >>() - .map_err(ActionError::action_failed)?; - - // Assert each region has the same block size, otherwise Volume creation - // will fail. - let all_region_have_same_block_size = datasets_and_regions - .windows(2) - .all(|w| w[0].1.block_size == w[1].1.block_size); - - if !all_region_have_same_block_size { - return Err(ActionError::action_failed(Error::internal_error( - "volume creation will fail due to block size mismatch", - ))); - } - - Ok(datasets_and_regions) -} - -pub(super) async fn delete_crucible_region( - log: &Logger, - client: &CrucibleAgentClient, - region_id: Uuid, -) -> Result<(), Error> { - // If the region never existed, then a `GET` will return 404, and so will a - // `DELETE`. Catch this case, and return Ok if the region never existed. - // This can occur if an `ensure_all_datasets_and_regions` partially fails. - let result = retry_until_known_result(log, || async { - client.region_get(&RegionId(region_id.to_string())).await - }) - .await; - - if let Err(e) = result { - error!( - log, - "delete_crucible_region: region_get saw {:?}", - e; - "region_id" => %region_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - http::StatusCode::NOT_FOUND => { - // Bail out here! - return Ok(()); - } - - status if status.is_client_error() => { - return Err(Error::invalid_request(&rv.message)); - } - - _ => { - return Err(Error::internal_error(&rv.message)); - } - } - } - - _ => { - return Err(Error::internal_error( - "unexpected failure during `region_get`", - )); - } - } - } - - // Past here, the region exists: ensure it is deleted. - - retry_until_known_result(log, || async { - client.region_delete(&RegionId(region_id.to_string())).await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_region: region_delete saw {:?}", - e; - "region_id" => %region_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete`", - ), - } - })?; - - #[derive(Debug, thiserror::Error)] - enum WaitError { - #[error("Transient error: {0}")] - Transient(#[from] anyhow::Error), - - #[error("Permanent error: {0}")] - Permanent(#[from] Error), - } - - // `region_delete` is only a request: wait until the region is - // deleted - backoff::retry_notify( - backoff::retry_policy_internal_service_aggressive(), - || async { - let region = retry_until_known_result(log, || async { - client.region_get(&RegionId(region_id.to_string())).await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_region: region_get saw {:?}", - e; - "region_id" => %region_id, - ); - - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - BackoffError::Permanent(WaitError::Permanent( - Error::invalid_request(&rv.message), - )) - } - _ => BackoffError::Permanent(WaitError::Permanent( - Error::internal_error(&rv.message), - )), - } - } - _ => BackoffError::Permanent(WaitError::Permanent( - Error::internal_error( - "unexpected failure during `region_get`", - ), - )), - } - })?; - - match region.state { - RegionState::Tombstoned => Err(BackoffError::transient( - WaitError::Transient(anyhow!("region not deleted yet")), - )), - - RegionState::Destroyed => { - info!( - log, - "region deleted"; - "region_id" => %region_id, - ); - - Ok(()) - } - - _ => Err(BackoffError::transient(WaitError::Transient( - anyhow!("region unexpected state {:?}", region.state), - ))), - } - }, - |e: WaitError, delay| { - info!( - log, - "{:?}, trying again in {:?}", - e, - delay; - "region_id" => %region_id, - ); - }, - ) - .await - .map_err(|e| match e { - WaitError::Transient(e) => { - // The backoff crate can be configured with a maximum elapsed time - // before giving up, which means that Transient could be returned - // here. Our current policies do **not** set this though. - Error::internal_error(&e.to_string()) - } - - WaitError::Permanent(e) => e, - }) -} - -// Given a list of datasets and regions, send DELETE calls to the datasets -// corresponding Crucible Agent for each region. -pub(super) async fn delete_crucible_regions( - log: &Logger, - datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, -) -> Result<(), Error> { - let request_count = datasets_and_regions.len(); - if request_count == 0 { - return Ok(()); - } - - futures::stream::iter(datasets_and_regions) - .map(|(dataset, region)| async move { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_region(&log, &client, region.id()).await - }) - // Execute the requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - - Ok(()) -} - -pub(super) async fn delete_crucible_running_snapshot( - log: &Logger, - client: &CrucibleAgentClient, - region_id: Uuid, - snapshot_id: Uuid, -) -> Result<(), Error> { - // delete running snapshot - retry_until_known_result(log, || async { - client - .region_delete_running_snapshot( - &RegionId(region_id.to_string()), - &snapshot_id.to_string(), - ) - .await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_running_snapshot: region_delete_running_snapshot saw {:?}", - e; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete_running_snapshot`", - ), - } - })?; - - #[derive(Debug, thiserror::Error)] - enum WaitError { - #[error("Transient error: {0}")] - Transient(#[from] anyhow::Error), - - #[error("Permanent error: {0}")] - Permanent(#[from] Error), - } - - // `region_delete_running_snapshot` is only a request: wait until - // running snapshot is deleted - backoff::retry_notify( - backoff::retry_policy_internal_service_aggressive(), - || async { - let snapshot = retry_until_known_result(log, || async { - client.region_get_snapshots( - &RegionId(region_id.to_string()), - ).await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_running_snapshot: region_get_snapshots saw {:?}", - e; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - BackoffError::Permanent( - WaitError::Permanent( - Error::invalid_request(&rv.message) - ) - ) - } - _ => BackoffError::Permanent( - WaitError::Permanent( - Error::internal_error(&rv.message) - ) - ) - } - } - _ => BackoffError::Permanent( - WaitError::Permanent( - Error::internal_error( - "unexpected failure during `region_get_snapshots`", - ) - ) - ) - } - })?; - - match snapshot.running_snapshots.get(&snapshot_id.to_string()) { - Some(running_snapshot) => { - info!( - log, - "running_snapshot is Some, state is {}", - running_snapshot.state.to_string(); - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - match running_snapshot.state { - RegionState::Tombstoned => { - Err(BackoffError::transient( - WaitError::Transient(anyhow!( - "running_snapshot tombstoned, not deleted yet", - ) - ))) - } - - RegionState::Destroyed => { - info!( - log, - "running_snapshot deleted", - ); - - Ok(()) - } - - _ => { - Err(BackoffError::transient( - WaitError::Transient(anyhow!( - "running_snapshot unexpected state", - ) - ))) - } - } - } - - None => { - // deleted? - info!( - log, - "running_snapshot is None"; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - // break here - it's possible that the running snapshot - // record was GCed, and it won't come back. - Ok(()) - } - } - }, - |e: WaitError, delay| { - info!( - log, - "{:?}, trying again in {:?}", - e, - delay; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - } - ) - .await - .map_err(|e| match e { - WaitError::Transient(e) => { - // The backoff crate can be configured with a maximum elapsed time - // before giving up, which means that Transient could be returned - // here. Our current policies do **not** set this though. - Error::internal_error(&e.to_string()) - } - - WaitError::Permanent(e) => { - e - } - }) -} - -pub(super) async fn delete_crucible_snapshot( - log: &Logger, - client: &CrucibleAgentClient, - region_id: Uuid, - snapshot_id: Uuid, -) -> Result<(), Error> { - // Unlike other Crucible agent endpoints, this one is synchronous in that it - // is not only a request to the Crucible agent: `zfs destroy` is performed - // right away. However this is still a request to illumos that may not take - // effect right away. Wait until the snapshot no longer appears in the list - // of region snapshots, meaning it was not returned from `zfs list`. - - info!(log, "deleting region {region_id} snapshot {snapshot_id}"); - - retry_until_known_result(log, || async { - client - .region_delete_snapshot( - &RegionId(region_id.to_string()), - &snapshot_id.to_string(), - ) - .await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_snapshot: region_delete_snapshot saw {:?}", - e; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete_snapshot`", - ), - } - })?; - - #[derive(Debug, thiserror::Error)] - enum WaitError { - #[error("Transient error: {0}")] - Transient(#[from] anyhow::Error), - - #[error("Permanent error: {0}")] - Permanent(#[from] Error), - } - - backoff::retry_notify( - backoff::retry_policy_internal_service_aggressive(), - || async { - let response = retry_until_known_result(log, || async { - client - .region_get_snapshots(&RegionId(region_id.to_string())) - .await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_snapshot: region_get_snapshots saw {:?}", - e; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - BackoffError::Permanent(WaitError::Permanent( - Error::invalid_request(&rv.message), - )) - } - _ => BackoffError::Permanent(WaitError::Permanent( - Error::internal_error(&rv.message), - )), - } - } - _ => BackoffError::Permanent(WaitError::Permanent( - Error::internal_error( - "unexpected failure during `region_get_snapshots`", - ), - )), - } - })?; - - if response - .snapshots - .iter() - .any(|x| x.name == snapshot_id.to_string()) - { - info!( - log, - "snapshot still exists, waiting"; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - Err(BackoffError::transient(WaitError::Transient(anyhow!( - "snapshot not deleted yet", - )))) - } else { - info!( - log, - "snapshot deleted"; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - Ok(()) - } - }, - |e: WaitError, delay| { - info!( - log, - "{:?}, trying again in {:?}", - e, - delay; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - }, - ) - .await - .map_err(|e| match e { - WaitError::Transient(e) => { - // The backoff crate can be configured with a maximum elapsed time - // before giving up, which means that Transient could be returned - // here. Our current policies do **not** set this though. - Error::internal_error(&e.to_string()) - } - - WaitError::Permanent(e) => e, - }) -} - -// Given a list of datasets and region snapshots, send DELETE calls to the -// datasets corresponding Crucible Agent for each snapshot. -pub(super) async fn delete_crucible_snapshots( - log: &Logger, - datasets_and_snapshots: Vec<( - db::model::Dataset, - db::model::RegionSnapshot, - )>, -) -> Result<(), Error> { - let request_count = datasets_and_snapshots.len(); - if request_count == 0 { - return Ok(()); - } - - futures::stream::iter(datasets_and_snapshots) - .map(|(dataset, region_snapshot)| async move { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_snapshot( - &log, - &client, - region_snapshot.region_id, - region_snapshot.snapshot_id, - ) - .await - }) - // Execute the requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - - Ok(()) -} - -// Given a list of datasets and region snapshots, send DELETE calls to the -// datasets corresponding Crucible Agent for each running read-only downstairs -// corresponding to the snapshot. -pub(super) async fn delete_crucible_running_snapshots( - log: &Logger, - datasets_and_snapshots: Vec<( - db::model::Dataset, - db::model::RegionSnapshot, - )>, -) -> Result<(), Error> { - let request_count = datasets_and_snapshots.len(); - if request_count == 0 { - return Ok(()); - } - - futures::stream::iter(datasets_and_snapshots) - .map(|(dataset, region_snapshot)| async move { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_running_snapshot( - &log, - &client, - region_snapshot.region_id, - region_snapshot.snapshot_id, - ) - .await - }) - // Execute the requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - - Ok(()) -} +// Common Pantry operations pub(crate) async fn get_pantry_address( nexus: &Arc, @@ -751,8 +30,6 @@ pub(crate) async fn get_pantry_address( .map_err(ActionError::action_failed) } -// Common Pantry operations - pub(crate) async fn call_pantry_attach_for_disk( log: &slog::Logger, opctx: &OpContext, diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs index 5c4f5bf1ee..1525340f62 100644 --- a/nexus/src/app/sagas/disk_create.rs +++ b/nexus/src/app/sagas/disk_create.rs @@ -5,7 +5,6 @@ use super::{ common_storage::{ call_pantry_attach_for_disk, call_pantry_detach_for_disk, - delete_crucible_regions, ensure_all_datasets_and_regions, get_pantry_address, }, ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, @@ -345,16 +344,20 @@ async fn sdc_noop(_sagactx: NexusActionContext) -> Result<(), ActionError> { async fn sdc_regions_ensure( sagactx: NexusActionContext, ) -> Result { - let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let log = osagactx.log(); let disk_id = sagactx.lookup::("disk_id")?; - let datasets_and_regions = ensure_all_datasets_and_regions( - &log, - sagactx.lookup::>( - "datasets_and_regions", - )?, - ) - .await?; + let datasets_and_regions = osagactx + .nexus() + .ensure_all_datasets_and_regions( + &log, + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await + .map_err(ActionError::action_failed)?; let block_size = datasets_and_regions[0].1.block_size; let blocks_per_extent = datasets_and_regions[0].1.extent_size; @@ -550,13 +553,15 @@ async fn sdc_regions_ensure_undo( warn!(log, "sdc_regions_ensure_undo: Deleting crucible regions"); - let result = delete_crucible_regions( - log, - sagactx.lookup::>( - "datasets_and_regions", - )?, - ) - .await; + let result = osagactx + .nexus() + .delete_crucible_regions( + log, + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await; match result { Err(e) => { diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index 53e06e310d..dc42c72daa 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -91,8 +91,6 @@ use super::{ common_storage::{ call_pantry_attach_for_disk, call_pantry_detach_for_disk, - delete_crucible_regions, delete_crucible_running_snapshot, - delete_crucible_snapshot, ensure_all_datasets_and_regions, get_pantry_address, }, ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, @@ -380,13 +378,16 @@ async fn ssc_regions_ensure( let destination_volume_id = sagactx.lookup::("destination_volume_id")?; - let datasets_and_regions = ensure_all_datasets_and_regions( - &log, - sagactx.lookup::>( - "datasets_and_regions", - )?, - ) - .await?; + let datasets_and_regions = osagactx + .nexus() + .ensure_all_datasets_and_regions( + &log, + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await + .map_err(ActionError::action_failed)?; let block_size = datasets_and_regions[0].1.block_size; let blocks_per_extent = datasets_and_regions[0].1.extent_size; @@ -458,15 +459,18 @@ async fn ssc_regions_ensure( async fn ssc_regions_ensure_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { - let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let log = osagactx.log(); warn!(log, "ssc_regions_ensure_undo: Deleting crucible regions"); - delete_crucible_regions( - log, - sagactx.lookup::>( - "datasets_and_regions", - )?, - ) - .await?; + osagactx + .nexus() + .delete_crucible_regions( + log, + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await?; info!(log, "ssc_regions_ensure_undo: Deleted crucible regions"); Ok(()) } @@ -764,10 +768,9 @@ async fn ssc_send_snapshot_request_to_sled_agent_undo( // ... and instruct each of those regions to delete the snapshot. for (dataset, region) in datasets_and_regions { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_snapshot(log, &client, region.id(), snapshot_id) + osagactx + .nexus() + .delete_crucible_snapshot(log, &dataset, region.id(), snapshot_id) .await?; } @@ -1090,10 +1093,9 @@ async fn ssc_call_pantry_snapshot_for_disk_undo( // ... and instruct each of those regions to delete the snapshot. for (dataset, region) in datasets_and_regions { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_snapshot(log, &client, region.id(), snapshot_id) + osagactx + .nexus() + .delete_crucible_snapshot(log, &dataset, region.id(), snapshot_id) .await?; } Ok(()) @@ -1350,16 +1352,15 @@ async fn ssc_start_running_snapshot_undo( // ... and instruct each of those regions to delete the running snapshot. for (dataset, region) in datasets_and_regions { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_running_snapshot( - &log, - &client, - region.id(), - snapshot_id, - ) - .await?; + osagactx + .nexus() + .delete_crucible_running_snapshot( + &log, + &dataset, + region.id(), + snapshot_id, + ) + .await?; osagactx .datastore() diff --git a/nexus/src/app/sagas/volume_delete.rs b/nexus/src/app/sagas/volume_delete.rs index 22425a0b99..bfd8e6616c 100644 --- a/nexus/src/app/sagas/volume_delete.rs +++ b/nexus/src/app/sagas/volume_delete.rs @@ -23,9 +23,6 @@ //! resources, and when they are inserted or deleted the accounting needs to //! change. Saga nodes must be idempotent in order to work correctly. -use super::common_storage::delete_crucible_regions; -use super::common_storage::delete_crucible_running_snapshots; -use super::common_storage::delete_crucible_snapshots; use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; @@ -45,17 +42,11 @@ pub(crate) struct Params { pub serialized_authn: authn::saga::Serialized, pub volume_id: Uuid, } + // volume delete saga: actions declare_saga_actions! { volume_delete; - // TODO(https://github.com/oxidecomputer/omicron/issues/612): - // - // We need a way to deal with this operation failing, aside from - // propagating the error to the user. - // - // What if the Sled goes offline? Nexus must ultimately be - // responsible for reconciling this scenario. DECREASE_CRUCIBLE_RESOURCE_COUNT -> "crucible_resources_to_delete" { + svd_decrease_crucible_resource_count } @@ -169,14 +160,16 @@ async fn svd_delete_crucible_regions( )) })?; - delete_crucible_regions(log, datasets_and_regions.clone()).await.map_err( - |e| { + osagactx + .nexus() + .delete_crucible_regions(log, datasets_and_regions.clone()) + .await + .map_err(|e| { ActionError::action_failed(format!( "failed to delete_crucible_regions: {:?}", e, )) - }, - )?; + })?; // Remove DB records let region_ids_to_delete = @@ -226,7 +219,9 @@ async fn svd_delete_crucible_running_snapshots( )) })?; - delete_crucible_running_snapshots(log, datasets_and_snapshots.clone()) + osagactx + .nexus() + .delete_crucible_running_snapshots(log, datasets_and_snapshots.clone()) .await .map_err(|e| { ActionError::action_failed(format!( @@ -267,7 +262,9 @@ async fn svd_delete_crucible_snapshots( )) })?; - delete_crucible_snapshots(log, datasets_and_snapshots.clone()) + osagactx + .nexus() + .delete_crucible_snapshots(log, datasets_and_snapshots.clone()) .await .map_err(|e| { ActionError::action_failed(format!( @@ -439,7 +436,12 @@ async fn svd_delete_freed_crucible_regions( } // Send DELETE calls to the corresponding Crucible agents - delete_crucible_regions(log, vec![(dataset.clone(), region.clone())]) + osagactx + .nexus() + .delete_crucible_regions( + log, + vec![(dataset.clone(), region.clone())], + ) .await .map_err(|e| { ActionError::action_failed(format!( diff --git a/nexus/src/app/session.rs b/nexus/src/app/session.rs index dd3665161a..fba4b2f0b7 100644 --- a/nexus/src/app/session.rs +++ b/nexus/src/app/session.rs @@ -157,9 +157,8 @@ impl super::Nexus { | Error::InsufficientCapacity { .. } | Error::TypeVersionMismatch { .. } | Error::Conflict { .. } - | Error::NotFound { .. } => { - Reason::UnknownError { source: error } - } + | Error::NotFound { .. } + | Error::Gone => Reason::UnknownError { source: error }, })?; Ok(db_silo_user.silo_id) } diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 886504a83b..c19c1852db 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -11,6 +11,7 @@ use dropshot::HttpErrorResponseBody; use http::method::Method; use http::StatusCode; use nexus_config::RegionAllocationStrategy; +use nexus_db_model::PhysicalDiskPolicy; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::datastore::REGION_REDUNDANCY_THRESHOLD; use nexus_db_queries::db::fixed_data::{silo::DEFAULT_SILO_ID, FLEET_ID}; @@ -40,11 +41,13 @@ use omicron_common::api::external::NameOrId; use omicron_nexus::app::{MAX_DISK_SIZE_BYTES, MIN_DISK_SIZE_BYTES}; use omicron_nexus::Nexus; use omicron_nexus::TestInterfaces as _; +use omicron_uuid_kinds::GenericUuid; use oximeter::types::Datum; use oximeter::types::Measurement; use sled_agent_client::TestInterfaces as _; use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; use uuid::Uuid; type ControlPlaneTestContext = @@ -2458,6 +2461,80 @@ async fn test_region_allocation_after_delete( assert_eq!(allocated_regions.len(), REGION_REDUNDANCY_THRESHOLD); } +#[nexus_test] +async fn test_no_halt_disk_delete_one_region_on_expunged_agent( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create the regular three 10 GiB zpools, each with one dataset. + let disk_test = DiskTest::new(&cptestctx).await; + + // Create a disk + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await; + + // Grab the db record now, before the delete + let (.., db_disk) = LookupPath::new(&opctx, datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + // Choose one of the datasets, and drop the simulated Crucible agent + let zpool = &disk_test.zpools[0]; + let dataset = &zpool.datasets[0]; + + cptestctx.sled_agent.sled_agent.drop_dataset(zpool.id, dataset.id).await; + + // Spawn a task that tries to delete the disk + let disk_url = get_disk_url(DISK_NAME); + let client = client.clone(); + let jh = tokio::spawn(async move { + NexusRequest::object_delete(&client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk") + }); + + // It won't finish until the dataset is expunged. + tokio::time::sleep(Duration::from_secs(3)).await; + assert!(!jh.is_finished()); + + // Expunge the physical disk + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id, + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Now, the delete call will finish Ok + jh.await.unwrap(); + + // Ensure that the disk was properly deleted and all the regions are gone - + // Nexus should hard delete the region records in this case. + + let datasets_and_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + + assert!(datasets_and_regions.is_empty()); +} + async fn disk_get(client: &ClientTestContext, disk_url: &str) -> Disk { NexusRequest::object_get(client, disk_url) .authn_as(AuthnMode::PrivilegedUser) diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index 298a8adc34..19c84da18c 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -870,4 +870,8 @@ impl SledAgent { ) { *self.fake_zones.lock().await = requested_zones; } + + pub async fn drop_dataset(&self, zpool_id: ZpoolUuid, dataset_id: Uuid) { + self.storage.lock().await.drop_dataset(zpool_id, dataset_id) + } } diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs index 6a688f6101..dac2a4cb48 100644 --- a/sled-agent/src/sim/storage.rs +++ b/sled-agent/src/sim/storage.rs @@ -543,6 +543,10 @@ impl Zpool { None } + + pub fn drop_dataset(&mut self, id: Uuid) { + let _ = self.datasets.remove(&id).expect("Failed to get the dataset"); + } } /// Simulated representation of all storage on a sled. @@ -642,6 +646,7 @@ impl Storage { pub fn zpools(&self) -> &HashMap { &self.zpools } + /// Adds a Dataset to the sled's simulated storage. pub async fn insert_dataset( &mut self, @@ -757,6 +762,13 @@ impl Storage { None } + + pub fn drop_dataset(&mut self, zpool_id: ZpoolUuid, dataset_id: Uuid) { + self.zpools + .get_mut(&zpool_id) + .expect("Zpool does not exist") + .drop_dataset(dataset_id) + } } /// Simulated crucible pantry