Skip to content

Commit

Permalink
Merge branch 'main' into alan/agent-needs-workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Hanson committed Dec 3, 2024
2 parents 8bb8a81 + 5a41b82 commit 3fed186
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 14 deletions.
2 changes: 1 addition & 1 deletion downstairs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.0.1"
authors = ["Joshua M. Clulow <[email protected]>", "Alan Hanson <[email protected]"]
license = "MPL-2.0"
edition = "2021"
rust-version = "1.70"
rust-version = "1.81"

[dependencies]
anyhow.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ crucible-client-types.workspace = true
crucible-downstairs = { workspace = true, features = ["integration-tests"] }
crucible-pantry-client.workspace = true
crucible-pantry.workspace = true
crucible.workspace = true
crucible = { workspace = true, features = ["integration-tests"] }
dropshot.workspace = true
futures-core.workspace = true
futures.workspace = true
Expand Down
37 changes: 37 additions & 0 deletions integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5809,4 +5809,41 @@ mod test {
}
));
}

#[tokio::test]
async fn test_auto_flush_deactivate() {
let log = csl();
let child = TestDownstairsSet::small(false).await.unwrap();
let vcr = VolumeConstructionRequest::Volume {
id: Uuid::new_v4(),
block_size: 512,
sub_volumes: vec![VolumeConstructionRequest::Region {
block_size: 512,
blocks_per_extent: child.blocks_per_extent(),
extent_count: child.extent_count(),
opts: child.opts(),
gen: 2,
}],
read_only_parent: None,
};

let volume = Volume::construct(vcr, None, log.clone()).await.unwrap();
volume.activate().await.unwrap();

// Send exactly IO_CACHED_MAX_JOBS to force a Barrier to be sent. The
// barrier empties out the active list, so deactivation may proceed.
for _ in 0..crucible::testing::IO_CACHED_MAX_JOBS {
let mut buf = Buffer::new(2, 512);
volume.read(BlockIndex(0), &mut buf).await.unwrap();
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// At this point, need_flush is set, but there are no active jobs.
// Deactivation must be aware of the need_flush flag; otherwise, the
// Upstairs will attempt to submit that flush after deactivation.
volume.deactivate().await.unwrap();

// Make sure everything worked
volume.activate().await.unwrap();
}
}
30 changes: 23 additions & 7 deletions tools/dtrace/single_up_info.d
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
dtrace:::BEGIN
{
show = 21;
/*
* We have to init something for last_id so we can use the
* default values for all the session IDs that we don't yet have.
*/
last_id["string"] = (int64_t)1;
}

/*
Expand All @@ -20,7 +25,7 @@ tick-1s
{
printf("%8s ", "SESSION");
printf("%17s %17s %17s", "DS STATE 0", "DS STATE 1", "DS STATE 2");
printf(" %5s %5s %9s %5s", "UPW", "DSW", "NEXT_JOB", "BAKPR");
printf(" %5s %5s %8s %5s", "UPW", "DSW", "NEXT_JOB", "DELTA");
printf(" %10s", "WRITE_BO");
printf(" %5s %5s %5s", "IP0", "IP1", "IP2");
printf(" %5s %5s %5s", "D0", "D1", "D2");
Expand All @@ -35,16 +40,27 @@ crucible_upstairs*:::up-status
/pid==$1/
{
show = show + 1;
session_id = json(copyinstr(arg1), "ok.session_id");
/*
* All these local variables require the "this->" so the probe firing
* from different sessions don't collide with each other.
*/
this->full_session_id = json(copyinstr(arg1), "ok.session_id");
this->session_id = substr(this->full_session_id, 0, 8);

this->next_id_str = json(copyinstr(arg1), "ok.next_job_id");
this->next_id_value = strtoll(this->next_id_str);

this->delta = this->next_id_value - last_id[this->session_id];

/* Set this for the next loop */
last_id[this->session_id] = this->next_id_value;
/*
* I'm not very happy about this, but if we don't print it all on one
* line, then multiple sessions will clobber each others output.
*/
printf("%8s %17s %17s %17s %5s %5s %5s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n",

substr(session_id, 0, 8),
printf("%8s %17s %17s %17s %5s %5s %8s %5d %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n",

this->session_id,
/*
* State for the three downstairs
*/
Expand All @@ -59,9 +75,10 @@ crucible_upstairs*:::up-status
json(copyinstr(arg1), "ok.ds_count"),

/*
* Job ID delta and backpressure
* Job ID, job delta and write bytes outstanding
*/
json(copyinstr(arg1), "ok.next_job_id"),
this->delta,
json(copyinstr(arg1), "ok.write_bytes_out"),

/*
Expand Down Expand Up @@ -93,5 +110,4 @@ crucible_upstairs*:::up-status
json(copyinstr(arg1), "ok.ds_extents_confirmed[0]"),
json(copyinstr(arg1), "ok.ds_extents_confirmed[1]"),
json(copyinstr(arg1), "ok.ds_extents_confirmed[2]"));

}
4 changes: 2 additions & 2 deletions tools/dtrace/sled_upstairs_info.d
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ tick-1s
crucible_upstairs*:::up-status
{
show = show + 1;
session_id = json(copyinstr(arg1), "ok.session_id");
this->session_id = json(copyinstr(arg1), "ok.session_id");

/*
* I'm not very happy about this very long muli-line printf, but if
Expand All @@ -48,7 +48,7 @@ crucible_upstairs*:::up-status
*/
printf("%5d %8s %17s %17s %17s %5s %5s %9s %10s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s %5s\n",
pid,
substr(session_id, 0, 8),
substr(this->session_id, 0, 8),

/* State for the three downstairs */
json(copyinstr(arg1), "ok.ds_state[0]"),
Expand Down
1 change: 0 additions & 1 deletion tools/dtrace/upstairs_info.d
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
dtrace:::BEGIN
{
show = 21;
last[pid] = 0;
}

/*
Expand Down
2 changes: 2 additions & 0 deletions upstairs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.0.1"
authors = ["Joshua M. Clulow <[email protected]>", "Alan Hanson <[email protected]"]
license = "MPL-2.0"
edition = "2021"
rust-version = "1.81"

[lib]
name = "crucible"
Expand All @@ -12,6 +13,7 @@ path = "src/lib.rs"
[features]
asm = ["usdt/asm"]
notify-nexus = ["nexus-client", "internal-dns", "progenitor-client", "http", "omicron-uuid-kinds"]
integration-tests = []

[dependencies]
anyhow.workspace = true
Expand Down
9 changes: 8 additions & 1 deletion upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,14 @@ impl Downstairs {
// Specifically, we want to skip jobs if the only path back online for
// that client goes through live-repair; if that client can come back
// through replay, then the jobs must remain live.
if matches!(self.clients[client_id].state(), DsState::LiveRepair) {
let client_state = self.clients[client_id].state();
if matches!(
client_state,
DsState::LiveRepair | DsState::LiveRepairReady
) || matches!(
client_state,
DsState::Active | DsState::Offline if !self.can_replay
) {
self.skip_all_jobs(client_id);
}

Expand Down
6 changes: 6 additions & 0 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ const IO_CACHED_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB
/// them comes back.
const IO_CACHED_MAX_JOBS: u64 = 10000;

// Re-exports for unit testing
#[cfg(feature = "integration-tests")]
pub mod testing {
pub const IO_CACHED_MAX_JOBS: u64 = super::IO_CACHED_MAX_JOBS;
}

/// The BlockIO trait behaves like a physical NVMe disk (or a virtio virtual
/// disk): there is no contract about what order operations that are submitted
/// between flushes are performed in.
Expand Down
3 changes: 2 additions & 1 deletion upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,7 @@ impl Upstairs {
Some(Instant::now() + REPAIR_CHECK_INTERVAL);
} else {
// We started the repair in the call to start_live_repair above
self.repair_check_deadline = None;
}
}

Expand Down Expand Up @@ -1278,7 +1279,7 @@ impl Upstairs {
}
UpstairsState::Active => (),
}
if !self.downstairs.can_deactivate_immediately() {
if self.need_flush || !self.downstairs.can_deactivate_immediately() {
debug!(self.log, "not ready to deactivate; submitting final flush");
let io_guard = self.try_acquire_io(0);
self.submit_flush(None, None, io_guard);
Expand Down

0 comments on commit 3fed186

Please sign in to comment.