diff --git a/common/src/api/external/error.rs b/common/src/api/external/error.rs index 6b3b93187f..10731c61c3 100644 --- a/common/src/api/external/error.rs +++ b/common/src/api/external/error.rs @@ -85,6 +85,11 @@ pub enum Error { /// ObjectNotFound instead. #[error("Not found: {}", .message.display_internal())] NotFound { message: MessagePair }, + + /// Access to the target resource is no longer available, and this condition + /// is likely to be permanent. + #[error("Gone")] + Gone, } /// Represents an error message which has an external component, along with @@ -214,7 +219,8 @@ impl Error { | Error::InternalError { .. } | Error::TypeVersionMismatch { .. } | Error::NotFound { .. } - | Error::Conflict { .. } => false, + | Error::Conflict { .. } + | Error::Gone => false, } } @@ -335,7 +341,8 @@ impl Error { match self { Error::ObjectNotFound { .. } | Error::ObjectAlreadyExists { .. } - | Error::Forbidden => self, + | Error::Forbidden + | Error::Gone => self, Error::InvalidRequest { message } => Error::InvalidRequest { message: message.with_internal_context(context), }, @@ -513,6 +520,12 @@ impl From for HttpError { internal_message, } } + + Error::Gone => HttpError::for_client_error( + Some(String::from("Gone")), + http::StatusCode::GONE, + String::from("Gone"), + ), } } } diff --git a/common/src/lib.rs b/common/src/lib.rs index a92237adfa..e4f53cbfab 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -26,6 +26,7 @@ pub mod backoff; pub mod cmd; pub mod disk; pub mod ledger; +pub mod progenitor_operation_retry; pub mod update; pub mod vlan; pub mod zpool_name; @@ -79,83 +80,40 @@ impl slog::KV for FileKv { pub const OMICRON_DPD_TAG: &str = "omicron"; -use futures::Future; -use slog::warn; +use crate::api::external::Error; +use crate::progenitor_operation_retry::ProgenitorOperationRetry; +use crate::progenitor_operation_retry::ProgenitorOperationRetryError; +use std::future::Future; /// Retry a progenitor client operation until a known result is returned. /// -/// Saga execution relies on the outcome of an external call being known: since -/// they are idempotent, reissue the external call until a known result comes -/// back. Retry if a communication error is seen, or if another retryable error -/// is seen. -/// -/// Note that retrying is only valid if the call itself is idempotent. +/// See [`ProgenitorOperationRetry`] for more information. +// TODO mark this deprecated, `never_bail` is a bad idea pub async fn retry_until_known_result( log: &slog::Logger, - mut f: F, + f: F, ) -> Result> where F: FnMut() -> Fut, Fut: Future>>, E: std::fmt::Debug, { - backoff::retry_notify( - backoff::retry_policy_internal_service(), - move || { - let fut = f(); - async move { - match fut.await { - Err(progenitor_client::Error::CommunicationError(e)) => { - warn!( - log, - "saw transient communication error {}, retrying...", - e, - ); - - Err(backoff::BackoffError::transient( - progenitor_client::Error::CommunicationError(e), - )) - } - - Err(progenitor_client::Error::ErrorResponse( - response_value, - )) => { - match response_value.status() { - // Retry on 503 or 429 - http::StatusCode::SERVICE_UNAVAILABLE - | http::StatusCode::TOO_MANY_REQUESTS => { - Err(backoff::BackoffError::transient( - progenitor_client::Error::ErrorResponse( - response_value, - ), - )) - } - - // Anything else is a permanent error - _ => Err(backoff::BackoffError::Permanent( - progenitor_client::Error::ErrorResponse( - response_value, - ), - )), - } - } - - Err(e) => { - warn!(log, "saw permanent error {}, aborting", e,); + match ProgenitorOperationRetry::new(f, never_bail).run(log).await { + Ok(v) => Ok(v), - Err(backoff::BackoffError::Permanent(e)) - } + Err(e) => match e { + ProgenitorOperationRetryError::ProgenitorError(e) => Err(e), - Ok(v) => Ok(v), - } + ProgenitorOperationRetryError::Gone + | ProgenitorOperationRetryError::GoneCheckError(_) => { + // ProgenitorOperationRetry::new called with `never_bail` as the + // bail check should never return these variants! + unreachable!(); } }, - |error: progenitor_client::Error<_>, delay| { - warn!( - log, - "failed external call ({:?}), will retry in {:?}", error, delay, - ); - }, - ) - .await + } +} + +async fn never_bail() -> Result { + Ok(false) } diff --git a/common/src/progenitor_operation_retry.rs b/common/src/progenitor_operation_retry.rs new file mode 100644 index 0000000000..8d9537a30c --- /dev/null +++ b/common/src/progenitor_operation_retry.rs @@ -0,0 +1,159 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use futures::Future; +use slog::warn; +use slog::Logger; + +use crate::api::external::Error; +use crate::backoff::retry_notify; +use crate::backoff::retry_policy_internal_service; +use crate::backoff::BackoffError; + +#[derive(Debug)] +pub enum ProgenitorOperationRetryError { + /// Nexus determined that the operation will never return a known result + /// because the remote server is gone. + Gone, + + /// Attempting to check if the retry loop should be stopped failed + GoneCheckError(Error), + + /// The retry loop progenitor operation saw a permanent client error + ProgenitorError(progenitor_client::Error), +} + +/// Retry a progenitor client operation until a known result is returned, or +/// until something tells us that we should stop trying. +/// +/// Saga execution relies on the outcome of an external call being known: since +/// they are idempotent, reissue the external call until a known result comes +/// back. Retry if a communication error is seen, or if another retryable error +/// is seen. +/// +/// During the retry loop, call the supplied `gone_check` function to see if the +/// retry loop should be aborted: in the cases where Nexus can _know_ that a +/// request will never complete, the retry loop must be aborted. Otherwise, +/// Nexus will indefinitely retry until some known result is returned. +/// +/// Note that retrying is only valid if the `operation` itself is idempotent. +pub struct ProgenitorOperationRetry< + T, + E: std::fmt::Debug, + F: FnMut() -> Fut, + Fut: Future>>, + BF: FnMut() -> BFut, + BFut: Future>, +> { + operation: F, + + /// If Nexus knows that the supplied operation will never successfully + /// complete, then `gone_check` should return true. + gone_check: BF, +} + +impl< + T, + E: std::fmt::Debug, + F: FnMut() -> Fut, + Fut: Future>>, + BF: FnMut() -> BFut, + BFut: Future>, + > ProgenitorOperationRetry +{ + pub fn new(operation: F, gone_check: BF) -> Self { + Self { operation, gone_check } + } + + pub async fn run( + mut self, + log: &Logger, + ) -> Result> { + retry_notify( + retry_policy_internal_service(), + move || { + let gone_check = (self.gone_check)(); + let f = (self.operation)(); + + async move { + match gone_check.await { + Ok(dest_is_gone) => { + if dest_is_gone { + return Err(BackoffError::Permanent( + ProgenitorOperationRetryError::Gone + )); + } + } + + Err(e) => { + return Err(BackoffError::Permanent( + ProgenitorOperationRetryError::GoneCheckError(e) + )); + } + } + + match f.await { + Err(progenitor_client::Error::CommunicationError(e)) => { + warn!( + log, + "saw transient communication error {}, retrying...", + e, + ); + + Err(BackoffError::transient( + ProgenitorOperationRetryError::ProgenitorError( + progenitor_client::Error::CommunicationError(e) + ) + )) + } + + Err(progenitor_client::Error::ErrorResponse( + response_value, + )) => { + match response_value.status() { + // Retry on 503 or 429 + http::StatusCode::SERVICE_UNAVAILABLE + | http::StatusCode::TOO_MANY_REQUESTS => { + Err(BackoffError::transient( + ProgenitorOperationRetryError::ProgenitorError( + progenitor_client::Error::ErrorResponse( + response_value + ) + ) + )) + } + + // Anything else is a permanent error + _ => Err(BackoffError::Permanent( + ProgenitorOperationRetryError::ProgenitorError( + progenitor_client::Error::ErrorResponse( + response_value + ) + ) + )) + } + } + + Err(e) => { + warn!(log, "saw permanent error {}, aborting", e,); + + Err(BackoffError::Permanent( + ProgenitorOperationRetryError::ProgenitorError(e) + )) + } + + Ok(v) => Ok(v), + } + } + }, + |error: ProgenitorOperationRetryError, delay| { + warn!( + log, + "failed external call ({:?}), will retry in {:?}", error, delay, + ); + }, + ) + .await + } +} diff --git a/nexus/db-queries/src/db/datastore/dataset.rs b/nexus/db-queries/src/db/datastore/dataset.rs index 3617f6d7fc..4058675c68 100644 --- a/nexus/db-queries/src/db/datastore/dataset.rs +++ b/nexus/db-queries/src/db/datastore/dataset.rs @@ -15,6 +15,8 @@ use crate::db::error::public_error_from_diesel; use crate::db::error::ErrorHandler; use crate::db::identity::Asset; use crate::db::model::Dataset; +use crate::db::model::PhysicalDisk; +use crate::db::model::PhysicalDiskPolicy; use crate::db::model::Zpool; use crate::db::pagination::paginated; use crate::db::pagination::Paginator; @@ -180,6 +182,56 @@ impl DataStore { Ok(all_datasets) } + + pub async fn dataset_on_in_service_physical_disk( + &self, + // opctx: &OpContext, + dataset_id: Uuid, + ) -> LookupResult { + //let conn = self.pool_connection_authorized(opctx).await?; + let conn = self.pool_connection_unauthorized().await?; + + let dataset = { + use db::schema::dataset::dsl; + + dsl::dataset + .filter(dsl::id.eq(dataset_id)) + .select(Dataset::as_select()) + .first_async::(&*conn) + .await + .map_err(|e| { + public_error_from_diesel(e, ErrorHandler::Server) + })? + }; + + let zpool = { + use db::schema::zpool::dsl; + + dsl::zpool + .filter(dsl::id.eq(dataset.pool_id)) + .select(Zpool::as_select()) + .first_async::(&*conn) + .await + .map_err(|e| { + public_error_from_diesel(e, ErrorHandler::Server) + })? + }; + + let physical_disk = { + use db::schema::physical_disk::dsl; + + dsl::physical_disk + .filter(dsl::id.eq(zpool.physical_disk_id)) + .select(PhysicalDisk::as_select()) + .first_async::(&*conn) + .await + .map_err(|e| { + public_error_from_diesel(e, ErrorHandler::Server) + })? + }; + + Ok(physical_disk.disk_policy == PhysicalDiskPolicy::InService) + } } #[cfg(test)] diff --git a/nexus/src/app/crucible.rs b/nexus/src/app/crucible.rs new file mode 100644 index 0000000000..5a1dead368 --- /dev/null +++ b/nexus/src/app/crucible.rs @@ -0,0 +1,1086 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Functions common to interacting with Crucible agents + +use super::*; + +use anyhow::anyhow; +use crucible_agent_client::types::CreateRegion; +use crucible_agent_client::types::GetSnapshotResponse; +use crucible_agent_client::types::Region; +use crucible_agent_client::types::RegionId; +use crucible_agent_client::types::State as RegionState; +use crucible_agent_client::Client as CrucibleAgentClient; +use futures::StreamExt; +use nexus_db_queries::db; +use nexus_db_queries::db::identity::Asset; +use omicron_common::api::external::Error; +use omicron_common::backoff::{self, BackoffError}; +use omicron_common::progenitor_operation_retry::ProgenitorOperationRetry; +use omicron_common::progenitor_operation_retry::ProgenitorOperationRetryError; +use slog::Logger; + +// Arbitrary limit on concurrency, for operations issued on multiple regions +// within a disk at the same time. +const MAX_CONCURRENT_REGION_REQUESTS: usize = 3; + +/// Provides a way for (with BackoffError) Permanent errors to have a different error type than +/// Transient errors. +#[derive(Debug, thiserror::Error)] +enum WaitError { + #[error("Transient error: {0}")] + Transient(#[from] anyhow::Error), + + #[error("Permanent error: {0}")] + Permanent(#[from] Error), +} + +impl super::Nexus { + fn crucible_agent_client_for_dataset( + &self, + dataset: &db::model::Dataset, + ) -> CrucibleAgentClient { + CrucibleAgentClient::new_with_client( + &format!("http://{}", dataset.address()), + self.reqwest_client.clone(), + ) + } + + /// Return if the Crucible agent is expected to be there and answer Nexus: + /// true means it's gone, and the caller should bail out of the + /// ProgenitorOperationRetry loop. + async fn crucible_agent_gone_check( + &self, + dataset_id: Uuid, + ) -> Result { + let on_in_service_physical_disk = self + .datastore() + .dataset_on_in_service_physical_disk(dataset_id) + .await?; + + Ok(!on_in_service_physical_disk) + } + + /// Call out to Crucible agent and perform region creation. + async fn ensure_region_in_dataset( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region: &db::model::Region, + ) -> Result { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let region_request = CreateRegion { + block_size: region.block_size().to_bytes(), + extent_count: region.extent_count(), + extent_size: region.blocks_per_extent(), + // TODO: Can we avoid casting from UUID to string? + // NOTE: This'll require updating the crucible agent client. + id: RegionId(region.id().to_string()), + encrypted: region.encrypted(), + cert_pem: None, + key_pem: None, + root_pem: None, + source: None, + }; + + let create_region = || async { + let region = match ProgenitorOperationRetry::new( + || async { client.region_create(®ion_request).await }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await + { + Ok(v) => Ok(v), + + Err(e) => { + error!( + log, + "region_create saw {:?}", + e; + "region_id" => %region.id(), + "dataset_id" => %dataset_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return an error if Nexus is unable to create the + // requested region + Err(BackoffError::Permanent(WaitError::Permanent(Error::Gone))) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error("insufficient permission for crucible_agent_gone_check") + ))) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::invalid_request(&rv.message) + ))) + } + + _ => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error(&rv.message) + ))) + } + } + } + + _ => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error("unexpected failure during `region_delete`") + ))) + } + } + } + } + }?; + + match region.state { + RegionState::Requested => { + Err(BackoffError::transient(WaitError::Transient(anyhow!( + "Region creation in progress" + )))) + } + + RegionState::Created => Ok(region), + + _ => Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error(&format!( + "Failed to create region, unexpected state: {:?}", + region.state + )), + ))), + } + }; + + let log_create_failure = |_, delay| { + warn!( + log, + "Region requested, not yet created. Retrying in {:?}", + delay; + "dataset" => %dataset.id(), + "region" => %region.id(), + ); + }; + + let region = backoff::retry_notify( + backoff::retry_policy_internal_service(), + create_region, + log_create_failure, + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed + // time before giving up, which means that Transient could be + // returned here. Our current policies do **not** set this + // though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => e, + })?; + + Ok(region.into_inner()) + } + + /// Returns a Ok(Some(Region)) if a region with id {region_id} exists, + /// Ok(None) if it does not (a 404 was seen), and Err otherwise. + async fn maybe_get_crucible_region( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + ) -> Result, Error> { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client.region_get(&RegionId(region_id.to_string())).await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(v) => Ok(Some(v.into_inner())), + + Err(e) => { + error!( + log, + "region_get saw {:?}", + e; + "region_id" => %region_id, + "dataset_id" => %dataset_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return an error if Nexus is unable to query the + // dataset's agent for the requested region + Err(Error::Gone) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + http::StatusCode::NOT_FOUND => { + Ok(None) + } + + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_get`", + )) + } + } + } + } + } + } + + async fn get_crucible_region_snapshots( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + ) -> Result { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client + .region_get_snapshots(&RegionId(region_id.to_string())) + .await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(v) => Ok(v.into_inner()), + + Err(e) => { + error!( + log, + "region_get_snapshots saw {:?}", + e; + "region_id" => %region_id, + "dataset_id" => %dataset_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return an error if Nexus is unable to query the + // dataset's agent for the requested region 's snapshots + Err(Error::Gone) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_get`", + )) + } + } + } + } + } + } + + /// Send a region deletion request + async fn request_crucible_region_delete( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + ) -> Result<(), Error> { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client.region_delete(&RegionId(region_id.to_string())).await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(_) => Ok(()), + + Err(e) => { + error!( + log, + "region_delete saw {:?}", + e; + "region_id" => %region_id, + "dataset_id" => %dataset.id(), + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return Ok if the dataset's agent is gone, no delete + // call is required. + Ok(()) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_delete`", + )) + } + } + } + } + } + } + + /// Send a running snapshot deletion request + async fn request_crucible_running_snapshot_delete( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client + .region_delete_running_snapshot( + &RegionId(region_id.to_string()), + &snapshot_id.to_string(), + ) + .await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(_) => Ok(()), + + Err(e) => { + error!( + log, + "region_delete_running_snapshot saw {:?}", + e; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return Ok if the dataset's agent is gone, no delete + // call is required. + Ok(()) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_delete_running_snapshot`", + )) + } + } + } + } + } + } + + /// Send a snapshot deletion request + async fn request_crucible_snapshot_delete( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + let client = self.crucible_agent_client_for_dataset(dataset); + let dataset_id = dataset.id(); + + let result = ProgenitorOperationRetry::new( + || async { + client + .region_delete_snapshot( + &RegionId(region_id.to_string()), + &snapshot_id.to_string(), + ) + .await + }, + || async { self.crucible_agent_gone_check(dataset_id).await }, + ) + .run(log) + .await; + + match result { + Ok(_) => Ok(()), + + Err(e) => { + error!( + log, + "region_delete_snapshot saw {:?}", + e; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + match e { + ProgenitorOperationRetryError::Gone => { + // Return Ok if the dataset's agent is gone, no delete + // call is required. + Ok(()) + } + + ProgenitorOperationRetryError::GoneCheckError(_) => { + Err(Error::internal_error( + "insufficient permission for crucible_agent_gone_check" + )) + } + + ProgenitorOperationRetryError::ProgenitorError(e) => match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Err(Error::invalid_request(&rv.message)) + } + + _ => { + Err(Error::internal_error(&rv.message)) + } + } + } + + _ => { + Err(Error::internal_error( + "unexpected failure during `region_delete_snapshot`", + )) + } + } + } + } + } + } + + /// Call out to a Crucible agent to delete a region + async fn delete_crucible_region( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + ) -> Result<(), Error> { + // If the region never existed, then a `GET` will return 404, and so + // will a `DELETE`. Catch this case, and return Ok if the region never + // existed. This can occur if an `ensure_all_datasets_and_regions` + // partially fails. + + match self.maybe_get_crucible_region(log, dataset, region_id).await { + Ok(Some(_)) => { + // region found, proceed with deleting + } + + Ok(None) => { + // region never exited, return Ok + return Ok(()); + } + + Err(e) => match e { + Error::Gone => { + // Return Ok if the dataset's agent is gone, no delete call + // is required. + return Ok(()); + } + + _ => { + return Err(e); + } + }, + } + + // Past here, the region exists (or existed at some point): ensure it is + // deleted. Request the deletion (which is idempotent), then wait for + // the appropriate state change. + + self.request_crucible_region_delete(log, dataset, region_id).await?; + + // Wait until the region is deleted + + backoff::retry_notify( + backoff::retry_policy_internal_service_aggressive(), + || async { + let region = match self.maybe_get_crucible_region( + log, + dataset, + region_id, + ).await { + Ok(None) => { + Err(BackoffError::Permanent(WaitError::Permanent( + Error::internal_error( + "dataset {dataset_id} region {region_id} is missing now!", + ) + ))) + } + + Ok(Some(v)) => Ok(v), + + Err(e) => match e { + Error::Gone => { + // Return Ok if the dataset's agent is gone, no + // delete call is required. + return Ok(()); + } + + _ => { + Err(BackoffError::Permanent(WaitError::Permanent(e))) + } + } + }?; + + match region.state { + RegionState::Tombstoned => Err(BackoffError::transient( + WaitError::Transient(anyhow!("region not deleted yet")), + )), + + RegionState::Destroyed => { + info!( + log, + "region deleted"; + "region_id" => %region_id, + ); + + Ok(()) + } + + _ => Err(BackoffError::transient(WaitError::Transient( + anyhow!("region unexpected state {:?}", region.state), + ))), + } + }, + |e: WaitError, delay| { + info!( + log, + "{:?}, trying again in {:?}", + e, + delay; + "region_id" => %region_id, + ); + }, + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed time + // before giving up, which means that Transient could be returned + // here. Our current policies do **not** set this though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => e, + }) + } + + async fn delete_crucible_running_snapshot_impl( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + // request running snapshot deletion + + self.request_crucible_running_snapshot_delete( + log, + dataset, + region_id, + snapshot_id, + ) + .await?; + + // `region_delete_running_snapshot` is only a request: wait until + // running snapshot is deleted + backoff::retry_notify( + backoff::retry_policy_internal_service_aggressive(), + || async { + let response = match self.get_crucible_region_snapshots( + log, + dataset, + region_id, + ) + .await { + Ok(v) => Ok(v), + Err(e) => match e { + Error::Gone => { + // Return Ok if the dataset's agent is gone, no + // delete call is required. + return Ok(()); + } + + _ => { + Err(BackoffError::Permanent(WaitError::Permanent(e))) + } + } + }?; + + match response.running_snapshots.get(&snapshot_id.to_string()) { + Some(running_snapshot) => { + info!( + log, + "running_snapshot is Some, state is {}", + running_snapshot.state.to_string(); + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + match running_snapshot.state { + RegionState::Tombstoned => { + Err(BackoffError::transient( + WaitError::Transient(anyhow!( + "running_snapshot tombstoned, not deleted yet", + ) + ))) + } + + RegionState::Destroyed => { + info!( + log, + "running_snapshot deleted", + ); + + Ok(()) + } + + _ => { + Err(BackoffError::transient( + WaitError::Transient(anyhow!( + "running_snapshot unexpected state", + ) + ))) + } + } + } + + None => { + // deleted? + info!( + log, + "running_snapshot is None"; + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + // break here - it's possible that the running snapshot + // record was GCed, and it won't come back. + Ok(()) + } + } + }, + |e: WaitError, delay| { + info!( + log, + "{:?}, trying again in {:?}", + e, + delay; + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + } + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed time + // before giving up, which means that Transient could be returned + // here. Our current policies do **not** set this though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => { + e + } + }) + } + + pub async fn delete_crucible_snapshot( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + self.delete_crucible_snapshot_impl(log, dataset, region_id, snapshot_id) + .await + } + + async fn delete_crucible_snapshot_impl( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + // Unlike other Crucible agent endpoints, this one is synchronous in that it + // is not only a request to the Crucible agent: `zfs destroy` is performed + // right away. However this is still a request to illumos that may not take + // effect right away. Wait until the snapshot no longer appears in the list + // of region snapshots, meaning it was not returned from `zfs list`. + + let dataset_id = dataset.id(); + + info!( + log, + "requesting region snapshot delete"; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + self.request_crucible_snapshot_delete( + log, + dataset, + region_id, + snapshot_id, + ) + .await?; + + backoff::retry_notify( + backoff::retry_policy_internal_service_aggressive(), + || async { + let response = match self + .get_crucible_region_snapshots(log, dataset, region_id) + .await + { + Ok(v) => Ok(v), + + Err(e) => match e { + Error::Gone => { + // Return Ok if the dataset's agent is gone, no + // delete call is required. + return Ok(()); + } + + _ => Err(BackoffError::Permanent( + WaitError::Permanent(e), + )), + }, + }?; + + if response + .snapshots + .iter() + .any(|x| x.name == snapshot_id.to_string()) + { + info!( + log, + "snapshot still exists, waiting"; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + Err(BackoffError::transient(WaitError::Transient(anyhow!( + "snapshot not deleted yet", + )))) + } else { + info!( + log, + "snapshot deleted"; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + + Ok(()) + } + }, + |e: WaitError, delay| { + info!( + log, + "{:?}, trying again in {:?}", + e, + delay; + "dataset_id" => %dataset_id, + "region_id" => %region_id, + "snapshot_id" => %snapshot_id, + ); + }, + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed time + // before giving up, which means that Transient could be returned + // here. Our current policies do **not** set this though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => e, + }) + } + + // PUBLIC API + + pub async fn ensure_all_datasets_and_regions( + &self, + log: &Logger, + datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, + ) -> Result, Error> { + let request_count = datasets_and_regions.len(); + if request_count == 0 { + return Ok(vec![]); + } + + // Allocate regions, and additionally return the dataset that the region was + // allocated in. + let datasets_and_regions: Vec<(db::model::Dataset, Region)> = + futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + match self + .ensure_region_in_dataset(log, &dataset, ®ion) + .await + { + Ok(result) => Ok((dataset, result)), + Err(e) => Err(e), + } + }) + // Execute the allocation requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, Error>>( + )?; + + // Assert each region has the same block size, otherwise Volume creation + // will fail. + let all_region_have_same_block_size = datasets_and_regions + .windows(2) + .all(|w| w[0].1.block_size == w[1].1.block_size); + + if !all_region_have_same_block_size { + return Err(Error::internal_error( + "volume creation will fail due to block size mismatch", + )); + } + + Ok(datasets_and_regions) + } + + /// Given a list of datasets and regions, send DELETE calls to the datasets + /// corresponding Crucible Agent for each region. + pub async fn delete_crucible_regions( + &self, + log: &Logger, + datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, + ) -> Result<(), Error> { + let request_count = datasets_and_regions.len(); + if request_count == 0 { + return Ok(()); + } + + futures::stream::iter(datasets_and_regions) + .map(|(dataset, region)| async move { + self.delete_crucible_region(log, &dataset, region.id()).await + }) + // Execute the requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + + Ok(()) + } + + /// Ensure that a Crucible "running snapshot" is deleted. + pub async fn delete_crucible_running_snapshot( + &self, + log: &Logger, + dataset: &db::model::Dataset, + region_id: Uuid, + snapshot_id: Uuid, + ) -> Result<(), Error> { + self.delete_crucible_running_snapshot_impl( + log, + dataset, + region_id, + snapshot_id, + ) + .await + } + + /// Given a list of datasets and region snapshots, send DELETE calls to the + /// datasets corresponding Crucible Agent for each running read-only + /// downstairs corresponding to the snapshot. + pub async fn delete_crucible_running_snapshots( + &self, + log: &Logger, + datasets_and_snapshots: Vec<( + db::model::Dataset, + db::model::RegionSnapshot, + )>, + ) -> Result<(), Error> { + let request_count = datasets_and_snapshots.len(); + if request_count == 0 { + return Ok(()); + } + + futures::stream::iter(datasets_and_snapshots) + .map(|(dataset, region_snapshot)| async move { + self.delete_crucible_running_snapshot_impl( + &log, + &dataset, + region_snapshot.region_id, + region_snapshot.snapshot_id, + ) + .await + }) + // Execute the requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + + Ok(()) + } + + /// Given a list of datasets and region snapshots, send DELETE calls to the + /// datasets corresponding Crucible Agent for each snapshot. + pub async fn delete_crucible_snapshots( + &self, + log: &Logger, + datasets_and_snapshots: Vec<( + db::model::Dataset, + db::model::RegionSnapshot, + )>, + ) -> Result<(), Error> { + let request_count = datasets_and_snapshots.len(); + if request_count == 0 { + return Ok(()); + } + + futures::stream::iter(datasets_and_snapshots) + .map(|(dataset, region_snapshot)| async move { + self.delete_crucible_snapshot_impl( + &log, + &dataset, + region_snapshot.region_id, + region_snapshot.snapshot_id, + ) + .await + }) + // Execute the requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + + Ok(()) + } +} diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 4b77788c96..a21db65c0b 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -44,6 +44,7 @@ pub(crate) mod background; mod bfd; mod bgp; mod certificate; +mod crucible; mod deployment; mod device_auth; mod disk; diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index 0fe14f6d2a..1fe8d76783 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -7,737 +7,16 @@ use super::*; use crate::Nexus; -use anyhow::anyhow; -use crucible_agent_client::{ - types::{CreateRegion, RegionId, State as RegionState}, - Client as CrucibleAgentClient, -}; -use futures::StreamExt; use internal_dns::ServiceName; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; -use nexus_db_queries::db::identity::Asset; use nexus_db_queries::db::lookup::LookupPath; use omicron_common::api::external::Error; -use omicron_common::backoff::{self, BackoffError}; use omicron_common::retry_until_known_result; -use slog::Logger; use std::net::SocketAddrV6; -// Arbitrary limit on concurrency, for operations issued on multiple regions -// within a disk at the same time. -const MAX_CONCURRENT_REGION_REQUESTS: usize = 3; - -/// Call out to Crucible agent and perform region creation. -pub(crate) async fn ensure_region_in_dataset( - log: &Logger, - dataset: &db::model::Dataset, - region: &db::model::Region, -) -> Result { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - let region_request = CreateRegion { - block_size: region.block_size().to_bytes(), - extent_count: region.extent_count(), - extent_size: region.blocks_per_extent(), - // TODO: Can we avoid casting from UUID to string? - // NOTE: This'll require updating the crucible agent client. - id: RegionId(region.id().to_string()), - encrypted: region.encrypted(), - cert_pem: None, - key_pem: None, - root_pem: None, - source: None, - }; - - let create_region = || async { - let region = client - .region_create(®ion_request) - .await - .map_err(|e| BackoffError::Permanent(e.into()))?; - match region.state { - RegionState::Requested => Err(BackoffError::transient(anyhow!( - "Region creation in progress" - ))), - - RegionState::Created => Ok(region), - - _ => Err(BackoffError::Permanent(anyhow!( - "Failed to create region, unexpected state: {:?}", - region.state - ))), - } - }; - - let log_create_failure = |_, delay| { - warn!( - log, - "Region requested, not yet created. Retrying in {:?}", - delay; - "region" => %region.id(), - ); - }; - - let region = backoff::retry_notify( - backoff::retry_policy_internal_service(), - create_region, - log_create_failure, - ) - .await - .map_err(|e| Error::internal_error(&e.to_string()))?; - - Ok(region.into_inner()) -} - -pub(crate) async fn ensure_all_datasets_and_regions( - log: &Logger, - datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, -) -> Result< - Vec<(db::model::Dataset, crucible_agent_client::types::Region)>, - ActionError, -> { - let request_count = datasets_and_regions.len(); - - // Allocate regions, and additionally return the dataset that the region was - // allocated in. - let datasets_and_regions: Vec<( - db::model::Dataset, - crucible_agent_client::types::Region, - )> = futures::stream::iter(datasets_and_regions) - .map(|(dataset, region)| async move { - match ensure_region_in_dataset(log, &dataset, ®ion).await { - Ok(result) => Ok((dataset, result)), - Err(e) => Err(e), - } - }) - // Execute the allocation requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::, - >>() - .await - .into_iter() - .collect::, - Error, - >>() - .map_err(ActionError::action_failed)?; - - // Assert each region has the same block size, otherwise Volume creation - // will fail. - let all_region_have_same_block_size = datasets_and_regions - .windows(2) - .all(|w| w[0].1.block_size == w[1].1.block_size); - - if !all_region_have_same_block_size { - return Err(ActionError::action_failed(Error::internal_error( - "volume creation will fail due to block size mismatch", - ))); - } - - Ok(datasets_and_regions) -} - -pub(super) async fn delete_crucible_region( - log: &Logger, - client: &CrucibleAgentClient, - region_id: Uuid, -) -> Result<(), Error> { - // If the region never existed, then a `GET` will return 404, and so will a - // `DELETE`. Catch this case, and return Ok if the region never existed. - // This can occur if an `ensure_all_datasets_and_regions` partially fails. - let result = retry_until_known_result(log, || async { - client.region_get(&RegionId(region_id.to_string())).await - }) - .await; - - if let Err(e) = result { - error!( - log, - "delete_crucible_region: region_get saw {:?}", - e; - "region_id" => %region_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - http::StatusCode::NOT_FOUND => { - // Bail out here! - return Ok(()); - } - - status if status.is_client_error() => { - return Err(Error::invalid_request(&rv.message)); - } - - _ => { - return Err(Error::internal_error(&rv.message)); - } - } - } - - _ => { - return Err(Error::internal_error( - "unexpected failure during `region_get`", - )); - } - } - } - - // Past here, the region exists: ensure it is deleted. - - retry_until_known_result(log, || async { - client.region_delete(&RegionId(region_id.to_string())).await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_region: region_delete saw {:?}", - e; - "region_id" => %region_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete`", - ), - } - })?; - - #[derive(Debug, thiserror::Error)] - enum WaitError { - #[error("Transient error: {0}")] - Transient(#[from] anyhow::Error), - - #[error("Permanent error: {0}")] - Permanent(#[from] Error), - } - - // `region_delete` is only a request: wait until the region is - // deleted - backoff::retry_notify( - backoff::retry_policy_internal_service_aggressive(), - || async { - let region = retry_until_known_result(log, || async { - client.region_get(&RegionId(region_id.to_string())).await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_region: region_get saw {:?}", - e; - "region_id" => %region_id, - ); - - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - BackoffError::Permanent(WaitError::Permanent( - Error::invalid_request(&rv.message), - )) - } - _ => BackoffError::Permanent(WaitError::Permanent( - Error::internal_error(&rv.message), - )), - } - } - _ => BackoffError::Permanent(WaitError::Permanent( - Error::internal_error( - "unexpected failure during `region_get`", - ), - )), - } - })?; - - match region.state { - RegionState::Tombstoned => Err(BackoffError::transient( - WaitError::Transient(anyhow!("region not deleted yet")), - )), - - RegionState::Destroyed => { - info!( - log, - "region deleted"; - "region_id" => %region_id, - ); - - Ok(()) - } - - _ => Err(BackoffError::transient(WaitError::Transient( - anyhow!("region unexpected state {:?}", region.state), - ))), - } - }, - |e: WaitError, delay| { - info!( - log, - "{:?}, trying again in {:?}", - e, - delay; - "region_id" => %region_id, - ); - }, - ) - .await - .map_err(|e| match e { - WaitError::Transient(e) => { - // The backoff crate can be configured with a maximum elapsed time - // before giving up, which means that Transient could be returned - // here. Our current policies do **not** set this though. - Error::internal_error(&e.to_string()) - } - - WaitError::Permanent(e) => e, - }) -} - -// Given a list of datasets and regions, send DELETE calls to the datasets -// corresponding Crucible Agent for each region. -pub(super) async fn delete_crucible_regions( - log: &Logger, - datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, -) -> Result<(), Error> { - let request_count = datasets_and_regions.len(); - if request_count == 0 { - return Ok(()); - } - - futures::stream::iter(datasets_and_regions) - .map(|(dataset, region)| async move { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_region(&log, &client, region.id()).await - }) - // Execute the requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - - Ok(()) -} - -pub(super) async fn delete_crucible_running_snapshot( - log: &Logger, - client: &CrucibleAgentClient, - region_id: Uuid, - snapshot_id: Uuid, -) -> Result<(), Error> { - // delete running snapshot - retry_until_known_result(log, || async { - client - .region_delete_running_snapshot( - &RegionId(region_id.to_string()), - &snapshot_id.to_string(), - ) - .await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_running_snapshot: region_delete_running_snapshot saw {:?}", - e; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete_running_snapshot`", - ), - } - })?; - - #[derive(Debug, thiserror::Error)] - enum WaitError { - #[error("Transient error: {0}")] - Transient(#[from] anyhow::Error), - - #[error("Permanent error: {0}")] - Permanent(#[from] Error), - } - - // `region_delete_running_snapshot` is only a request: wait until - // running snapshot is deleted - backoff::retry_notify( - backoff::retry_policy_internal_service_aggressive(), - || async { - let snapshot = retry_until_known_result(log, || async { - client.region_get_snapshots( - &RegionId(region_id.to_string()), - ).await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_running_snapshot: region_get_snapshots saw {:?}", - e; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - BackoffError::Permanent( - WaitError::Permanent( - Error::invalid_request(&rv.message) - ) - ) - } - _ => BackoffError::Permanent( - WaitError::Permanent( - Error::internal_error(&rv.message) - ) - ) - } - } - _ => BackoffError::Permanent( - WaitError::Permanent( - Error::internal_error( - "unexpected failure during `region_get_snapshots`", - ) - ) - ) - } - })?; - - match snapshot.running_snapshots.get(&snapshot_id.to_string()) { - Some(running_snapshot) => { - info!( - log, - "running_snapshot is Some, state is {}", - running_snapshot.state.to_string(); - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - match running_snapshot.state { - RegionState::Tombstoned => { - Err(BackoffError::transient( - WaitError::Transient(anyhow!( - "running_snapshot tombstoned, not deleted yet", - ) - ))) - } - - RegionState::Destroyed => { - info!( - log, - "running_snapshot deleted", - ); - - Ok(()) - } - - _ => { - Err(BackoffError::transient( - WaitError::Transient(anyhow!( - "running_snapshot unexpected state", - ) - ))) - } - } - } - - None => { - // deleted? - info!( - log, - "running_snapshot is None"; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - // break here - it's possible that the running snapshot - // record was GCed, and it won't come back. - Ok(()) - } - } - }, - |e: WaitError, delay| { - info!( - log, - "{:?}, trying again in {:?}", - e, - delay; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - } - ) - .await - .map_err(|e| match e { - WaitError::Transient(e) => { - // The backoff crate can be configured with a maximum elapsed time - // before giving up, which means that Transient could be returned - // here. Our current policies do **not** set this though. - Error::internal_error(&e.to_string()) - } - - WaitError::Permanent(e) => { - e - } - }) -} - -pub(super) async fn delete_crucible_snapshot( - log: &Logger, - client: &CrucibleAgentClient, - region_id: Uuid, - snapshot_id: Uuid, -) -> Result<(), Error> { - // Unlike other Crucible agent endpoints, this one is synchronous in that it - // is not only a request to the Crucible agent: `zfs destroy` is performed - // right away. However this is still a request to illumos that may not take - // effect right away. Wait until the snapshot no longer appears in the list - // of region snapshots, meaning it was not returned from `zfs list`. - - info!(log, "deleting region {region_id} snapshot {snapshot_id}"); - - retry_until_known_result(log, || async { - client - .region_delete_snapshot( - &RegionId(region_id.to_string()), - &snapshot_id.to_string(), - ) - .await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_snapshot: region_delete_snapshot saw {:?}", - e; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete_snapshot`", - ), - } - })?; - - #[derive(Debug, thiserror::Error)] - enum WaitError { - #[error("Transient error: {0}")] - Transient(#[from] anyhow::Error), - - #[error("Permanent error: {0}")] - Permanent(#[from] Error), - } - - backoff::retry_notify( - backoff::retry_policy_internal_service_aggressive(), - || async { - let response = retry_until_known_result(log, || async { - client - .region_get_snapshots(&RegionId(region_id.to_string())) - .await - }) - .await - .map_err(|e| { - error!( - log, - "delete_crucible_snapshot: region_get_snapshots saw {:?}", - e; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - status if status.is_client_error() => { - BackoffError::Permanent(WaitError::Permanent( - Error::invalid_request(&rv.message), - )) - } - _ => BackoffError::Permanent(WaitError::Permanent( - Error::internal_error(&rv.message), - )), - } - } - _ => BackoffError::Permanent(WaitError::Permanent( - Error::internal_error( - "unexpected failure during `region_get_snapshots`", - ), - )), - } - })?; - - if response - .snapshots - .iter() - .any(|x| x.name == snapshot_id.to_string()) - { - info!( - log, - "snapshot still exists, waiting"; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - Err(BackoffError::transient(WaitError::Transient(anyhow!( - "snapshot not deleted yet", - )))) - } else { - info!( - log, - "snapshot deleted"; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - - Ok(()) - } - }, - |e: WaitError, delay| { - info!( - log, - "{:?}, trying again in {:?}", - e, - delay; - "region_id" => %region_id, - "snapshot_id" => %snapshot_id, - ); - }, - ) - .await - .map_err(|e| match e { - WaitError::Transient(e) => { - // The backoff crate can be configured with a maximum elapsed time - // before giving up, which means that Transient could be returned - // here. Our current policies do **not** set this though. - Error::internal_error(&e.to_string()) - } - - WaitError::Permanent(e) => e, - }) -} - -// Given a list of datasets and region snapshots, send DELETE calls to the -// datasets corresponding Crucible Agent for each snapshot. -pub(super) async fn delete_crucible_snapshots( - log: &Logger, - datasets_and_snapshots: Vec<( - db::model::Dataset, - db::model::RegionSnapshot, - )>, -) -> Result<(), Error> { - let request_count = datasets_and_snapshots.len(); - if request_count == 0 { - return Ok(()); - } - - futures::stream::iter(datasets_and_snapshots) - .map(|(dataset, region_snapshot)| async move { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_snapshot( - &log, - &client, - region_snapshot.region_id, - region_snapshot.snapshot_id, - ) - .await - }) - // Execute the requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - - Ok(()) -} - -// Given a list of datasets and region snapshots, send DELETE calls to the -// datasets corresponding Crucible Agent for each running read-only downstairs -// corresponding to the snapshot. -pub(super) async fn delete_crucible_running_snapshots( - log: &Logger, - datasets_and_snapshots: Vec<( - db::model::Dataset, - db::model::RegionSnapshot, - )>, -) -> Result<(), Error> { - let request_count = datasets_and_snapshots.len(); - if request_count == 0 { - return Ok(()); - } - - futures::stream::iter(datasets_and_snapshots) - .map(|(dataset, region_snapshot)| async move { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_running_snapshot( - &log, - &client, - region_snapshot.region_id, - region_snapshot.snapshot_id, - ) - .await - }) - // Execute the requests concurrently. - .buffer_unordered(std::cmp::min( - request_count, - MAX_CONCURRENT_REGION_REQUESTS, - )) - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - - Ok(()) -} +// Common Pantry operations pub(crate) async fn get_pantry_address( nexus: &Arc, @@ -751,8 +30,6 @@ pub(crate) async fn get_pantry_address( .map_err(ActionError::action_failed) } -// Common Pantry operations - pub(crate) async fn call_pantry_attach_for_disk( log: &slog::Logger, opctx: &OpContext, diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs index 5c4f5bf1ee..1525340f62 100644 --- a/nexus/src/app/sagas/disk_create.rs +++ b/nexus/src/app/sagas/disk_create.rs @@ -5,7 +5,6 @@ use super::{ common_storage::{ call_pantry_attach_for_disk, call_pantry_detach_for_disk, - delete_crucible_regions, ensure_all_datasets_and_regions, get_pantry_address, }, ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, @@ -345,16 +344,20 @@ async fn sdc_noop(_sagactx: NexusActionContext) -> Result<(), ActionError> { async fn sdc_regions_ensure( sagactx: NexusActionContext, ) -> Result { - let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let log = osagactx.log(); let disk_id = sagactx.lookup::("disk_id")?; - let datasets_and_regions = ensure_all_datasets_and_regions( - &log, - sagactx.lookup::>( - "datasets_and_regions", - )?, - ) - .await?; + let datasets_and_regions = osagactx + .nexus() + .ensure_all_datasets_and_regions( + &log, + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await + .map_err(ActionError::action_failed)?; let block_size = datasets_and_regions[0].1.block_size; let blocks_per_extent = datasets_and_regions[0].1.extent_size; @@ -550,13 +553,15 @@ async fn sdc_regions_ensure_undo( warn!(log, "sdc_regions_ensure_undo: Deleting crucible regions"); - let result = delete_crucible_regions( - log, - sagactx.lookup::>( - "datasets_and_regions", - )?, - ) - .await; + let result = osagactx + .nexus() + .delete_crucible_regions( + log, + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await; match result { Err(e) => { diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index 53e06e310d..dc42c72daa 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -91,8 +91,6 @@ use super::{ common_storage::{ call_pantry_attach_for_disk, call_pantry_detach_for_disk, - delete_crucible_regions, delete_crucible_running_snapshot, - delete_crucible_snapshot, ensure_all_datasets_and_regions, get_pantry_address, }, ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, @@ -380,13 +378,16 @@ async fn ssc_regions_ensure( let destination_volume_id = sagactx.lookup::("destination_volume_id")?; - let datasets_and_regions = ensure_all_datasets_and_regions( - &log, - sagactx.lookup::>( - "datasets_and_regions", - )?, - ) - .await?; + let datasets_and_regions = osagactx + .nexus() + .ensure_all_datasets_and_regions( + &log, + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await + .map_err(ActionError::action_failed)?; let block_size = datasets_and_regions[0].1.block_size; let blocks_per_extent = datasets_and_regions[0].1.extent_size; @@ -458,15 +459,18 @@ async fn ssc_regions_ensure( async fn ssc_regions_ensure_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { - let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let log = osagactx.log(); warn!(log, "ssc_regions_ensure_undo: Deleting crucible regions"); - delete_crucible_regions( - log, - sagactx.lookup::>( - "datasets_and_regions", - )?, - ) - .await?; + osagactx + .nexus() + .delete_crucible_regions( + log, + sagactx.lookup::>( + "datasets_and_regions", + )?, + ) + .await?; info!(log, "ssc_regions_ensure_undo: Deleted crucible regions"); Ok(()) } @@ -764,10 +768,9 @@ async fn ssc_send_snapshot_request_to_sled_agent_undo( // ... and instruct each of those regions to delete the snapshot. for (dataset, region) in datasets_and_regions { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_snapshot(log, &client, region.id(), snapshot_id) + osagactx + .nexus() + .delete_crucible_snapshot(log, &dataset, region.id(), snapshot_id) .await?; } @@ -1090,10 +1093,9 @@ async fn ssc_call_pantry_snapshot_for_disk_undo( // ... and instruct each of those regions to delete the snapshot. for (dataset, region) in datasets_and_regions { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_snapshot(log, &client, region.id(), snapshot_id) + osagactx + .nexus() + .delete_crucible_snapshot(log, &dataset, region.id(), snapshot_id) .await?; } Ok(()) @@ -1350,16 +1352,15 @@ async fn ssc_start_running_snapshot_undo( // ... and instruct each of those regions to delete the running snapshot. for (dataset, region) in datasets_and_regions { - let url = format!("http://{}", dataset.address()); - let client = CrucibleAgentClient::new(&url); - - delete_crucible_running_snapshot( - &log, - &client, - region.id(), - snapshot_id, - ) - .await?; + osagactx + .nexus() + .delete_crucible_running_snapshot( + &log, + &dataset, + region.id(), + snapshot_id, + ) + .await?; osagactx .datastore() diff --git a/nexus/src/app/sagas/volume_delete.rs b/nexus/src/app/sagas/volume_delete.rs index 22425a0b99..bfd8e6616c 100644 --- a/nexus/src/app/sagas/volume_delete.rs +++ b/nexus/src/app/sagas/volume_delete.rs @@ -23,9 +23,6 @@ //! resources, and when they are inserted or deleted the accounting needs to //! change. Saga nodes must be idempotent in order to work correctly. -use super::common_storage::delete_crucible_regions; -use super::common_storage::delete_crucible_running_snapshots; -use super::common_storage::delete_crucible_snapshots; use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; @@ -45,17 +42,11 @@ pub(crate) struct Params { pub serialized_authn: authn::saga::Serialized, pub volume_id: Uuid, } + // volume delete saga: actions declare_saga_actions! { volume_delete; - // TODO(https://github.com/oxidecomputer/omicron/issues/612): - // - // We need a way to deal with this operation failing, aside from - // propagating the error to the user. - // - // What if the Sled goes offline? Nexus must ultimately be - // responsible for reconciling this scenario. DECREASE_CRUCIBLE_RESOURCE_COUNT -> "crucible_resources_to_delete" { + svd_decrease_crucible_resource_count } @@ -169,14 +160,16 @@ async fn svd_delete_crucible_regions( )) })?; - delete_crucible_regions(log, datasets_and_regions.clone()).await.map_err( - |e| { + osagactx + .nexus() + .delete_crucible_regions(log, datasets_and_regions.clone()) + .await + .map_err(|e| { ActionError::action_failed(format!( "failed to delete_crucible_regions: {:?}", e, )) - }, - )?; + })?; // Remove DB records let region_ids_to_delete = @@ -226,7 +219,9 @@ async fn svd_delete_crucible_running_snapshots( )) })?; - delete_crucible_running_snapshots(log, datasets_and_snapshots.clone()) + osagactx + .nexus() + .delete_crucible_running_snapshots(log, datasets_and_snapshots.clone()) .await .map_err(|e| { ActionError::action_failed(format!( @@ -267,7 +262,9 @@ async fn svd_delete_crucible_snapshots( )) })?; - delete_crucible_snapshots(log, datasets_and_snapshots.clone()) + osagactx + .nexus() + .delete_crucible_snapshots(log, datasets_and_snapshots.clone()) .await .map_err(|e| { ActionError::action_failed(format!( @@ -439,7 +436,12 @@ async fn svd_delete_freed_crucible_regions( } // Send DELETE calls to the corresponding Crucible agents - delete_crucible_regions(log, vec![(dataset.clone(), region.clone())]) + osagactx + .nexus() + .delete_crucible_regions( + log, + vec![(dataset.clone(), region.clone())], + ) .await .map_err(|e| { ActionError::action_failed(format!( diff --git a/nexus/src/app/session.rs b/nexus/src/app/session.rs index dd3665161a..fba4b2f0b7 100644 --- a/nexus/src/app/session.rs +++ b/nexus/src/app/session.rs @@ -157,9 +157,8 @@ impl super::Nexus { | Error::InsufficientCapacity { .. } | Error::TypeVersionMismatch { .. } | Error::Conflict { .. } - | Error::NotFound { .. } => { - Reason::UnknownError { source: error } - } + | Error::NotFound { .. } + | Error::Gone => Reason::UnknownError { source: error }, })?; Ok(db_silo_user.silo_id) } diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 886504a83b..c19c1852db 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -11,6 +11,7 @@ use dropshot::HttpErrorResponseBody; use http::method::Method; use http::StatusCode; use nexus_config::RegionAllocationStrategy; +use nexus_db_model::PhysicalDiskPolicy; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::datastore::REGION_REDUNDANCY_THRESHOLD; use nexus_db_queries::db::fixed_data::{silo::DEFAULT_SILO_ID, FLEET_ID}; @@ -40,11 +41,13 @@ use omicron_common::api::external::NameOrId; use omicron_nexus::app::{MAX_DISK_SIZE_BYTES, MIN_DISK_SIZE_BYTES}; use omicron_nexus::Nexus; use omicron_nexus::TestInterfaces as _; +use omicron_uuid_kinds::GenericUuid; use oximeter::types::Datum; use oximeter::types::Measurement; use sled_agent_client::TestInterfaces as _; use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; use uuid::Uuid; type ControlPlaneTestContext = @@ -2458,6 +2461,80 @@ async fn test_region_allocation_after_delete( assert_eq!(allocated_regions.len(), REGION_REDUNDANCY_THRESHOLD); } +#[nexus_test] +async fn test_no_halt_disk_delete_one_region_on_expunged_agent( + cptestctx: &ControlPlaneTestContext, +) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = + OpContext::for_tests(cptestctx.logctx.log.new(o!()), datastore.clone()); + + // Create the regular three 10 GiB zpools, each with one dataset. + let disk_test = DiskTest::new(&cptestctx).await; + + // Create a disk + let client = &cptestctx.external_client; + let _project_id = create_project_and_pool(client).await; + + let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await; + + // Grab the db record now, before the delete + let (.., db_disk) = LookupPath::new(&opctx, datastore) + .disk_id(disk.identity.id) + .fetch() + .await + .unwrap(); + + // Choose one of the datasets, and drop the simulated Crucible agent + let zpool = &disk_test.zpools[0]; + let dataset = &zpool.datasets[0]; + + cptestctx.sled_agent.sled_agent.drop_dataset(zpool.id, dataset.id).await; + + // Spawn a task that tries to delete the disk + let disk_url = get_disk_url(DISK_NAME); + let client = client.clone(); + let jh = tokio::spawn(async move { + NexusRequest::object_delete(&client, &disk_url) + .authn_as(AuthnMode::PrivilegedUser) + .execute() + .await + .expect("failed to delete disk") + }); + + // It won't finish until the dataset is expunged. + tokio::time::sleep(Duration::from_secs(3)).await; + assert!(!jh.is_finished()); + + // Expunge the physical disk + let (_, db_zpool) = LookupPath::new(&opctx, datastore) + .zpool_id(zpool.id.into_untyped_uuid()) + .fetch() + .await + .unwrap(); + + datastore + .physical_disk_update_policy( + &opctx, + db_zpool.physical_disk_id, + PhysicalDiskPolicy::Expunged, + ) + .await + .unwrap(); + + // Now, the delete call will finish Ok + jh.await.unwrap(); + + // Ensure that the disk was properly deleted and all the regions are gone - + // Nexus should hard delete the region records in this case. + + let datasets_and_regions = + datastore.get_allocated_regions(db_disk.volume_id).await.unwrap(); + + assert!(datasets_and_regions.is_empty()); +} + async fn disk_get(client: &ClientTestContext, disk_url: &str) -> Disk { NexusRequest::object_get(client, disk_url) .authn_as(AuthnMode::PrivilegedUser) diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index 298a8adc34..19c84da18c 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -870,4 +870,8 @@ impl SledAgent { ) { *self.fake_zones.lock().await = requested_zones; } + + pub async fn drop_dataset(&self, zpool_id: ZpoolUuid, dataset_id: Uuid) { + self.storage.lock().await.drop_dataset(zpool_id, dataset_id) + } } diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs index 6a688f6101..dac2a4cb48 100644 --- a/sled-agent/src/sim/storage.rs +++ b/sled-agent/src/sim/storage.rs @@ -543,6 +543,10 @@ impl Zpool { None } + + pub fn drop_dataset(&mut self, id: Uuid) { + let _ = self.datasets.remove(&id).expect("Failed to get the dataset"); + } } /// Simulated representation of all storage on a sled. @@ -642,6 +646,7 @@ impl Storage { pub fn zpools(&self) -> &HashMap { &self.zpools } + /// Adds a Dataset to the sled's simulated storage. pub async fn insert_dataset( &mut self, @@ -757,6 +762,13 @@ impl Storage { None } + + pub fn drop_dataset(&mut self, zpool_id: ZpoolUuid, dataset_id: Uuid) { + self.zpools + .get_mut(&zpool_id) + .expect("Zpool does not exist") + .drop_dataset(dataset_id) + } } /// Simulated crucible pantry