From 826e820c25d8fd6a947450993dda0adb08afbb1c Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Thu, 26 Sep 2024 13:52:43 -0700 Subject: [PATCH] More crutest and dsc updates to support Volume layer activities. (#1472) This adds support for crutest to use a provided dsc endpoint to construct a Volume object. These changes should be a fix for both issues: https://github.com/oxidecomputer/crucible/issues/1451 and https://github.com/oxidecomputer/crucible/issues/1457 Moved the existing volume creation steps to a new function, and added another option on how we can create a Volume. The two previous ways of creating a volume are not changed (though I changed a log message and added some warnings). The new code is in taking the dsc provided endpoint and using that to construct a volume. Additional dsc changes were made to help provide Volume info. Renamed things in dsc to better reflect what information they hold. Specifically, update a bunch of region set comments, as dsc just controls crucible-downstairs processes, and does not know which ones are part of what region set. **New dsc commands** * get_ds_uuid: Returns the UUID for the given client ID * all_running: Returns true if all downstairs that dsc knows about are currently in Running state. * get_region_count: Returns the total number of regions that dsc knows about. **New dsc behavior** * dsc will now wait on all downstairs starting before taking any commands. The ability for dsc to answer a request can be used by a test to confirm that all downstairs had started. * Add the ability to supply a dsc endpoint to crutest-cli (fix https://github.com/oxidecomputer/crucible/issues/1459) **Other changes** `tools/test_replay.sh` transitioned to using the new --dsc option, as that test already required a dsc endpoint and was using a hard coded default value for it. `tools/test_restart_repair.sh` was updated to wait for dsc to report that all downstairs are online after a restart. This avoids a race where we told dsc to start, and then start crutest, but the downstairs are not yet online. All the tests that use dsc will eventually transition to using it to construct a Volume, but I'm pushing that work to another PR. --------- Co-authored-by: Alan Hanson --- Cargo.lock | 2 + crutest/src/cli.rs | 13 +- crutest/src/main.rs | 345 +++++++++++++++++++++------------ dsc-client/Cargo.toml | 3 +- dsc/Cargo.toml | 3 +- dsc/src/client.rs | 21 ++ dsc/src/control.rs | 69 ++++++- dsc/src/main.rs | 95 +++++++-- openapi/dsc-control.json | 90 +++++++++ tools/test_fail_live_repair.sh | 3 +- tools/test_replay.sh | 14 +- tools/test_restart_repair.sh | 18 +- 12 files changed, 511 insertions(+), 165 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8c0f1145f..6a9073eb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1750,6 +1750,7 @@ dependencies = [ "statistical", "tempfile", "tokio", + "uuid", ] [[package]] @@ -1764,6 +1765,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "uuid", ] [[package]] diff --git a/crutest/src/cli.rs b/crutest/src/cli.rs index c4209f9ec..dcbf76c20 100644 --- a/crutest/src/cli.rs +++ b/crutest/src/cli.rs @@ -25,8 +25,9 @@ pub struct CliAction { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Parser, PartialEq)] pub enum DscCommand { - /// Connect to the default DSC server (http://127.0.0.1:9998) - Connect, + /// IP:Port for a dsc server + /// #[clap(long, global = true, default_value = "127.0.0.1:9998", action)] + Connect { server: SocketAddr }, /// Disable random stopping of downstairs DisableRandomStop, /// Disable auto restart on the given downstairs client ID @@ -380,7 +381,7 @@ async fn handle_dsc( ) { if let Some(dsc_client) = dsc_client { match dsc_cmd { - DscCommand::Connect => { + DscCommand::Connect { .. } => { println!("Already connected"); } DscCommand::DisableRandomStop => { @@ -444,9 +445,9 @@ async fn handle_dsc( println!("Got res: {:?}", res); } } - } else if dsc_cmd == DscCommand::Connect { - let url = "http://127.0.0.1:9998".to_string(); - println!("Connect to {:?}", url); + } else if let DscCommand::Connect { server } = dsc_cmd { + let url = format!("http://{}", server).to_string(); + println!("Connecting to {:?}", url); let rs = Client::new(&url); *dsc_client = Some(rs); } else { diff --git a/crutest/src/main.rs b/crutest/src/main.rs index b600ea955..b33092b18 100644 --- a/crutest/src/main.rs +++ b/crutest/src/main.rs @@ -18,6 +18,7 @@ use futures::stream::FuturesOrdered; use futures::StreamExt; use human_bytes::human_bytes; use indicatif::{ProgressBar, ProgressStyle}; +use oximeter::types::ProducerRegistry; use rand::prelude::*; use rand_chacha::rand_core::SeedableRng; use serde::{Deserialize, Serialize}; @@ -117,11 +118,8 @@ enum Workload { /// Test the downstairs replay path. /// Stop a downstairs, then run some IO, then start that downstairs back /// up. Verify all IO to all downstairs finishes. - Replay { - /// URL location of the running dsc server - #[clap(long, default_value = "http://127.0.0.1:9998", action)] - dsc: String, - }, + /// This test requires a dsc server to control the downstairs. + Replay, /// Test the downstairs replacement path. /// Run IO to the upstairs, then replace a downstairs, then run /// more IO and verify it all works as expected. @@ -139,7 +137,7 @@ enum Workload { ReplaceBeforeActive { /// URL location of the running dsc server #[clap(long, default_value = "http://127.0.0.1:9998", action)] - dsc: String, + dsc_str: String, /// The address:port of a running downstairs for replacement #[clap(long, action)] @@ -149,7 +147,7 @@ enum Workload { ReplaceReconcile { /// URL location of the running dsc server #[clap(long, default_value = "http://127.0.0.1:9998", action)] - dsc: String, + dsc_str: String, /// The address:port of a running downstairs for replacement #[clap(long, action)] @@ -184,6 +182,12 @@ pub struct Opt { #[clap(short, long, global = true, action)] count: Option, + /// IP:Port for a dsc server. + /// Some tests require a dsc enpoint to control the downstairs. + /// A dsc endpoint can also be used to construct the initial Volume. + #[clap(long, global = true, action)] + dsc: Option, + /// How long to wait before the auto flush check fires #[clap(long, global = true, action)] flush_timeout: Option, @@ -246,13 +250,7 @@ pub struct Opt { stable: bool, /// The IP:Port where each downstairs is listening. - #[clap( - short, - long, - global = true, - default_value = "127.0.0.1:9000", - action - )] + #[clap(short, long, global = true, action)] target: Vec, /// A UUID to use for the upstairs. @@ -735,9 +733,193 @@ async fn handle_signals( } } +// Construct a volume for use by the tests. +// Our choice of how to construct the volume depends on what options we +// have been given. +// +// If we have been provided a vcr file, this will get first priority and all +// other options will be ignored. +// +// Second choice is if we are provided the address for a dsc server. We can +// use the dsc server to determine part of what we need to create a Volume. +// The rest of what we need we can gather from the CrucibleOpts, which are +// built from options provided on the command line, or their defaults. +// +// For the final choice we have to construct a Volume by asking our downstairs +// for information that we need up front, which we then combine with +// CrucibleOpts. This will work as long as one of the downstairs is up +// already. If we have a test that requires no downstairs to be running on +// startup, then we need to provide a VCR file, or use the dsc server. +async fn make_a_volume( + opt: &Opt, + block_io_logger: Logger, + test_log: &Logger, + pr: Option, +) -> Result> { + let up_uuid = opt.uuid.unwrap_or_else(Uuid::new_v4); + let mut crucible_opts = CrucibleOpts { + id: up_uuid, + target: opt.target.clone(), + lossy: false, + flush_timeout: opt.flush_timeout, + key: opt.key.clone(), + cert_pem: opt.cert_pem.clone(), + key_pem: opt.key_pem.clone(), + root_cert_pem: opt.root_cert_pem.clone(), + control: opt.control, + read_only: opt.read_only, + }; + + if let Some(vcr_file) = &opt.vcr_file { + let vcr: VolumeConstructionRequest = match read_json(vcr_file) { + Ok(vcr) => vcr, + Err(e) => { + bail!("Error {:?} reading VCR from {:?}", e, vcr_file) + } + }; + info!(test_log, "Using VCR: {:?}", vcr); + + if opt.gen != 0 { + warn!(test_log, "gen option is ignored when VCR is provided"); + } + if !opt.target.is_empty() { + warn!(test_log, "targets are ignored when VCR is provided"); + } + let volume = Volume::construct(vcr, pr, block_io_logger).await.unwrap(); + Ok(Arc::new(volume)) + } else if opt.dsc.is_some() { + // We were given a dsc endpoint, use that to create a VCR that + // represents our Volume. + if !opt.target.is_empty() { + warn!(test_log, "targets are ignored when dsc option is provided"); + } + let dsc = opt.dsc.unwrap(); + let dsc_url = format!("http://{}", dsc); + let dsc_client = Client::new(&dsc_url); + let ri = match dsc_client.dsc_get_region_info().await { + Ok(res) => res.into_inner(), + Err(e) => { + bail!("Failed to get region info from {:?}: {}", dsc_url, e); + } + }; + info!(test_log, "use region info: {:?}", ri); + let extent_info = RegionExtentInfo { + block_size: ri.block_size, + blocks_per_extent: ri.blocks_per_extent, + extent_count: ri.extent_count, + }; + + let res = dsc_client.dsc_get_region_count().await.unwrap(); + let regions = res.into_inner(); + if regions < 3 { + bail!("Found {regions} regions. We need at least 3"); + } + + let sv_count = regions / 3; + let region_remainder = regions % 3; + info!( + test_log, + "dsc has {} regions. This means {} sub_volumes", regions, sv_count + ); + if region_remainder != 0 { + warn!( + test_log, + "{} regions from dsc will not be part of any sub_volume", + region_remainder, + ); + } + + // We start by creating the overall volume. + let mut volume = Volume::new(extent_info.block_size, block_io_logger); + + // Now, loop over regions we found from dsc and make a + // sub_volume at every three. + let mut cid = 0; + for sv in 0..sv_count { + let mut targets = Vec::new(); + for _ in 0..3 { + let port = dsc_client.dsc_get_port(cid).await.unwrap(); + let tar: SocketAddr = + format!("{}:{}", dsc.ip(), port.into_inner()) + .parse() + .unwrap(); + targets.push(tar); + cid += 1; + } + info!(test_log, "SV {:?} has targets: {:?}", sv, targets); + crucible_opts.target = targets; + + volume + .add_subvolume_create_guest( + crucible_opts.clone(), + extent_info.clone(), + opt.gen, + pr.clone(), + ) + .await + .unwrap(); + } + + Ok(Arc::new(volume)) + } else { + // We were not provided a VCR, so, we have to make one by using + // the repair port on a downstairs to get region information that + // we require. Once we have that information, we can build a VCR + // from it. + + // For each sub-volume, we need to know: + // block_size, blocks_per_extent, and extent_size. We can get any + // of the target downstairs to give us this info, if they are + // running. We don't care which one responds. Any mismatch will + // be detected later in the process and handled by the upstairs. + let mut extent_info_result = None; + for target in &crucible_opts.target { + let port = target.port() + crucible_common::REPAIR_PORT_OFFSET; + info!(test_log, "look at: http://{}:{} ", target.ip(), port); + let repair_url = format!("http://{}:{}", target.ip(), port); + let repair_client = repair_client::new(&repair_url); + match repair_client.get_region_info().await { + Ok(ri) => { + info!(test_log, "RI is: {:?}", ri); + extent_info_result = Some(RegionExtentInfo { + block_size: ri.block_size(), + blocks_per_extent: ri.extent_size().value, + extent_count: ri.extent_count(), + }); + break; + } + Err(e) => { + warn!( + test_log, + "Failed to get info from {:?} {:?}", repair_url, e + ); + } + } + } + let extent_info = match extent_info_result { + Some(ei) => ei, + None => { + bail!("Can't determine extent info to build a Volume"); + } + }; + + let mut volume = Volume::new(extent_info.block_size, block_io_logger); + volume + .add_subvolume_create_guest( + crucible_opts.clone(), + extent_info, + opt.gen, + pr, + ) + .await + .unwrap(); + + Ok(Arc::new(volume)) + } +} + /** - * This is an example Crucible client. - * Here we make use of the interfaces that Crucible exposes. + * A test program that makes use use of the interfaces that Crucible exposes. */ #[tokio::main] async fn main() -> Result<()> { @@ -759,26 +941,9 @@ async fn main() -> Result<()> { bail!("Verify requires verify_in file"); } - let up_uuid = opt.uuid.unwrap_or_else(Uuid::new_v4); - - let crucible_opts = CrucibleOpts { - id: up_uuid, - target: opt.target.clone(), - lossy: false, - flush_timeout: opt.flush_timeout, - key: opt.key, - cert_pem: opt.cert_pem, - key_pem: opt.key_pem, - root_cert_pem: opt.root_cert_pem, - control: opt.control, - read_only: opt.read_only, - }; - - /* - * If just want the cli, then start that after our runtime. The cli - * does not need upstairs started, as that should happen in the - * cli-server code. - */ + // If just want the cli, then start that after our runtime. The cli + // does not need upstairs started, as that should happen in the + // cli-server code. if let Workload::Cli { attach } = opt.workload { cli::start_cli_client(attach).await?; return Ok(()); @@ -835,80 +1000,9 @@ async fn main() -> Result<()> { pr = None; } - // We need to build a Volume for all the tests to use. - // If we have received a VCR as input, we can use that. Otherwise we - // have to construct one by asking our downstairs for information that - // we need up front. This will work as long as one of the downstairs - // is up already. If we have a test that requires no downstairs to be - // running on startup, then we need to provide a VCR up front. - let block_io = { - if let Some(vcr_file) = opt.vcr_file { - let vcr: VolumeConstructionRequest = match read_json(&vcr_file) { - Ok(vcr) => vcr, - Err(e) => { - bail!("Error {:?} reading VCR from {:?}", e, vcr_file) - } - }; - let volume = - Volume::construct(vcr, pr, block_io_logger).await.unwrap(); - Arc::new(volume) - } else { - // We were not provided a VCR, so, we have to make one by using - // the repair port on a downstairs to get region information that - // we require. Once we have that information, we can build a VCR - // from it. - - // For each sub-volume, we need to know: - // block_size, blocks_per_extent, and extent_size. We can get any - // of the target downstairs to give us this info, if they are - // running. We don't care which one responds. Any mismatch will - // be detected later in the process and handled by the upstairs. - let mut extent_info_result = None; - for target in &crucible_opts.target { - let port = target.port() + crucible_common::REPAIR_PORT_OFFSET; - println!("look at: http://{}:{} ", target.ip(), port); - let repair_url = format!("http://{}:{}", target.ip(), port); - let repair_client = repair_client::new(&repair_url); - match repair_client.get_region_info().await { - Ok(ri) => { - println!("RI is: {:?}", ri); - extent_info_result = Some(RegionExtentInfo { - block_size: ri.block_size(), - blocks_per_extent: ri.extent_size().value, - extent_count: ri.extent_count(), - }); - break; - } - Err(e) => { - println!( - "Failed to get info from {:?} {:?}", - repair_url, e - ); - } - } - } - let extent_info = match extent_info_result { - Some(ei) => ei, - None => { - bail!("Can't determine extent info to build a Volume"); - } - }; - - let mut volume = - Volume::new(extent_info.block_size, block_io_logger); - volume - .add_subvolume_create_guest( - crucible_opts, - extent_info, - opt.gen, - pr, - ) - .await - .unwrap(); - - Arc::new(volume) - } - }; + // Build a Volume for all the tests to use. + let block_io = + make_a_volume(&opt, block_io_logger.clone(), &test_log, pr).await?; if let Workload::CliServer { listen, port } = opt.workload { cli::start_cli_server( @@ -1200,7 +1294,7 @@ async fn main() -> Result<()> { } return Ok(()); } - Workload::Replay { dsc } => { + Workload::Replay => { // Either we have a count, or we run until we get a signal. let mut wtq = { if opt.continuous { @@ -1210,8 +1304,15 @@ async fn main() -> Result<()> { WhenToQuit::Count { count } } }; - - let dsc_client = Client::new(&dsc); + let dsc_client = match opt.dsc { + Some(dsc_addr) => { + let dsc_url = format!("http://{}", dsc_addr); + Client::new(&dsc_url) + } + None => { + bail!("Replay workload requires a dsc endpoint"); + } + }; replay_workload(&block_io, &mut wtq, &mut region_info, dsc_client) .await?; } @@ -1243,8 +1344,11 @@ async fn main() -> Result<()> { ) .await?; } - Workload::ReplaceBeforeActive { dsc, replacement } => { - let dsc_client = Client::new(&dsc); + Workload::ReplaceBeforeActive { + dsc_str, + replacement, + } => { + let dsc_client = Client::new(&dsc_str); // Either we have a count, or we run until we get a signal. let wtq = { if opt.continuous { @@ -1271,8 +1375,11 @@ async fn main() -> Result<()> { ) .await?; } - Workload::ReplaceReconcile { dsc, replacement } => { - let dsc_client = Client::new(&dsc); + Workload::ReplaceReconcile { + dsc_str, + replacement, + } => { + let dsc_client = Client::new(&dsc_str); // Either we have a count, or we run until we get a signal. let wtq = { if opt.continuous { @@ -2133,8 +2240,8 @@ async fn replay_workload( "CLIENT: Up:{} ds:{} act:{}", wc.up_count, wc.ds_count, wc.active_count ); - if wc.up_count + wc.ds_count == 0 && wc.active_count == 3 { - println!("Replay: All jobs finished, all DS active."); + if wc.up_count + wc.ds_count == 0 { + println!("Replay: All jobs finished"); break; } tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; diff --git a/dsc-client/Cargo.toml b/dsc-client/Cargo.toml index 13d18f265..4fd6d0223 100644 --- a/dsc-client/Cargo.toml +++ b/dsc-client/Cargo.toml @@ -6,10 +6,11 @@ edition = "2021" [dependencies] anyhow.workspace = true +crucible-workspace-hack.workspace = true percent-encoding.workspace = true progenitor.workspace = true reqwest.workspace = true schemars.workspace = true serde_json.workspace = true serde.workspace = true -crucible-workspace-hack.workspace = true +uuid.workspace = true diff --git a/dsc/Cargo.toml b/dsc/Cargo.toml index 3a758a023..341f33378 100644 --- a/dsc/Cargo.toml +++ b/dsc/Cargo.toml @@ -10,6 +10,7 @@ byte-unit.workspace = true clap.workspace = true crucible-client-types.workspace = true crucible-common.workspace = true +crucible-workspace-hack.workspace = true csv.workspace = true dsc-client.workspace = true dropshot.workspace = true @@ -19,7 +20,7 @@ schemars.workspace = true serde.workspace = true statistical.workspace = true tokio.workspace = true -crucible-workspace-hack.workspace = true +uuid.workspace = true [dev-dependencies] expectorate.workspace = true diff --git a/dsc/src/client.rs b/dsc/src/client.rs index 28ff8bcd5..be3c14e20 100644 --- a/dsc/src/client.rs +++ b/dsc/src/client.rs @@ -8,6 +8,8 @@ use anyhow::Result; #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Parser, PartialEq)] pub enum ClientCommand { + /// Returns true if all downstairs are running + AllRunning, /// Disable random stopping of downstairs DisableRandomStop, /// Disable auto restart on the given downstairs client ID @@ -46,6 +48,8 @@ pub enum ClientCommand { #[clap(long, short, action)] cid: u32, }, + /// Get count of regions. + RegionCount, /// Get region info. RegionInfo, /// Shutdown all downstairs, then shutdown dsc itself. @@ -71,6 +75,11 @@ pub enum ClientCommand { StopAll, /// Stop a random downstairs StopRand, + /// Get the UUID of the given client ID + Uuid { + #[clap(long, short, action)] + cid: u32, + }, } // Connect to the DSC and run a command. @@ -78,6 +87,10 @@ pub enum ClientCommand { pub async fn client_main(server: String, cmd: ClientCommand) -> Result<()> { let dsc = Client::new(&server); match cmd { + ClientCommand::AllRunning => { + let res = dsc.dsc_all_running().await.unwrap(); + println!("{:?}", res); + } ClientCommand::DisableRandomStop => { let _ = dsc.dsc_disable_random_stop().await.unwrap(); } @@ -110,6 +123,10 @@ pub async fn client_main(server: String, cmd: ClientCommand) -> Result<()> { let res = dsc.dsc_get_port(cid).await.unwrap(); println!("{:?}", res); } + ClientCommand::RegionCount => { + let res = dsc.dsc_get_region_count().await.unwrap(); + println!("{:?}", res); + } ClientCommand::RegionInfo => { let res = dsc.dsc_get_region_info().await.unwrap(); println!("{:?}", res); @@ -136,6 +153,10 @@ pub async fn client_main(server: String, cmd: ClientCommand) -> Result<()> { ClientCommand::StopRand => { let _ = dsc.dsc_stop_rand().await.unwrap(); } + ClientCommand::Uuid { cid } => { + let res = dsc.dsc_get_uuid(cid).await.unwrap(); + println!("{:?}", res); + } } Ok(()) } diff --git a/dsc/src/control.rs b/dsc/src/control.rs index ac60acd8e..371b2a70e 100644 --- a/dsc/src/control.rs +++ b/dsc/src/control.rs @@ -12,17 +12,19 @@ use dropshot::RequestContext; use dropshot::{HttpResponseOk, HttpResponseUpdatedNoContent}; use schemars::JsonSchema; use serde::Deserialize; -//use serde::Serialize; use std::sync::Arc; use super::*; pub(crate) fn build_api() -> ApiDescription> { let mut api = ApiDescription::new(); + api.register(dsc_all_running).unwrap(); api.register(dsc_get_ds_state).unwrap(); api.register(dsc_get_pid).unwrap(); api.register(dsc_get_port).unwrap(); + api.register(dsc_get_region_count).unwrap(); api.register(dsc_get_region_info).unwrap(); + api.register(dsc_get_uuid).unwrap(); api.register(dsc_stop).unwrap(); api.register(dsc_stop_all).unwrap(); api.register(dsc_stop_rand).unwrap(); @@ -187,6 +189,22 @@ async fn dsc_get_port( Ok(HttpResponseOk(ds_port)) } +/** + * Return true if all downstairs are running + */ +#[endpoint { + method = GET, + path = "/allrunning", +}] +async fn dsc_all_running( + rqctx: RequestContext>, +) -> Result, HttpError> { + let api_context = rqctx.context(); + + let all_state = api_context.dsci.all_running().await; + Ok(HttpResponseOk(all_state)) +} + /** * Fetch the current state for the requested client_id */ @@ -218,6 +236,37 @@ async fn dsc_get_ds_state( Ok(HttpResponseOk(ds_state)) } +/** + * Fetch the UUID for the requested client_id + */ +#[endpoint { + method = GET, + path = "/uuid/cid/{cid}", +}] +async fn dsc_get_uuid( + rqctx: RequestContext>, + path: Path, +) -> Result, HttpError> { + let path = path.into_inner(); + let cid = path.cid; + let api_context = rqctx.context(); + + if cid_bad(&api_context.dsci, cid).await { + return Err(HttpError::for_bad_request( + Some(String::from("BadInput")), + format!("Invalid client id: {}", cid), + )); + } + let uuid = api_context.dsci.get_ds_uuid(cid).await.map_err(|e| { + HttpError::for_bad_request( + None, + format!("failed to get UUID for downstairs {}: {:#}", cid, e), + ) + })?; + + Ok(HttpResponseOk(uuid)) +} + /** * Stop the downstairs at the given client_id */ @@ -406,6 +455,22 @@ async fn dsc_enable_restart_all( Ok(HttpResponseUpdatedNoContent()) } +/** + * Get the count of regions. + */ +#[endpoint { + method = GET, + path = "/regioncount", +}] +async fn dsc_get_region_count( + rqctx: RequestContext>, +) -> Result, HttpError> { + let api_context = rqctx.context(); + + let region_count = api_context.dsci.get_region_count().await; + Ok(HttpResponseOk(region_count)) +} + /** * Fetch the region info for our downstairs */ @@ -422,7 +487,7 @@ async fn dsc_get_region_info( api_context.dsci.get_region_info().await.map_err(|e| { HttpError::for_bad_request( None, - format!("failed get to region info {:#}", e), + format!("failed to get region info {:#}", e), ) })?; diff --git a/dsc/src/main.rs b/dsc/src/main.rs index 847343b06..fabc0392a 100644 --- a/dsc/src/main.rs +++ b/dsc/src/main.rs @@ -20,6 +20,7 @@ use tokio::process::{Child, Command}; use tokio::runtime::Builder; use tokio::sync::{mpsc, watch, Mutex}; use tokio::time::sleep_until; +use uuid::Uuid; pub mod client; pub mod control; @@ -145,7 +146,7 @@ enum Action { )] region_dir: Vec, }, - /// Start a downstairs region set + /// Start the requested downstairs regions /// This requires the region is already created, unless you include /// the --create option. Start { @@ -228,6 +229,7 @@ struct DownstairsInfo { ds_bin: String, region_dir: String, port: u32, + uuid: Uuid, _create_output: String, output_file: PathBuf, client_id: usize, @@ -235,10 +237,12 @@ struct DownstairsInfo { } impl DownstairsInfo { + #[allow(clippy::too_many_arguments)] fn new( ds_bin: String, region_dir: String, port: u32, + uuid: Uuid, _create_output: String, output_file: PathBuf, client_id: usize, @@ -248,6 +252,7 @@ impl DownstairsInfo { ds_bin, region_dir, port, + uuid, _create_output, output_file, client_id, @@ -295,9 +300,9 @@ impl DownstairsInfo { } } -// Describing the downstairs that together make a region. +// Describing all the downstairs regions we know about. #[derive(Debug)] -struct RegionSet { +struct Regions { ds: Vec>, ds_bin: String, region_dir: Vec, @@ -313,8 +318,8 @@ struct RegionSet { pub struct DscInfo { /// The directory location where output files are output_dir: PathBuf, - /// The region set that make our downstairs - rs: Mutex, + /// The regions this dsc knows about + rs: Mutex, /// Work for the dsc to do, what downstairs to start/stop/etc work: Mutex, /// If the downstairs are started read only @@ -427,7 +432,7 @@ impl DscInfo { }; assert_eq!(rv.len(), region_count); - let rs = RegionSet { + let rs = Regions { ds: Vec::new(), ds_bin: downstairs_bin, region_dir: rv, @@ -452,9 +457,9 @@ impl DscInfo { } /* - * Create a default region set. Attach it to our dsc info struct + * Create the requested set of regions. Attach it to our dsc info struct */ - async fn create_region_set( + async fn create_regions( &self, extent_size: u64, extent_count: u32, @@ -475,13 +480,12 @@ impl DscInfo { .await .unwrap(); } - println!("Region set with {region_count} regions was created"); + println!("Created {region_count} regions"); Ok(()) } /** - * Create a region as part of the region set at the given port with - * the provided extent size and extent_count. + * Create a region with the provided extent size and extent_count. */ async fn create_ds_region( &self, @@ -524,8 +528,7 @@ impl DscInfo { extent_count, }); } - - // use port to do this, or make a client ID that is port base, etc + // The port is determined by ds_id and the port step value. let port = rs.port_base + (ds_id as u32 * rs.port_step); let rd = &rs.region_dir[ds_id]; let new_region_dir = port_to_region(rd.clone(), port)?; @@ -533,6 +536,7 @@ impl DscInfo { let extent_count = format!("{}", extent_count); let block_size = format!("{}", block_size); let uuid = format!("12345678-0000-0000-0000-{:012}", port); + let ds_uuid = Uuid::parse_str(&uuid).unwrap(); let start = std::time::Instant::now(); let mut cmd_args = vec![ "create", @@ -589,6 +593,7 @@ impl DscInfo { rs.ds_bin.clone(), new_region_dir, port, + ds_uuid, String::from_utf8(output.stdout).unwrap(), output_path, ds_id, @@ -615,8 +620,8 @@ impl DscInfo { } /* - * Generate a region set using the starting port and region - * directories. Return error if any of them don't already exist. + * Generate regions using the starting port and region directories. + * Return error if any of them don't already exist. * TODO: This is assuming a fair amount of stuff. * Make fewer assumptions... */ @@ -624,8 +629,8 @@ impl DscInfo { let mut rs = self.rs.lock().await; let mut port = rs.port_base; - // If we are generating our region set, then we don't know any - // information yet about the region. + // Since we are generating our regions, we must create the required + // directories and files. let mut region_info: Option = None; for ds_id in 0..region_count { let rd = rs.region_dir[ds_id].clone(); @@ -683,6 +688,7 @@ impl DscInfo { rs.ds_bin.clone(), new_region_dir, port, + def.uuid(), "/dev/null".to_string(), output_path, ds_id, @@ -699,6 +705,16 @@ impl DscInfo { Ok(()) } + async fn all_running(&self) -> bool { + let rs = self.rs.lock().await; + for state in rs.ds_state.iter() { + if *state != DownstairsState::Running { + return false; + } + } + true + } + async fn get_ds_state(&self, client_id: usize) -> Result { let rs = self.rs.lock().await; if rs.ds_state.len() <= client_id { @@ -723,6 +739,14 @@ impl DscInfo { Ok(rs.ds[client_id].port) } + async fn get_ds_uuid(&self, client_id: usize) -> Result { + let rs = self.rs.lock().await; + if rs.ds.len() <= client_id { + bail!("Invalid client ID: {}", client_id); + } + Ok(rs.ds[client_id].uuid) + } + async fn get_region_info(&self) -> Result { let rs = self.rs.lock().await; if let Some(ri) = &rs.region_info { @@ -731,6 +755,11 @@ impl DscInfo { bail!("No region info found"); } } + + async fn get_region_count(&self) -> usize { + let rs = self.rs.lock().await; + rs.ds.len() + } } // This holds the work queue for the main task. Work is added @@ -917,6 +946,32 @@ async fn start_dsc( drop(tx); drop(rs); + // Wait here for all downstairs to start + let mut running = 0; + loop { + let res = rx.recv().await; + if let Some(mi) = res { + println!( + "[{}][{}] initial start wait reports {:?}", + mi.port, mi.client_id, mi.state + ); + let mut rs = dsci.rs.lock().await; + if mi.state == DownstairsState::Running { + running += 1; + } + + rs.ds_state[mi.client_id] = mi.state; + rs.ds_pid[mi.client_id] = mi.pid; + + if running == rs.ds_state.len() { + println!("All downstairs are running"); + break; + } + } else { + println!("rx.recv got None"); + } + } + let mut rng = rand_chacha::ChaCha8Rng::from_entropy(); let mut timeout_deadline = deadline_secs(5); let mut shutdown_sent = false; @@ -1055,7 +1110,7 @@ struct MonitorInfo { } /// State of a downstairs. -#[derive(Debug, Copy, Clone, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Copy, Clone, Deserialize, Serialize, JsonSchema, PartialEq)] #[serde(rename_all = "snake_case")] pub enum DownstairsState { Stopped, @@ -1509,7 +1564,7 @@ fn main() -> Result<()> { false, )?; - runtime.block_on(dsci.create_region_set( + runtime.block_on(dsci.create_regions( extent_size, extent_count, block_size, @@ -1569,7 +1624,7 @@ fn main() -> Result<()> { )?; if create { - runtime.block_on(dsci.create_region_set( + runtime.block_on(dsci.create_regions( extent_size, extent_count, block_size, diff --git a/openapi/dsc-control.json b/openapi/dsc-control.json index 5d217def6..367ac701f 100644 --- a/openapi/dsc-control.json +++ b/openapi/dsc-control.json @@ -5,6 +5,31 @@ "version": "0.0.0" }, "paths": { + "/allrunning": { + "get": { + "summary": "Return true if all downstairs are running", + "operationId": "dsc_all_running", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Boolean", + "type": "boolean" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/disablerestart/all": { "post": { "summary": "Disable automatic restart on all downstairs", @@ -268,6 +293,33 @@ } } }, + "/regioncount": { + "get": { + "summary": "Get the count of regions.", + "operationId": "dsc_get_region_count", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "uint", + "type": "integer", + "format": "uint", + "minimum": 0 + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/regioninfo": { "get": { "summary": "Fetch the region info for our downstairs", @@ -453,6 +505,44 @@ } } } + }, + "/uuid/cid/{cid}": { + "get": { + "summary": "Fetch the UUID for the requested client_id", + "operationId": "dsc_get_uuid", + "parameters": [ + { + "in": "path", + "name": "cid", + "required": true, + "schema": { + "type": "integer", + "format": "uint", + "minimum": 0 + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Uuid", + "type": "string", + "format": "uuid" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } } }, "components": { diff --git a/tools/test_fail_live_repair.sh b/tools/test_fail_live_repair.sh index c90f8b4da..f8d6ac266 100755 --- a/tools/test_fail_live_repair.sh +++ b/tools/test_fail_live_repair.sh @@ -95,8 +95,7 @@ fi echo "starting $(date)" | tee ${loop_log} echo "Tail $test_log for test output" -# Make enough extents that we can be sure to catch in while it -# is repairing. +# Make enough extents that we can be sure to catch it while it is repairing. if ! ${dsc} create --cleanup \ --ds-bin "$cds" \ --extent-count 400 \ diff --git a/tools/test_replay.sh b/tools/test_replay.sh index 2948a3eef..e6bceb929 100755 --- a/tools/test_replay.sh +++ b/tools/test_replay.sh @@ -70,15 +70,11 @@ if ! ps -p $dsc_pid > /dev/null; then exit 1 fi -args=() -args+=( -t "127.0.0.1:8810" ) -args+=( -t "127.0.0.1:8820" ) -args+=( -t "127.0.0.1:8830" ) - gen=1 # Initial seed for verify file echo "Running initial fill" | tee -a "$test_log" -if ! "$crucible_test" fill "${args[@]}" -q -g "$gen"\ +if ! "$crucible_test" fill -q -g "$gen"\ + --dsc 127.0.0.1:9998 \ --verify-out "$verify_log" --retry-activate >> "$test_log" 2>&1 ; then echo Failed on initial verify seed, check "$test_log" ${dsc} cmd shutdown @@ -88,9 +84,10 @@ fi SECONDS=0 echo "Replay loop starts now $(date)" | tee -a "$test_log" -"$crucible_test" replay "${args[@]}" -c "$loops" \ +"$crucible_test" replay -c "$loops" \ --stable -g "$gen" --verify-out "$verify_log" \ --verify-in "$verify_log" \ + --dsc 127.0.0.1:9998 \ --retry-activate >> "$test_log" 2>&1 result=$? duration=$SECONDS @@ -103,7 +100,8 @@ else "$loops" $((duration / 60)) $((duration % 60)) | tee -a "$test_log" echo "Do final verify" | tee -a "$test_log" - if ! "$crucible_test" verify "${args[@]}" -q -g "$gen"\ + if ! "$crucible_test" verify -q -g "$gen"\ + --dsc 127.0.0.1:9998 \ --verify-out "$verify_log" \ --verify-in "$verify_log" >> "$test_log" 2>&1 ; then echo Failed on final verify, check "$test_log" diff --git a/tools/test_restart_repair.sh b/tools/test_restart_repair.sh index 1812a1e77..7cd42bca7 100755 --- a/tools/test_restart_repair.sh +++ b/tools/test_restart_repair.sh @@ -25,17 +25,23 @@ function ctrl_c() { # Bring all downstairs online. function bring_all_downstairs_online() { - # dsc start all downstairs - if ! "$dsc" cmd start-all; then - echo "dsc: Failed to stop all downstairs" + # dsc turn on automatic restart + if ! "$dsc" cmd enable-restart-all; then + echo "dsc: Failed to enable automatic restart" exit 1 fi - # dsc turn on automatic restart - if ! "$dsc" cmd enable-restart-all; then - echo "dsc: Failed to disable automatic restart" + # dsc start all downstairs + if ! "$dsc" cmd start-all; then + echo "dsc: Failed to start all downstairs" exit 1 fi + ready=$("$dsc" cmd all-running) + while [[ "$ready" != "true" ]]; do + echo "Waiting for all downstairs to come online" >> "$test_log" + sleep 5 + ready=$("$dsc" cmd all-running) + done } # Stop all downstairs.