From b6dfff70a82863d484d4be5730836a3d9eca8062 Mon Sep 17 00:00:00 2001 From: Ayaz <20735482+ayazhafiz@users.noreply.github.com> Date: Tue, 1 Aug 2023 16:58:35 -0500 Subject: [PATCH] Log warnings when workers have different commands (#74) --- CHANGELOG.md | 11 +- Cargo.lock | 39 +- Cargo.toml | 2 + crates/abq_cli/Cargo.toml | 2 +- crates/abq_cli/src/reporting.rs | 4 + crates/abq_cli/src/workers.rs | 4 + crates/abq_cli/tests/cli.rs | 86 +++ crates/abq_queue/Cargo.toml | 1 + crates/abq_queue/src/persistence/run_state.rs | 67 +- crates/abq_queue/src/queue.rs | 628 ++++++++++-------- crates/abq_queue/src/queue/test_utils.rs | 52 ++ crates/abq_queue/tests/integration.rs | 3 + crates/abq_reporting/src/lib.rs | 2 + crates/abq_reporting/src/output.rs | 2 +- crates/abq_reporting/src/writer.rs | 42 ++ crates/abq_utils/Cargo.toml | 2 + crates/abq_utils/src/lib.rs | 1 + crates/abq_utils/src/net_protocol.rs | 16 +- crates/abq_utils/src/test_command_hash.rs | 54 ++ crates/abq_workers/Cargo.toml | 1 + crates/abq_workers/src/assigned_run.rs | 17 +- crates/abq_workers/src/lib.rs | 2 +- crates/abq_workers/src/negotiate.rs | 60 +- crates/abq_workers/src/runner_strategy.rs | 10 +- 24 files changed, 819 insertions(+), 289 deletions(-) create mode 100644 crates/abq_queue/src/queue/test_utils.rs create mode 100644 crates/abq_reporting/src/writer.rs create mode 100644 crates/abq_utils/src/test_command_hash.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 56af120b..7be3901e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,14 @@ ## 1.6.2 -ABQ 1.6.2 is a patch release fixing an issue that could result in -denial-of-service of an ABQ queue due to large test results. +ABQ 1.6.2 is a patch release. + +An issue that could result in denial-of-service of an ABQ queue due to large +test results is fixed. + +ABQ will now log a warning when a worker for a given run ID is using a test +command different from any other worker, either in the same run or during +retries. ABQ may not function properly if the test command for a run ID changes +between workers executing tests for that run ID. ## 1.6.1 diff --git a/Cargo.lock b/Cargo.lock index 3fb3f8a0..e4229f28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,7 +4,7 @@ version = 3 [[package]] name = "abq" -version = "1.6.1" +version = "1.6.2" dependencies = [ "abq_dot_reporter", "abq_hosted", @@ -159,6 +159,7 @@ name = "abq_queue" version = "0.1.0" dependencies = [ "abq_native_runner_simulation", + "abq_reporting", "abq_run_n_times", "abq_test_utils", "abq_utils", @@ -252,6 +253,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "blake3", "derive_more", "flate2", "insta", @@ -284,6 +286,7 @@ name = "abq_workers" version = "0.1.0" dependencies = [ "abq_generic_test_runner", + "abq_reporting", "abq_run_n_times", "abq_test_utils", "abq_utils", @@ -346,12 +349,24 @@ version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" +[[package]] +name = "arrayref" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" + [[package]] name = "arrayvec" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "arrayvec" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -802,6 +817,20 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "blake3" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "729b71f35bd3fa1a4c86b85d32c8b9069ea7fe14f7a53cfabb65f62d4265b888" +dependencies = [ + "arrayref", + "arrayvec 0.7.4", + "cc", + "cfg-if", + "constant_time_eq", + "digest", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -973,6 +1002,12 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "constant_time_eq" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21a53c0a4d288377e7415b53dcfc3c04da5cdc2cc95c8d5ac178b58f0b861ad6" + [[package]] name = "convert_case" version = "0.4.0" @@ -3265,7 +3300,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6cbce692ab4ca2f1f3047fcf732430249c0e971bfdd2b234cf2c47ad93af5983" dependencies = [ - "arrayvec", + "arrayvec 0.5.2", "utf8parse", "vte_generate_state_changes", ] diff --git a/Cargo.toml b/Cargo.toml index ee35b2ed..deb00996 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,8 @@ signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } rand = "0.8.5" rand_chacha = "0.3.1" +blake3 = "=1.4.0" + tracing = { version = "0.1.37", features = ["release_max_level_info"] } tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] } tracing-appender = "0.2.2" diff --git a/crates/abq_cli/Cargo.toml b/crates/abq_cli/Cargo.toml index a228b3e5..7af68bdd 100644 --- a/crates/abq_cli/Cargo.toml +++ b/crates/abq_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "abq" -version = "1.6.1" +version = "1.6.2" edition = "2021" [dependencies] diff --git a/crates/abq_cli/src/reporting.rs b/crates/abq_cli/src/reporting.rs index 2e127202..a436999a 100644 --- a/crates/abq_cli/src/reporting.rs +++ b/crates/abq_cli/src/reporting.rs @@ -267,6 +267,10 @@ impl StdoutPreferences { pub fn stdout_stream(&self) -> impl termcolor::WriteColor { StandardStream::stdout(self.color) } + + pub fn stderr_stream(&self) -> impl termcolor::WriteColor { + StandardStream::stderr(self.color) + } } fn reporter_from_kind( diff --git a/crates/abq_cli/src/workers.rs b/crates/abq_cli/src/workers.rs index ee2dd16d..ee0eeee8 100644 --- a/crates/abq_cli/src/workers.rs +++ b/crates/abq_cli/src/workers.rs @@ -85,8 +85,11 @@ pub async fn start_workers_standalone( run_id: run_id.clone(), batch_size_hint: batch_size, test_strategy, + test_command_hash: runner_kind.command_hash(), }; + let stderr_writer = Box::new(stdout_preferences.stderr_stream()); + let mut worker_pool = WorkersNegotiator::negotiate_and_start_pool( WorkersConfig { tag, @@ -101,6 +104,7 @@ pub async fn start_workers_standalone( results_batch_size_hint: batch_size.get(), max_run_number, should_send_results: execution_mode == ExecutionMode::WriteNormal, + warning_writer: stderr_writer, }, queue_negotiator, client_opts, diff --git a/crates/abq_cli/tests/cli.rs b/crates/abq_cli/tests/cli.rs index 2f5d57ad..448b3da5 100644 --- a/crates/abq_cli/tests/cli.rs +++ b/crates/abq_cli/tests/cli.rs @@ -4649,3 +4649,89 @@ fn test_run_with_pat_and_run_id_that_doesnt_exist() { term(queue_proc); } + +#[test] +#[with_protocol_version] +#[serial] +fn warn_on_different_runner_command() { + let name = "warn_on_different_runner_command"; + let conf = CSConfigOptions { + use_auth_token: true, + tls: true, + }; + + let (queue_proc, queue_addr) = setup_queue!(name, conf); + + let proto = AbqProtocolVersion::V0_2.get_supported_witness().unwrap(); + + let manifest = vec![TestOrGroup::test(Test::new( + proto, + "test1".to_string(), + [], + Default::default(), + ))]; + + let manifest = ManifestMessage::new(Manifest::new(manifest, Default::default())); + + let simulation = [ + Connect, + // + // Write spawn message + OpaqueWrite(pack(legal_spawned_message(proto))), + // + // Write the manifest if we need to. + // Otherwise handle the one test. + IfGenerateManifest { + then_do: vec![OpaqueWrite(pack(&manifest))], + else_do: vec![ + // + // Read init context message + write ACK + OpaqueRead, + OpaqueWrite(pack(InitSuccessMessage::new(proto))), + // Read first test, write okay + OpaqueRead, + OpaqueWrite(pack(RawTestResultMessage::fake(proto))), + ], + }, + // + // Finish + Exit(0), + ]; + + let packed = pack_msgs_to_disk(simulation); + + let test_args = |instance: usize| { + let simulator = native_runner_simulation_bin(); + let simfile_path = packed.path.display().to_string(); + let args = vec![ + format!("test"), + format!("--worker=0"), + format!("--queue-addr={queue_addr}"), + format!("--run-id=test-run-id"), + ]; + let mut args = conf.extend_args_for_client(args); + args.extend([s!("--"), simulator, simfile_path]); + // Add a fake argument to differentiate the two runs. + args.push("--fake-instance-arg".to_string()); + args.push(instance.to_string()); + args + }; + + let instance1 = Abq::new(format!("{name}_inst1")).args(test_args(0)).run(); + + assert!(instance1.exit_status.success()); + + let instance2 = Abq::new(format!("{name}_inst2")).args(test_args(1)).run(); + + assert!(instance2.exit_status.success()); + + let stdout = instance2.stdout; + let stderr = instance2.stderr; + + assert!( + stderr.contains("WARNING: The passed test command differs from the one used by other ABQ workers for this run ID."), + "STDOUT:\n{stdout}\nSTDERR:\n{stderr}" + ); + + term(queue_proc); +} diff --git a/crates/abq_queue/Cargo.toml b/crates/abq_queue/Cargo.toml index 25d9686c..271595a6 100644 --- a/crates/abq_queue/Cargo.toml +++ b/crates/abq_queue/Cargo.toml @@ -38,6 +38,7 @@ s3 = [ ] [dev-dependencies] +abq_reporting = { path = "../abq_reporting" } abq_utils = { path = "../abq_utils", features = ["expose-native-protocols"] } abq_test_utils = { path = "../abq_test_support/abq_test_utils" } abq_with_protocol_version = { path = "../abq_test_support/with_protocol_version" } diff --git a/crates/abq_queue/src/persistence/run_state.rs b/crates/abq_queue/src/persistence/run_state.rs index ab58f99a..837c3424 100644 --- a/crates/abq_queue/src/persistence/run_state.rs +++ b/crates/abq_queue/src/persistence/run_state.rs @@ -7,6 +7,7 @@ use abq_utils::{ error::OpaqueResult, exit::ExitCode, net_protocol::{entity::Entity, runners::MetadataMap, workers::RunId}, + test_command_hash::TestCommandHash, }; use serde_derive::{Deserialize, Serialize}; use thiserror::Error; @@ -27,6 +28,9 @@ pub struct RunState { pub new_worker_exit_code: ExitCode, pub init_metadata: MetadataMap, pub seen_workers: Vec, + /// NB: Optional because this did not exist prior to ABQ 1.6.2. + #[serde(skip_serializing_if = "Option::is_none")] + pub test_command_hash: Option, } impl RunState { @@ -36,6 +40,7 @@ impl RunState { new_worker_exit_code: ExitCode::SUCCESS, init_metadata: MetadataMap::new(), seen_workers: Vec::new(), + test_command_hash: Some(TestCommandHash::random()), } } } @@ -135,6 +140,7 @@ mod test { new_worker_exit_code: ExitCode::SUCCESS, init_metadata: MetadataMap::new(), seen_workers: vec![], + test_command_hash: Some(TestCommandHash::from_command("yarn", &["jest".to_owned()])), }; let serialized = SerializableRunState::new(run_state.clone()); let serialized_bytes = serialized.serialize().unwrap(); @@ -146,7 +152,41 @@ mod test { "schema_version": 1, "new_worker_exit_code": 0, "init_metadata": {}, - "seen_workers": [] + "seen_workers": [], + "test_command_hash": [ + 115, + 226, + 225, + 160, + 14, + 172, + 165, + 205, + 140, + 100, + 188, + 76, + 56, + 80, + 143, + 200, + 177, + 239, + 232, + 249, + 38, + 80, + 127, + 129, + 4, + 157, + 230, + 41, + 123, + 72, + 130, + 80 + ] } "###); } @@ -170,12 +210,37 @@ mod test { )); } + #[test] + fn test_deserialize_missing_test_command_hash() { + let serialized = r###" + { + "schema_version": 1, + "new_worker_exit_code": 0, + "init_metadata": {}, + "seen_workers": [] + } + "###; + let result = SerializableRunState::deserialize(serialized.as_bytes()).unwrap(); + let result = result.into_run_state(); + + assert_eq!( + result, + RunState { + new_worker_exit_code: ExitCode::SUCCESS, + init_metadata: MetadataMap::new(), + seen_workers: vec![], + test_command_hash: None, + } + ); + } + #[test] fn test_deserialize_compatible_schema_but_different_version() { let run_state = RunState { new_worker_exit_code: ExitCode::SUCCESS, init_metadata: MetadataMap::new(), seen_workers: vec![], + test_command_hash: Some(TestCommandHash::random()), }; let serialized = SerializableRunState(SerializedRunStateInner { schema_version: SchemaVersion { diff --git a/crates/abq_queue/src/queue.rs b/crates/abq_queue/src/queue.rs index 60d80d1f..59af6df3 100644 --- a/crates/abq_queue/src/queue.rs +++ b/crates/abq_queue/src/queue.rs @@ -30,11 +30,12 @@ use abq_utils::net_protocol::{ }; use abq_utils::net_protocol::{async_write_stream, meta, publicize_addr}; use abq_utils::server_shutdown::{ShutdownManager, ShutdownReceiver}; +use abq_utils::test_command_hash::TestCommandHash; use abq_utils::tls::ServerTlsStrategy; use abq_utils::vec_map::VecMap; use abq_utils::{atomic, illegal_state, log_assert}; use abq_workers::negotiate::{QueueNegotiator, QueueNegotiatorHandle, QueueNegotiatorServerError}; -use abq_workers::{AssignedRun, AssignedRunStatus, GetAssignedRun}; +use abq_workers::{AssignedRun, AssignedRunKind, AssignedRunStatus, GetAssignedRun}; use async_trait::async_trait; use parking_lot::{Mutex, RwLock}; @@ -71,6 +72,11 @@ enum RunState { /// Strategy for pulling tests off the queue test_strategy: TestStrategy, + + /// Hash of the native test runner command for this run. + /// + /// Used for validation purposes, as we expect all runners to be using the same command. + test_command_hash: TestCommandHash, }, /// The active state of the test suite run. The queue is populated and at least one worker is /// connected. @@ -90,6 +96,11 @@ enum RunState { /// A tracker of what results have been persisted. results_persistence: ResultsPersistedCell, + + /// Hash of the native test runner command for this run. + /// + /// Used for validation purposes, as we expect all runners to be using the same command. + test_command_hash: TestCommandHash, }, /// All items in the manifest have been handed out. /// Workers may still be executing locally, for example in-band retries. @@ -110,6 +121,11 @@ enum RunState { /// A tracker of what results have been persisted. results_persistence: ResultsPersistence, + + /// Hash of the native test runner command for this run. + /// + /// Used for validation purposes, as we expect all runners to be using the same command. + test_command_hash: Option, }, Cancelled { #[allow(unused)] // yet @@ -311,22 +327,22 @@ enum ManifestProgressResult { NewProgress { newly_observed_test_index: usize }, } +#[derive(Clone, Copy)] +struct RunParams<'a> { + run_id: &'a RunId, + batch_size_hint: NonZeroUsize, + test_strategy: TestStrategy, + runner_test_command_hash: TestCommandHash, + entity: Entity, + remote: &'a RemotePersister, +} + impl AllRuns { /// Finds a queue for a run, or creates a new one if the run is observed as fresh. /// If the given run ID already has an associated queue, an error is returned. - pub async fn find_or_create_run( - &self, - run_id: &RunId, - batch_size_hint: NonZeroUsize, - test_strategy: TestStrategy, - entity: Entity, - remote: &RemotePersister, - ) -> AssignedRunStatus { - if !self.runs.read().contains_key(run_id) { - match self - .try_create_run(run_id, batch_size_hint, test_strategy, entity, remote) - .await - { + pub async fn find_or_create_run(&self, params: RunParams<'_>) -> AssignedRunStatus { + if !self.runs.read().contains_key(params.run_id) { + match self.try_create_run(params).await { ControlFlow::Break(assigned) => return assigned, ControlFlow::Continue(()) => { // Pass through, the run was not actually fresh and is now @@ -335,8 +351,15 @@ impl AllRuns { } } + let RunParams { + run_id, + entity, + runner_test_command_hash, + .. + } = params; + let runs = self.runs.read(); - let run = match runs.get(run_id) { + let run = match runs.get(params.run_id) { Some(st) => st.read(), None => { illegal_state!( @@ -352,6 +375,7 @@ impl AllRuns { worker_connection_times, batch_size_hint: _, test_strategy: _, + test_command_hash, } => { let mut worker_connection_times = worker_connection_times.lock(); let old = worker_connection_times.insert_by_tag(entity, time::Instant::now()); @@ -361,19 +385,40 @@ impl AllRuns { "same worker connecting twice for manifest" ); - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: false, + let runner_test_command_differs = test_command_hash != &runner_test_command_hash; + if runner_test_command_differs { + log_runner_test_command_differs(run_id, entity, IsRetry(false)); + } + + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: false, + }, + runner_test_command_differs, }) } - RunState::HasWork { active_workers, .. } => { + RunState::HasWork { + active_workers, + test_command_hash, + .. + } => { let mut active_workers = active_workers.lock(); let old_active_worker_info = active_workers.insert_by_tag(entity, None); + let runner_test_command_differs = test_command_hash != &runner_test_command_hash; + if runner_test_command_differs { + let is_retry = old_active_worker_info.is_some(); + log_runner_test_command_differs(run_id, entity, IsRetry(is_retry)); + } + match old_active_worker_info { None => { // This is a fresh worker. - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: false, + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: false, + }, + runner_test_command_differs, }) } Some((old_entity, old_finished_state)) => { @@ -412,13 +457,17 @@ impl AllRuns { "worker reconnecting for out-of-process retry manifest during active run" ); - AssignedRunStatus::Run(AssignedRun::Retry) + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Retry, + runner_test_command_differs, + }) } } } RunState::InitialManifestDone { new_worker_exit_code, seen_workers, + test_command_hash, .. } => { if seen_workers.read().contains_by_tag(&entity) { @@ -429,7 +478,18 @@ impl AllRuns { ?entity, "worker reconnecting for out-of-process retry manifest after initial run" ); - AssignedRunStatus::Run(AssignedRun::Retry) + + let runner_test_command_differs = test_command_hash + .map(|hash| hash != runner_test_command_hash) + .unwrap_or(false); + if runner_test_command_differs { + log_runner_test_command_differs(run_id, entity, IsRetry(true)); + } + + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Retry, + runner_test_command_differs, + }) } else { let exit_code = *new_worker_exit_code; tracing::info!( @@ -455,14 +515,16 @@ impl AllRuns { /// Otherwise, returning [ControlFlow::Continue] should fall back to loading the run from the /// queue state. #[inline] - async fn try_create_run( - &self, - run_id: &RunId, - batch_size_hint: NonZeroUsize, - test_strategy: TestStrategy, - entity: Entity, - remote: &RemotePersister, - ) -> ControlFlow { + async fn try_create_run(&self, params: RunParams<'_>) -> ControlFlow { + let RunParams { + run_id, + batch_size_hint, + test_strategy, + runner_test_command_hash, + entity, + remote, + } = params; + let result = async move { match remote.try_load_run_state(run_id).await? { LoadedRunState::Found(run_state) => { @@ -475,6 +537,7 @@ impl AllRuns { batch_size_hint, test_strategy, entity, + runner_test_command_hash, )) } LoadedRunState::IncompatibleSchemaVersion { found, expected } => { @@ -489,6 +552,7 @@ impl AllRuns { batch_size_hint, test_strategy, entity, + runner_test_command_hash, )) } } @@ -517,6 +581,7 @@ impl AllRuns { batch_size_hint: NonZeroUsize, test_strategy: TestStrategy, entity: Entity, + test_command_hash: TestCommandHash, ) -> ControlFlow { let run = { let mut worker_timings = WorkerSet::default(); @@ -528,6 +593,7 @@ impl AllRuns { worker_connection_times: Mutex::new(worker_timings), batch_size_hint, test_strategy, + test_command_hash, }, } }; @@ -548,8 +614,11 @@ impl AllRuns { tracing::info!(?run_id, ?entity, "created fresh run"); - ControlFlow::Break(AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: true, + ControlFlow::Break(AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: true, + }, + runner_test_command_differs: false, })) } else { ControlFlow::Continue(()) @@ -566,6 +635,7 @@ impl AllRuns { new_worker_exit_code, init_metadata, seen_workers, + test_command_hash, } = run_state; // No worker is currently active, as this is the first time we're seeing them on this @@ -587,6 +657,7 @@ impl AllRuns { results_persistence: ResultsPersistence::Persisted(ResultsPersistedCell::new( run_id.clone(), )), + test_command_hash, }, }; @@ -622,28 +693,35 @@ impl AllRuns { let mut run = runs.get(run_id).expect("no run recorded").write(); - let (worker_connection_times, batch_size_hint, test_strategy) = match &mut run.state { - RunState::WaitingForManifest { - worker_connection_times, - batch_size_hint, - test_strategy, - } => { - // expected state, pass through - let timings = Mutex::into_inner(std::mem::take(worker_connection_times)); - (timings, *batch_size_hint, *test_strategy) - } - RunState::Cancelled { .. } => { - // If cancelled, do nothing. - return AddedManifest::RunCancelled; - } - RunState::HasWork { .. } | RunState::InitialManifestDone { .. } => { - illegal_state!( - "can only provide manifest while waiting for manifest", - ?run_id - ); - return AddedManifest::RunCancelled; - } - }; + let (worker_connection_times, batch_size_hint, test_strategy, test_command_hash) = + match &mut run.state { + RunState::WaitingForManifest { + worker_connection_times, + batch_size_hint, + test_strategy, + test_command_hash, + } => { + // expected state, pass through + let timings = Mutex::into_inner(std::mem::take(worker_connection_times)); + ( + timings, + *batch_size_hint, + *test_strategy, + *test_command_hash, + ) + } + RunState::Cancelled { .. } => { + // If cancelled, do nothing. + return AddedManifest::RunCancelled; + } + RunState::HasWork { .. } | RunState::InitialManifestDone { .. } => { + illegal_state!( + "can only provide manifest while waiting for manifest", + ?run_id + ); + return AddedManifest::RunCancelled; + } + }; let manifest_size_nonce = flat_manifest.len() as u64; let work_from_manifest = flat_manifest.into_iter().map(|(spec, group_id)| { @@ -673,6 +751,7 @@ impl AllRuns { init_metadata, active_workers: Mutex::new(active_workers), results_persistence: results_persistence.clone(), + test_command_hash, }; AddedManifest::Added { @@ -1050,12 +1129,14 @@ impl AllRuns { let queue; let init_metadata; let results_persistence; + let test_command_hash; match &mut run.state { RunState::HasWork { queue: this_queue, active_workers: this_active_workers, init_metadata: this_init_metadata, results_persistence: this_results_persistence, + test_command_hash: this_test_command_hash, .. } => { log_assert!( @@ -1066,6 +1147,7 @@ impl AllRuns { queue = std::mem::take(this_queue); init_metadata = std::mem::take(this_init_metadata); results_persistence = this_results_persistence.clone(); + test_command_hash = *this_test_command_hash; } RunState::Cancelled { .. } => { // Cancellation always takes priority over completeness. @@ -1106,6 +1188,7 @@ impl AllRuns { new_worker_exit_code, init_metadata: init_metadata.clone(), seen_workers: seen_workers.iter().map(|(worker, _)| *worker).collect(), + test_command_hash: Some(test_command_hash), }, ); @@ -1115,6 +1198,7 @@ impl AllRuns { seen_workers: RwLock::new(seen_workers), manifest_persistence: ManifestPersistence::Persisted(manifest_persisted), results_persistence: ResultsPersistence::Persisted(results_persistence), + test_command_hash: Some(test_command_hash), }; // NB: Always sub last for conversative estimation. @@ -1131,9 +1215,12 @@ impl AllRuns { let mut run = runs.get(&run_id).expect("no run recorded").write(); - match run.state { - RunState::WaitingForManifest { .. } => { + let test_command_hash = match run.state { + RunState::WaitingForManifest { + test_command_hash, .. + } => { // okay + test_command_hash } RunState::Cancelled { .. } => { // No-op, since the run was already cancelled. @@ -1146,7 +1233,7 @@ impl AllRuns { ); return; } - } + }; run.state = RunState::InitialManifestDone { new_worker_exit_code: ExitCode::FAILURE, @@ -1154,6 +1241,7 @@ impl AllRuns { seen_workers: Default::default(), manifest_persistence: ManifestPersistence::ManifestNeverReceived, results_persistence: ResultsPersistence::ManifestNeverReceived, + test_command_hash: Some(test_command_hash), }; // NB: Always sub last for conversative estimation. @@ -1166,9 +1254,12 @@ impl AllRuns { let mut run = runs.get(&run_id).expect("no run recorded").write(); - match run.state { - RunState::WaitingForManifest { .. } => { + let test_command_hash = match run.state { + RunState::WaitingForManifest { + test_command_hash, .. + } => { // okay + test_command_hash } RunState::Cancelled { .. } => { // No-op, since the run was already cancelled. @@ -1181,7 +1272,7 @@ impl AllRuns { ); return RecordedEmptyManifest::RunCancelled; } - } + }; let results_persistence = ResultsPersistedCell::new(run_id); @@ -1192,6 +1283,7 @@ impl AllRuns { seen_workers: Default::default(), manifest_persistence: ManifestPersistence::EmptyManifest, results_persistence: ResultsPersistence::Persisted(results_persistence.clone()), + test_command_hash: Some(test_command_hash), }; // NB: Always sub last for conversative estimation. @@ -1381,6 +1473,17 @@ impl AllRuns { } } +struct IsRetry(bool); + +fn log_runner_test_command_differs(run_id: &RunId, runner_entity: Entity, is_retry: IsRetry) { + tracing::warn!( + ?run_id, + entity=?runner_entity, + is_retry=is_retry.0, + "Runner has mismatched test command for run", + ); +} + pub struct Abq { shutdown_manager: ShutdownManager, @@ -1656,6 +1759,7 @@ impl GetAssignedRun for ChooseRunForWorker { run_id, batch_size_hint, test_strategy, + test_command_hash, } = invoke_work; let batch_size_hint = @@ -1670,13 +1774,14 @@ impl GetAssignedRun for ChooseRunForWorker { let assigned_run = self .queues - .find_or_create_run( + .find_or_create_run(RunParams { run_id, batch_size_hint, - *test_strategy, + test_strategy: *test_strategy, + runner_test_command_hash: *test_command_hash, entity, - &self.remote, - ) + remote: &self.remote, + }) .await; // Now that we've found a run for this ID, if the run is fresh, enqueue a job to check @@ -2735,21 +2840,10 @@ async fn fetch_persisted_manifest( } #[cfg(test)] -use abq_utils::net_protocol::runners::ProtocolWitness; - -#[cfg(test)] -fn fake_test_spec(proto: ProtocolWitness) -> TestSpec { - use abq_utils::net_protocol::{runners::TestCase, workers::WorkId}; - - TestSpec { - test_case: TestCase::new(proto, "fake-test", Default::default()), - work_id: WorkId::new(), - } -} +mod test_utils; #[cfg(test)] mod test { - use abq_utils::net_protocol::queue::TestStrategy; use std::{io, time::Instant}; use crate::{ @@ -2760,15 +2854,15 @@ mod test { run_state, }, queue::{ - AddedManifest, CancelReason, ManifestProgressCancelReason, ManifestProgressResult, - PulledTestsStatus, QueueServer, RunStatus, SharedRuns, WorkScheduler, + test_utils::RunParamsBuilder, AddedManifest, CancelReason, + ManifestProgressCancelReason, ManifestProgressResult, PulledTestsStatus, QueueServer, + RunStatus, SharedRuns, WorkScheduler, }, timeout::RunTimeoutManager, }; use abq_run_n_times::n_times; use abq_test_utils::{ - accept_handshake, assert_scoped_log, build_fake_connection, build_random_strategies, - one_nonzero_usize, spec, + accept_handshake, assert_scoped_log, build_fake_connection, build_random_strategies, spec, }; use abq_utils::{ auth::{build_strategies, AdminToken, ClientAuthStrategy, ServerAuthStrategy, UserToken}, @@ -2788,10 +2882,11 @@ mod test { workers::{RunId, WorkId}, }, server_shutdown::ShutdownManager, + test_command_hash::TestCommandHash, tls::{ClientTlsStrategy, ServerTlsStrategy}, }; use abq_with_protocol_version::with_protocol_version; - use abq_workers::{AssignedRun, AssignedRunStatus}; + use abq_workers::{AssignedRun, AssignedRunKind, AssignedRunStatus}; use futures::FutureExt; use tracing_test::traced_test; @@ -3112,11 +3207,7 @@ mod test { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote::NoopPersister::new().into(), + RunParamsBuilder::new(&run_id, &remote::NoopPersister::new().into()).build(), ) .await; @@ -3132,11 +3223,7 @@ mod test { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote::NoopPersister::new().into(), + RunParamsBuilder::new(&run_id, &remote::NoopPersister::new().into()).build(), ) .await; let added = queues.add_manifest(&run_id, vec![], Default::default()); @@ -3154,11 +3241,7 @@ mod test { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote::NoopPersister::new().into(), + RunParamsBuilder::new(&run_id, &remote::NoopPersister::new().into()).build(), ) .await; let added = queues.add_manifest(&run_id, vec![], Default::default()); @@ -3182,11 +3265,7 @@ mod test { for run_id in [&run_id1, &run_id2, &run_id3, &run_id4] { let _ = queues .find_or_create_run( - run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote::NoopPersister::new().into(), + RunParamsBuilder::new(run_id, &remote::NoopPersister::new().into()).build(), ) .await; } @@ -3212,11 +3291,9 @@ mod test { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote::NoopPersister::new().into(), + RunParamsBuilder::new(&run_id, &remote::NoopPersister::new().into()) + .entity(Entity::runner(0, 1)) + .build(), ) .await; @@ -3241,11 +3318,9 @@ mod test { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(Entity::runner(0, 1)) + .build(), ) .await; let _ = queues.add_manifest(&run_id, vec![], Default::default()); @@ -3272,11 +3347,9 @@ mod test { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(Entity::runner(0, 1)) + .build(), ) .await; @@ -3307,21 +3380,25 @@ mod test { let remote = remote::NoopPersister::new().into(); + let test_command_hash = TestCommandHash::random(); + // worker0 creates run { let assigned_lookup = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - worker0, - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(worker0) + .runner_test_command_hash(test_command_hash) + .build(), ) .await; assert_eq!( assigned_lookup, - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: true + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: true + }, + runner_test_command_differs: false }) ); assert_eq!(queues.get_run_status(&run_id), Some(RunStatus::Active)); @@ -3332,17 +3409,19 @@ mod test { { let assigned_lookup = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - worker1, - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(worker1) + .runner_test_command_hash(test_command_hash) + .build(), ) .await; assert_eq!( assigned_lookup, - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: false + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: false + }, + runner_test_command_differs: false }) ); assert_eq!(queues.get_run_status(&run_id), Some(RunStatus::Active)); @@ -3361,17 +3440,19 @@ mod test { { let assigned_lookup = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - worker2, - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(worker2) + .runner_test_command_hash(test_command_hash) + .build(), ) .await; assert_eq!( assigned_lookup, - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: false + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: false + }, + runner_test_command_differs: false }) ); assert_eq!(queues.get_run_status(&run_id), Some(RunStatus::Active)); @@ -3446,11 +3527,9 @@ mod test { let queues = SharedRuns::default(); let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(Entity::runner(0, 1)) + .build(), ) .await; queues @@ -3532,11 +3611,9 @@ mod test { let queues = SharedRuns::default(); let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(Entity::runner(0, 1)) + .build(), ) .await; queues @@ -3616,11 +3693,9 @@ mod test { let queues = SharedRuns::default(); let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(Entity::runner(0, 1)) + .build(), ) .await; let added = queues.add_manifest(&run_id, vec![], Default::default()); @@ -3665,13 +3740,7 @@ mod test { let run_id = RunId::unique(); let _ = queues - .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, - ) + .find_or_create_run(RunParamsBuilder::new(&run_id, &remote).build()) .await; let result = queues.handle_manifest_progress_timeout(&run_id, 0); @@ -3692,13 +3761,7 @@ mod test { let run_id = RunId::unique(); let _ = queues - .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, - ) + .find_or_create_run(RunParamsBuilder::new(&run_id, &remote).build()) .await; let _ = queues.add_manifest(&run_id, vec![(spec(1), GroupId::new())], Default::default()); @@ -3721,11 +3784,9 @@ mod test { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(Entity::runner(0, 1)) + .build(), ) .await; let _ = queues.add_manifest( @@ -3754,11 +3815,9 @@ mod test { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(Entity::runner(0, 1)) + .build(), ) .await; let _ = queues.add_manifest(&run_id, vec![(spec(1), GroupId::new())], Default::default()); @@ -3777,13 +3836,7 @@ mod test { let run_id = RunId::unique(); let _ = queues - .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, - ) + .find_or_create_run(RunParamsBuilder::new(&run_id, &remote).build()) .await; queues.mark_cancelled(&run_id, Entity::runner(0, 1), CancelReason::User); @@ -3810,13 +3863,7 @@ mod test { let run_id = RunId::unique(); let _ = queues - .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, - ) + .find_or_create_run(RunParamsBuilder::new(&run_id, &remote).build()) .await; let _ = queues.add_manifest( &run_id, @@ -3895,20 +3942,17 @@ mod test { let remote = remote::NoopPersister::new().into(); let assigned = queues - .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, - ) + .find_or_create_run(RunParamsBuilder::new(&run_id, &remote).build()) .await; assert_eq!(queues.estimate_num_active_runs(), 1); assert_eq!( assigned, - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: true + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: true + }, + runner_test_command_differs: false }) ); } @@ -3922,6 +3966,8 @@ mod test { let entity = Entity::runner(0, 1); + let test_command_hash = TestCommandHash::random(); + let remote = remote::FakePersister::builder() .on_try_load_run_state({ let expected_run_id = run_id.clone(); @@ -3929,6 +3975,7 @@ mod test { assert_eq!(run_id, &expected_run_id); let run_state = run_state::RunState { seen_workers: vec![entity], + test_command_hash: Some(test_command_hash), ..run_state::RunState::fake() }; async { Ok(LoadedRunState::Found(run_state)) }.boxed() @@ -3939,15 +3986,20 @@ mod test { let assigned = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - entity, - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(entity) + .runner_test_command_hash(test_command_hash) + .build(), ) .await; - assert_eq!(assigned, AssignedRunStatus::Run(AssignedRun::Retry)); + assert_eq!( + assigned, + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Retry, + runner_test_command_differs: false + }) + ); } #[tokio::test] @@ -3975,11 +4027,9 @@ mod test { let assigned = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(Entity::runner(0, 1)) + .build(), ) .await; @@ -4010,19 +4060,16 @@ mod test { .into(); let assigned = queues - .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, - ) + .find_or_create_run(RunParamsBuilder::new(&run_id, &remote).build()) .await; assert_eq!( assigned, - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: true + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: true + }, + runner_test_command_differs: false }) ); } @@ -4052,19 +4099,16 @@ mod test { .into(); let assigned = queues - .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, - ) + .find_or_create_run(RunParamsBuilder::new(&run_id, &remote).build()) .await; assert_eq!( assigned, - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: true + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: true + }, + runner_test_command_differs: false }) ); } @@ -4088,34 +4132,75 @@ mod test { .into(); let assigned = queues + .find_or_create_run(RunParamsBuilder::new(&run_id, &remote).build()) + .await; + + assert!(matches!(assigned, AssignedRunStatus::FatalError(_))); + } + + #[tokio::test] + #[traced_test] + async fn create_run_return_flag_if_command_is_different() { + let queues = SharedRuns::default(); + + let run_id = RunId::unique(); + + let remote = remote::NoopPersister::new().into(); + + { + let _ = queues + .find_or_create_run( + RunParamsBuilder::new(&run_id, &remote) + .entity(Entity::runner(0, 1)) + .build(), + ) + .await; + } + + let assigned_2 = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .runner_test_command_hash(TestCommandHash::random()) + .entity(Entity::runner(0, 2)) + .build(), ) .await; - assert!(matches!(assigned, AssignedRunStatus::FatalError(_))); + assert_eq!( + assigned_2, + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: false + }, + runner_test_command_differs: true + }) + ); + + assert_scoped_log( + "abq_queue::queue", + "Runner has mismatched test command for run", + ); } } #[cfg(test)] mod test_pull_work { use abq_test_utils::one_nonzero_usize; - use abq_utils::net_protocol::{ - entity::Entity, - queue::{GroupId, TestStrategy}, - workers::{RunId, WorkerTest}, + use abq_utils::{ + net_protocol::{ + entity::Entity, + queue::{GroupId, TestStrategy}, + workers::{RunId, WorkerTest}, + }, + test_command_hash::TestCommandHash, }; use abq_with_protocol_version::with_protocol_version; use crate::{ persistence::results::ResultsPersistedCell, queue::{ - fake_test_spec, JobQueue, NextWorkResult, PulledTestsStatus, RunState, RunStatus, - SharedRuns, + test_utils::fake_test_spec, JobQueue, NextWorkResult, PulledTestsStatus, RunState, + RunStatus, SharedRuns, }, }; @@ -4152,6 +4237,7 @@ mod test_pull_work { batch_size_hint, active_workers: Default::default(), results_persistence: ResultsPersistedCell::new(run_id.clone()), + test_command_hash: TestCommandHash::random(), }; let queues = SharedRuns::default(); @@ -4208,7 +4294,7 @@ mod test_pull_work { #[cfg(test)] mod persistence_on_end_of_manifest { - use abq_utils::net_protocol::queue::TestStrategy; + use abq_utils::{net_protocol::queue::TestStrategy, test_command_hash::TestCommandHash}; use futures::FutureExt; use std::{net::SocketAddr, sync::Arc, time::Duration}; @@ -4219,9 +4305,9 @@ mod persistence_on_end_of_manifest { remote::{self, RemotePersister}, results::ResultsPersistedCell, }, - queue::{fake_test_spec, NextWorkResult, RunState, SharedRuns, WorkScheduler}, + queue::{test_utils::fake_test_spec, NextWorkResult, RunState, SharedRuns, WorkScheduler}, }; - use abq_test_utils::{one_nonzero_usize, spec}; + use abq_test_utils::spec; use abq_utils::{ auth::{ClientAuthStrategy, ServerAuthStrategy}, net_async, @@ -4237,11 +4323,13 @@ mod persistence_on_end_of_manifest { tls::{ClientTlsStrategy, ServerTlsStrategy}, }; use abq_with_protocol_version::with_protocol_version; - use abq_workers::{AssignedRun, AssignedRunStatus}; + use abq_workers::{AssignedRun, AssignedRunKind, AssignedRunStatus}; use ntest::timeout; use parking_lot::Mutex; use tokio::task::JoinHandle; + use super::test_utils::RunParamsBuilder; + #[tokio::test] #[with_protocol_version] async fn worker_told_to_pull_retry_manifest() { @@ -4259,15 +4347,16 @@ mod persistence_on_end_of_manifest { let test2 = fake_test_spec(proto); let test3 = fake_test_spec(proto); + let test_command_hash = TestCommandHash::random(); + // Create run, add manifest by worker0 { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - worker0, - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(worker0) + .runner_test_command_hash(test_command_hash) + .build(), ) .await; let _ = queues.add_manifest( @@ -4294,14 +4383,19 @@ mod persistence_on_end_of_manifest { // manifest. let assigned = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - worker0_shadow, - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(worker0_shadow) + .runner_test_command_hash(test_command_hash) + .build(), ) .await; - assert_eq!(assigned, AssignedRunStatus::Run(AssignedRun::Retry)); + assert_eq!( + assigned, + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Retry, + runner_test_command_differs: false + }) + ); } struct WorkSchedulerBuilder(WorkScheduler); @@ -4382,6 +4476,7 @@ mod persistence_on_end_of_manifest { batch_size_hint, active_workers: Default::default(), results_persistence: ResultsPersistedCell::new(run_id.clone()), + test_command_hash: TestCommandHash::random(), }; let queues = SharedRuns::default(); @@ -4508,11 +4603,9 @@ mod persistence_on_end_of_manifest { let _ = queues .find_or_create_run( - &run_id, - one_nonzero_usize(), - TestStrategy::ByTest, - Entity::runner(0, 1), - &remote, + RunParamsBuilder::new(&run_id, &remote) + .entity(entity) + .build(), ) .await; let _ = queues.add_manifest( @@ -4590,6 +4683,7 @@ mod persist_results { runners::TestResult, workers::RunId, }, + test_command_hash::TestCommandHash, }; use ntest::timeout; @@ -4620,6 +4714,7 @@ mod persist_results { active_workers: Default::default(), results_persistence: results_cell.clone(), batch_size_hint: one_nonzero_usize(), + test_command_hash: TestCommandHash::random(), }; queues.set_state(run_id.clone(), done); queues @@ -4666,6 +4761,7 @@ mod persist_results { seen_workers: Default::default(), manifest_persistence: ManifestPersistence::EmptyManifest, results_persistence: ResultsPersistence::Persisted(results_cell.clone()), + test_command_hash: Some(TestCommandHash::random()), }; queues.set_state(run_id.clone(), has_work); queues @@ -4758,7 +4854,12 @@ mod persist_results { get_read_results_cell! { get_read_results_cell_when_waiting_for_manifest, - RunState::WaitingForManifest { worker_connection_times: Default::default(), batch_size_hint: one_nonzero_usize(), test_strategy: TestStrategy::ByTest }, + RunState::WaitingForManifest { + worker_connection_times: Default::default(), + batch_size_hint: one_nonzero_usize(), + test_strategy: TestStrategy::ByTest, + test_command_hash: TestCommandHash::random(), + }, Err(ReadResultsError::WaitingForManifest) } @@ -4774,6 +4875,7 @@ mod persist_results { batch_size_hint: one_nonzero_usize(), active_workers: parking_lot::Mutex::new(active_workers), results_persistence: ResultsPersistedCell::new(RunId::unique()), + test_command_hash: TestCommandHash::random(), } }, Ok(ReadResultsState::OutstandingRunners(r)) if r == &[Tag::runner(1, 1)] @@ -4791,6 +4893,7 @@ mod persist_results { seen_workers: parking_lot::RwLock::new(active_workers), results_persistence: ResultsPersistence::Persisted(ResultsPersistedCell::new(RunId::unique())), manifest_persistence: ManifestPersistence::EmptyManifest, + test_command_hash: Some(TestCommandHash::random()), } }, Ok(ReadResultsState::ReadFromCell(..)) @@ -4808,6 +4911,7 @@ mod persist_results { seen_workers: parking_lot::RwLock::new(active_workers), results_persistence: ResultsPersistence::Persisted(ResultsPersistedCell::new(RunId::unique())), manifest_persistence: ManifestPersistence::EmptyManifest, + test_command_hash: Some(TestCommandHash::random()), } }, Ok(ReadResultsState::OutstandingRunners(r)) if r == &[Tag::runner(2, 1)] @@ -4822,6 +4926,7 @@ mod persist_results { seen_workers: Default::default(), results_persistence: ResultsPersistence::ManifestNeverReceived, manifest_persistence: ManifestPersistence::EmptyManifest, + test_command_hash: Some(TestCommandHash::random()), } }, Err(ReadResultsError::ManifestNeverReceived) @@ -4855,6 +4960,7 @@ mod persist_results { init_metadata: Default::default(), active_workers: Default::default(), results_persistence: results_cell.clone(), + test_command_hash: TestCommandHash::random(), }; queues.set_state(run_id.clone(), has_work); queues diff --git a/crates/abq_queue/src/queue/test_utils.rs b/crates/abq_queue/src/queue/test_utils.rs new file mode 100644 index 00000000..e90b5211 --- /dev/null +++ b/crates/abq_queue/src/queue/test_utils.rs @@ -0,0 +1,52 @@ +#![cfg(test)] +use abq_test_utils::one_nonzero_usize; +use abq_utils::{ + net_protocol::{ + entity::Entity, + queue::{TestSpec, TestStrategy}, + runners::ProtocolWitness, + workers::RunId, + }, + test_command_hash::TestCommandHash, +}; + +use crate::persistence::remote::RemotePersister; + +use super::RunParams; + +pub fn fake_test_spec(proto: ProtocolWitness) -> TestSpec { + use abq_utils::net_protocol::{runners::TestCase, workers::WorkId}; + + TestSpec { + test_case: TestCase::new(proto, "fake-test", Default::default()), + work_id: WorkId::new(), + } +} + +pub struct RunParamsBuilder<'a>(RunParams<'a>); +impl<'a> RunParamsBuilder<'a> { + pub fn new(run_id: &'a RunId, remote: &'a RemotePersister) -> Self { + Self(RunParams { + run_id, + batch_size_hint: one_nonzero_usize(), + test_strategy: TestStrategy::ByTest, + entity: Entity::runner(0, 1), + remote, + runner_test_command_hash: TestCommandHash::random(), + }) + } + + pub fn entity(mut self, entity: Entity) -> Self { + self.0.entity = entity; + self + } + + pub fn runner_test_command_hash(mut self, hash: TestCommandHash) -> Self { + self.0.runner_test_command_hash = hash; + self + } + + pub(super) fn build(self) -> RunParams<'a> { + self.0 + } +} diff --git a/crates/abq_queue/tests/integration.rs b/crates/abq_queue/tests/integration.rs index 0d0b109e..2e5999af 100644 --- a/crates/abq_queue/tests/integration.rs +++ b/crates/abq_queue/tests/integration.rs @@ -20,6 +20,7 @@ use abq_queue::{ queue::{Abq, QueueConfig}, RunTimeoutStrategy, TimeoutReason, }; +use abq_reporting::writer::NoopColorWriter; use abq_test_utils::{artifacts_dir, assert_scoped_log, s}; use abq_utils::{ auth::{ClientAuthStrategy, User}, @@ -271,6 +272,7 @@ impl WorkersConfigBuilder { protocol_version_timeout: DEFAULT_PROTOCOL_VERSION_TIMEOUT, test_timeout: DEFAULT_RUNNER_TEST_TIMEOUT, should_send_results: true, + warning_writer: Box::new(NoopColorWriter), }; Self { config, @@ -494,6 +496,7 @@ fn action_to_fut( run_id, batch_size_hint: one_nonzero(), test_strategy, + test_command_hash: config.runner_kind.command_hash(), }; let config = WorkersConfig { diff --git a/crates/abq_reporting/src/lib.rs b/crates/abq_reporting/src/lib.rs index d0ec1505..5d7db58e 100644 --- a/crates/abq_reporting/src/lib.rs +++ b/crates/abq_reporting/src/lib.rs @@ -8,6 +8,8 @@ use abq_utils::net_protocol::{ pub mod colors; pub mod output; +pub mod writer; + #[must_use] #[derive(Debug)] pub struct CompletedSummary { diff --git a/crates/abq_reporting/src/output.rs b/crates/abq_reporting/src/output.rs index f9b73806..a082ffde 100644 --- a/crates/abq_reporting/src/output.rs +++ b/crates/abq_reporting/src/output.rs @@ -425,7 +425,7 @@ fn green_bold_spec() -> ColorSpec { } #[inline] -fn yellow_bold_spec() -> ColorSpec { +pub fn yellow_bold_spec() -> ColorSpec { let mut spec = ColorSpec::new(); spec.set_fg(Some(Color::Yellow)).set_bold(true); spec diff --git a/crates/abq_reporting/src/writer.rs b/crates/abq_reporting/src/writer.rs new file mode 100644 index 00000000..4186c635 --- /dev/null +++ b/crates/abq_reporting/src/writer.rs @@ -0,0 +1,42 @@ +use crate::output::yellow_bold_spec; + +pub trait ColorWriter { + fn warn_line(&mut self, msg: &str) -> std::io::Result<()>; +} + +pub struct NoopColorWriter; + +impl std::io::Write for NoopColorWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl termcolor::WriteColor for NoopColorWriter { + fn supports_color(&self) -> bool { + false + } + + fn set_color(&mut self, _: &termcolor::ColorSpec) -> std::io::Result<()> { + Ok(()) + } + + fn reset(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl ColorWriter for T { + fn warn_line(&mut self, message: &str) -> std::io::Result<()> { + self.set_color(&yellow_bold_spec())?; + self.write_all(b"WARNING: ")?; + self.write_all(message.as_bytes())?; + self.write_all(b"\n")?; + self.reset()?; + Ok(()) + } +} diff --git a/crates/abq_utils/Cargo.toml b/crates/abq_utils/Cargo.toml index e3ba6182..091cec7b 100644 --- a/crates/abq_utils/Cargo.toml +++ b/crates/abq_utils/Cargo.toml @@ -20,6 +20,8 @@ rand.workspace = true rand_chacha.workspace = true uuid.workspace = true +blake3.workspace = true + thiserror.workspace = true async-trait.workspace = true parking_lot.workspace = true diff --git a/crates/abq_utils/src/lib.rs b/crates/abq_utils/src/lib.rs index 8b8f291b..7f26f93b 100644 --- a/crates/abq_utils/src/lib.rs +++ b/crates/abq_utils/src/lib.rs @@ -13,6 +13,7 @@ pub mod oneshot_notify; pub mod results_handler; pub mod retry; pub mod server_shutdown; +pub mod test_command_hash; pub mod time; pub mod timeout_future; pub mod tls; diff --git a/crates/abq_utils/src/net_protocol.rs b/crates/abq_utils/src/net_protocol.rs index 8d432881..5178b779 100644 --- a/crates/abq_utils/src/net_protocol.rs +++ b/crates/abq_utils/src/net_protocol.rs @@ -235,7 +235,7 @@ pub mod workers { queue::TestSpec, runners::{AbqProtocolVersion, Manifest, ManifestMessage, NativeRunnerSpecification}, }; - use crate::capture_output::StdioOutput; + use crate::{capture_output::StdioOutput, test_command_hash::TestCommandHash}; use serde_derive::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -377,6 +377,17 @@ pub mod workers { } } } + + pub fn command_hash(&self) -> TestCommandHash { + match self { + RunnerKind::GenericNativeTestRunner(params) => { + TestCommandHash::from_command(¶ms.cmd, ¶ms.args) + } + RunnerKind::TestLikeRunner(_, _) => { + TestCommandHash::from_command("test-like-runner", &[]) + } + } + } } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] @@ -470,7 +481,7 @@ pub mod queue { runners::{AbqProtocolVersion, NativeRunnerSpecification, TestCase, TestResult}, workers::{ManifestResult, RunId, WorkId}, }; - use crate::capture_output::StdioOutput; + use crate::{capture_output::StdioOutput, test_command_hash::TestCommandHash}; /// Information about the queue and its negotiation server. #[derive(Serialize, Deserialize, Debug, Clone)] @@ -535,6 +546,7 @@ pub mod queue { pub run_id: RunId, pub batch_size_hint: NonZeroU64, pub test_strategy: TestStrategy, + pub test_command_hash: TestCommandHash, } /// Specification of a test case to run. diff --git a/crates/abq_utils/src/test_command_hash.rs b/crates/abq_utils/src/test_command_hash.rs new file mode 100644 index 00000000..8ffa755f --- /dev/null +++ b/crates/abq_utils/src/test_command_hash.rs @@ -0,0 +1,54 @@ +use serde_derive::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)] +pub struct TestCommandHash([u8; 32]); + +impl TestCommandHash { + pub fn from_command(command: &str, args: &[String]) -> Self { + let mut hasher = blake3::Hasher::new(); + hasher.update(command.as_bytes()); + for arg in args { + hasher.update(arg.as_bytes()); + } + Self(hasher.finalize().into()) + } + + pub fn random() -> Self { + let mut hasher = blake3::Hasher::new(); + hasher.update(&rand::random::<[u8; 32]>()); + Self(hasher.finalize().into()) + } +} + +#[cfg(test)] +mod test { + use super::TestCommandHash; + + #[test] + fn random_is_random() { + let one = TestCommandHash::random(); + let two = TestCommandHash::random(); + assert_ne!(one, two); + } + + #[test] + fn consistent_for_same_command() { + let one = TestCommandHash::from_command("foo", &["arg1".into()]); + let two = TestCommandHash::from_command("foo", &["arg1".into()]); + assert_eq!(one, two); + } + + #[test] + fn different_for_different_command() { + let one = TestCommandHash::from_command("foo", &["arg1".into()]); + let two = TestCommandHash::from_command("bar", &["arg1".into()]); + assert_ne!(one, two); + } + + #[test] + fn different_for_different_args() { + let one = TestCommandHash::from_command("foo", &["arg1".into()]); + let two = TestCommandHash::from_command("foo", &["arg2".into()]); + assert_ne!(one, two); + } +} diff --git a/crates/abq_workers/Cargo.toml b/crates/abq_workers/Cargo.toml index 488206d1..6670a72b 100644 --- a/crates/abq_workers/Cargo.toml +++ b/crates/abq_workers/Cargo.toml @@ -5,6 +5,7 @@ version = "0.1.0" [dependencies] abq_generic_test_runner = { path = "../abq_runners/generic_test_runner" } +abq_reporting = { path = "../abq_reporting" } abq_utils = { path = "../abq_utils" } thiserror.workspace = true diff --git a/crates/abq_workers/src/assigned_run.rs b/crates/abq_workers/src/assigned_run.rs index 234d5465..0401203d 100644 --- a/crates/abq_workers/src/assigned_run.rs +++ b/crates/abq_workers/src/assigned_run.rs @@ -4,13 +4,21 @@ use serde_derive::{Deserialize, Serialize}; /// The test run a worker should ask for work on. #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] -pub enum AssignedRun { +pub enum AssignedRunKind { /// This worker is connecting for a fresh run, and should fetch tests online. Fresh { should_generate_manifest: bool }, /// This worker is connecting for a retry, and should fetch its manifest from the queue once. Retry, } +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct AssignedRun { + pub kind: AssignedRunKind, + /// [true] if the runner test command differs from the one associated when the run was + /// created. + pub runner_test_command_differs: bool, +} + #[must_use] #[derive(Debug, PartialEq, Eq)] pub enum AssignedRunStatus { @@ -27,8 +35,11 @@ impl AssignedRunStatus { pub fn freshly_created(&self) -> bool { matches!( self, - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: true + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: true, + }, + .. }) ) } diff --git a/crates/abq_workers/src/lib.rs b/crates/abq_workers/src/lib.rs index 4f10421c..64dc969c 100644 --- a/crates/abq_workers/src/lib.rs +++ b/crates/abq_workers/src/lib.rs @@ -9,4 +9,4 @@ pub mod workers; pub use abq_generic_test_runner::DEFAULT_PROTOCOL_VERSION_TIMEOUT; pub use abq_generic_test_runner::DEFAULT_RUNNER_TEST_TIMEOUT; -pub use assigned_run::{AssignedRun, AssignedRunStatus, GetAssignedRun}; +pub use assigned_run::{AssignedRun, AssignedRunKind, AssignedRunStatus, GetAssignedRun}; diff --git a/crates/abq_workers/src/negotiate.rs b/crates/abq_workers/src/negotiate.rs index f7749764..8cfa03a9 100644 --- a/crates/abq_workers/src/negotiate.rs +++ b/crates/abq_workers/src/negotiate.rs @@ -1,5 +1,6 @@ //! Module negotiate helps worker pools attach to queues. +use abq_reporting::writer::ColorWriter; use serde_derive::{Deserialize, Serialize}; use std::{ error::Error, @@ -14,10 +15,11 @@ use tokio::task::JoinHandle; use tracing::{error, instrument}; use crate::{ + assigned_run::AssignedRun, negotiate, runner_strategy::RunnerStrategyGenerator, workers::{WorkerContext, WorkerPool, WorkerPoolConfig, WorkersExit, WorkersExitStatus}, - AssignedRun, AssignedRunStatus, GetAssignedRun, + AssignedRunKind, AssignedRunStatus, GetAssignedRun, }; use abq_utils::{ auth::User, @@ -115,6 +117,8 @@ pub struct WorkersConfig { /// Max number of test suite run attempts pub max_run_number: u32, pub should_send_results: bool, + /// Handler to writing warnings to stderr, if any. + pub warning_writer: Box, } #[derive(Debug, Error)] @@ -228,13 +232,23 @@ impl WorkersNegotiator { protocol_version_timeout, test_timeout, should_send_results, + mut warning_writer, } = workers_config; - let some_runner_should_generate_manifest = match assigned { - AssignedRun::Fresh { + let AssignedRun { + kind: assigned_kind, + runner_test_command_differs, + } = assigned; + + if runner_test_command_differs { + write_test_command_differs_warning(&mut *warning_writer)?; + } + + let some_runner_should_generate_manifest = match assigned_kind { + AssignedRunKind::Fresh { should_generate_manifest, } => should_generate_manifest, - AssignedRun::Retry => false, + AssignedRunKind::Retry => false, }; let runner_strategy_generator = RunnerStrategyGenerator::new( @@ -244,7 +258,7 @@ impl WorkersNegotiator { work_server_addr, local_results_handler, max_run_number, - assigned, + assigned_kind, should_send_results, ); @@ -270,6 +284,21 @@ impl WorkersNegotiator { } } +fn write_test_command_differs_warning(writer: &mut dyn ColorWriter) -> std::io::Result<()> { + writer.warn_line( + "The passed test command differs from the one used by other ABQ workers for this run ID.", + )?; + writer.warn_line( + "This may cause unexpected behavior, such as tests being run multiple times or not at all.", + )?; + writer.warn_line("Please ensure that all workers are using the same test command.")?; + writer.warn_line("")?; + writer.warn_line("If you need to use environment variables like ABQ_RUNNER or ABQ_WORKER,")?; + writer + .warn_line("consider evaluating them inside of the test process instead of the command.")?; + Ok(()) +} + /// Waits to receive worker execution context from a queue negotiator. /// This call will block on the result, but be composed of only non-blocking calls to the queue /// negotiator. @@ -596,10 +625,12 @@ mod test { use super::{MessageToQueueNegotiator, QueueNegotiator, WorkersNegotiator}; use crate::assigned_run::fake::MockGetAssignedRun; + use crate::assigned_run::AssignedRun; use crate::negotiate::{AssignedRunStatus, WorkersConfig}; use crate::workers::{WorkerContext, WorkersExitStatus}; - use crate::{AssignedRun, DEFAULT_RUNNER_TEST_TIMEOUT}; + use crate::{AssignedRunKind, DEFAULT_RUNNER_TEST_TIMEOUT}; use abq_generic_test_runner::DEFAULT_PROTOCOL_VERSION_TIMEOUT; + use abq_reporting::writer::NoopColorWriter; use abq_test_utils::one_nonzero; use abq_utils::auth::{ build_strategies, Admin, AdminToken, ClientAuthStrategy, ServerAuthStrategy, User, @@ -621,6 +652,7 @@ mod test { }; use abq_utils::results_handler::NoopResultsHandler; use abq_utils::server_shutdown::ShutdownManager; + use abq_utils::test_command_hash::TestCommandHash; use abq_utils::tls::{ClientTlsStrategy, ServerTlsStrategy}; use abq_utils::{net_async, net_protocol}; use abq_with_protocol_version::with_protocol_version; @@ -865,8 +897,11 @@ mod test { )); let get_assigned_run = MockGetAssignedRun::new(|| { - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: true, + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: true, + }, + runner_test_command_differs: false, }) }); @@ -898,12 +933,14 @@ mod test { protocol_version_timeout: DEFAULT_PROTOCOL_VERSION_TIMEOUT, test_timeout: DEFAULT_RUNNER_TEST_TIMEOUT, should_send_results: true, + warning_writer: Box::new(NoopColorWriter), }; let invoke_data = InvokeWork { run_id, batch_size_hint: one_nonzero(), test_strategy: TestStrategy::ByTest, + test_command_hash: TestCommandHash::random(), }; let mut workers = WorkersNegotiator::negotiate_and_start_pool( @@ -958,8 +995,11 @@ mod test { "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), MockGetAssignedRun::new(|| { - AssignedRunStatus::Run(AssignedRun::Fresh { - should_generate_manifest: true, + AssignedRunStatus::Run(AssignedRun { + kind: AssignedRunKind::Fresh { + should_generate_manifest: true, + }, + runner_test_command_differs: false, }) }), ) diff --git a/crates/abq_workers/src/runner_strategy.rs b/crates/abq_workers/src/runner_strategy.rs index f98e13b0..3d4f3c07 100644 --- a/crates/abq_workers/src/runner_strategy.rs +++ b/crates/abq_workers/src/runner_strategy.rs @@ -24,7 +24,7 @@ use crate::{ results_handler::{MultiplexingResultsHandler, QueueResultsSender}, test_fetching, workers::{GetInitContext, InitContextResult, NotifyManifest, NotifyMaterialTestsAllRun}, - AssignedRun, + AssignedRunKind, }; pub struct RunnerStrategy { @@ -49,7 +49,7 @@ pub struct RunnerStrategyGenerator { work_server_addr: SocketAddr, local_results_handler: SharedResultsHandler, max_run_number: u32, - assigned: AssignedRun, + assigned: AssignedRunKind, should_send_results: bool, } @@ -61,7 +61,7 @@ impl RunnerStrategyGenerator { work_server_addr: SocketAddr, local_results_handler: SharedResultsHandler, max_run_number: u32, - assigned: AssignedRun, + assigned: AssignedRunKind, should_send_results: bool, ) -> Self { Self { @@ -91,8 +91,8 @@ impl StrategyGenerator for RunnerStrategyGenerator { } = &self; let sourcing_strategy = match assigned { - AssignedRun::Fresh { .. } => test_fetching::SourcingStrategy::Fresh, - AssignedRun::Retry => test_fetching::SourcingStrategy::Retry, + AssignedRunKind::Fresh { .. } => test_fetching::SourcingStrategy::Fresh, + AssignedRunKind::Retry => test_fetching::SourcingStrategy::Retry, }; let (tests_fetcher, results_retry_tracker) = test_fetching::Fetcher::new(