From ad8548a1a492e1f52dba52763e5a5c2b6d37ac74 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 9 Sep 2024 14:38:41 -0700 Subject: [PATCH] [nexus] Add `instance_watcher` concurrency limit (#6527) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `instance_watcher` background task follows a pattern where it queries the database for instances, and then spawns a big pile of Tokio tasks to concurrently perform health checks for those instances. As suggested by @davepacheco in [this comment][1], there should probably be a limit on the number of concurrently running health checks to avoid clobbering the sled-agents with a giant pile of HTTP requests. This branch sets a global concurrency limit of 16 health checks (which is fairly conservative, but we can turn it up later if it turns out to be a bottleneck). The concurrency limit is implemented using the database query's batch size. Previously, this code was written in a slightly-weird dual-loop structure, which was intended specifically to *avoid* the size of the database query batch acting as a concurrency limit: we would read a page of sleds from CRDB, spawn a bunch of health check tasks, and then read the next batch, waiting for the tasks to complete only once all instance records had been read from the database. Now, we can implement a concurrency limit by just...not doing that. We now wait to read the next page of query results until we've run health checks for every instance in the batch, limiting the number of concurrently in flight checks. This has a nice advantage over the naïve approach of using a `tokio::sync::Semaphore` or similar, which each health check task must acquire before proceeding, as the concurrency limit: it also bounds the amount of Nexus' memory used by the instance watcher. If we spawned all the tasks immediately but made them wait to acquire a semaphore permit, there would be a bunch of tasks in memory sitting around doing nothing until the currently in flight tasks completed. With the batch size as concurrency limit approach, we can instead avoid spawning those tasks at all (and, avoid reading stuff from CRDB until we actually need it). [1]: https://github.com/oxidecomputer/omicron/pull/6519#pullrequestreview-2283593739 --- .../app/background/tasks/instance_watcher.rs | 171 ++++++++++-------- 1 file changed, 100 insertions(+), 71 deletions(-) diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index 351ab73eb4..09ddac192e 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -44,9 +44,27 @@ pub(crate) struct InstanceWatcher { id: WatcherIdentity, } -const MAX_SLED_AGENTS: NonZeroU32 = unsafe { - // Safety: last time I checked, 100 was greater than zero. - NonZeroU32::new_unchecked(100) +/// Determines how many instance checks and their subsequent update sagas (if +/// the instance's state has changed) can execute concurrently. If this +/// number is too high, there's risk that this task could starve other Nexus +/// activities or overload the database or overload one or more sled agents. +/// +/// The only consequence of the number being too low is that it may take longer +/// for the system to notice an instance's VMM state requires an instance state +/// transition, which translates to increased latency between when events (like +/// a VMM crash or a completed migration) occur, and when the necessary actions +/// (like releasing unused resource allocations or allowing an instance to +/// be restarted or deleted) are performed. However, in the happy path, instance +/// update sagas execute immediately upon receipt of a VMM state update pushed +/// by a sled-agent in `cpapi_instances_put`. Therefore, discovering a needed +/// state transition a health check only occurs if the pushed state update was +/// not handled correctly, or if the sled-agent itself has restarted and +/// forgotten about the instances it was supposed to know about. For now, we +/// tune this pretty low, choosing safety over low recovery latency for these +/// relatively rare events. +const MAX_CONCURRENT_CHECKS: NonZeroU32 = unsafe { + // Safety: last time I checked, 16 was greater than zero. + NonZeroU32::new_unchecked(16) }; impl InstanceWatcher { @@ -66,7 +84,7 @@ impl InstanceWatcher { fn check_instance( &self, opctx: &OpContext, - client: &SledAgentClient, + client: SledAgentClient, target: VirtualMachine, vmm: Vmm, ) -> impl Future + Send + 'static { @@ -83,18 +101,17 @@ impl InstanceWatcher { meta.insert("vmm_id".to_string(), vmm_id.to_string()); opctx.child(meta) }; - let client = client.clone(); async move { slog::trace!( opctx.log, "checking on VMM"; "propolis_id" => %vmm_id ); - slog::trace!(opctx.log, "checking on instance..."); let rsp = client .vmm_get_state(&vmm_id) .await .map_err(SledAgentInstanceError); + let mut check = Check { target, outcome: Default::default(), @@ -377,15 +394,36 @@ impl BackgroundTask for InstanceWatcher { ) -> BoxFuture<'a, serde_json::Value> { async { let mut tasks = tokio::task::JoinSet::new(); - let mut paginator = Paginator::new(MAX_SLED_AGENTS); - let mk_client = |sled: &Sled| { - nexus_networking::sled_client_from_address( - sled.id(), - sled.address(), - &opctx.log, - ) - }; + // We're using the size of the database query page as a simple + // mechanism for limiting the number of concurrent health checks: if + // we only query for `MAX_CONCURRENT_CHECKS` records at a time, and + // we wait until all spawned health checks have completed before + // reading the next batch, it follows that there are only ever + // `MAX_CONCURRENT_CHECKS` checks in flight at a time. + // + // This does mean that, unlike using a counting semaphore or + // similar, we may have fewer than the limit health checks running + // in parallel as we will not query for a new batch until *all* + // checks for the current batch have completed. If that becomes an + // issue, we could implement a more sophisticated + // concurrency-limiting scheme later. + let mut paginator = Paginator::new(MAX_CONCURRENT_CHECKS); + + let mut total: usize = 0; + let mut update_sagas_queued: usize = 0; + let mut instance_states: BTreeMap = + BTreeMap::new(); + let mut check_failures: BTreeMap = + BTreeMap::new(); + let mut check_errors: BTreeMap = BTreeMap::new(); + // A `reqwest` client is a reference-counted handle to a connection + // pool that can be reused by multiple requests. Making a new client + // is fairly expensive, but cloning one is cheap, and cloning it + // allows reusing pooled TCP connections. Therefore, we will order + // the database query by sled ID, and reuse the same sled-agent + // client as long as we are talking to the same sled. + let mut curr_client: Option<(Uuid, SledAgentClient)> = None; while let Some(p) = paginator.next() { let maybe_batch = self .datastore @@ -406,69 +444,60 @@ impl BackgroundTask for InstanceWatcher { }; paginator = p.found_batch(&batch, &|(sled, _, vmm, _)| (sled.id(), vmm.id)); - // When we iterate over the batch of sled instances, we pop the - // first sled from the batch before looping over the rest, to - // insure that the initial sled-agent client is created first, - // as we need the address of the first sled to construct it. - // We could, alternatively, make the sled-agent client an - // `Option`, but then every subsequent iteration would have to - // handle the case where it's `None`, and I thought this was a - // bit neater... - let mut batch = batch.into_iter(); - if let Some((mut curr_sled, instance, vmm, project)) = batch.next() { - let mut client = mk_client(&curr_sled); - let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project); - tasks.spawn(self.check_instance(opctx, &client, target, vmm)); - - for (sled, instance, vmm, project) in batch { - // We're now talking to a new sled agent; update the client. - if sled.id() != curr_sled.id() { - client = mk_client(&sled); - curr_sled = sled; - } + // Spawn a task to check on each sled in the batch. + for (sled, instance, vmm, project) in batch { + let client = match curr_client { + // If we are still talking to the same sled, reuse the + // existing client and its connection pool. + Some((sled_id, ref client)) if sled_id == sled.id() => client.clone(), + // Otherwise, if we've moved on to a new sled, refresh + // the client. + ref mut curr => { + let client = nexus_networking::sled_client_from_address( + sled.id(), + sled.address(), + &opctx.log, + ); + *curr = Some((sled.id(), client.clone())); + client + }, + }; - let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project); - tasks.spawn(self.check_instance(opctx, &client, target, vmm)); - } + + let target = VirtualMachine::new(self.id, &sled, &instance, &vmm, &project); + tasks.spawn(self.check_instance(opctx, client, target, vmm)); } - } - // Now, wait for the check results to come back. - let mut total: usize = 0; - let mut update_sagas_queued: usize = 0; - let mut instance_states: BTreeMap = - BTreeMap::new(); - let mut check_failures: BTreeMap = - BTreeMap::new(); - let mut check_errors: BTreeMap = BTreeMap::new(); - while let Some(result) = tasks.join_next().await { - total += 1; - let check = result.expect( - "a `JoinError` is returned if a spawned task \ - panics, or if the task is aborted. we never abort \ - tasks on this `JoinSet`, and nexus is compiled with \ - `panic=\"abort\"`, so neither of these cases should \ - ever occur", - ); - match check.outcome { - CheckOutcome::Success(state) => { - *instance_states - .entry(state.to_string()) - .or_default() += 1; + // Now, wait for the check results to come back. + while let Some(result) = tasks.join_next().await { + total += 1; + let check = result.expect( + "a `JoinError` is returned if a spawned task \ + panics, or if the task is aborted. we never abort \ + tasks on this `JoinSet`, and nexus is compiled with \ + `panic=\"abort\"`, so neither of these cases should \ + ever occur", + ); + match check.outcome { + CheckOutcome::Success(state) => { + *instance_states + .entry(state.to_string()) + .or_default() += 1; + } + CheckOutcome::Failure(reason) => { + *check_failures.entry(reason.as_str().into_owned()).or_default() += 1; + } + CheckOutcome::Unknown => {} } - CheckOutcome::Failure(reason) => { - *check_failures.entry(reason.as_str().into_owned()).or_default() += 1; + if let Err(ref reason) = check.result { + *check_errors.entry(reason.as_str().into_owned()).or_default() += 1; } - CheckOutcome::Unknown => {} - } - if let Err(ref reason) = check.result { - *check_errors.entry(reason.as_str().into_owned()).or_default() += 1; - } - if check.update_saga_queued { - update_sagas_queued += 1; - } - self.metrics.lock().unwrap().record_check(check); + if check.update_saga_queued { + update_sagas_queued += 1; + } + self.metrics.lock().unwrap().record_check(check); + } } // All requests completed! Prune any old instance metrics for