From 899f33c08eb47a09286e2c5188e94aa6ec434657 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 5 Sep 2024 10:55:13 -0700 Subject: [PATCH 1/7] [nexus] Add `instance_watcher` concurrency limit The `instance_watcher` background task follows a pattern where it queries the database for instances, and then spawns a big pile of Tokio tasks to concurrently perform health checks for those instances. As suggested by @davepacheco in [this comment][1], there should probably be a limit on the number of concurrently running health checks to avoid clobbering the sled-agents with a giant pile of HTTP requests. This branch sets a global concurrency limit of 16 health checks using a `tokio::sync::Semaphore`. This is a pretty arbirary, and fairly low, limit, but we can bump it up later if it turns out to be a bottleneck. [1]: https://github.com/oxidecomputer/omicron/pull/6519#pullrequestreview-2283593739 --- .../app/background/tasks/instance_watcher.rs | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index 7c90e7b1ec..223a6d7637 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -31,6 +31,7 @@ use std::future::Future; use std::num::NonZeroU32; use std::sync::Arc; use std::sync::Mutex; +use tokio::sync::Semaphore; use uuid::Uuid; oximeter::use_timeseries!("vm-health-check.toml"); @@ -42,6 +43,9 @@ pub(crate) struct InstanceWatcher { sagas: Arc, metrics: Arc>, id: WatcherIdentity, + /// Semaphore used to limit the number of concurrently in flight requests to + /// sled-agents. + check_concurrency_limit: Arc, } const MAX_SLED_AGENTS: NonZeroU32 = unsafe { @@ -49,6 +53,9 @@ const MAX_SLED_AGENTS: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(100) }; +// Chosen arbitrarily. Perhaps this should be configurable in the config file? +const MAX_CONCURRENT_CHECKS: usize = 16; + impl InstanceWatcher { pub(crate) fn new( datastore: Arc, @@ -60,7 +67,15 @@ impl InstanceWatcher { producer_registry .register_producer(metrics::Producer(metrics.clone())) .unwrap(); - Self { datastore, sagas, metrics, id } + Self { + datastore, + sagas, + metrics, + id, + check_concurrency_limit: Arc::new(Semaphore::new( + MAX_CONCURRENT_CHECKS, + )), + } } fn check_instance( @@ -72,6 +87,7 @@ impl InstanceWatcher { ) -> impl Future + Send + 'static { let datastore = self.datastore.clone(); let sagas = self.sagas.clone(); + let concurrency_limit = self.check_concurrency_limit.clone(); let vmm_id = PropolisUuid::from_untyped_uuid(target.vmm_id); let opctx = { @@ -86,15 +102,28 @@ impl InstanceWatcher { let client = client.clone(); async move { + // First, acquire a permit from the semaphore before continuing to + // perform the health check, to limit the number of concurrently + // in-flight checks. + // + // Dropping this will release the permit back to the semaphore, + // allowing he next check task to proceed. + let _permit = match concurrency_limit.acquire().await { + Ok(permit) => Some(permit), + Err(_) if cfg!(debug_assertions) => unreachable!( + "the semaphore is never closed, so this should never fail" + ), + Err(_) => None, + }; slog::trace!( opctx.log, "checking on VMM"; "propolis_id" => %vmm_id ); - slog::trace!(opctx.log, "checking on instance..."); let rsp = client .vmm_get_state(&vmm_id) .await .map_err(SledAgentInstanceError); + let mut check = Check { target, outcome: Default::default(), From dbf5c23995c69e43de1daf4c7ac63c39f386b5ea Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 5 Sep 2024 11:44:23 -0700 Subject: [PATCH 2/7] just use the database as the concurrency limit --- .../app/background/tasks/instance_watcher.rs | 164 +++++++----------- 1 file changed, 64 insertions(+), 100 deletions(-) diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index 223a6d7637..39ac482aea 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -31,7 +31,6 @@ use std::future::Future; use std::num::NonZeroU32; use std::sync::Arc; use std::sync::Mutex; -use tokio::sync::Semaphore; use uuid::Uuid; oximeter::use_timeseries!("vm-health-check.toml"); @@ -43,18 +42,13 @@ pub(crate) struct InstanceWatcher { sagas: Arc, metrics: Arc>, id: WatcherIdentity, - /// Semaphore used to limit the number of concurrently in flight requests to - /// sled-agents. - check_concurrency_limit: Arc, } -const MAX_SLED_AGENTS: NonZeroU32 = unsafe { - // Safety: last time I checked, 100 was greater than zero. - NonZeroU32::new_unchecked(100) -}; - // Chosen arbitrarily. Perhaps this should be configurable in the config file? -const MAX_CONCURRENT_CHECKS: usize = 16; +const MAX_CONCURRENT_CHECKS: NonZeroU32 = unsafe { + // Safety: last time I checked, 16 was greater than zero. + NonZeroU32::new_unchecked(16) +}; impl InstanceWatcher { pub(crate) fn new( @@ -67,27 +61,18 @@ impl InstanceWatcher { producer_registry .register_producer(metrics::Producer(metrics.clone())) .unwrap(); - Self { - datastore, - sagas, - metrics, - id, - check_concurrency_limit: Arc::new(Semaphore::new( - MAX_CONCURRENT_CHECKS, - )), - } + Self { datastore, sagas, metrics, id } } fn check_instance( &self, opctx: &OpContext, - client: &SledAgentClient, + client: SledAgentClient, target: VirtualMachine, vmm: Vmm, ) -> impl Future + Send + 'static { let datastore = self.datastore.clone(); let sagas = self.sagas.clone(); - let concurrency_limit = self.check_concurrency_limit.clone(); let vmm_id = PropolisUuid::from_untyped_uuid(target.vmm_id); let opctx = { @@ -99,22 +84,8 @@ impl InstanceWatcher { meta.insert("vmm_id".to_string(), vmm_id.to_string()); opctx.child(meta) }; - let client = client.clone(); async move { - // First, acquire a permit from the semaphore before continuing to - // perform the health check, to limit the number of concurrently - // in-flight checks. - // - // Dropping this will release the permit back to the semaphore, - // allowing he next check task to proceed. - let _permit = match concurrency_limit.acquire().await { - Ok(permit) => Some(permit), - Err(_) if cfg!(debug_assertions) => unreachable!( - "the semaphore is never closed, so this should never fail" - ), - Err(_) => None, - }; slog::trace!( opctx.log, "checking on VMM"; "propolis_id" => %vmm_id ); @@ -406,15 +377,17 @@ impl BackgroundTask for InstanceWatcher { ) -> BoxFuture<'a, serde_json::Value> { async { let mut tasks = tokio::task::JoinSet::new(); - let mut paginator = Paginator::new(MAX_SLED_AGENTS); - let mk_client = |sled: &Sled| { - nexus_networking::sled_client_from_address( - sled.id(), - sled.address(), - &opctx.log, - ) - }; + let mut paginator = Paginator::new(MAX_CONCURRENT_CHECKS); + + let mut total: usize = 0; + let mut update_sagas_queued: usize = 0; + let mut instance_states: BTreeMap = + BTreeMap::new(); + let mut check_failures: BTreeMap = + BTreeMap::new(); + let mut check_errors: BTreeMap = BTreeMap::new(); + let mut curr_sled: Option<(Uuid, SledAgentClient)> = None; while let Some(p) = paginator.next() { let maybe_batch = self .datastore @@ -435,69 +408,60 @@ impl BackgroundTask for InstanceWatcher { }; paginator = p.found_batch(&batch, &|(sled, _, _, _)| sled.id()); - // When we iterate over the batch of sled instances, we pop the - // first sled from the batch before looping over the rest, to - // insure that the initial sled-agent client is created first, - // as we need the address of the first sled to construct it. - // We could, alternatively, make the sled-agent client an - // `Option`, but then every subsequent iteration would have to - // handle the case where it's `None`, and I thought this was a - // bit neater... - let mut batch = batch.into_iter(); - if let Some((mut curr_sled, instance, vmm, project)) = batch.next() { - let mut client = mk_client(&curr_sled); - let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project); - tasks.spawn(self.check_instance(opctx, &client, target, vmm)); - - for (sled, instance, vmm, project) in batch { - // We're now talking to a new sled agent; update the client. - if sled.id() != curr_sled.id() { - client = mk_client(&sled); - curr_sled = sled; - } + // Spawn a task to check on each sled in the batch. + for (sled, instance, vmm, project) in batch { + let client = match curr_sled { + // If we are still talking to the same sled, reuse the + // existing client and its connection pool. + Some((sled_id, ref client)) if sled_id == sled.id() => client.clone(), + // Otherwise, if we've moved on to a new sled, refresh + // the client. + ref mut curr => { + let client = nexus_networking::sled_client_from_address( + sled.id(), + sled.address(), + &opctx.log, + ); + *curr = Some((sled.id(), client.clone())); + client + }, + }; - let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project); - tasks.spawn(self.check_instance(opctx, &client, target, vmm)); - } + + let target = VirtualMachine::new(self.id, &sled, &instance, &vmm, &project); + tasks.spawn(self.check_instance(opctx, client, target, vmm)); } - } - // Now, wait for the check results to come back. - let mut total: usize = 0; - let mut update_sagas_queued: usize = 0; - let mut instance_states: BTreeMap = - BTreeMap::new(); - let mut check_failures: BTreeMap = - BTreeMap::new(); - let mut check_errors: BTreeMap = BTreeMap::new(); - while let Some(result) = tasks.join_next().await { - total += 1; - let check = result.expect( - "a `JoinError` is returned if a spawned task \ - panics, or if the task is aborted. we never abort \ - tasks on this `JoinSet`, and nexus is compiled with \ - `panic=\"abort\"`, so neither of these cases should \ - ever occur", - ); - match check.outcome { - CheckOutcome::Success(state) => { - *instance_states - .entry(state.to_string()) - .or_default() += 1; + // Now, wait for the check results to come back. + while let Some(result) = tasks.join_next().await { + total += 1; + let check = result.expect( + "a `JoinError` is returned if a spawned task \ + panics, or if the task is aborted. we never abort \ + tasks on this `JoinSet`, and nexus is compiled with \ + `panic=\"abort\"`, so neither of these cases should \ + ever occur", + ); + match check.outcome { + CheckOutcome::Success(state) => { + *instance_states + .entry(state.to_string()) + .or_default() += 1; + } + CheckOutcome::Failure(reason) => { + *check_failures.entry(reason.as_str().into_owned()).or_default() += 1; + } + CheckOutcome::Unknown => {} } - CheckOutcome::Failure(reason) => { - *check_failures.entry(reason.as_str().into_owned()).or_default() += 1; + if let Err(ref reason) = check.result { + *check_errors.entry(reason.as_str().into_owned()).or_default() += 1; } - CheckOutcome::Unknown => {} - } - if let Err(ref reason) = check.result { - *check_errors.entry(reason.as_str().into_owned()).or_default() += 1; - } - if check.update_saga_queued { - update_sagas_queued += 1; - } - self.metrics.lock().unwrap().record_check(check); + if check.update_saga_queued { + update_sagas_queued += 1; + } + self.metrics.lock().unwrap().record_check(check); + } } // All requests completed! Prune any old instance metrics for From 305aa0a7ff93615662b6b15b4cb2457e32234c73 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 5 Sep 2024 11:49:10 -0700 Subject: [PATCH 3/7] docs --- nexus/src/app/background/tasks/instance_watcher.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index 39ac482aea..f3a12b88b3 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -387,7 +387,13 @@ impl BackgroundTask for InstanceWatcher { BTreeMap::new(); let mut check_errors: BTreeMap = BTreeMap::new(); - let mut curr_sled: Option<(Uuid, SledAgentClient)> = None; + // A `reqwest` client is a reference-counted handle to a connection + // poll that can be reused by multiple requests. Making a new client + // is fairly expensive, but cloning one is cheap, and cloning it + // allows reusing pooled TCP connections. Therefore, we will order + // the database query by sled ID, and reuse the same sled-agent + // client as long as we are talking to the same sled. + let mut curr_client: Option<(Uuid, SledAgentClient)> = None; while let Some(p) = paginator.next() { let maybe_batch = self .datastore @@ -410,7 +416,7 @@ impl BackgroundTask for InstanceWatcher { // Spawn a task to check on each sled in the batch. for (sled, instance, vmm, project) in batch { - let client = match curr_sled { + let client = match curr_client { // If we are still talking to the same sled, reuse the // existing client and its connection pool. Some((sled_id, ref client)) if sled_id == sled.id() => client.clone(), From 9142baccd5a8d8f971805f0ce7a23616f2fa9539 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 9 Sep 2024 11:34:12 -0700 Subject: [PATCH 4/7] Update nexus/src/app/background/tasks/instance_watcher.rs Co-authored-by: David Pacheco --- nexus/src/app/background/tasks/instance_watcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index f3a12b88b3..3ec44d2375 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -388,7 +388,7 @@ impl BackgroundTask for InstanceWatcher { let mut check_errors: BTreeMap = BTreeMap::new(); // A `reqwest` client is a reference-counted handle to a connection - // poll that can be reused by multiple requests. Making a new client + // pool that can be reused by multiple requests. Making a new client // is fairly expensive, but cloning one is cheap, and cloning it // allows reusing pooled TCP connections. Therefore, we will order // the database query by sled ID, and reuse the same sled-agent From 45e7e4b418af53842f5fe6cd9329e79881094ef6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 9 Sep 2024 12:10:54 -0700 Subject: [PATCH 5/7] improve documentation of concurrency limit as suggested by @davepacheco in https://github.com/oxidecomputer/omicron/pull/6527#discussion_r1750727032 --- .../app/background/tasks/instance_watcher.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index 3ec44d2375..1ac6da0ad0 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -44,7 +44,24 @@ pub(crate) struct InstanceWatcher { id: WatcherIdentity, } -// Chosen arbitrarily. Perhaps this should be configurable in the config file? +/// Determines how many instance checks and their subsequent update sagas (if +/// the instance's state has changed) can execute concurrently. If this +/// numberis too high, there's risk that this task could starve other Nexus +/// activities or overload the database or overload one or more sled agents. +/// +/// The only consequence of the number being too low is that it may take longer +/// for the system to notice an instance's VMM state requires an instance state +/// transition, which translates to increased latency between when events (like +/// a VMM crash or a completed migration) occur, and when the necessary actions +/// are (like releasing unused resource allocations or allowing an instance to +/// be restarted or deleted) are performed. However, in the happy path, instance +/// update sagas execute immediately upon receipt of a VMM state update pushed +/// by a sled-agent in `cpapi_instances_put`. Therefore, discovering a needed +/// state transition a health check only occurs if the pushed state update was +/// not handled correctly, or if the sled-agent itself has restarted and +/// forgotten about the instances it was supposed to know about. For now, we +/// tune this pretty low, choosing safety over low recovery latency for these +/// relatively rare events. const MAX_CONCURRENT_CHECKS: NonZeroU32 = unsafe { // Safety: last time I checked, 16 was greater than zero. NonZeroU32::new_unchecked(16) From 56e4cdebd11042391f2b38b878a83448df939d59 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 9 Sep 2024 12:23:40 -0700 Subject: [PATCH 6/7] fix typos Co-authored-by: David Pacheco --- nexus/src/app/background/tasks/instance_watcher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index 1ac6da0ad0..5d8d257a51 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -46,14 +46,14 @@ pub(crate) struct InstanceWatcher { /// Determines how many instance checks and their subsequent update sagas (if /// the instance's state has changed) can execute concurrently. If this -/// numberis too high, there's risk that this task could starve other Nexus +/// number is too high, there's risk that this task could starve other Nexus /// activities or overload the database or overload one or more sled agents. /// /// The only consequence of the number being too low is that it may take longer /// for the system to notice an instance's VMM state requires an instance state /// transition, which translates to increased latency between when events (like /// a VMM crash or a completed migration) occur, and when the necessary actions -/// are (like releasing unused resource allocations or allowing an instance to +/// (like releasing unused resource allocations or allowing an instance to /// be restarted or deleted) are performed. However, in the happy path, instance /// update sagas execute immediately upon receipt of a VMM state update pushed /// by a sled-agent in `cpapi_instances_put`. Therefore, discovering a needed From 88416753f3107a2418b4de94f11d124976a9b685 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 9 Sep 2024 12:40:55 -0700 Subject: [PATCH 7/7] document use of query size for concurrency limit --- nexus/src/app/background/tasks/instance_watcher.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index 5d8d257a51..82b831ef58 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -394,6 +394,19 @@ impl BackgroundTask for InstanceWatcher { ) -> BoxFuture<'a, serde_json::Value> { async { let mut tasks = tokio::task::JoinSet::new(); + // We're using the size of the database query page as a simple + // mechanism for limiting the number of concurrent health checks: if + // we only query for `MAX_CONCURRENT_CHECKS` records at a time, and + // we wait until all spawned health checks have completed before + // reading the next batch, it follows that there are only ever + // `MAX_CONCURRENT_CHECKS` checks in flight at a time. + // + // This does mean that, unlike using a counting semaphore or + // similar, we may have fewer than the limit health checks running + // in parallel as we will not query for a new batch until *all* + // checks for the current batch have completed. If that becomes an + // issue, we could implement a more sophisticated + // concurrency-limiting scheme later. let mut paginator = Paginator::new(MAX_CONCURRENT_CHECKS); let mut total: usize = 0;