Skip to content

Commit

Permalink
[nexus] Add instance_watcher concurrency limit
Browse files Browse the repository at this point in the history
The `instance_watcher` background task follows a pattern where it
queries the database for instances, and then spawns a big pile of Tokio
tasks to concurrently perform health checks for those instances. As
suggested by @davepacheco in [this comment][1], there should probably be
a limit on the number of concurrently running health checks to avoid
clobbering the sled-agents with a giant pile of HTTP requests.

This branch sets a global concurrency limit of 16 health checks using a
`tokio::sync::Semaphore`. This is a pretty arbirary, and fairly low,
limit, but we can bump it up later if it turns out to be a bottleneck.

[1]: #6519 (review)
  • Loading branch information
hawkw committed Sep 5, 2024
1 parent da08190 commit 899f33c
Showing 1 changed file with 31 additions and 2 deletions.
33 changes: 31 additions & 2 deletions nexus/src/app/background/tasks/instance_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::future::Future;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::Semaphore;
use uuid::Uuid;

oximeter::use_timeseries!("vm-health-check.toml");
Expand All @@ -42,13 +43,19 @@ pub(crate) struct InstanceWatcher {
sagas: Arc<dyn StartSaga>,
metrics: Arc<Mutex<metrics::Metrics>>,
id: WatcherIdentity,
/// Semaphore used to limit the number of concurrently in flight requests to
/// sled-agents.
check_concurrency_limit: Arc<Semaphore>,
}

const MAX_SLED_AGENTS: NonZeroU32 = unsafe {
// Safety: last time I checked, 100 was greater than zero.
NonZeroU32::new_unchecked(100)
};

// Chosen arbitrarily. Perhaps this should be configurable in the config file?
const MAX_CONCURRENT_CHECKS: usize = 16;

impl InstanceWatcher {
pub(crate) fn new(
datastore: Arc<DataStore>,
Expand All @@ -60,7 +67,15 @@ impl InstanceWatcher {
producer_registry
.register_producer(metrics::Producer(metrics.clone()))
.unwrap();
Self { datastore, sagas, metrics, id }
Self {
datastore,
sagas,
metrics,
id,
check_concurrency_limit: Arc::new(Semaphore::new(
MAX_CONCURRENT_CHECKS,
)),
}
}

fn check_instance(
Expand All @@ -72,6 +87,7 @@ impl InstanceWatcher {
) -> impl Future<Output = Check> + Send + 'static {
let datastore = self.datastore.clone();
let sagas = self.sagas.clone();
let concurrency_limit = self.check_concurrency_limit.clone();

let vmm_id = PropolisUuid::from_untyped_uuid(target.vmm_id);
let opctx = {
Expand All @@ -86,15 +102,28 @@ impl InstanceWatcher {
let client = client.clone();

async move {
// First, acquire a permit from the semaphore before continuing to
// perform the health check, to limit the number of concurrently
// in-flight checks.
//
// Dropping this will release the permit back to the semaphore,
// allowing he next check task to proceed.
let _permit = match concurrency_limit.acquire().await {
Ok(permit) => Some(permit),
Err(_) if cfg!(debug_assertions) => unreachable!(
"the semaphore is never closed, so this should never fail"
),
Err(_) => None,
};
slog::trace!(
opctx.log, "checking on VMM"; "propolis_id" => %vmm_id
);

slog::trace!(opctx.log, "checking on instance...");
let rsp = client
.vmm_get_state(&vmm_id)
.await
.map_err(SledAgentInstanceError);

let mut check = Check {
target,
outcome: Default::default(),
Expand Down

0 comments on commit 899f33c

Please sign in to comment.