From de19dbda4c8187315d653934133b60af3354aec3 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 5 Sep 2024 10:55:13 -0700 Subject: [PATCH] [nexus] Add `instance_watcher` concurrency limit The `instance_watcher` background task follows a pattern where it queries the database for instances, and then spawns a big pile of Tokio tasks to concurrently perform health checks for those instances. As suggested by @davepacheco in [this comment][1], there should probably be a limit on the number of concurrently running health checks to avoid clobbering the sled-agents with a giant pile of HTTP requests. [1]: https://github.com/oxidecomputer/omicron/pull/6519#pullrequestreview-2283593739 --- .../app/background/tasks/instance_watcher.rs | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index 7c90e7b1ecb..dda830afaf5 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -31,6 +31,7 @@ use std::future::Future; use std::num::NonZeroU32; use std::sync::Arc; use std::sync::Mutex; +use tokio::sync::Semaphore; use uuid::Uuid; oximeter::use_timeseries!("vm-health-check.toml"); @@ -42,6 +43,9 @@ pub(crate) struct InstanceWatcher { sagas: Arc, metrics: Arc>, id: WatcherIdentity, + /// Semaphore used to limit the number of concurrently in flight requests to + /// sled-agents. + check_concurrency_limit: Arc, } const MAX_SLED_AGENTS: NonZeroU32 = unsafe { @@ -49,6 +53,9 @@ const MAX_SLED_AGENTS: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(100) }; +// Chosen arbitrarily. Perhaps this should be configurable in the config file? +const MAX_CONCURRENT_CHECKS: usize = 128; + impl InstanceWatcher { pub(crate) fn new( datastore: Arc, @@ -60,7 +67,15 @@ impl InstanceWatcher { producer_registry .register_producer(metrics::Producer(metrics.clone())) .unwrap(); - Self { datastore, sagas, metrics, id } + Self { + datastore, + sagas, + metrics, + id, + check_concurrency_limit: Arc::new(Semaphore::new( + MAX_CONCURRENT_CHECKS, + )), + } } fn check_instance( @@ -72,6 +87,7 @@ impl InstanceWatcher { ) -> impl Future + Send + 'static { let datastore = self.datastore.clone(); let sagas = self.sagas.clone(); + let concurrency_limit = self.check_concurrency_limit.clone(); let vmm_id = PropolisUuid::from_untyped_uuid(target.vmm_id); let opctx = { @@ -86,15 +102,28 @@ impl InstanceWatcher { let client = client.clone(); async move { + // First, acquire a permit from the semaphore before continuing to + // perform the health check, to limit the number of concurrently + // in-flight checks. + // + // Dropping this will release the permit back to the semaphore, + // allowing he next check task to proceed. + let _permit = match concurrency_limit.acquire().await { + Ok(permit) => Some(permit), + Err(_) if cfg!(debug_assertions) => unreachable!( + "the semaphore is never closed, so this should never fail" + ), + Err(_) => None, + }; slog::trace!( opctx.log, "checking on VMM"; "propolis_id" => %vmm_id ); - slog::trace!(opctx.log, "checking on instance..."); let rsp = client .vmm_get_state(&vmm_id) .await .map_err(SledAgentInstanceError); + let mut check = Check { target, outcome: Default::default(),