diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index 351ab73eb4..09ddac192e 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -44,9 +44,27 @@ pub(crate) struct InstanceWatcher { id: WatcherIdentity, } -const MAX_SLED_AGENTS: NonZeroU32 = unsafe { - // Safety: last time I checked, 100 was greater than zero. - NonZeroU32::new_unchecked(100) +/// Determines how many instance checks and their subsequent update sagas (if +/// the instance's state has changed) can execute concurrently. If this +/// number is too high, there's risk that this task could starve other Nexus +/// activities or overload the database or overload one or more sled agents. +/// +/// The only consequence of the number being too low is that it may take longer +/// for the system to notice an instance's VMM state requires an instance state +/// transition, which translates to increased latency between when events (like +/// a VMM crash or a completed migration) occur, and when the necessary actions +/// (like releasing unused resource allocations or allowing an instance to +/// be restarted or deleted) are performed. However, in the happy path, instance +/// update sagas execute immediately upon receipt of a VMM state update pushed +/// by a sled-agent in `cpapi_instances_put`. Therefore, discovering a needed +/// state transition a health check only occurs if the pushed state update was +/// not handled correctly, or if the sled-agent itself has restarted and +/// forgotten about the instances it was supposed to know about. For now, we +/// tune this pretty low, choosing safety over low recovery latency for these +/// relatively rare events. +const MAX_CONCURRENT_CHECKS: NonZeroU32 = unsafe { + // Safety: last time I checked, 16 was greater than zero. + NonZeroU32::new_unchecked(16) }; impl InstanceWatcher { @@ -66,7 +84,7 @@ impl InstanceWatcher { fn check_instance( &self, opctx: &OpContext, - client: &SledAgentClient, + client: SledAgentClient, target: VirtualMachine, vmm: Vmm, ) -> impl Future + Send + 'static { @@ -83,18 +101,17 @@ impl InstanceWatcher { meta.insert("vmm_id".to_string(), vmm_id.to_string()); opctx.child(meta) }; - let client = client.clone(); async move { slog::trace!( opctx.log, "checking on VMM"; "propolis_id" => %vmm_id ); - slog::trace!(opctx.log, "checking on instance..."); let rsp = client .vmm_get_state(&vmm_id) .await .map_err(SledAgentInstanceError); + let mut check = Check { target, outcome: Default::default(), @@ -377,15 +394,36 @@ impl BackgroundTask for InstanceWatcher { ) -> BoxFuture<'a, serde_json::Value> { async { let mut tasks = tokio::task::JoinSet::new(); - let mut paginator = Paginator::new(MAX_SLED_AGENTS); - let mk_client = |sled: &Sled| { - nexus_networking::sled_client_from_address( - sled.id(), - sled.address(), - &opctx.log, - ) - }; + // We're using the size of the database query page as a simple + // mechanism for limiting the number of concurrent health checks: if + // we only query for `MAX_CONCURRENT_CHECKS` records at a time, and + // we wait until all spawned health checks have completed before + // reading the next batch, it follows that there are only ever + // `MAX_CONCURRENT_CHECKS` checks in flight at a time. + // + // This does mean that, unlike using a counting semaphore or + // similar, we may have fewer than the limit health checks running + // in parallel as we will not query for a new batch until *all* + // checks for the current batch have completed. If that becomes an + // issue, we could implement a more sophisticated + // concurrency-limiting scheme later. + let mut paginator = Paginator::new(MAX_CONCURRENT_CHECKS); + + let mut total: usize = 0; + let mut update_sagas_queued: usize = 0; + let mut instance_states: BTreeMap = + BTreeMap::new(); + let mut check_failures: BTreeMap = + BTreeMap::new(); + let mut check_errors: BTreeMap = BTreeMap::new(); + // A `reqwest` client is a reference-counted handle to a connection + // pool that can be reused by multiple requests. Making a new client + // is fairly expensive, but cloning one is cheap, and cloning it + // allows reusing pooled TCP connections. Therefore, we will order + // the database query by sled ID, and reuse the same sled-agent + // client as long as we are talking to the same sled. + let mut curr_client: Option<(Uuid, SledAgentClient)> = None; while let Some(p) = paginator.next() { let maybe_batch = self .datastore @@ -406,69 +444,60 @@ impl BackgroundTask for InstanceWatcher { }; paginator = p.found_batch(&batch, &|(sled, _, vmm, _)| (sled.id(), vmm.id)); - // When we iterate over the batch of sled instances, we pop the - // first sled from the batch before looping over the rest, to - // insure that the initial sled-agent client is created first, - // as we need the address of the first sled to construct it. - // We could, alternatively, make the sled-agent client an - // `Option`, but then every subsequent iteration would have to - // handle the case where it's `None`, and I thought this was a - // bit neater... - let mut batch = batch.into_iter(); - if let Some((mut curr_sled, instance, vmm, project)) = batch.next() { - let mut client = mk_client(&curr_sled); - let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project); - tasks.spawn(self.check_instance(opctx, &client, target, vmm)); - - for (sled, instance, vmm, project) in batch { - // We're now talking to a new sled agent; update the client. - if sled.id() != curr_sled.id() { - client = mk_client(&sled); - curr_sled = sled; - } + // Spawn a task to check on each sled in the batch. + for (sled, instance, vmm, project) in batch { + let client = match curr_client { + // If we are still talking to the same sled, reuse the + // existing client and its connection pool. + Some((sled_id, ref client)) if sled_id == sled.id() => client.clone(), + // Otherwise, if we've moved on to a new sled, refresh + // the client. + ref mut curr => { + let client = nexus_networking::sled_client_from_address( + sled.id(), + sled.address(), + &opctx.log, + ); + *curr = Some((sled.id(), client.clone())); + client + }, + }; - let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project); - tasks.spawn(self.check_instance(opctx, &client, target, vmm)); - } + + let target = VirtualMachine::new(self.id, &sled, &instance, &vmm, &project); + tasks.spawn(self.check_instance(opctx, client, target, vmm)); } - } - // Now, wait for the check results to come back. - let mut total: usize = 0; - let mut update_sagas_queued: usize = 0; - let mut instance_states: BTreeMap = - BTreeMap::new(); - let mut check_failures: BTreeMap = - BTreeMap::new(); - let mut check_errors: BTreeMap = BTreeMap::new(); - while let Some(result) = tasks.join_next().await { - total += 1; - let check = result.expect( - "a `JoinError` is returned if a spawned task \ - panics, or if the task is aborted. we never abort \ - tasks on this `JoinSet`, and nexus is compiled with \ - `panic=\"abort\"`, so neither of these cases should \ - ever occur", - ); - match check.outcome { - CheckOutcome::Success(state) => { - *instance_states - .entry(state.to_string()) - .or_default() += 1; + // Now, wait for the check results to come back. + while let Some(result) = tasks.join_next().await { + total += 1; + let check = result.expect( + "a `JoinError` is returned if a spawned task \ + panics, or if the task is aborted. we never abort \ + tasks on this `JoinSet`, and nexus is compiled with \ + `panic=\"abort\"`, so neither of these cases should \ + ever occur", + ); + match check.outcome { + CheckOutcome::Success(state) => { + *instance_states + .entry(state.to_string()) + .or_default() += 1; + } + CheckOutcome::Failure(reason) => { + *check_failures.entry(reason.as_str().into_owned()).or_default() += 1; + } + CheckOutcome::Unknown => {} } - CheckOutcome::Failure(reason) => { - *check_failures.entry(reason.as_str().into_owned()).or_default() += 1; + if let Err(ref reason) = check.result { + *check_errors.entry(reason.as_str().into_owned()).or_default() += 1; } - CheckOutcome::Unknown => {} - } - if let Err(ref reason) = check.result { - *check_errors.entry(reason.as_str().into_owned()).or_default() += 1; - } - if check.update_saga_queued { - update_sagas_queued += 1; - } - self.metrics.lock().unwrap().record_check(check); + if check.update_saga_queued { + update_sagas_queued += 1; + } + self.metrics.lock().unwrap().record_check(check); + } } // All requests completed! Prune any old instance metrics for