diff --git a/nexus/db-model/src/volume.rs b/nexus/db-model/src/volume.rs index 3fbb4378416..bd6e349fbff 100644 --- a/nexus/db-model/src/volume.rs +++ b/nexus/db-model/src/volume.rs @@ -7,19 +7,9 @@ use crate::collection::DatastoreCollectionConfig; use crate::schema::{region, volume}; use chrono::{DateTime, Utc}; use db_macros::Asset; -use serde::{Deserialize, Serialize}; use uuid::Uuid; -#[derive( - Asset, - Queryable, - Insertable, - Debug, - Selectable, - Serialize, - Deserialize, - Clone, -)] +#[derive(Asset, Queryable, Insertable, Debug, Selectable, Clone)] #[diesel(table_name = volume)] pub struct Volume { #[diesel(embed)] diff --git a/nexus/db-queries/src/db/datastore/disk.rs b/nexus/db-queries/src/db/datastore/disk.rs index 57b7bed00b0..7ae9967285b 100644 --- a/nexus/db-queries/src/db/datastore/disk.rs +++ b/nexus/db-queries/src/db/datastore/disk.rs @@ -568,17 +568,12 @@ impl DataStore { pub async fn project_delete_disk_no_auth( &self, disk_id: &Uuid, + ok_to_delete_states: &[api::external::DiskState], ) -> Result { use db::schema::disk::dsl; let pool = self.pool(); let now = Utc::now(); - let ok_to_delete_states = vec![ - api::external::DiskState::Detached, - api::external::DiskState::Faulted, - api::external::DiskState::Creating, - ]; - let ok_to_delete_state_labels: Vec<_> = ok_to_delete_states.iter().map(|s| s.label()).collect(); let destroyed = api::external::DiskState::Destroyed.label(); diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 06373fdb587..5721583fd00 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -92,6 +92,7 @@ pub use silo::Discoverability; pub use switch_port::SwitchPortSettingsCombinedResult; pub use virtual_provisioning_collection::StorageType; pub use volume::CrucibleResources; +pub use volume::CrucibleTargets; // Number of unique datasets required to back a region. // TODO: This should likely turn into a configuration option. diff --git a/nexus/db-queries/src/db/datastore/region.rs b/nexus/db-queries/src/db/datastore/region.rs index c39f7ceb409..6bfea9085d0 100644 --- a/nexus/db-queries/src/db/datastore/region.rs +++ b/nexus/db-queries/src/db/datastore/region.rs @@ -22,6 +22,8 @@ use nexus_types::external_api::params; use omicron_common::api::external; use omicron_common::api::external::DeleteResult; use omicron_common::api::external::Error; +use omicron_common::backoff::{self, BackoffError}; +use slog::Logger; use uuid::Uuid; impl DataStore { @@ -146,6 +148,7 @@ impl DataStore { /// Also updates the storage usage on their corresponding datasets. pub async fn regions_hard_delete( &self, + log: &Logger, region_ids: Vec, ) -> DeleteResult { if region_ids.is_empty() { @@ -159,67 +162,95 @@ impl DataStore { } type TxnError = TransactionError; - self.pool() - .transaction(move |conn| { - use db::schema::dataset::dsl as dataset_dsl; - use db::schema::region::dsl as region_dsl; - - // Remove the regions, collecting datasets they're from. - let datasets = diesel::delete(region_dsl::region) - .filter(region_dsl::id.eq_any(region_ids)) - .returning(region_dsl::dataset_id) - .get_results::(conn)?; - - // Update datasets to which the regions belonged. - for dataset in datasets { - let dataset_total_occupied_size: Option< - diesel::pg::data_types::PgNumeric, - > = region_dsl::region - .filter(region_dsl::dataset_id.eq(dataset)) - .select(diesel::dsl::sum( - region_dsl::block_size - * region_dsl::blocks_per_extent - * region_dsl::extent_count, - )) - .nullable() - .get_result(conn)?; - - let dataset_total_occupied_size: i64 = if let Some( - dataset_total_occupied_size, - ) = - dataset_total_occupied_size - { - let dataset_total_occupied_size: db::model::ByteCount = - dataset_total_occupied_size.try_into().map_err( - |e: anyhow::Error| { - TxnError::CustomError( - RegionDeleteError::NumericError( - e.to_string(), - ), - ) - }, - )?; - - dataset_total_occupied_size.into() - } else { - 0 - }; - - diesel::update(dataset_dsl::dataset) - .filter(dataset_dsl::id.eq(dataset)) - .set( - dataset_dsl::size_used - .eq(dataset_total_occupied_size), - ) - .execute(conn)?; - } - - Ok(()) - }) - .await - .map_err(|e: TxnError| { - Error::internal_error(&format!("Transaction error: {}", e)) - }) + // Retry this transaction until it succeeds. It's a little heavy in that + // there's a for loop inside that iterates over the datasets the + // argument regions belong to, and it often encounters the "retry + // transaction" error. + let transaction = { + |region_ids: Vec| async { + self.pool() + .transaction(move |conn| { + use db::schema::dataset::dsl as dataset_dsl; + use db::schema::region::dsl as region_dsl; + + // Remove the regions, collecting datasets they're from. + let datasets = diesel::delete(region_dsl::region) + .filter(region_dsl::id.eq_any(region_ids)) + .returning(region_dsl::dataset_id) + .get_results::(conn)?; + + // Update datasets to which the regions belonged. + for dataset in datasets { + let dataset_total_occupied_size: Option< + diesel::pg::data_types::PgNumeric, + > = region_dsl::region + .filter(region_dsl::dataset_id.eq(dataset)) + .select(diesel::dsl::sum( + region_dsl::block_size + * region_dsl::blocks_per_extent + * region_dsl::extent_count, + )) + .nullable() + .get_result(conn)?; + + let dataset_total_occupied_size: i64 = if let Some( + dataset_total_occupied_size, + ) = + dataset_total_occupied_size + { + let dataset_total_occupied_size: db::model::ByteCount = + dataset_total_occupied_size.try_into().map_err( + |e: anyhow::Error| { + TxnError::CustomError( + RegionDeleteError::NumericError( + e.to_string(), + ), + ) + }, + )?; + + dataset_total_occupied_size.into() + } else { + 0 + }; + + diesel::update(dataset_dsl::dataset) + .filter(dataset_dsl::id.eq(dataset)) + .set( + dataset_dsl::size_used + .eq(dataset_total_occupied_size), + ) + .execute(conn)?; + } + + Ok(()) + }) + .await + .map_err(|e: TxnError| { + if e.retry_transaction() { + BackoffError::transient(Error::internal_error( + &format!("Retryable transaction error {:?}", e) + )) + } else { + BackoffError::Permanent(Error::internal_error( + &format!("Transaction error: {}", e) + )) + } + }) + } + }; + + backoff::retry_notify( + backoff::retry_policy_internal_service_aggressive(), + || async { + let region_ids = region_ids.clone(); + transaction(region_ids).await + }, + |e: Error, delay| { + info!(log, "{:?}, trying again in {:?}", e, delay,); + }, + ) + .await } /// Return the total occupied size for a dataset diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index a2f9e211386..fd1d9b30349 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -16,6 +16,7 @@ use crate::db::model::RegionSnapshot; use crate::db::model::Volume; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; +use async_bb8_diesel::OptionalExtension; use chrono::Utc; use diesel::prelude::*; use diesel::OptionalExtension as DieselOptionalExtension; @@ -24,7 +25,6 @@ use omicron_common::api::external::DeleteResult; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::LookupResult; -use omicron_common::api::external::LookupType; use omicron_common::api::external::ResourceType; use serde::Deserialize; use serde::Serialize; @@ -45,6 +45,26 @@ impl DataStore { } type TxnError = TransactionError; + // Grab all the targets that the volume construction request references. + // Do this outside the transaction, as the data inside volume doesn't + // change and this would simply add to the transaction time. + let crucible_targets = { + let vcr: VolumeConstructionRequest = + serde_json::from_str(&volume.data()).map_err(|e| { + Error::internal_error(&format!( + "serde_json::from_str error in volume_create: {}", + e + )) + })?; + + let mut crucible_targets = CrucibleTargets::default(); + read_only_resources_associated_with_volume( + &vcr, + &mut crucible_targets, + ); + crucible_targets + }; + self.pool() .transaction(move |conn| { let maybe_volume: Option = dsl::volume @@ -93,25 +113,6 @@ impl DataStore { // Increase the usage count for Crucible resources according to the // contents of the volume. - // Grab all the targets that the volume construction request references. - let crucible_targets = { - let vcr: VolumeConstructionRequest = serde_json::from_str( - &volume.data(), - ) - .map_err(|e: serde_json::Error| { - TxnError::CustomError(VolumeCreationError::SerdeError( - e, - )) - })?; - - let mut crucible_targets = CrucibleTargets::default(); - resources_associated_with_volume( - &vcr, - &mut crucible_targets, - ); - crucible_targets - }; - // Increase the number of uses for each referenced region snapshot. use db::schema::region_snapshot::dsl as rs_dsl; for read_only_target in &crucible_targets.read_only_targets { @@ -146,24 +147,44 @@ impl DataStore { }) } - pub async fn volume_hard_delete(&self, volume_id: Uuid) -> DeleteResult { + /// Return a Option based on id, even if it's soft deleted. + pub async fn volume_get( + &self, + volume_id: Uuid, + ) -> LookupResult> { use db::schema::volume::dsl; - - diesel::delete(dsl::volume) + dsl::volume .filter(dsl::id.eq(volume_id)) - .execute_async(self.pool()) + .select(Volume::as_select()) + .first_async::(self.pool()) + .await + .optional() + .map_err(|e| public_error_from_diesel_pool(e, ErrorHandler::Server)) + } + + /// Delete the volume if it exists. If it was already deleted, this is a + /// no-op. + pub async fn volume_hard_delete(&self, volume_id: Uuid) -> DeleteResult { + self.pool() + .transaction(move |conn| { + use db::schema::volume::dsl; + + let volume = dsl::volume + .filter(dsl::id.eq(volume_id)) + .select(Volume::as_select()) + .first::(conn) + .optional()?; + + if volume.is_some() { + diesel::delete(dsl::volume) + .filter(dsl::id.eq(volume_id)) + .execute(conn)?; + } + + Ok(()) + }) .await - .map_err(|e| { - public_error_from_diesel_pool( - e, - ErrorHandler::NotFoundByLookup( - ResourceType::Volume, - LookupType::ById(volume_id), - ), - ) - })?; - - Ok(()) + .map_err(|e| public_error_from_diesel_pool(e, ErrorHandler::Server)) } /// Checkout a copy of the Volume from the database. @@ -457,6 +478,8 @@ impl DataStore { .filter( dsl::volume_references .eq(0) + // Despite the SQL specifying that this column is NOT NULL, + // this null check is required for this function to work! .or(dsl::volume_references.is_null()), ) // where the volume has already been soft-deleted @@ -472,6 +495,28 @@ impl DataStore { .map_err(|e| public_error_from_diesel_pool(e, ErrorHandler::Server)) } + pub async fn read_only_resources_associated_with_volume( + &self, + volume_id: Uuid, + ) -> LookupResult { + let volume = if let Some(volume) = self.volume_get(volume_id).await? { + volume + } else { + // Volume has already been hard deleted (volume_get returns + // soft deleted records), return that no cleanup is necessary. + return Ok(CrucibleTargets::default()); + }; + + let vcr: VolumeConstructionRequest = + serde_json::from_str(&volume.data())?; + + let mut crucible_targets = CrucibleTargets::default(); + + read_only_resources_associated_with_volume(&vcr, &mut crucible_targets); + + Ok(crucible_targets) + } + /// Decrease the usage count for Crucible resources according to the /// contents of the volume. Call this when deleting a volume (but before the /// volume record has been hard deleted). @@ -493,6 +538,37 @@ impl DataStore { } type TxnError = TransactionError; + // Grab all the targets that the volume construction request references. + // Do this outside the transaction, as the data inside volume doesn't + // change and this would simply add to the transaction time. + let crucible_targets = { + let volume = + if let Some(volume) = self.volume_get(volume_id).await? { + volume + } else { + // The volume was hard-deleted, return an empty + // CrucibleResources + return Ok(CrucibleResources::V1( + CrucibleResourcesV1::default(), + )); + }; + + let vcr: VolumeConstructionRequest = + serde_json::from_str(&volume.data()).map_err(|e| { + Error::internal_error(&format!( + "serde_json::from_str error in volume_create: {}", + e + )) + })?; + + let mut crucible_targets = CrucibleTargets::default(); + read_only_resources_associated_with_volume( + &vcr, + &mut crucible_targets, + ); + crucible_targets + }; + // In a transaction: // // 1. decrease the number of references for each region snapshot that @@ -518,11 +594,11 @@ impl DataStore { // hard-deleted, assume clean-up has occurred and return an empty // CrucibleResources. If the volume record was soft-deleted, then // return the serialized CrucibleResources. - let volume = { - use db::schema::volume::dsl; + use db::schema::volume::dsl as volume_dsl; - let volume = dsl::volume - .filter(dsl::id.eq(volume_id)) + { + let volume = volume_dsl::volume + .filter(volume_dsl::id.eq(volume_id)) .select(Volume::as_select()) .get_result(conn) .optional()?; @@ -573,23 +649,6 @@ impl DataStore { } }; - // Grab all the targets that the volume construction request references. - let crucible_targets = { - let vcr: VolumeConstructionRequest = - serde_json::from_str(&volume.data()).map_err(|e| { - TxnError::CustomError( - DecreaseCrucibleResourcesError::SerdeError(e), - ) - })?; - - let mut crucible_targets = CrucibleTargets::default(); - resources_associated_with_volume( - &vcr, - &mut crucible_targets, - ); - crucible_targets - }; - // Decrease the number of uses for each referenced region snapshot. use db::schema::region_snapshot::dsl; @@ -631,6 +690,8 @@ impl DataStore { .filter( dsl::volume_references .eq(0) + // Despite the SQL specifying that this column is NOT NULL, + // this null check is required for this function to work! .or(dsl::volume_references.is_null()), ) .select((Dataset::as_select(), Region::as_select())) @@ -663,8 +724,6 @@ impl DataStore { // Soft delete this volume, and serialize the resources that are to // be cleaned up. - use db::schema::volume::dsl as volume_dsl; - let now = Utc::now(); diesel::update(volume_dsl::volume) .filter(volume_dsl::id.eq(volume_id)) @@ -896,8 +955,8 @@ impl DataStore { } } -#[derive(Default)] -struct CrucibleTargets { +#[derive(Default, Debug, Serialize, Deserialize)] +pub struct CrucibleTargets { read_only_targets: Vec, } @@ -917,7 +976,7 @@ pub struct CrucibleResourcesV1 { /// Return the targets from a VolumeConstructionRequest. /// /// The targets of a volume construction request map to resources. -fn resources_associated_with_volume( +fn read_only_resources_associated_with_volume( vcr: &VolumeConstructionRequest, crucible_targets: &mut CrucibleTargets, ) { @@ -929,11 +988,14 @@ fn resources_associated_with_volume( read_only_parent, } => { for sub_volume in sub_volumes { - resources_associated_with_volume(sub_volume, crucible_targets); + read_only_resources_associated_with_volume( + sub_volume, + crucible_targets, + ); } if let Some(read_only_parent) = read_only_parent { - resources_associated_with_volume( + read_only_resources_associated_with_volume( read_only_parent, crucible_targets, ); diff --git a/nexus/db-queries/src/db/error.rs b/nexus/db-queries/src/db/error.rs index 82a4703b2fa..59094d2e0ba 100644 --- a/nexus/db-queries/src/db/error.rs +++ b/nexus/db-queries/src/db/error.rs @@ -51,6 +51,34 @@ impl From for TransactionError { } } +impl TransactionError { + /// Based on [the CRDB][1] docs, return true if this transaction must be + /// retried. + /// + /// [1]: https://www.cockroachlabs.com/docs/v23.1/transaction-retry-error-reference#client-side-retry-handling + pub fn retry_transaction(&self) -> bool { + match &self { + TransactionError::Pool(e) => match e { + PoolError::Connection(ConnectionError::Query( + DieselError::DatabaseError(kind, boxed_error_information), + )) => match kind { + DieselErrorKind::SerializationFailure => { + return boxed_error_information + .message() + .starts_with("restart transaction"); + } + + _ => false, + }, + + _ => false, + }, + + _ => false, + } + } +} + /// Summarizes details provided with a database error. fn format_database_error( kind: DieselErrorKind, diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index 3379b4f3360..98db2ecc7cd 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -60,7 +60,9 @@ pub async fn ensure_region_in_dataset( RegionState::Requested => Err(BackoffError::transient(anyhow!( "Region creation in progress" ))), + RegionState::Created => Ok(region), + _ => Err(BackoffError::Permanent(anyhow!( "Failed to create region, unexpected state: {:?}", region.state @@ -141,9 +143,117 @@ pub async fn ensure_all_datasets_and_regions( Ok(datasets_and_regions) } +pub(super) async fn delete_crucible_region( + log: &Logger, + client: &CrucibleAgentClient, + region_id: Uuid, +) -> Result<(), Error> { + retry_until_known_result(log, || async { + client.region_delete(&RegionId(region_id.to_string())).await + }) + .await + .map_err(|e| { + error!(log, "delete_crucible_region: region_delete saw {:?}", e); + match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Error::invalid_request(&rv.message) + } + _ => Error::internal_error(&rv.message), + } + } + _ => Error::internal_error( + "unexpected failure during `region_delete`", + ), + } + })?; + + #[derive(Debug, thiserror::Error)] + pub enum WaitError { + #[error("Transient error: {0}")] + Transient(#[from] anyhow::Error), + + #[error("Permanent error: {0}")] + Permanent(#[from] Error), + } + + // `region_delete` is only a request: wait until the region is + // deleted + backoff::retry_notify( + backoff::retry_policy_internal_service_aggressive(), + || async { + let region = retry_until_known_result(log, || async { + client.region_get(&RegionId(region_id.to_string())).await + }) + .await + .map_err(|e| { + error!(log, "delete_crucible_region: region_get saw {:?}", e); + + match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + BackoffError::Permanent(WaitError::Permanent( + Error::invalid_request(&rv.message), + )) + } + _ => BackoffError::Permanent(WaitError::Permanent( + Error::internal_error(&rv.message), + )), + } + } + _ => BackoffError::Permanent(WaitError::Permanent( + Error::internal_error( + "unexpected failure during `region_get`", + ), + )), + } + })?; + + match region.state { + RegionState::Tombstoned => { + Err(BackoffError::transient(WaitError::Transient(anyhow!( + "region {} not deleted yet", + region_id.to_string(), + )))) + } + + RegionState::Destroyed => { + info!(log, "region {} deleted", region_id.to_string(),); + + Ok(()) + } + + _ => { + Err(BackoffError::transient(WaitError::Transient(anyhow!( + "region {} unexpected state", + region_id.to_string(), + )))) + } + } + }, + |e: WaitError, delay| { + info!(log, "{:?}, trying again in {:?}", e, delay,); + }, + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed time + // before giving up, which means that Transient could be returned + // here. Our current policies do **not** set this though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => e, + }) +} + // Given a list of datasets and regions, send DELETE calls to the datasets // corresponding Crucible Agent for each region. pub(super) async fn delete_crucible_regions( + log: &Logger, datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, ) -> Result<(), Error> { let request_count = datasets_and_regions.len(); @@ -155,30 +265,263 @@ pub(super) async fn delete_crucible_regions( .map(|(dataset, region)| async move { let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - let id = RegionId(region.id().to_string()); - client.region_delete(&id).await.map_err(|e| match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - http::StatusCode::SERVICE_UNAVAILABLE => { - Error::unavail(&rv.message) + + delete_crucible_region(&log, &client, region.id()).await + }) + // Execute the requests concurrently. + .buffer_unordered(std::cmp::min( + request_count, + MAX_CONCURRENT_REGION_REQUESTS, + )) + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + + Ok(()) +} + +pub(super) async fn delete_crucible_running_snapshot( + log: &Logger, + client: &CrucibleAgentClient, + region_id: Uuid, + snapshot_id: Uuid, +) -> Result<(), Error> { + // delete running snapshot + retry_until_known_result(log, || async { + client + .region_delete_running_snapshot( + &RegionId(region_id.to_string()), + &snapshot_id.to_string(), + ) + .await + }) + .await + .map_err(|e| { + error!( + log, + "delete_crucible_snapshot: region_delete_running_snapshot saw {:?}", + e + ); + match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Error::invalid_request(&rv.message) + } + _ => Error::internal_error(&rv.message), + } + } + _ => Error::internal_error( + "unexpected failure during `region_delete_running_snapshot`", + ), + } + })?; + + #[derive(Debug, thiserror::Error)] + pub enum WaitError { + #[error("Transient error: {0}")] + Transient(#[from] anyhow::Error), + + #[error("Permanent error: {0}")] + Permanent(#[from] Error), + } + + // `region_delete_running_snapshot` is only a request: wait until + // running snapshot is deleted + backoff::retry_notify( + backoff::retry_policy_internal_service_aggressive(), + || async { + let snapshot = retry_until_known_result(log, || async { + client.region_get_snapshots( + &RegionId(region_id.to_string()), + ).await + }) + .await + .map_err(|e| { + error!(log, "delete_crucible_snapshot: region_get_snapshots saw {:?}", e); + match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + BackoffError::Permanent( + WaitError::Permanent( + Error::invalid_request(&rv.message) + ) + ) + } + _ => BackoffError::Permanent( + WaitError::Permanent( + Error::internal_error(&rv.message) + ) + ) + } + } + _ => BackoffError::Permanent( + WaitError::Permanent( + Error::internal_error( + "unexpected failure during `region_get_snapshots`", + ) + ) + ) + } + })?; + + match snapshot.running_snapshots.get(&snapshot_id.to_string()) { + Some(running_snapshot) => { + info!( + log, + "region {} snapshot {} running_snapshot is Some, state is {}", + region_id.to_string(), + snapshot_id.to_string(), + running_snapshot.state.to_string(), + ); + + match running_snapshot.state { + RegionState::Tombstoned => { + Err(BackoffError::transient( + WaitError::Transient(anyhow!( + "region {} snapshot {} running_snapshot not deleted yet", + region_id.to_string(), + snapshot_id.to_string(), + ) + ))) + } + + RegionState::Destroyed => { + info!( + log, + "region {} snapshot {} running_snapshot deleted", + region_id.to_string(), + snapshot_id.to_string(), + ); + + Ok(()) } - status if status.is_client_error() => { - Error::invalid_request(&rv.message) + + _ => { + Err(BackoffError::transient( + WaitError::Transient(anyhow!( + "region {} snapshot {} running_snapshot unexpected state", + region_id.to_string(), + snapshot_id.to_string(), + ) + ))) } - _ => Error::internal_error(&rv.message), } } - _ => Error::internal_error( - "unexpected failure during `delete_crucible_regions`", - ), - }) + + None => { + // deleted? + info!( + log, + "region {} snapshot {} running_snapshot is None", + region_id.to_string(), + snapshot_id.to_string(), + ); + + // break here - it's possible that the running snapshot + // record was GCed, and it won't come back. + Ok(()) + } + } + }, + |e: WaitError, delay| { + info!( + log, + "{:?}, trying again in {:?}", + e, + delay, + ); + } + ) + .await + .map_err(|e| match e { + WaitError::Transient(e) => { + // The backoff crate can be configured with a maximum elapsed time + // before giving up, which means that Transient could be returned + // here. Our current policies do **not** set this though. + Error::internal_error(&e.to_string()) + } + + WaitError::Permanent(e) => { + e + } + }) +} + +pub(super) async fn delete_crucible_snapshot( + log: &Logger, + client: &CrucibleAgentClient, + region_id: Uuid, + snapshot_id: Uuid, +) -> Result<(), Error> { + // delete snapshot - this endpoint is synchronous, it is not only a request + retry_until_known_result(log, || async { + client + .region_delete_snapshot( + &RegionId(region_id.to_string()), + &snapshot_id.to_string(), + ) + .await + }) + .await + .map_err(|e| { + error!( + log, + "delete_crucible_snapshot: region_delete_snapshot saw {:?}", e + ); + match e { + crucible_agent_client::Error::ErrorResponse(rv) => { + match rv.status() { + status if status.is_client_error() => { + Error::invalid_request(&rv.message) + } + _ => Error::internal_error(&rv.message), + } + } + _ => Error::internal_error( + "unexpected failure during `region_delete_snapshot`", + ), + } + })?; + + Ok(()) +} + +// Given a list of datasets and region snapshots, send DELETE calls to the +// datasets corresponding Crucible Agent for each snapshot. +pub(super) async fn delete_crucible_snapshots( + log: &Logger, + datasets_and_snapshots: Vec<( + db::model::Dataset, + db::model::RegionSnapshot, + )>, +) -> Result<(), Error> { + let request_count = datasets_and_snapshots.len(); + if request_count == 0 { + return Ok(()); + } + + futures::stream::iter(datasets_and_snapshots) + .map(|(dataset, region_snapshot)| async move { + let url = format!("http://{}", dataset.address()); + let client = CrucibleAgentClient::new(&url); + + delete_crucible_snapshot( + &log, + &client, + region_snapshot.region_id, + region_snapshot.snapshot_id, + ) + .await }) - // Execute the allocation requests concurrently. + // Execute the requests concurrently. .buffer_unordered(std::cmp::min( request_count, MAX_CONCURRENT_REGION_REQUESTS, )) - .collect::>>() + .collect::>>() .await .into_iter() .collect::, _>>()?; @@ -188,8 +531,9 @@ pub(super) async fn delete_crucible_regions( // Given a list of datasets and region snapshots, send DELETE calls to the // datasets corresponding Crucible Agent for each running read-only downstairs -// and snapshot. -pub(super) async fn delete_crucible_snapshots( +// corresponding to the snapshot. +pub(super) async fn delete_crucible_running_snapshots( + log: &Logger, datasets_and_snapshots: Vec<( db::model::Dataset, db::model::RegionSnapshot, @@ -205,57 +549,15 @@ pub(super) async fn delete_crucible_snapshots( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - // delete running snapshot - client - .region_delete_running_snapshot( - &RegionId(region_snapshot.region_id.to_string()), - ®ion_snapshot.snapshot_id.to_string(), - ) - .await - .map_err(|e| match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - http::StatusCode::SERVICE_UNAVAILABLE => { - Error::unavail(&rv.message) - } - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete_running_snapshot`", - ), - })?; - - // delete snapshot - client - .region_delete_snapshot( - &RegionId(region_snapshot.region_id.to_string()), - ®ion_snapshot.snapshot_id.to_string(), - ) - .await - .map_err(|e| match e { - crucible_agent_client::Error::ErrorResponse(rv) => { - match rv.status() { - http::StatusCode::SERVICE_UNAVAILABLE => { - Error::unavail(&rv.message) - } - status if status.is_client_error() => { - Error::invalid_request(&rv.message) - } - _ => Error::internal_error(&rv.message), - } - } - _ => Error::internal_error( - "unexpected failure during `region_delete_snapshot`", - ), - })?; - - Ok(()) + delete_crucible_running_snapshot( + &log, + &client, + region_snapshot.region_id, + region_snapshot.snapshot_id, + ) + .await }) - // Execute the allocation requests concurrently. + // Execute the requests concurrently. .buffer_unordered(std::cmp::min( request_count, MAX_CONCURRENT_REGION_REQUESTS, diff --git a/nexus/src/app/sagas/disk_create.rs b/nexus/src/app/sagas/disk_create.rs index d41345c9444..0b26bd0ea8d 100644 --- a/nexus/src/app/sagas/disk_create.rs +++ b/nexus/src/app/sagas/disk_create.rs @@ -17,6 +17,7 @@ use crate::db::lookup::LookupPath; use crate::external_api::params; use crate::{authn, authz, db}; use nexus_db_queries::db::datastore::RegionAllocationStrategy; +use omicron_common::api::external::DiskState; use omicron_common::api::external::Error; use rand::{rngs::StdRng, RngCore, SeedableRng}; use serde::Deserialize; @@ -218,7 +219,13 @@ async fn sdc_create_disk_record_undo( let osagactx = sagactx.user_data(); let disk_id = sagactx.lookup::("disk_id")?; - osagactx.datastore().project_delete_disk_no_auth(&disk_id).await?; + osagactx + .datastore() + .project_delete_disk_no_auth( + &disk_id, + &[DiskState::Detached, DiskState::Faulted, DiskState::Creating], + ) + .await?; Ok(()) } @@ -262,6 +269,7 @@ async fn sdc_alloc_regions_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { let osagactx = sagactx.user_data(); + let log = osagactx.log(); let region_ids = sagactx .lookup::>( @@ -271,7 +279,7 @@ async fn sdc_alloc_regions_undo( .map(|(_, region)| region.id()) .collect::>(); - osagactx.datastore().regions_hard_delete(region_ids).await?; + osagactx.datastore().regions_hard_delete(log, region_ids).await?; Ok(()) } @@ -518,6 +526,7 @@ async fn sdc_regions_ensure_undo( let log = sagactx.user_data().log(); warn!(log, "sdc_regions_ensure_undo: Deleting crucible regions"); delete_crucible_regions( + log, sagactx.lookup::>( "datasets_and_regions", )?, @@ -529,7 +538,7 @@ async fn sdc_regions_ensure_undo( async fn sdc_create_volume_record( sagactx: NexusActionContext, -) -> Result { +) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let volume_id = sagactx.lookup::("volume_id")?; @@ -537,13 +546,13 @@ async fn sdc_create_volume_record( let volume = db::model::Volume::new(volume_id, volume_data); - let volume_created = osagactx + osagactx .datastore() .volume_create(volume) .await .map_err(ActionError::action_failed)?; - Ok(volume_created) + Ok(()) } async fn sdc_create_volume_record_undo( @@ -556,8 +565,18 @@ async fn sdc_create_volume_record_undo( &sagactx, ¶ms.serialized_authn, ); + let volume_id = sagactx.lookup::("volume_id")?; - osagactx.nexus().volume_delete(&opctx, volume_id).await?; + + // Depending on the read only parent, there will some read only resources + // used, however this saga tracks them all. + osagactx + .datastore() + .decrease_crucible_resource_count_and_soft_delete_volume(volume_id) + .await?; + + osagactx.datastore().volume_hard_delete(volume_id).await?; + Ok(()) } diff --git a/nexus/src/app/sagas/disk_delete.rs b/nexus/src/app/sagas/disk_delete.rs index 2198954eb6c..6d5d4a1e02e 100644 --- a/nexus/src/app/sagas/disk_delete.rs +++ b/nexus/src/app/sagas/disk_delete.rs @@ -10,6 +10,7 @@ use crate::app::sagas::volume_delete; use crate::app::sagas::SagaInitError; use crate::authn; use crate::db; +use omicron_common::api::external::DiskState; use serde::Deserialize; use serde::Serialize; use steno::ActionError; @@ -106,9 +107,13 @@ async fn sdd_delete_disk_record( let disk = osagactx .datastore() - .project_delete_disk_no_auth(¶ms.disk_id) + .project_delete_disk_no_auth( + ¶ms.disk_id, + &[DiskState::Detached, DiskState::Faulted], + ) .await .map_err(ActionError::action_failed)?; + Ok(disk) } diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index 8e4851f350d..df1ceea2fd6 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -91,7 +91,8 @@ use super::{ common_storage::{ call_pantry_attach_for_disk, call_pantry_detach_for_disk, - delete_crucible_regions, ensure_all_datasets_and_regions, + delete_crucible_regions, delete_crucible_running_snapshot, + delete_crucible_snapshot, ensure_all_datasets_and_regions, get_pantry_address, }, ActionRegistry, NexusActionContext, NexusSaga, SagaInitError, @@ -351,6 +352,7 @@ async fn ssc_alloc_regions_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { let osagactx = sagactx.user_data(); + let log = osagactx.log(); let region_ids = sagactx .lookup::>( @@ -360,7 +362,7 @@ async fn ssc_alloc_regions_undo( .map(|(_, region)| region.id()) .collect::>(); - osagactx.datastore().regions_hard_delete(region_ids).await?; + osagactx.datastore().regions_hard_delete(log, region_ids).await?; Ok(()) } @@ -453,6 +455,7 @@ async fn ssc_regions_ensure_undo( let log = sagactx.user_data().log(); warn!(log, "ssc_regions_ensure_undo: Deleting crucible regions"); delete_crucible_regions( + log, sagactx.lookup::>( "datasets_and_regions", )?, @@ -464,7 +467,7 @@ async fn ssc_regions_ensure_undo( async fn ssc_create_destination_volume_record( sagactx: NexusActionContext, -) -> Result { +) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let destination_volume_id = @@ -475,28 +478,31 @@ async fn ssc_create_destination_volume_record( let volume = db::model::Volume::new(destination_volume_id, destination_volume_data); - let volume_created = osagactx + osagactx .datastore() .volume_create(volume) .await .map_err(ActionError::action_failed)?; - Ok(volume_created) + Ok(()) } async fn ssc_create_destination_volume_record_undo( sagactx: NexusActionContext, ) -> Result<(), anyhow::Error> { let osagactx = sagactx.user_data(); - let params = sagactx.saga_params::()?; - let opctx = crate::context::op_context_for_saga_action( - &sagactx, - ¶ms.serialized_authn, - ); let destination_volume_id = sagactx.lookup::("destination_volume_id")?; - osagactx.nexus().volume_delete(&opctx, destination_volume_id).await?; + + osagactx + .datastore() + .decrease_crucible_resource_count_and_soft_delete_volume( + destination_volume_id, + ) + .await?; + + osagactx.datastore().volume_hard_delete(destination_volume_id).await?; Ok(()) } @@ -739,16 +745,10 @@ async fn ssc_send_snapshot_request_to_sled_agent_undo( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - retry_until_known_result(log, || async { - client - .region_delete_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - .await - }) - .await?; + delete_crucible_snapshot(log, &client, region.id(), snapshot_id) + .await?; } + Ok(()) } @@ -1061,15 +1061,8 @@ async fn ssc_call_pantry_snapshot_for_disk_undo( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - retry_until_known_result(log, || async { - client - .region_delete_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - .await - }) - .await?; + delete_crucible_snapshot(log, &client, region.id(), snapshot_id) + .await?; } Ok(()) } @@ -1300,6 +1293,7 @@ async fn ssc_start_running_snapshot_undo( .disk_id(params.disk_id) .fetch() .await?; + let datasets_and_regions = osagactx.datastore().get_allocated_regions(disk.volume_id).await?; @@ -1308,29 +1302,14 @@ async fn ssc_start_running_snapshot_undo( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - use crucible_agent_client::Error::ErrorResponse; - use http::status::StatusCode; + delete_crucible_running_snapshot( + &log, + &client, + region.id(), + snapshot_id, + ) + .await?; - retry_until_known_result(log, || async { - client - .region_delete_running_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - .await - }) - .await - .map(|_| ()) - // NOTE: If we later create a volume record and delete it, the - // running snapshot may be deleted (see: - // ssc_create_volume_record_undo). - // - // To cope, we treat "running snapshot not found" as "Ok", since it - // may just be the result of the volume deletion steps completing. - .or_else(|err| match err { - ErrorResponse(r) if r.status() == StatusCode::NOT_FOUND => Ok(()), - _ => Err(err), - })?; osagactx .datastore() .region_snapshot_remove(dataset.id(), region.id(), snapshot_id) @@ -1341,7 +1320,7 @@ async fn ssc_start_running_snapshot_undo( async fn ssc_create_volume_record( sagactx: NexusActionContext, -) -> Result { +) -> Result<(), ActionError> { let log = sagactx.user_data().log(); let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; @@ -1403,7 +1382,7 @@ async fn ssc_create_volume_record( let volume = db::model::Volume::new(volume_id, volume_data); // Insert volume record into the DB - let volume_created = osagactx + osagactx .datastore() .volume_create(volume) .await @@ -1411,7 +1390,7 @@ async fn ssc_create_volume_record( info!(log, "volume {} created ok", volume_id); - Ok(volume_created) + Ok(()) } async fn ssc_create_volume_record_undo( @@ -1420,14 +1399,18 @@ async fn ssc_create_volume_record_undo( let log = sagactx.user_data().log(); let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; - let opctx = crate::context::op_context_for_saga_action( - &sagactx, - ¶ms.serialized_authn, - ); let volume_id = sagactx.lookup::("volume_id")?; - info!(log, "deleting volume {}", volume_id); - osagactx.nexus().volume_delete(&opctx, volume_id).await?; + info!( + log, + "calling decrease crucible resource count for volume {}", volume_id + ); + osagactx + .datastore() + .decrease_crucible_resource_count_and_soft_delete_volume(volume_id) + .await?; + + osagactx.datastore().volume_hard_delete(volume_id).await?; Ok(()) } diff --git a/nexus/src/app/sagas/volume_delete.rs b/nexus/src/app/sagas/volume_delete.rs index df449f67f95..c2920363e8a 100644 --- a/nexus/src/app/sagas/volume_delete.rs +++ b/nexus/src/app/sagas/volume_delete.rs @@ -24,6 +24,7 @@ //! change. Saga nodes must be idempotent in order to work correctly. use super::common_storage::delete_crucible_regions; +use super::common_storage::delete_crucible_running_snapshots; use super::common_storage::delete_crucible_snapshots; use super::ActionRegistry; use super::NexusActionContext; @@ -61,10 +62,16 @@ declare_saga_actions! { DELETE_CRUCIBLE_REGIONS -> "no_result_1" { + svd_delete_crucible_regions } - DELETE_CRUCIBLE_SNAPSHOTS -> "no_result_2" { + DELETE_CRUCIBLE_RUNNING_SNAPSHOTS -> "no_result_2" { + + svd_delete_crucible_running_snapshots + } + DELETE_CRUCIBLE_SNAPSHOTS -> "no_result_3" { + svd_delete_crucible_snapshots } - DELETE_FREED_CRUCIBLE_REGIONS -> "no_result_3" { + DELETE_CRUCIBLE_SNAPSHOT_RECORDS -> "no_result_4" { + + svd_delete_crucible_snapshot_records + } + DELETE_FREED_CRUCIBLE_REGIONS -> "no_result_5" { + svd_delete_freed_crucible_regions } HARD_DELETE_VOLUME_RECORD -> "final_no_result" { @@ -81,9 +88,13 @@ pub fn create_dag( builder.append_parallel(vec![ // clean up top level regions for volume delete_crucible_regions_action(), - // clean up snapshots no longer referenced by any volume - delete_crucible_snapshots_action(), + // clean up running snapshots no longer referenced by any volume + delete_crucible_running_snapshots_action(), ]); + // clean up snapshots no longer referenced by any volume + builder.append(delete_crucible_snapshots_action()); + // remove snapshot db records + builder.append(delete_crucible_snapshot_records_action()); // clean up regions that were freed by deleting snapshots builder.append(delete_freed_crucible_regions_action()); builder.append(hard_delete_volume_record_action()); @@ -125,7 +136,10 @@ async fn svd_decrease_crucible_resource_count( params.volume_id, ) .await - .map_err(ActionError::action_failed)?; + .map_err(|e| ActionError::action_failed(format!( + "failed to decrease_crucible_resource_count_and_soft_delete_volume: {:?}", + e, + )))?; Ok(crucible_resources) } @@ -134,6 +148,7 @@ async fn svd_decrease_crucible_resource_count( async fn svd_delete_crucible_regions( sagactx: NexusActionContext, ) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); let osagactx = sagactx.user_data(); let crucible_resources_to_delete = @@ -143,10 +158,16 @@ async fn svd_delete_crucible_regions( match crucible_resources_to_delete { CrucibleResources::V1(crucible_resources_to_delete) => { delete_crucible_regions( + log, crucible_resources_to_delete.datasets_and_regions.clone(), ) .await - .map_err(ActionError::action_failed)?; + .map_err(|e| { + ActionError::action_failed(format!( + "failed to delete_crucible_regions: {:?}", + e, + )) + })?; // Remove DB records let region_ids_to_delete = crucible_resources_to_delete @@ -157,9 +178,48 @@ async fn svd_delete_crucible_regions( osagactx .datastore() - .regions_hard_delete(region_ids_to_delete) + .regions_hard_delete(log, region_ids_to_delete) .await - .map_err(ActionError::action_failed)?; + .map_err(|e| { + ActionError::action_failed(format!( + "failed to regions_hard_delete: {:?}", + e, + )) + })?; + } + } + + Ok(()) +} + +/// Clean up running read-only downstairs corresponding to snapshots freed up +/// for deletion by deleting this volume. +/// +/// This Volume may have referenced read-only downstairs (and their snapshots), +/// and deleting it will remove the references - this may free up those +/// resources for deletion, which this Saga node does. +async fn svd_delete_crucible_running_snapshots( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); + + let crucible_resources_to_delete = + sagactx.lookup::("crucible_resources_to_delete")?; + + // Send DELETE calls to the corresponding Crucible agents + match crucible_resources_to_delete { + CrucibleResources::V1(crucible_resources_to_delete) => { + delete_crucible_running_snapshots( + log, + crucible_resources_to_delete.datasets_and_snapshots.clone(), + ) + .await + .map_err(|e| { + ActionError::action_failed(format!( + "failed to delete_crucible_running_snapshots: {:?}", + e, + )) + })?; } } @@ -174,7 +234,7 @@ async fn svd_delete_crucible_regions( async fn svd_delete_crucible_snapshots( sagactx: NexusActionContext, ) -> Result<(), ActionError> { - let osagactx = sagactx.user_data(); + let log = sagactx.user_data().log(); let crucible_resources_to_delete = sagactx.lookup::("crucible_resources_to_delete")?; @@ -183,11 +243,33 @@ async fn svd_delete_crucible_snapshots( match crucible_resources_to_delete { CrucibleResources::V1(crucible_resources_to_delete) => { delete_crucible_snapshots( + log, crucible_resources_to_delete.datasets_and_snapshots.clone(), ) .await - .map_err(ActionError::action_failed)?; + .map_err(|e| { + ActionError::action_failed(format!( + "failed to delete_crucible_snapshots: {:?}", + e, + )) + })?; + } + } + + Ok(()) +} + +/// Remove records for deleted snapshots +async fn svd_delete_crucible_snapshot_records( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let crucible_resources_to_delete = + sagactx.lookup::("crucible_resources_to_delete")?; + + match crucible_resources_to_delete { + CrucibleResources::V1(crucible_resources_to_delete) => { // Remove DB records for (_, region_snapshot) in &crucible_resources_to_delete.datasets_and_snapshots @@ -200,7 +282,15 @@ async fn svd_delete_crucible_snapshots( region_snapshot.snapshot_id, ) .await - .map_err(ActionError::action_failed)?; + .map_err(|e| { + ActionError::action_failed(format!( + "failed to region_snapshot_remove {} {} {}: {:?}", + region_snapshot.dataset_id, + region_snapshot.region_id, + region_snapshot.snapshot_id, + e, + )) + })?; } } } @@ -220,45 +310,66 @@ async fn svd_delete_crucible_snapshots( async fn svd_delete_freed_crucible_regions( sagactx: NexusActionContext, ) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); let osagactx = sagactx.user_data(); // Find regions freed up for deletion by a previous saga node deleting the // region snapshots. - let freed_datasets_regions_and_volumes = osagactx - .datastore() - .find_deleted_volume_regions() - .await - .map_err(ActionError::action_failed)?; + let freed_datasets_regions_and_volumes = + osagactx.datastore().find_deleted_volume_regions().await.map_err( + |e| { + ActionError::action_failed(format!( + "failed to find_deleted_volume_regions: {:?}", + e, + )) + }, + )?; // Send DELETE calls to the corresponding Crucible agents delete_crucible_regions( + log, freed_datasets_regions_and_volumes .iter() .map(|(d, r, _)| (d.clone(), r.clone())) .collect(), ) .await - .map_err(ActionError::action_failed)?; + .map_err(|e| { + ActionError::action_failed(format!( + "failed to delete_crucible_regions: {:?}", + e, + )) + })?; // Remove region DB records osagactx .datastore() .regions_hard_delete( + log, freed_datasets_regions_and_volumes .iter() .map(|(_, r, _)| r.id()) .collect(), ) .await - .map_err(ActionError::action_failed)?; + .map_err(|e| { + ActionError::action_failed(format!( + "failed to regions_hard_delete: {:?}", + e, + )) + })?; // Remove volume DB records for (_, _, volume) in &freed_datasets_regions_and_volumes { - osagactx - .datastore() - .volume_hard_delete(volume.id()) - .await - .map_err(ActionError::action_failed)?; + osagactx.datastore().volume_hard_delete(volume.id()).await.map_err( + |e| { + ActionError::action_failed(format!( + "failed to volume_hard_delete {}: {:?}", + volume.id(), + e, + )) + }, + )?; } Ok(()) @@ -280,17 +391,25 @@ async fn svd_hard_delete_volume_record( .datastore() .get_allocated_regions(params.volume_id) .await - .map_err(ActionError::action_failed)?; + .map_err(|e| { + ActionError::action_failed(format!( + "failed to get_allocated_regions for {}: {:?}", + params.volume_id, e, + )) + })?; if !allocated_regions.is_empty() { return Ok(()); } - osagactx - .datastore() - .volume_hard_delete(params.volume_id) - .await - .map_err(ActionError::action_failed)?; + osagactx.datastore().volume_hard_delete(params.volume_id).await.map_err( + |e| { + ActionError::action_failed(format!( + "failed to volume_hard_delete {}: {:?}", + params.volume_id, e, + )) + }, + )?; Ok(()) } diff --git a/nexus/src/app/sagas/volume_remove_rop.rs b/nexus/src/app/sagas/volume_remove_rop.rs index c97660dc8ed..27fb3871f59 100644 --- a/nexus/src/app/sagas/volume_remove_rop.rs +++ b/nexus/src/app/sagas/volume_remove_rop.rs @@ -126,7 +126,7 @@ impl NexusSaga for SagaVolumeRemoveROP { async fn svr_create_temp_volume( sagactx: NexusActionContext, -) -> Result { +) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let temp_volume_id = sagactx.lookup::("temp_volume_id")?; @@ -148,13 +148,13 @@ async fn svr_create_temp_volume( })?; let volume = db::model::Volume::new(temp_volume_id, temp_volume_data); - let volume_created = osagactx + osagactx .datastore() .volume_create(volume) .await .map_err(ActionError::action_failed)?; - Ok(volume_created) + Ok(()) } async fn svr_create_temp_volume_undo( diff --git a/nexus/src/app/volume.rs b/nexus/src/app/volume.rs index 9d57a6e2b98..6ce391ddc03 100644 --- a/nexus/src/app/volume.rs +++ b/nexus/src/app/volume.rs @@ -8,45 +8,10 @@ use crate::app::sagas; use crate::authn; use nexus_db_queries::context::OpContext; use omicron_common::api::external::DeleteResult; -use omicron_common::api::external::Error; use std::sync::Arc; use uuid::Uuid; impl super::Nexus { - /// Kick off a saga to delete a volume (and clean up any Crucible resources - /// as a result). Note that this does not unconditionally delete the volume - /// record: if the allocated Crucible regions associated with this volume - /// still have references, we cannot delete it, so it will be soft-deleted. - /// Only when all the associated resources have been cleaned up does Nexus - /// hard delete the volume record. - /// - /// Note it is **not** valid to call this function from an "action" node in - /// a saga because it would not be idempotent in the case of a rerun. This - /// function is ok to call from an "undo" node: as of this writing, this - /// occurs when unwinding the creation of a resource. If that unwind fails, - /// then the saga is parked in "Stuck" anyway. - pub async fn volume_delete( - self: &Arc, - opctx: &OpContext, - volume_id: Uuid, - ) -> DeleteResult { - let saga_params = sagas::volume_delete::Params { - serialized_authn: authn::saga::Serialized::for_opctx(opctx), - volume_id, - }; - - let saga_outputs = self - .execute_saga::(saga_params) - .await?; - - let volume_deleted = - saga_outputs.lookup_node_output::<()>("final_no_result").map_err( - |e| Error::InternalError { internal_message: e.to_string() }, - )?; - - Ok(volume_deleted) - } - /// Start a saga to remove a read only parent from a volume. pub async fn volume_remove_read_only_parent( self: &Arc, diff --git a/sled-agent/src/sim/http_entrypoints_storage.rs b/sled-agent/src/sim/http_entrypoints_storage.rs index 831e96f80ec..8cfc6bb70a3 100644 --- a/sled-agent/src/sim/http_entrypoints_storage.rs +++ b/sled-agent/src/sim/http_entrypoints_storage.rs @@ -109,16 +109,12 @@ async fn region_delete( let id = path.into_inner().id; let crucible = rc.context(); - match crucible + crucible .delete(id) .await - .map_err(|e| HttpError::for_bad_request(None, e.to_string()))? - { - Some(_) => Ok(HttpResponseDeleted()), - None => { - Err(HttpError::for_not_found(None, "Region not found".to_string())) - } - } + .map_err(|e| HttpError::for_bad_request(None, e.to_string()))?; + + Ok(HttpResponseDeleted()) } #[endpoint { @@ -279,9 +275,7 @@ async fn region_delete_running_snapshot( )); } - let snapshots = crucible.snapshots_for_region(&p.id).await; - - if !snapshots.iter().any(|x| x.name == p.name) { + if crucible.get_snapshot_for_region(&p.id, &p.name).await.is_none() { return Err(HttpError::for_not_found( None, format!("snapshot {:?} not found", p.name), diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index 5f9ab3ca97e..a75885b2b22 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -558,10 +558,7 @@ impl SledAgent { storage.get_dataset_for_region(*region_id).await; if let Some(crucible_data) = crucible_data { - crucible_data - .create_snapshot(*region_id, snapshot_id) - .await - .map_err(|e| Error::internal_error(&e.to_string()))?; + crucible_data.create_snapshot(*region_id, snapshot_id).await; } else { return Err(Error::not_found_by_id( ResourceType::Disk, diff --git a/sled-agent/src/sim/storage.rs b/sled-agent/src/sim/storage.rs index 1a02c063402..789a598c541 100644 --- a/sled-agent/src/sim/storage.rs +++ b/sled-agent/src/sim/storage.rs @@ -34,8 +34,9 @@ use uuid::Uuid; type CreateCallback = Box State + Send + 'static>; struct CrucibleDataInner { + log: Logger, regions: HashMap, - snapshots: HashMap>, + snapshots: HashMap>, running_snapshots: HashMap>, on_create: Option, creating_a_running_snapshot_should_fail: bool, @@ -43,8 +44,9 @@ struct CrucibleDataInner { } impl CrucibleDataInner { - fn new(crucible_port: u16) -> Self { + fn new(log: Logger, crucible_port: u16) -> Self { Self { + log, regions: HashMap::new(), snapshots: HashMap::new(), running_snapshots: HashMap::new(), @@ -126,33 +128,38 @@ impl CrucibleDataInner { } } - fn create_snapshot( - &mut self, - id: Uuid, - snapshot_id: Uuid, - ) -> Result { - let vec = self.snapshots.entry(id).or_insert_with(|| Vec::new()); - - if vec.iter().any(|x| x.name == snapshot_id.to_string()) { - bail!("region {} snapshot {} exists already", id, snapshot_id); - } - - let snap = - Snapshot { name: snapshot_id.to_string(), created: Utc::now() }; - - vec.push(snap.clone()); - - Ok(snap) + fn create_snapshot(&mut self, id: Uuid, snapshot_id: Uuid) -> Snapshot { + info!(self.log, "Creating region {} snapshot {}", id, snapshot_id); + self.snapshots + .entry(id) + .or_insert_with(|| HashMap::new()) + .entry(snapshot_id.to_string()) + .or_insert_with(|| Snapshot { + name: snapshot_id.to_string(), + created: Utc::now(), + }) + .clone() } fn snapshots_for_region(&self, id: &RegionId) -> Vec { let id = Uuid::from_str(&id.0).unwrap(); match self.snapshots.get(&id) { - Some(vec) => vec.clone(), + Some(map) => map.values().cloned().collect(), None => vec![], } } + fn get_snapshot_for_region( + &self, + id: &RegionId, + snapshot_id: &str, + ) -> Option { + let id = Uuid::from_str(&id.0).unwrap(); + self.snapshots + .get(&id) + .and_then(|hm| hm.get(&snapshot_id.to_string()).cloned()) + } + fn running_snapshots_for_id( &self, id: &RegionId, @@ -166,15 +173,35 @@ impl CrucibleDataInner { fn delete_snapshot(&mut self, id: &RegionId, name: &str) -> Result<()> { let running_snapshots_for_id = self.running_snapshots_for_id(id); - if running_snapshots_for_id.contains_key(name) { - bail!("downstairs running for region {} snapshot {}", id.0, name,); - } + if let Some(running_snapshot) = running_snapshots_for_id.get(name) { + match &running_snapshot.state { + State::Created | State::Requested | State::Tombstoned => { + bail!( + "downstairs running for region {} snapshot {}", + id.0, + name + ); + } - let id = Uuid::from_str(&id.0).unwrap(); - if let Some(vec) = self.snapshots.get_mut(&id) { - vec.retain(|x| x.name != name); + State::Destroyed => { + // ok + } + + State::Failed => { + bail!( + "failed downstairs running for region {} snapshot {}", + id.0, + name + ); + } + } } + info!(self.log, "Deleting region {} snapshot {}", id.0, name); + let region_id = Uuid::from_str(&id.0).unwrap(); + if let Some(map) = self.snapshots.get_mut(®ion_id) { + map.remove(name); + } Ok(()) } @@ -206,7 +233,7 @@ impl CrucibleDataInner { id: RegionId(Uuid::new_v4().to_string()), name: name.to_string(), port_number: self.next_port, - state: State::Requested, + state: State::Created, }; map.insert(name.to_string(), running_snapshot.clone()); @@ -226,13 +253,10 @@ impl CrucibleDataInner { let map = self.running_snapshots.entry(id).or_insert_with(|| HashMap::new()); - // If the running snapshot was already deleted, then return Ok - if !map.contains_key(&name.to_string()) { - return Ok(()); + if let Some(running_snapshot) = map.get_mut(&name.to_string()) { + running_snapshot.state = State::Destroyed; } - map.remove(&name.to_string()); - Ok(()) } @@ -246,8 +270,20 @@ impl CrucibleDataInner { let snapshots = self.snapshots.values().flatten().count(); - let running_snapshots = - self.running_snapshots.values().flat_map(|hm| hm.values()).count(); + let running_snapshots = self + .running_snapshots + .values() + .flat_map(|hm| hm.values()) + .filter(|rs| rs.state != State::Destroyed) + .count(); + + info!( + self.log, + "is_empty non_destroyed_regions {} snapshots {} running_snapshots {}", + non_destroyed_regions, + snapshots, + running_snapshots, + ); non_destroyed_regions == 0 && snapshots == 0 && running_snapshots == 0 } @@ -259,8 +295,8 @@ pub struct CrucibleData { } impl CrucibleData { - fn new(crucible_port: u16) -> Self { - Self { inner: Mutex::new(CrucibleDataInner::new(crucible_port)) } + fn new(log: Logger, crucible_port: u16) -> Self { + Self { inner: Mutex::new(CrucibleDataInner::new(log, crucible_port)) } } pub async fn set_create_callback(&self, callback: CreateCallback) { @@ -296,7 +332,7 @@ impl CrucibleData { &self, id: Uuid, snapshot_id: Uuid, - ) -> Result { + ) -> Snapshot { self.inner.lock().await.create_snapshot(id, snapshot_id) } @@ -304,6 +340,14 @@ impl CrucibleData { self.inner.lock().await.snapshots_for_region(id) } + pub async fn get_snapshot_for_region( + &self, + id: &RegionId, + snapshot_id: &str, + ) -> Option { + self.inner.lock().await.get_snapshot_for_region(id, snapshot_id) + } + pub async fn running_snapshots_for_id( &self, id: &RegionId, @@ -357,7 +401,10 @@ impl CrucibleServer { // SocketAddr::new with port set to 0 will grab any open port to host // the emulated crucible agent, but set the fake downstairs listen ports // to start at `crucible_port`. - let data = Arc::new(CrucibleData::new(crucible_port)); + let data = Arc::new(CrucibleData::new( + log.new(slog::o!("port" => format!("{crucible_port}"))), + crucible_port, + )); let config = dropshot::ConfigDropshot { bind_address: SocketAddr::new(crucible_ip, 0), ..Default::default()