From 3c90832b9bc49ea2af11f81b550287aa9c487f63 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 1 Sep 2024 09:53:07 -0700 Subject: [PATCH] [nexus] add instance_reincarnation RPW This commit introduces a new background task, called `instance_reincarnation`, which automatically restarts `Failed` instances if the instance's `auto_restart_policy` indicates that it should be restarted. --- nexus-config/src/nexus_config.rs | 15 ++ nexus/db-queries/src/db/datastore/instance.rs | 56 +++- nexus/examples/config.toml | 3 +- nexus/src/app/background/init.rs | 24 ++ .../tasks/instance_reincarnation.rs | 251 ++++++++++++++++++ nexus/src/app/background/tasks/mod.rs | 1 + nexus/src/app/sagas/instance_start.rs | 7 +- nexus/tests/config.test.toml | 2 + smf/nexus/multi-sled/config-partial.toml | 2 + smf/nexus/single-sled/config-partial.toml | 2 + 10 files changed, 356 insertions(+), 7 deletions(-) create mode 100644 nexus/src/app/background/tasks/instance_reincarnation.rs diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index 7f2726cc595..35f91cd34f3 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -381,6 +381,8 @@ pub struct BackgroundTaskConfig { pub instance_watcher: InstanceWatcherConfig, /// configuration for instance updater task pub instance_updater: InstanceUpdaterConfig, + /// configuration for instance reincarnation task + pub instance_reincarnation: InstancereincarnationConfig, /// configuration for service VPC firewall propagation task pub service_firewall_propagation: ServiceFirewallPropagationConfig, /// configuration for v2p mapping propagation task @@ -589,6 +591,14 @@ pub struct InstanceUpdaterConfig { pub disable: bool, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct InstancereincarnationConfig { + /// period (in seconds) for periodic activations of this background task + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, +} + #[serde_as] #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct ServiceFirewallPropagationConfig { @@ -911,6 +921,7 @@ mod test { instance_watcher.period_secs = 30 instance_updater.period_secs = 30 instance_updater.disable = false + instance_reincarnation.period_secs = 60 service_firewall_propagation.period_secs = 300 v2p_mapping_propagation.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 @@ -1066,6 +1077,9 @@ mod test { period_secs: Duration::from_secs(30), disable: false, }, + instance_reincarnation: InstancereincarnationConfig { + period_secs: Duration::from_secs(60), + }, service_firewall_propagation: ServiceFirewallPropagationConfig { period_secs: Duration::from_secs(300), @@ -1169,6 +1183,7 @@ mod test { region_replacement_driver.period_secs = 30 instance_watcher.period_secs = 30 instance_updater.period_secs = 30 + instance_reincarnation.period_secs = 60 service_firewall_propagation.period_secs = 300 v2p_mapping_propagation.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 diff --git a/nexus/db-queries/src/db/datastore/instance.rs b/nexus/db-queries/src/db/datastore/instance.rs index 34579ad21f3..2a9ef0a8e4c 100644 --- a/nexus/db-queries/src/db/datastore/instance.rs +++ b/nexus/db-queries/src/db/datastore/instance.rs @@ -20,7 +20,9 @@ use crate::db::identity::Resource; use crate::db::lookup::LookupPath; use crate::db::model::Generation; use crate::db::model::Instance; +use crate::db::model::InstanceAutoRestart; use crate::db::model::InstanceRuntimeState; +use crate::db::model::InstanceState; use crate::db::model::Migration; use crate::db::model::MigrationState; use crate::db::model::Name; @@ -435,6 +437,53 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } + /// List all instances in the [`Failed`](InstanceState::Failed) with an + /// auto-restart policy that permits them to be automatically restarted by + /// the control plane. + /// + /// This is used by the `instance_reincarnation` RPW to ensure that that any + /// such instances are restarted. + /// + /// This query is paginated by the instance's UUID, using the provided + /// [`DataPageParams`]. + pub async fn find_reincarnatable_instances( + &self, + opctx: &OpContext, + pagparams: &DataPageParams<'_, Uuid>, + ) -> ListResultVec { + use db::schema::instance::dsl; + + paginated(dsl::instance, dsl::id, pagparams) + // Only attempt to reincarnate Failed instances. + .filter(dsl::state.eq(InstanceState::Failed)) + // The instance's auto-restart policy must allow the control plane + // to restart it automatically. + // + // N.B. that this may become more complex in the future if we grow + // additional auto-restart policies that require additional logic + // (such as restart limits...) + .filter( + dsl::auto_restart_policy.eq(InstanceAutoRestart::AllFailures), + ) + // Deleted instances may not be reincarnated. + .filter(dsl::time_deleted.is_null()) + // If the instance is currently in the process of being updated, + // let's not mess with it for now and try to restart it on another + // pass. + .filter(dsl::updater_id.is_null()) + // TODO(eliza): perhaps we ought to check for the presence of an + // active VMM here? If there is one, that would indicate that the + // instance hasn't been moved to `Failed` correctly. But, we would + // also need to handle the case where the active VMM is + // SagaUnwound... + .select(Instance::as_select()) + .load_async::( + &*self.pool_connection_authorized(opctx).await?, + ) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + /// Fetches information about an Instance that the caller has previously /// fetched /// @@ -871,12 +920,11 @@ impl DataStore { // instance must be "stopped" or "failed" in order to delete it. The // delete operation sets "time_deleted" (just like with other objects) // and also sets the state to "destroyed". - use db::model::InstanceState as DbInstanceState; use db::schema::{disk, instance}; - let stopped = DbInstanceState::NoVmm; - let failed = DbInstanceState::Failed; - let destroyed = DbInstanceState::Destroyed; + let stopped = InstanceState::NoVmm; + let failed = InstanceState::Failed; + let destroyed = InstanceState::Destroyed; let ok_to_delete_instance_states = vec![stopped, failed]; let detached_label = api::external::DiskState::Detached.label(); diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index d25408e6e34..94569c68e5f 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -120,7 +120,8 @@ region_replacement_driver.period_secs = 10 instance_watcher.period_secs = 30 # How frequently to schedule new instance update sagas. instance_updater.period_secs = 30 -service_firewall_propagation.period_secs = 300 +# How frequently to attempt to restart Failed instances? +instance_reincarnation.period_secs = 60 v2p_mapping_propagation.period_secs = 30 abandoned_vmm_reaper.period_secs = 60 saga_recovery.period_secs = 600 diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index fedb74b81be..4908f669034 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -98,6 +98,7 @@ use super::tasks::dns_config; use super::tasks::dns_propagation; use super::tasks::dns_servers; use super::tasks::external_endpoints; +use super::tasks::instance_reincarnation; use super::tasks::instance_updater; use super::tasks::instance_watcher; use super::tasks::inventory_collection; @@ -160,6 +161,7 @@ pub struct BackgroundTasks { pub task_region_replacement_driver: Activator, pub task_instance_watcher: Activator, pub task_instance_updater: Activator, + pub task_instance_reincarnation: Activator, pub task_service_firewall_propagation: Activator, pub task_abandoned_vmm_reaper: Activator, pub task_vpc_route_manager: Activator, @@ -245,6 +247,7 @@ impl BackgroundTasksInitializer { task_region_replacement_driver: Activator::new(), task_instance_watcher: Activator::new(), task_instance_updater: Activator::new(), + task_instance_reincarnation: Activator::new(), task_service_firewall_propagation: Activator::new(), task_abandoned_vmm_reaper: Activator::new(), task_vpc_route_manager: Activator::new(), @@ -311,6 +314,7 @@ impl BackgroundTasksInitializer { task_region_replacement_driver, task_instance_watcher, task_instance_updater, + task_instance_reincarnation, task_service_firewall_propagation, task_abandoned_vmm_reaper, task_vpc_route_manager, @@ -669,6 +673,26 @@ impl BackgroundTasksInitializer { }); } + // Background task: schedule restart sagas for failed instances that can + // be automatically restarted. + { + let reincarnator = + instance_reincarnation::InstanceReincarnation::new( + datastore.clone(), + sagas.clone(), + ); + driver.register(TaskDefinition { + name: "instance_reincarnation", + description: "schedules start sagas for failed instances that \ + can be automatically restarted", + period: config.instance_reincarnation.period_secs, + task_impl: Box::new(reincarnator), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_instance_reincarnation, + }); + } + // Background task: service firewall rule propagation driver.register(TaskDefinition { name: "service_firewall_rule_propagation", diff --git a/nexus/src/app/background/tasks/instance_reincarnation.rs b/nexus/src/app/background/tasks/instance_reincarnation.rs new file mode 100644 index 00000000000..aa9237b8325 --- /dev/null +++ b/nexus/src/app/background/tasks/instance_reincarnation.rs @@ -0,0 +1,251 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Background task for automatically restarting failed instances. + +use crate::app::background::BackgroundTask; +use crate::app::saga::StartSaga; +use crate::app::sagas::instance_start; +use crate::app::sagas::NexusSaga; +use anyhow::Context; +use futures::future::BoxFuture; +use nexus_db_queries::authn; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::pagination::Paginator; +use nexus_db_queries::db::DataStore; +use nexus_types::identity::Resource; +use omicron_common::api::external::Error; +use std::num::NonZeroU32; +use std::sync::Arc; +use tokio::task::JoinSet; + +pub struct InstanceReincarnation { + datastore: Arc, + sagas: Arc, +} + +#[derive(Default)] +struct ActivationStats { + instances_found: usize, + instances_reincarnated: usize, + already_reincarnated: usize, + sagas_started: usize, + saga_start_errors: usize, + saga_errors: usize, +} + +const BATCH_SIZE: NonZeroU32 = unsafe { + // Safety: last time I checked, 100 was greater than zero. + NonZeroU32::new_unchecked(100) +}; + +impl BackgroundTask for InstanceReincarnation { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + Box::pin(async move { + let mut stats = ActivationStats::default(); + let error = match self.actually_activate(opctx, &mut stats).await { + Ok(_) => { + if stats.instances_reincarnated > 0 { + info!( + &opctx.log, + "instance reincarnation completed"; + "instances_found" => stats.instances_found, + "instances_reincarnated" => stats.instances_reincarnated, + "already_reincarnated" => stats.already_reincarnated, + "sagas_started" => stats.sagas_started, + ); + } else { + debug!( + &opctx.log, + "instance reincarnation completed; no instances \ + in need of reincarnation"; + "instances_found" => stats.instances_found, + "already_reincarnated" => stats.already_reincarnated, + ); + } + None + } + Err(error) => { + error!( + &opctx.log, + "instance reincarnation failed!"; + "last_error" => %error, + "instances_found" => stats.instances_found, + "instances_reincarnated" => stats.instances_reincarnated, + "already_reincarnated" => stats.already_reincarnated, + "sagas_started" => stats.sagas_started, + "saga_start_errors" => stats.saga_start_errors, + "saga_errors" => stats.saga_errors, + ); + Some(error.to_string()) + } + }; + serde_json::json!({ + "instances_found": stats.instances_found, + "instances_reincarnated": stats.instances_reincarnated, + "already_reincarnated": stats.already_reincarnated, + "sagas_started": stats.sagas_started, + "saga_start_errors": stats.saga_start_errors, + "saga_errors": stats.saga_errors, + "last_error": error, + }) + }) + } +} + +impl InstanceReincarnation { + pub(crate) fn new( + datastore: Arc, + sagas: Arc, + ) -> Self { + Self { datastore, sagas } + } + + async fn actually_activate( + &mut self, + opctx: &OpContext, + stats: &mut ActivationStats, + ) -> anyhow::Result<()> { + let mut tasks = JoinSet::new(); + + let mut last_err = Ok(()); + let mut paginator = Paginator::new(BATCH_SIZE); + + while let Some(p) = paginator.next() { + let maybe_batch = self + .datastore + .find_reincarnatable_instances(opctx, &p.current_pagparams()) + .await; + let batch = match maybe_batch { + Ok(batch) => batch, + Err(error) => { + const ERR_STR: &'static str = + "failed to list instances in need of reincarnation"; + error!( + opctx.log, + "failed to list instances in need of reincarnation"; + "error" => &error, + ); + last_err = Err(error).context(ERR_STR); + break; + } + }; + + paginator = p.found_batch(&batch, &|instance| instance.id()); + + let found = batch.len(); + if found == 0 { + debug!( + opctx.log, + "no more instances in need of reincarnation"; + "total_found" => stats.instances_found, + ); + break; + } + + let prev_sagas_started = stats.sagas_started; + stats.instances_found += found; + + let serialized_authn = authn::saga::Serialized::for_opctx(opctx); + for db_instance in batch { + let instance_id = db_instance.id(); + let prepared_saga = instance_start::SagaInstanceStart::prepare( + &instance_start::Params { + db_instance, + serialized_authn: serialized_authn.clone(), + }, + ); + match prepared_saga { + Ok(saga) => { + let start_saga = self.sagas.clone(); + tasks.spawn(async move { + start_saga + .saga_start(saga) + .await + .map_err(|e| (instance_id, e))?; + Ok(instance_id) + }); + stats.sagas_started += 1; + } + Err(error) => { + const ERR_STR: &'static str = + "failed to prepare instance-start saga for "; + error!( + opctx.log, + "{ERR_STR}{instance_id}"; + "instance_id" => %instance_id, + "error" => %error, + ); + last_err = Err(error) + .with_context(|| format!("{ERR_STR}{instance_id}")); + stats.saga_start_errors += 1; + } + }; + } + + debug!( + opctx.log, + "found instance in need of reincarnation"; + "instances_found" => found, + "total_found" => stats.instances_found, + "sagas_started" => stats.sagas_started - prev_sagas_started, + "total_sagas_started" => stats.sagas_started, + ); + } + + // All sagas started, wait for them to come back... + while let Some(saga_result) = tasks.join_next().await { + match saga_result { + // Start saga completed successfully + Ok(Ok(instance_id)) => { + debug!( + opctx.log, + "welcome back to the realm of the living, {instance_id}!"; + "instance_id" => %instance_id, + ); + stats.instances_reincarnated += 1; + } + // The instance was restarted by another saga, that's fine... + Ok(Err((instance_id, Error::Conflict { message }))) + if message.external_message() + == instance_start::ALREADY_STARTING_ERROR => + { + debug!( + opctx.log, + "instance {instance_id} was already reincarnated"; + "instance_id" => %instance_id, + ); + stats.already_reincarnated += 1; + } + // Start saga failed + Ok(Err((instance_id, error))) => { + const ERR_MSG: &'static str = "failed to restart instance"; + warn!(opctx.log, + "{ERR_MSG} {instance_id}"; + "instance_id" => %instance_id, + "error" => %error, + ); + stats.saga_errors += 1; + last_err = Err(error) + .with_context(|| format!("{ERR_MSG} {instance_id}")); + } + Err(e) => { + const JOIN_ERR_MSG: &'static str = + "tasks spawned on the JoinSet should never return a \ + JoinError, as nexus is compiled with panic=\"abort\", \ + and we never cancel them..."; + error!(opctx.log, "{JOIN_ERR_MSG}"; "error" => %e); + if cfg!(debug_assertions) { + unreachable!("{JOIN_ERR_MSG} but, I saw {e}!",) + } + } + } + } + + last_err + } +} diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index 6cbba0a07b8..e4bbbfe6d04 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -14,6 +14,7 @@ pub mod dns_config; pub mod dns_propagation; pub mod dns_servers; pub mod external_endpoints; +pub mod instance_reincarnation; pub mod instance_updater; pub mod instance_watcher; pub mod inventory_collection; diff --git a/nexus/src/app/sagas/instance_start.rs b/nexus/src/app/sagas/instance_start.rs index 3325cd72f44..e710891ea38 100644 --- a/nexus/src/app/sagas/instance_start.rs +++ b/nexus/src/app/sagas/instance_start.rs @@ -99,6 +99,9 @@ declare_saga_actions! { /// changing its generation. const REGISTERED_VMM_RECORD: &'static str = "ensure_registered"; +pub(crate) const ALREADY_STARTING_ERROR: &'static str = + "instance changed state before it could be started"; + #[derive(Debug)] pub(crate) struct SagaInstanceStart; impl NexusSaga for SagaInstanceStart { @@ -282,7 +285,7 @@ async fn sis_move_to_starting( // must have started the instance already, so unwind. Some(_) => { return Err(ActionError::action_failed(Error::conflict( - "instance changed state before it could be started", + ALREADY_STARTING_ERROR, ))); } @@ -312,7 +315,7 @@ async fn sis_move_to_starting( .map_err(ActionError::action_failed)? { return Err(ActionError::action_failed(Error::conflict( - "instance changed state before it could be started", + ALREADY_STARTING_ERROR, ))); } diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index bd338469e08..89fe3658301 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -137,6 +137,8 @@ lookup_region_port.period_secs = 60 # Therefore, disable the background task during tests. instance_updater.disable = true instance_updater.period_secs = 60 +# How frequently to attempt to restart Failed instances? +instance_reincarnation.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index 30b86767856..b824f4b7c97 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -65,6 +65,8 @@ abandoned_vmm_reaper.period_secs = 60 saga_recovery.period_secs = 600 lookup_region_port.period_secs = 60 instance_updater.period_secs = 30 +# How frequently to attempt to restart Failed instances? +instance_reincarnation.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30 diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index 1761d416982..e966b3eabd4 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -65,6 +65,8 @@ abandoned_vmm_reaper.period_secs = 60 saga_recovery.period_secs = 600 lookup_region_port.period_secs = 60 instance_updater.period_secs = 30 +# How frequently to attempt to restart Failed instances? +instance_reincarnation.period_secs = 60 region_snapshot_replacement_start.period_secs = 30 region_snapshot_replacement_garbage_collection.period_secs = 30 region_snapshot_replacement_step.period_secs = 30