Skip to content

Commit

Permalink
Consolidate DownstairsClient::reinitialize (#1549)
Browse files Browse the repository at this point in the history
Previously, `Downstairs::reinitialize` would do the following:

- Call `DownstairsClient::on_missing`, which updates the client state
- Skip jobs (if necessary)
- Call `DownstairsClient::reinitialize`, which also updates client state

These used to be separate tasks, but they're now in the same place, so
it's silly to have them both.

This PR consolidates all of the logic into
`DownstairsClient::reinitialize`.
  • Loading branch information
mkeeter authored Nov 5, 2024
1 parent 511e7ea commit a9d614a
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 75 deletions.
96 changes: 46 additions & 50 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,42 +518,6 @@ impl DownstairsClient {
info!(self.log, " {} final dependency list {:?}", ds_id, deps);
}

/// When the downstairs is marked as missing, handle its state transition
pub(crate) fn on_missing(&mut self) {
let current = &self.state;
let new_state = match current {
DsState::Active | DsState::Offline => DsState::Offline,

DsState::Faulted
| DsState::LiveRepair
| DsState::LiveRepairReady => DsState::Faulted,

DsState::New
| DsState::Deactivated
| DsState::Reconcile
| DsState::Disconnected
| DsState::WaitQuorum
| DsState::WaitActive
| DsState::Disabled => DsState::Disconnected,

// If we have replaced a downstairs, don't forget that.
DsState::Replacing | DsState::Replaced => DsState::Replaced,

DsState::Migrating => panic!(),
};

if *current != new_state {
info!(
self.log,
"Gone missing, transition from {current:?} to {new_state:?}"
);
}

// Jobs are skipped and replayed in `Downstairs::reinitialize`, which is
// (probably) the caller of this function.
self.state = new_state;
}

/// Checks whether this Downstairs is ready for the upstairs to deactivate
///
/// # Panics
Expand Down Expand Up @@ -584,31 +548,63 @@ impl DownstairsClient {
///
/// # Panics
/// If `self.client_task` is not `None`, or `self.target_addr` is `None`
pub(crate) fn reinitialize(&mut self, auto_promote: bool) {
pub(crate) fn reinitialize(&mut self, up_state: &UpstairsState) {
// Clear this Downstair's repair address, and let the YesItsMe set it.
// This works if this Downstairs is new, reconnecting, or was replaced
// entirely; the repair address could have changed in any of these
// cases.
self.repair_addr = None;
self.needs_replay = false;

if auto_promote {
self.promote_state = Some(PromoteState::Waiting);
} else {
self.promote_state = None;
}
// If the upstairs is already active (or trying to go active), then the
// downstairs should automatically call PromoteToActive when it reaches
// the relevant state.
self.promote_state = match up_state {
UpstairsState::Active | UpstairsState::GoActive(..) => {
Some(PromoteState::Waiting)
}
UpstairsState::Initializing
| UpstairsState::Deactivating { .. } => None,
};

self.negotiation_state = NegotiationState::Start;

// TODO this is an awkward special case!
if self.state == DsState::Disconnected {
info!(self.log, "Disconnected -> New");
self.state = DsState::New;
let current = &self.state;
let new_state = match current {
DsState::Active => Some(DsState::Offline),
DsState::LiveRepair | DsState::LiveRepairReady => {
Some(DsState::Faulted)
}

DsState::Deactivated
| DsState::Reconcile
| DsState::Disconnected
| DsState::WaitQuorum
| DsState::WaitActive
| DsState::Disabled => Some(DsState::New),

// If we have replaced a downstairs, don't forget that.
DsState::Replacing => Some(DsState::Replaced),

// We stay in these states through the task restart
DsState::Offline
| DsState::Faulted
| DsState::New
| DsState::Replaced => None,

DsState::Migrating => panic!(),
};

// Jobs are skipped and replayed in `Downstairs::reinitialize`, which is
// (probably) the caller of this function.
if let Some(new_state) = new_state {
self.checked_state_transition(up_state, new_state);
}

self.connection_id.update();

// Restart with a short delay
self.start_task(true, auto_promote);
// Restart with a short delay, connecting if we're auto-promoting
self.start_task(true, self.promote_state.is_some());
}

/// Sets the `needs_replay` flag
Expand Down Expand Up @@ -1142,7 +1138,8 @@ impl DownstairsClient {
DsState::Active
| DsState::Deactivated
| DsState::Faulted
| DsState::Reconcile => {} // Okay
| DsState::Reconcile
| DsState::Disabled => {} // Okay
_ => {
panic_invalid();
}
Expand Down Expand Up @@ -1858,7 +1855,6 @@ impl DownstairsClient {
String::new()
},
);
self.checked_state_transition(up_state, DsState::New);
if !match_gen {
let gen_error = format!(
"Generation requested:{} found:{}",
Expand Down
18 changes: 4 additions & 14 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,16 +505,8 @@ impl Downstairs {
pub(crate) fn reinitialize(
&mut self,
client_id: ClientId,
auto_promote: bool,
up_state: &UpstairsState,
) {
let prev_state = self.clients[client_id].state();

// If the connection goes down here, we need to know what state we were
// in to decide what state to transition to. The on_missing method will
// do that for us!
self.clients[client_id].on_missing();

// If the IO task stops on its own, then under certain circumstances,
// we want to skip all of its jobs. (If we requested that the IO task
// stop, then whoever made that request is responsible for skipping jobs
Expand All @@ -523,15 +515,13 @@ impl Downstairs {
// Specifically, we want to skip jobs if the only path back online for
// that client goes through live-repair; if that client can come back
// through replay, then the jobs must remain live.
let new_state = self.clients[client_id].state();
if matches!(prev_state, DsState::LiveRepair | DsState::Active)
&& matches!(new_state, DsState::Faulted)
{
if matches!(self.clients[client_id].state(), DsState::LiveRepair) {
self.skip_all_jobs(client_id);
}

// Restart the IO task for that specific client
self.clients[client_id].reinitialize(auto_promote);
// Restart the IO task for that specific client, transitioning to a new
// state.
self.clients[client_id].reinitialize(up_state);

for i in ClientId::iter() {
// Clear per-client delay, because we're starting a new session
Expand Down
12 changes: 1 addition & 11 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2008,17 +2008,7 @@ impl Upstairs {
self.downstairs
.notify_nexus_of_client_task_stopped(client_id, reason);

// If the upstairs is already active (or trying to go active), then the
// downstairs should automatically call PromoteToActive when it reaches
// the relevant state.
let auto_promote = match self.state {
UpstairsState::Active | UpstairsState::GoActive(..) => true,
UpstairsState::Initializing
| UpstairsState::Deactivating { .. } => false,
};

self.downstairs
.reinitialize(client_id, auto_promote, &self.state);
self.downstairs.reinitialize(client_id, &self.state);
}

/// Sets both guest and per-client backpressure
Expand Down

0 comments on commit a9d614a

Please sign in to comment.