Skip to content

Commit

Permalink
[nexus] Add instance_watcher concurrency limit (#6527)
Browse files Browse the repository at this point in the history
The `instance_watcher` background task follows a pattern where it
queries the database for instances, and then spawns a big pile of Tokio
tasks to concurrently perform health checks for those instances. As
suggested by @davepacheco in [this comment][1], there should probably be
a limit on the number of concurrently running health checks to avoid
clobbering the sled-agents with a giant pile of HTTP requests.

This branch sets a global concurrency limit of 16 health checks (which
is fairly conservative, but we can turn it up later if it turns out to
be a bottleneck). The concurrency limit is implemented using the
database query's batch size. Previously, this code was written in a
slightly-weird dual-loop structure, which was intended specifically to
*avoid* the size of the database query batch acting as a concurrency
limit: we would read a page of sleds from CRDB, spawn a bunch of health
check tasks, and then read the next batch, waiting for the tasks to
complete only once all instance records had been read from the database.
Now, we can implement a concurrency limit by just...not doing that. We
now wait to read the next page of query results until we've run health
checks for every instance in the batch, limiting the number of
concurrently in flight checks. 

This has a nice advantage over the naïve approach of using a
`tokio::sync::Semaphore` or similar, which each health check task must
acquire before proceeding, as the concurrency limit: it also bounds the
amount of Nexus' memory used by the instance watcher. If we spawned all
the tasks immediately but made them wait to acquire a semaphore permit,
there would be a bunch of tasks in memory sitting around doing nothing
until the currently in flight tasks completed. With the batch size as
concurrency limit approach, we can instead avoid spawning those tasks at
all (and, avoid reading stuff from CRDB until we actually need it).

[1]: #6519 (review)
  • Loading branch information
hawkw authored Sep 9, 2024
1 parent 1c24f45 commit ad8548a
Showing 1 changed file with 100 additions and 71 deletions.
171 changes: 100 additions & 71 deletions nexus/src/app/background/tasks/instance_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,27 @@ pub(crate) struct InstanceWatcher {
id: WatcherIdentity,
}

const MAX_SLED_AGENTS: NonZeroU32 = unsafe {
// Safety: last time I checked, 100 was greater than zero.
NonZeroU32::new_unchecked(100)
/// Determines how many instance checks and their subsequent update sagas (if
/// the instance's state has changed) can execute concurrently. If this
/// number is too high, there's risk that this task could starve other Nexus
/// activities or overload the database or overload one or more sled agents.
///
/// The only consequence of the number being too low is that it may take longer
/// for the system to notice an instance's VMM state requires an instance state
/// transition, which translates to increased latency between when events (like
/// a VMM crash or a completed migration) occur, and when the necessary actions
/// (like releasing unused resource allocations or allowing an instance to
/// be restarted or deleted) are performed. However, in the happy path, instance
/// update sagas execute immediately upon receipt of a VMM state update pushed
/// by a sled-agent in `cpapi_instances_put`. Therefore, discovering a needed
/// state transition a health check only occurs if the pushed state update was
/// not handled correctly, or if the sled-agent itself has restarted and
/// forgotten about the instances it was supposed to know about. For now, we
/// tune this pretty low, choosing safety over low recovery latency for these
/// relatively rare events.
const MAX_CONCURRENT_CHECKS: NonZeroU32 = unsafe {
// Safety: last time I checked, 16 was greater than zero.
NonZeroU32::new_unchecked(16)
};

impl InstanceWatcher {
Expand All @@ -66,7 +84,7 @@ impl InstanceWatcher {
fn check_instance(
&self,
opctx: &OpContext,
client: &SledAgentClient,
client: SledAgentClient,
target: VirtualMachine,
vmm: Vmm,
) -> impl Future<Output = Check> + Send + 'static {
Expand All @@ -83,18 +101,17 @@ impl InstanceWatcher {
meta.insert("vmm_id".to_string(), vmm_id.to_string());
opctx.child(meta)
};
let client = client.clone();

async move {
slog::trace!(
opctx.log, "checking on VMM"; "propolis_id" => %vmm_id
);

slog::trace!(opctx.log, "checking on instance...");
let rsp = client
.vmm_get_state(&vmm_id)
.await
.map_err(SledAgentInstanceError);

let mut check = Check {
target,
outcome: Default::default(),
Expand Down Expand Up @@ -377,15 +394,36 @@ impl BackgroundTask for InstanceWatcher {
) -> BoxFuture<'a, serde_json::Value> {
async {
let mut tasks = tokio::task::JoinSet::new();
let mut paginator = Paginator::new(MAX_SLED_AGENTS);
let mk_client = |sled: &Sled| {
nexus_networking::sled_client_from_address(
sled.id(),
sled.address(),
&opctx.log,
)
};
// We're using the size of the database query page as a simple
// mechanism for limiting the number of concurrent health checks: if
// we only query for `MAX_CONCURRENT_CHECKS` records at a time, and
// we wait until all spawned health checks have completed before
// reading the next batch, it follows that there are only ever
// `MAX_CONCURRENT_CHECKS` checks in flight at a time.
//
// This does mean that, unlike using a counting semaphore or
// similar, we may have fewer than the limit health checks running
// in parallel as we will not query for a new batch until *all*
// checks for the current batch have completed. If that becomes an
// issue, we could implement a more sophisticated
// concurrency-limiting scheme later.
let mut paginator = Paginator::new(MAX_CONCURRENT_CHECKS);

let mut total: usize = 0;
let mut update_sagas_queued: usize = 0;
let mut instance_states: BTreeMap<String, usize> =
BTreeMap::new();
let mut check_failures: BTreeMap<String, usize> =
BTreeMap::new();
let mut check_errors: BTreeMap<String, usize> = BTreeMap::new();

// A `reqwest` client is a reference-counted handle to a connection
// pool that can be reused by multiple requests. Making a new client
// is fairly expensive, but cloning one is cheap, and cloning it
// allows reusing pooled TCP connections. Therefore, we will order
// the database query by sled ID, and reuse the same sled-agent
// client as long as we are talking to the same sled.
let mut curr_client: Option<(Uuid, SledAgentClient)> = None;
while let Some(p) = paginator.next() {
let maybe_batch = self
.datastore
Expand All @@ -406,69 +444,60 @@ impl BackgroundTask for InstanceWatcher {
};
paginator = p.found_batch(&batch, &|(sled, _, vmm, _)| (sled.id(), vmm.id));

// When we iterate over the batch of sled instances, we pop the
// first sled from the batch before looping over the rest, to
// insure that the initial sled-agent client is created first,
// as we need the address of the first sled to construct it.
// We could, alternatively, make the sled-agent client an
// `Option`, but then every subsequent iteration would have to
// handle the case where it's `None`, and I thought this was a
// bit neater...
let mut batch = batch.into_iter();
if let Some((mut curr_sled, instance, vmm, project)) = batch.next() {
let mut client = mk_client(&curr_sled);
let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project);
tasks.spawn(self.check_instance(opctx, &client, target, vmm));

for (sled, instance, vmm, project) in batch {
// We're now talking to a new sled agent; update the client.
if sled.id() != curr_sled.id() {
client = mk_client(&sled);
curr_sled = sled;
}
// Spawn a task to check on each sled in the batch.
for (sled, instance, vmm, project) in batch {
let client = match curr_client {
// If we are still talking to the same sled, reuse the
// existing client and its connection pool.
Some((sled_id, ref client)) if sled_id == sled.id() => client.clone(),
// Otherwise, if we've moved on to a new sled, refresh
// the client.
ref mut curr => {
let client = nexus_networking::sled_client_from_address(
sled.id(),
sled.address(),
&opctx.log,
);
*curr = Some((sled.id(), client.clone()));
client
},
};

let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project);
tasks.spawn(self.check_instance(opctx, &client, target, vmm));
}

let target = VirtualMachine::new(self.id, &sled, &instance, &vmm, &project);
tasks.spawn(self.check_instance(opctx, client, target, vmm));
}
}

// Now, wait for the check results to come back.
let mut total: usize = 0;
let mut update_sagas_queued: usize = 0;
let mut instance_states: BTreeMap<String, usize> =
BTreeMap::new();
let mut check_failures: BTreeMap<String, usize> =
BTreeMap::new();
let mut check_errors: BTreeMap<String, usize> = BTreeMap::new();
while let Some(result) = tasks.join_next().await {
total += 1;
let check = result.expect(
"a `JoinError` is returned if a spawned task \
panics, or if the task is aborted. we never abort \
tasks on this `JoinSet`, and nexus is compiled with \
`panic=\"abort\"`, so neither of these cases should \
ever occur",
);
match check.outcome {
CheckOutcome::Success(state) => {
*instance_states
.entry(state.to_string())
.or_default() += 1;
// Now, wait for the check results to come back.
while let Some(result) = tasks.join_next().await {
total += 1;
let check = result.expect(
"a `JoinError` is returned if a spawned task \
panics, or if the task is aborted. we never abort \
tasks on this `JoinSet`, and nexus is compiled with \
`panic=\"abort\"`, so neither of these cases should \
ever occur",
);
match check.outcome {
CheckOutcome::Success(state) => {
*instance_states
.entry(state.to_string())
.or_default() += 1;
}
CheckOutcome::Failure(reason) => {
*check_failures.entry(reason.as_str().into_owned()).or_default() += 1;
}
CheckOutcome::Unknown => {}
}
CheckOutcome::Failure(reason) => {
*check_failures.entry(reason.as_str().into_owned()).or_default() += 1;
if let Err(ref reason) = check.result {
*check_errors.entry(reason.as_str().into_owned()).or_default() += 1;
}
CheckOutcome::Unknown => {}
}
if let Err(ref reason) = check.result {
*check_errors.entry(reason.as_str().into_owned()).or_default() += 1;
}
if check.update_saga_queued {
update_sagas_queued += 1;
}
self.metrics.lock().unwrap().record_check(check);
if check.update_saga_queued {
update_sagas_queued += 1;
}
self.metrics.lock().unwrap().record_check(check);

}
}

// All requests completed! Prune any old instance metrics for
Expand Down

0 comments on commit ad8548a

Please sign in to comment.