Skip to content

Commit

Permalink
Add IOop::Barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Oct 11, 2024
1 parent 158e3fd commit a2f5901
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 6 deletions.
50 changes: 49 additions & 1 deletion downstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ enum IOop {
snapshot_details: Option<SnapshotDetails>,
extent_limit: Option<ExtentId>,
},
Barrier {
dependencies: Vec<JobId>, // Jobs that must finish before this
},
/*
* These operations are for repairing a bad downstairs
*/
Expand Down Expand Up @@ -114,6 +117,7 @@ impl IOop {
match &self {
IOop::Write { dependencies, .. }
| IOop::Flush { dependencies, .. }
| IOop::Barrier { dependencies, .. }
| IOop::Read { dependencies, .. }
| IOop::WriteUnwritten { dependencies, .. }
| IOop::ExtentClose { dependencies, .. }
Expand Down Expand Up @@ -687,6 +691,9 @@ pub fn show_work(ds: &mut Downstairs) {
IOop::Read { dependencies, .. } => ("Read", dependencies),
IOop::Write { dependencies, .. } => ("Write", dependencies),
IOop::Flush { dependencies, .. } => ("Flush", dependencies),
IOop::Barrier { dependencies, .. } => {
("Barrier", dependencies)
}
IOop::WriteUnwritten { dependencies, .. } => {
("WriteU", dependencies)
}
Expand Down Expand Up @@ -721,6 +728,7 @@ pub mod cdt {
fn submit__writeunwritten__start(_: u64) {}
fn submit__write__start(_: u64) {}
fn submit__flush__start(_: u64) {}
fn submit__barrier__start(_: u64) {}
fn submit__el__close__start(_: u64) {}
fn submit__el__flush__close__start(_: u64) {}
fn submit__el__repair__start(_: u64) {}
Expand Down Expand Up @@ -977,6 +985,11 @@ impl ActiveConnection {
session_id,
..
}
| Message::Barrier {
upstairs_id,
session_id,
..
}
| Message::ReadRequest {
upstairs_id,
session_id,
Expand Down Expand Up @@ -1146,6 +1159,25 @@ impl ActiveConnection {
)
.await?
}
Message::Barrier {
job_id,
dependencies,
..
} => {
cdt::submit__barrier__start!(|| job_id.0);

let new_barrier = IOop::Barrier { dependencies };

self.do_work_if_ready(
job_id,
new_barrier,
flags,
reqwest_client,
dss,
region,
)
.await?
}
Message::WriteUnwritten { header, data } => {
cdt::submit__writeunwritten__start!(|| header.job_id.0);
let writes = RegionWrite::new(
Expand Down Expand Up @@ -1719,6 +1751,18 @@ impl ActiveConnection {
result,
}
}
IOop::Barrier { dependencies } => {
debug!(self.log, "Barrier :{job_id} deps:{dependencies:?}",);

// No work is actually done here; the barrier just lets us drop
// older dependencies (because it waits for all of them).
Message::BarrierAck {
upstairs_id: upstairs_connection.upstairs_id,
session_id: upstairs_connection.session_id,
job_id,
result: Ok(()),
}
}
IOop::ExtentClose {
dependencies,
extent,
Expand Down Expand Up @@ -3328,6 +3372,7 @@ impl Work {
IOop::Write { .. } => "Write",
IOop::WriteUnwritten { .. } => "WriteUnwritten",
IOop::Flush { .. } => "Flush",
IOop::Barrier { .. } => "Barrier",
IOop::Read { .. } => "Read",
IOop::ExtentClose { .. } => "ECLose",
IOop::ExtentFlushClose { .. } => "EFlushCLose",
Expand Down Expand Up @@ -3674,7 +3719,10 @@ mod test {
.iter()
.all(|dep| work.completed.is_complete(*dep)));

if matches!(job, IOop::Flush { .. }) {
// Flushes and barriers both guarantee that no future jobs will depend
// on jobs that preceded them, so we reset the completed jobs list to
// only include their job id.
if matches!(job, IOop::Flush { .. } | IOop::Barrier { .. }) {
work.completed.reset(ds_id);
} else {
work.completed.push(ds_id);
Expand Down
21 changes: 19 additions & 2 deletions protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ pub struct SnapshotDetails {
#[repr(u32)]
#[derive(IntoPrimitive)]
pub enum MessageVersion {
/// Add `Barrier` and `BarrierAck`
V12 = 12,

/// Use `ReadBlockContext` instead of `Option<BlockContext>`
V11 = 11,

Expand Down Expand Up @@ -207,7 +210,7 @@ pub enum MessageVersion {
}
impl MessageVersion {
pub const fn current() -> Self {
Self::V11
Self::V12
}
}

Expand All @@ -216,7 +219,7 @@ impl MessageVersion {
* This, along with the MessageVersion enum above should be updated whenever
* changes are made to the Message enum below.
*/
pub const CRUCIBLE_MESSAGE_VERSION: u32 = 11;
pub const CRUCIBLE_MESSAGE_VERSION: u32 = MessageVersion::current() as u32;

/*
* If you add or change the Message enum, you must also increment the
Expand Down Expand Up @@ -506,6 +509,18 @@ pub enum Message {
job_id: JobId,
result: Result<(), CrucibleError>,
},
Barrier {
upstairs_id: Uuid,
session_id: Uuid,
job_id: JobId,
dependencies: Vec<JobId>,
},
BarrierAck {
upstairs_id: Uuid,
session_id: Uuid,
job_id: JobId,
result: Result<(), CrucibleError>,
},

ReadRequest {
upstairs_id: Uuid,
Expand Down Expand Up @@ -621,6 +636,7 @@ impl Message {
| Message::ExtentLiveReopen { .. }
| Message::ExtentLiveNoOp { .. }
| Message::Flush { .. }
| Message::Barrier { .. }
| Message::ReadRequest { .. }
| Message::WriteUnwritten { .. }
| Message::Unknown(..) => None,
Expand All @@ -634,6 +650,7 @@ impl Message {
| Message::ExtentLiveAckId { result, .. }
| Message::WriteAck { result, .. }
| Message::FlushAck { result, .. }
| Message::BarrierAck { result, .. }
| Message::WriteUnwrittenAck { result, .. } => {
result.as_ref().err()
}
Expand Down
1 change: 1 addition & 0 deletions upstairs/src/active_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl ActiveJobs {
// tracker have been recorded.
match &io.work {
IOop::Flush { .. }
| IOop::Barrier { .. }
| IOop::Write { .. }
| IOop::WriteUnwritten { .. }
| IOop::Read { .. }
Expand Down
18 changes: 16 additions & 2 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ impl DownstairsClient {
IOop::Write { dependencies, .. }
| IOop::WriteUnwritten { dependencies, .. }
| IOop::Flush { dependencies, .. }
| IOop::Barrier { dependencies, .. }
| IOop::Read { dependencies, .. }
| IOop::ExtentFlushClose { dependencies, .. }
| IOop::ExtentLiveRepair { dependencies, .. }
Expand Down Expand Up @@ -1300,7 +1301,8 @@ impl DownstairsClient {
// XXX: Errors should be reported to nexus
IOop::Write { .. }
| IOop::WriteUnwritten { .. }
| IOop::Flush { .. } => {
| IOop::Flush { .. }
| IOop::Barrier { .. } => {
self.stats.downstairs_errors += 1;
}

Expand Down Expand Up @@ -1401,7 +1403,9 @@ impl DownstairsClient {
* as those jobs should never be acked before all three
* are done.
*/
IOop::Write { .. } | IOop::WriteUnwritten { .. } => {}
IOop::Write { .. }
| IOop::WriteUnwritten { .. }
| IOop::Barrier { .. } => {}
IOop::ExtentFlushClose { .. }
| IOop::ExtentLiveRepair { .. }
| IOop::ExtentLiveReopen { .. }
Expand Down Expand Up @@ -1508,6 +1512,16 @@ impl DownstairsClient {
}
self.last_flush = ds_id;
}
IOop::Barrier { .. } => {
assert!(read_data.blocks.is_empty());
assert!(read_data.data.is_empty());
assert!(extent_info.is_none());

if jobs_completed_ok == 2 {
ackable = true;
cdt::up__to__ds__barrier__done!(|| ds_id.0);
}
}
IOop::ExtentFlushClose { .. } => {
assert!(read_data.blocks.is_empty());
assert!(read_data.data.is_empty());
Expand Down
47 changes: 47 additions & 0 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ impl Downstairs {
cdt::gw__flush__done!(|| (ds_id.0));
stats.add_flush();
}
IOop::Barrier { .. } => {
cdt::gw__barrier__done!(|| (ds_id.0));
stats.add_barrier();
}
IOop::ExtentFlushClose { extent, .. } => {
cdt::gw__close__done!(|| (ds_id.0, extent.0));
stats.add_flush_close();
Expand Down Expand Up @@ -1916,6 +1920,20 @@ impl Downstairs {
next_id
}

pub(crate) fn submit_barrier(&mut self) -> JobId {
let next_id = self.next_id();
cdt::gw__barrier__start!(|| (next_id.0));

// A barrier has the same deps as a flush (namely, it depends on all
// previous jobs and acts as the only dependency for all subsequent
// jobs)
let dependencies = self.ds_active.deps_for_flush(next_id);
debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}");

self.enqueue(next_id, IOop::Barrier { dependencies }, ClientMap::new());
next_id
}

/// Reserves repair IDs if impacted blocks overlap our extent under repair
fn check_repair_ids_for_range(&mut self, impacted_blocks: ImpactedBlocks) {
let Some(eur) = self.get_extent_under_repair() else {
Expand Down Expand Up @@ -2210,6 +2228,15 @@ impl Downstairs {
extent_limit,
}
}
IOop::Barrier { dependencies } => {
cdt::ds__barrier__client__start!(|| (ds_id.0, client_id.get()));
Message::Barrier {
upstairs_id: self.cfg.upstairs_id,
session_id: self.cfg.session_id,
job_id: ds_id,
dependencies,
}
}
IOop::Read {
dependencies,
start_eid,
Expand Down Expand Up @@ -2688,6 +2715,10 @@ impl Downstairs {
let job_type = "Flush".to_string();
(job_type, 0)
}
IOop::Barrier { .. } => {
let job_type = "Barrier".to_string();
(job_type, 0)
}
IOop::ExtentFlushClose { extent, .. } => {
let job_type = "FClose".to_string();
(job_type, extent.0 as usize)
Expand Down Expand Up @@ -2817,6 +2848,21 @@ impl Downstairs {
None,
)
}
Message::BarrierAck {
upstairs_id,
session_id,
job_id,
result,
} => {
cdt::ds__barrier__client__done!(|| (job_id.0, client_id.get()));
(
upstairs_id,
session_id,
job_id,
result.map(|_| Default::default()),
None,
)
}
Message::ReadResponse { header, data } => {
cdt::ds__read__client__done!(|| (
header.job_id.0,
Expand Down Expand Up @@ -3180,6 +3226,7 @@ impl Downstairs {
job.work,
IOop::Write { .. }
| IOop::Flush { .. }
| IOop::Barrier { .. }
| IOop::WriteUnwritten { .. }
| IOop::ExtentFlushClose { .. }
| IOop::ExtentLiveRepair { .. }
Expand Down
Loading

0 comments on commit a2f5901

Please sign in to comment.