Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[nexus] Add instance_watcher concurrency limit #6527

Merged
merged 7 commits into from
Sep 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 100 additions & 71 deletions nexus/src/app/background/tasks/instance_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,27 @@ pub(crate) struct InstanceWatcher {
id: WatcherIdentity,
}

const MAX_SLED_AGENTS: NonZeroU32 = unsafe {
// Safety: last time I checked, 100 was greater than zero.
NonZeroU32::new_unchecked(100)
/// Determines how many instance checks and their subsequent update sagas (if
/// the instance's state has changed) can execute concurrently. If this
/// number is too high, there's risk that this task could starve other Nexus
/// activities or overload the database or overload one or more sled agents.
///
/// The only consequence of the number being too low is that it may take longer
/// for the system to notice an instance's VMM state requires an instance state
/// transition, which translates to increased latency between when events (like
/// a VMM crash or a completed migration) occur, and when the necessary actions
/// (like releasing unused resource allocations or allowing an instance to
/// be restarted or deleted) are performed. However, in the happy path, instance
/// update sagas execute immediately upon receipt of a VMM state update pushed
/// by a sled-agent in `cpapi_instances_put`. Therefore, discovering a needed
/// state transition a health check only occurs if the pushed state update was
/// not handled correctly, or if the sled-agent itself has restarted and
/// forgotten about the instances it was supposed to know about. For now, we
/// tune this pretty low, choosing safety over low recovery latency for these
/// relatively rare events.
const MAX_CONCURRENT_CHECKS: NonZeroU32 = unsafe {
// Safety: last time I checked, 16 was greater than zero.
NonZeroU32::new_unchecked(16)
};

impl InstanceWatcher {
Expand All @@ -66,7 +84,7 @@ impl InstanceWatcher {
fn check_instance(
&self,
opctx: &OpContext,
client: &SledAgentClient,
client: SledAgentClient,
target: VirtualMachine,
vmm: Vmm,
) -> impl Future<Output = Check> + Send + 'static {
Expand All @@ -83,18 +101,17 @@ impl InstanceWatcher {
meta.insert("vmm_id".to_string(), vmm_id.to_string());
opctx.child(meta)
};
let client = client.clone();

async move {
slog::trace!(
opctx.log, "checking on VMM"; "propolis_id" => %vmm_id
);

slog::trace!(opctx.log, "checking on instance...");
let rsp = client
.vmm_get_state(&vmm_id)
.await
.map_err(SledAgentInstanceError);

let mut check = Check {
target,
outcome: Default::default(),
Expand Down Expand Up @@ -377,15 +394,36 @@ impl BackgroundTask for InstanceWatcher {
) -> BoxFuture<'a, serde_json::Value> {
async {
let mut tasks = tokio::task::JoinSet::new();
let mut paginator = Paginator::new(MAX_SLED_AGENTS);
let mk_client = |sled: &Sled| {
nexus_networking::sled_client_from_address(
sled.id(),
sled.address(),
&opctx.log,
)
};
// We're using the size of the database query page as a simple
// mechanism for limiting the number of concurrent health checks: if
// we only query for `MAX_CONCURRENT_CHECKS` records at a time, and
// we wait until all spawned health checks have completed before
// reading the next batch, it follows that there are only ever
// `MAX_CONCURRENT_CHECKS` checks in flight at a time.
//
// This does mean that, unlike using a counting semaphore or
// similar, we may have fewer than the limit health checks running
// in parallel as we will not query for a new batch until *all*
// checks for the current batch have completed. If that becomes an
// issue, we could implement a more sophisticated
// concurrency-limiting scheme later.
let mut paginator = Paginator::new(MAX_CONCURRENT_CHECKS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like somewhere there ought to be a comment explaining that the batch size is used as a coarse way to limit concurrency. Maybe here?

I just noticed while looking at this that this isn't the same as targeting a concurrency level of MAX_CONCURRENT_CHECKS because we'll start 16, then wait until they finish, then start another 16, then wait, ... etc. So the average concurrency will be somewhat less. This is fine but I hadn't appreciated it when we talked about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. That's an advantage of using the Tokio semaphore for the concurrency limit rather than the DB query...perhaps we should rework this to use a semaphore and make the query batch size smaller...


let mut total: usize = 0;
let mut update_sagas_queued: usize = 0;
let mut instance_states: BTreeMap<String, usize> =
BTreeMap::new();
let mut check_failures: BTreeMap<String, usize> =
BTreeMap::new();
let mut check_errors: BTreeMap<String, usize> = BTreeMap::new();

// A `reqwest` client is a reference-counted handle to a connection
// pool that can be reused by multiple requests. Making a new client
// is fairly expensive, but cloning one is cheap, and cloning it
// allows reusing pooled TCP connections. Therefore, we will order
// the database query by sled ID, and reuse the same sled-agent
// client as long as we are talking to the same sled.
hawkw marked this conversation as resolved.
Show resolved Hide resolved
let mut curr_client: Option<(Uuid, SledAgentClient)> = None;
while let Some(p) = paginator.next() {
let maybe_batch = self
.datastore
Expand All @@ -406,69 +444,60 @@ impl BackgroundTask for InstanceWatcher {
};
paginator = p.found_batch(&batch, &|(sled, _, _, _)| sled.id());

// When we iterate over the batch of sled instances, we pop the
// first sled from the batch before looping over the rest, to
// insure that the initial sled-agent client is created first,
// as we need the address of the first sled to construct it.
// We could, alternatively, make the sled-agent client an
// `Option`, but then every subsequent iteration would have to
// handle the case where it's `None`, and I thought this was a
// bit neater...
let mut batch = batch.into_iter();
if let Some((mut curr_sled, instance, vmm, project)) = batch.next() {
let mut client = mk_client(&curr_sled);
let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project);
tasks.spawn(self.check_instance(opctx, &client, target, vmm));

for (sled, instance, vmm, project) in batch {
// We're now talking to a new sled agent; update the client.
if sled.id() != curr_sled.id() {
client = mk_client(&sled);
curr_sled = sled;
}
// Spawn a task to check on each sled in the batch.
for (sled, instance, vmm, project) in batch {
let client = match curr_client {
// If we are still talking to the same sled, reuse the
// existing client and its connection pool.
Some((sled_id, ref client)) if sled_id == sled.id() => client.clone(),
// Otherwise, if we've moved on to a new sled, refresh
// the client.
ref mut curr => {
let client = nexus_networking::sled_client_from_address(
sled.id(),
sled.address(),
&opctx.log,
);
*curr = Some((sled.id(), client.clone()));
client
},
};

let target = VirtualMachine::new(self.id, &curr_sled, &instance, &vmm, &project);
tasks.spawn(self.check_instance(opctx, &client, target, vmm));
}

let target = VirtualMachine::new(self.id, &sled, &instance, &vmm, &project);
tasks.spawn(self.check_instance(opctx, client, target, vmm));
}
}

// Now, wait for the check results to come back.
let mut total: usize = 0;
let mut update_sagas_queued: usize = 0;
let mut instance_states: BTreeMap<String, usize> =
BTreeMap::new();
let mut check_failures: BTreeMap<String, usize> =
BTreeMap::new();
let mut check_errors: BTreeMap<String, usize> = BTreeMap::new();
while let Some(result) = tasks.join_next().await {
total += 1;
let check = result.expect(
"a `JoinError` is returned if a spawned task \
panics, or if the task is aborted. we never abort \
tasks on this `JoinSet`, and nexus is compiled with \
`panic=\"abort\"`, so neither of these cases should \
ever occur",
);
match check.outcome {
CheckOutcome::Success(state) => {
*instance_states
.entry(state.to_string())
.or_default() += 1;
// Now, wait for the check results to come back.
while let Some(result) = tasks.join_next().await {
total += 1;
let check = result.expect(
"a `JoinError` is returned if a spawned task \
panics, or if the task is aborted. we never abort \
tasks on this `JoinSet`, and nexus is compiled with \
`panic=\"abort\"`, so neither of these cases should \
ever occur",
);
match check.outcome {
CheckOutcome::Success(state) => {
*instance_states
.entry(state.to_string())
.or_default() += 1;
}
CheckOutcome::Failure(reason) => {
*check_failures.entry(reason.as_str().into_owned()).or_default() += 1;
}
CheckOutcome::Unknown => {}
}
CheckOutcome::Failure(reason) => {
*check_failures.entry(reason.as_str().into_owned()).or_default() += 1;
if let Err(ref reason) = check.result {
*check_errors.entry(reason.as_str().into_owned()).or_default() += 1;
}
CheckOutcome::Unknown => {}
}
if let Err(ref reason) = check.result {
*check_errors.entry(reason.as_str().into_owned()).or_default() += 1;
}
if check.update_saga_queued {
update_sagas_queued += 1;
}
self.metrics.lock().unwrap().record_check(check);
if check.update_saga_queued {
update_sagas_queued += 1;
}
self.metrics.lock().unwrap().record_check(check);

}
}

// All requests completed! Prune any old instance metrics for
Expand Down
Loading