Skip to content

Commit

Permalink
Remove IOState::New; add separate ReconcileIOState
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Sep 24, 2024
1 parent 6baa154 commit 469b798
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 75 deletions.
51 changes: 25 additions & 26 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use crate::{
live_repair::ExtentInfo, upstairs::UpstairsConfig, upstairs::UpstairsState,
ClientIOStateCount, ClientId, CrucibleDecoder, CrucibleError, DownstairsIO,
DsState, EncryptionContext, IOState, IOop, JobId, Message, RawReadResponse,
ReconcileIO, RegionDefinitionStatus, RegionMetadata, Validation,
ReconcileIO, ReconcileIOState, RegionDefinitionStatus, RegionMetadata,
Validation,
};
use crucible_common::{
deadline_secs, verbose_timeout, x509::TLSContext, ExtentId,
Expand Down Expand Up @@ -344,12 +345,10 @@ impl DownstairsClient {
job: &mut DownstairsIO,
new_state: IOState,
) -> IOState {
let is_running =
matches!(new_state, IOState::New | IOState::InProgress);
let is_running = matches!(new_state, IOState::InProgress);
self.io_state_count.incr(&new_state);
let old_state = job.state.insert(self.client_id, new_state);
let was_running =
matches!(old_state, IOState::New | IOState::InProgress);
let was_running = matches!(old_state, IOState::InProgress);
self.io_state_count.decr(&old_state);

// Update our bytes-in-flight counter
Expand Down Expand Up @@ -414,7 +413,7 @@ impl DownstairsClient {
job
}

/// Ensures that the given job is in the job queue and in `IOState::New`
/// Ensures that the given job is in the job queue
pub(crate) fn replay_job(&mut self, job: &mut DownstairsIO) {
// If the job is InProgress, then we can just go back to New and no
// extra work is required.
Expand All @@ -424,13 +423,8 @@ impl DownstairsClient {
panic!("[{}] This job was not acked: {:?}", self.client_id, job);
}

let old_state = self.set_job_state(job, IOState::InProgress);
self.set_job_state(job, IOState::InProgress);
job.replay = true;
assert_ne!(
old_state,
IOState::New,
"IOState::New is transitory and should not be seen"
);
}

/// Sets this job as skipped and moves it to `skipped_jobs`
Expand All @@ -439,7 +433,7 @@ impl DownstairsClient {
/// If the job is not new or in-progress
pub(crate) fn skip_job(&mut self, job: &mut DownstairsIO) {
let prev_state = self.set_job_state(job, IOState::Skipped);
assert!(matches!(prev_state, IOState::New | IOState::InProgress));
assert!(matches!(prev_state, IOState::InProgress));
self.skipped_jobs.insert(job.ds_id);
}

Expand Down Expand Up @@ -2155,8 +2149,10 @@ impl DownstairsClient {
if self.state != DsState::Reconcile {
panic!("[{}] should still be in reconcile", self.client_id);
}
let prev_state = job.state.insert(self.client_id, IOState::InProgress);
assert_eq!(prev_state, IOState::New);
let prev_state = job
.state
.insert(self.client_id, ReconcileIOState::InProgress);
assert_eq!(prev_state, ReconcileIOState::New);

// Some reconciliation messages need to be adjusted on a per-client
// basis, e.g. not sending ExtentRepair to clients that aren't being
Expand All @@ -2174,9 +2170,10 @@ impl DownstairsClient {
} else {
// Skip this job for this Downstairs, since only the target
// clients need to do the reconcile.
let prev_state =
job.state.insert(self.client_id, IOState::Skipped);
assert_eq!(prev_state, IOState::InProgress);
let prev_state = job
.state
.insert(self.client_id, ReconcileIOState::Skipped);
assert_eq!(prev_state, ReconcileIOState::InProgress);
debug!(self.log, "no action needed request {repair_id:?}");
}
}
Expand All @@ -2192,9 +2189,10 @@ impl DownstairsClient {
debug!(self.log, "skipping flush request {repair_id:?}");
// Skip this job for this Downstairs, since it's narrowly
// aimed at a different client.
let prev_state =
job.state.insert(self.client_id, IOState::Skipped);
assert_eq!(prev_state, IOState::InProgress);
let prev_state = job
.state
.insert(self.client_id, ReconcileIOState::Skipped);
assert_eq!(prev_state, ReconcileIOState::InProgress);
}
}
Message::ExtentReopen { .. } | Message::ExtentClose { .. } => {
Expand All @@ -2213,12 +2211,13 @@ impl DownstairsClient {
reconcile_id: ReconciliationId,
job: &mut ReconcileIO,
) -> bool {
let old_state = job.state.insert(self.client_id, IOState::Done);
assert_eq!(old_state, IOState::InProgress);
let old_state =
job.state.insert(self.client_id, ReconcileIOState::Done);
assert_eq!(old_state, ReconcileIOState::InProgress);
assert_eq!(job.id, reconcile_id);
job.state
.iter()
.all(|s| matches!(s, IOState::Done | IOState::Skipped))
job.state.iter().all(|s| {
matches!(s, ReconcileIOState::Done | ReconcileIOState::Skipped)
})
}

pub(crate) fn total_live_work(&self) -> usize {
Expand Down
60 changes: 30 additions & 30 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ impl Downstairs {
*/
for (id, job) in &self.ds_active {
let state = &job.state[client_id];
if state == &IOState::New || state == &IOState::InProgress {
if state == &IOState::InProgress {
info!(
self.log,
"[{}] cannot deactivate, job {} in state {:?}",
Expand Down Expand Up @@ -2516,7 +2516,7 @@ impl Downstairs {
self.ds_active.for_each(|ds_id, job| {
let state = &job.state[client_id];

if matches!(state, IOState::InProgress | IOState::New) {
if matches!(state, IOState::InProgress) {
self.clients[client_id].skip_job(job);
number_jobs_skipped += 1;

Expand Down Expand Up @@ -4247,7 +4247,7 @@ pub(crate) mod test {
upstairs::UpstairsState,
ClientId, CrucibleError, DownstairsIO, DsState, ExtentFix, IOState,
IOop, ImpactedAddr, ImpactedBlocks, JobId, RawReadResponse,
ReconcileIO, ReconciliationId, SnapshotDetails,
ReconcileIO, ReconcileIOState, ReconciliationId, SnapshotDetails,
};

use bytes::BytesMut;
Expand Down Expand Up @@ -5860,9 +5860,9 @@ pub(crate) mod test {
panic!("{:?} not ExtentFlush()", m);
}
}
assert_eq!(IOState::New, rio.state[ClientId::new(0)]);
assert_eq!(IOState::New, rio.state[ClientId::new(1)]);
assert_eq!(IOState::New, rio.state[ClientId::new(2)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(0)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(1)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(2)]);

// Second task, close extent
let rio = ds.reconcile_task_list.pop_front().unwrap();
Expand All @@ -5879,9 +5879,9 @@ pub(crate) mod test {
panic!("{:?} not ExtentClose()", m);
}
}
assert_eq!(IOState::New, rio.state[ClientId::new(0)]);
assert_eq!(IOState::New, rio.state[ClientId::new(1)]);
assert_eq!(IOState::New, rio.state[ClientId::new(2)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(0)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(1)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(2)]);

// Third task, repair extent
let rio = ds.reconcile_task_list.pop_front().unwrap();
Expand All @@ -5907,9 +5907,9 @@ pub(crate) mod test {
panic!("{:?} not ExtentRepair", m);
}
}
assert_eq!(IOState::New, rio.state[ClientId::new(0)]);
assert_eq!(IOState::New, rio.state[ClientId::new(1)]);
assert_eq!(IOState::New, rio.state[ClientId::new(2)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(0)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(1)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(2)]);

// Third task, close extent
let rio = ds.reconcile_task_list.pop_front().unwrap();
Expand All @@ -5926,9 +5926,9 @@ pub(crate) mod test {
panic!("{:?} not ExtentClose()", m);
}
}
assert_eq!(IOState::New, rio.state[ClientId::new(0)]);
assert_eq!(IOState::New, rio.state[ClientId::new(1)]);
assert_eq!(IOState::New, rio.state[ClientId::new(2)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(0)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(1)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(2)]);
}

#[test]
Expand Down Expand Up @@ -5979,9 +5979,9 @@ pub(crate) mod test {
panic!("{:?} not ExtentFlush()", m);
}
}
assert_eq!(IOState::New, rio.state[ClientId::new(0)]);
assert_eq!(IOState::New, rio.state[ClientId::new(1)]);
assert_eq!(IOState::New, rio.state[ClientId::new(2)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(0)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(1)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(2)]);

// Second task, close extent
let rio = ds.reconcile_task_list.pop_front().unwrap();
Expand All @@ -5998,9 +5998,9 @@ pub(crate) mod test {
panic!("{:?} not ExtentClose()", m);
}
}
assert_eq!(IOState::New, rio.state[ClientId::new(0)]);
assert_eq!(IOState::New, rio.state[ClientId::new(1)]);
assert_eq!(IOState::New, rio.state[ClientId::new(2)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(0)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(1)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(2)]);

// Third task, repair extent
let rio = ds.reconcile_task_list.pop_front().unwrap();
Expand All @@ -6026,9 +6026,9 @@ pub(crate) mod test {
panic!("{:?} not ExtentRepair", m);
}
}
assert_eq!(IOState::New, rio.state[ClientId::new(0)]);
assert_eq!(IOState::New, rio.state[ClientId::new(1)]);
assert_eq!(IOState::New, rio.state[ClientId::new(2)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(0)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(1)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(2)]);

// Third task, close extent
let rio = ds.reconcile_task_list.pop_front().unwrap();
Expand All @@ -6045,9 +6045,9 @@ pub(crate) mod test {
panic!("{:?} not ExtentClose()", m);
}
}
assert_eq!(IOState::New, rio.state[ClientId::new(0)]);
assert_eq!(IOState::New, rio.state[ClientId::new(1)]);
assert_eq!(IOState::New, rio.state[ClientId::new(2)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(0)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(1)]);
assert_eq!(ReconcileIOState::New, rio.state[ClientId::new(2)]);
}

// Tests for reconciliation
Expand Down Expand Up @@ -6316,9 +6316,9 @@ pub(crate) mod test {
let Some(job) = &ds.reconcile_current_work else {
panic!("failed to find current work");
};
assert_eq!(job.state[ClientId::new(0)], IOState::Skipped);
assert_eq!(job.state[ClientId::new(1)], IOState::InProgress);
assert_eq!(job.state[ClientId::new(2)], IOState::InProgress);
assert_eq!(job.state[ClientId::new(0)], ReconcileIOState::Skipped);
assert_eq!(job.state[ClientId::new(1)], ReconcileIOState::InProgress);
assert_eq!(job.state[ClientId::new(2)], ReconcileIOState::InProgress);

let msg = Message::RepairAckId { repair_id: rep_id };
assert!(!ds.on_reconciliation_ack(
Expand Down
41 changes: 22 additions & 19 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ impl DownstairsIO {

for state in self.state.iter() {
match state {
IOState::New | IOState::InProgress => wc.active += 1,
IOState::InProgress => wc.active += 1,
IOState::Error(_) => wc.error += 1,
IOState::Skipped => wc.skipped += 1,
IOState::Done => wc.done += 1,
Expand Down Expand Up @@ -1102,15 +1102,15 @@ struct WorkSummary {
struct ReconcileIO {
id: ReconciliationId,
op: Message,
state: ClientData<IOState>,
state: ClientData<ReconcileIOState>,
}

impl ReconcileIO {
fn new(id: ReconciliationId, op: Message) -> ReconcileIO {
ReconcileIO {
id,
op,
state: ClientData::new(IOState::New),
state: ClientData::new(ReconcileIOState::New),
}
}
}
Expand Down Expand Up @@ -1368,14 +1368,9 @@ impl IOop {
}
}

/*
* The various states an IO can be in when it is on the work hashmap.
* There is a state that is unique to each downstairs task we have and
* they operate independent of each other.
*/
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
pub enum IOState {
pub enum ReconcileIOState {
// A new IO request.
New,
// The request has been sent to this tasks downstairs.
Expand All @@ -1385,17 +1380,31 @@ pub enum IOState {
// The IO request should be ignored. Ex: we could be doing recovery and
// we only want a specific downstairs to do that work.
Skipped,
// The IO returned an error.
}

/*
* The various states an IO can be in when it is on the work hashmap.
* There is a state that is unique to each downstairs task we have and
* they operate independent of each other.
*/
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
pub enum IOState {
/// The request has been sent to this tasks downstairs.
InProgress,
/// The successful response came back from downstairs.
Done,
/// The IO request should be ignored. Ex: we could be doing recovery and
/// we only want a specific downstairs to do that work.
Skipped,
/// The IO returned an error.
Error(CrucibleError),
}

impl fmt::Display for IOState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Make sure to right-align output on 4 characters
match self {
IOState::New => {
write!(f, " New")
}
IOState::InProgress => {
write!(f, "Sent")
}
Expand Down Expand Up @@ -1442,7 +1451,6 @@ impl ClientIOStateCount {

fn get_mut(&mut self, state: &IOState) -> &mut u32 {
match state {
IOState::New => &mut self.new,
IOState::InProgress => &mut self.in_progress,
IOState::Done => &mut self.done,
IOState::Skipped => &mut self.skipped,
Expand All @@ -1463,7 +1471,6 @@ pub struct IOStateCount {
impl IOStateCount {
fn show_all(&self) {
println!(" STATES DS:0 DS:1 DS:2 TOTAL");
self.show(IOState::New);
self.show(IOState::InProgress);
self.show(IOState::Done);
self.show(IOState::Skipped);
Expand All @@ -1473,7 +1480,6 @@ impl IOStateCount {

fn get(&self, state: &IOState) -> &ClientData<u32> {
match state {
IOState::New => &self.new,
IOState::InProgress => &self.in_progress,
IOState::Done => &self.done,
IOState::Skipped => &self.skipped,
Expand All @@ -1484,9 +1490,6 @@ impl IOStateCount {
fn show(&self, state: IOState) {
let state_stat = self.get(&state);
match state {
IOState::New => {
print!(" New ");
}
IOState::InProgress => {
print!(" Sent ");
}
Expand Down

0 comments on commit 469b798

Please sign in to comment.