Skip to content

Commit

Permalink
Log warnings when workers have different commands (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
ayazhafiz authored Aug 1, 2023
1 parent 8567a8c commit b6dfff7
Show file tree
Hide file tree
Showing 24 changed files with 819 additions and 289 deletions.
11 changes: 9 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
## 1.6.2

ABQ 1.6.2 is a patch release fixing an issue that could result in
denial-of-service of an ABQ queue due to large test results.
ABQ 1.6.2 is a patch release.

An issue that could result in denial-of-service of an ABQ queue due to large
test results is fixed.

ABQ will now log a warning when a worker for a given run ID is using a test
command different from any other worker, either in the same run or during
retries. ABQ may not function properly if the test command for a run ID changes
between workers executing tests for that run ID.

## 1.6.1

Expand Down
39 changes: 37 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
rand = "0.8.5"
rand_chacha = "0.3.1"

blake3 = "=1.4.0"

tracing = { version = "0.1.37", features = ["release_max_level_info"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] }
tracing-appender = "0.2.2"
Expand Down
2 changes: 1 addition & 1 deletion crates/abq_cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "abq"
version = "1.6.1"
version = "1.6.2"
edition = "2021"

[dependencies]
Expand Down
4 changes: 4 additions & 0 deletions crates/abq_cli/src/reporting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ impl StdoutPreferences {
pub fn stdout_stream(&self) -> impl termcolor::WriteColor {
StandardStream::stdout(self.color)
}

pub fn stderr_stream(&self) -> impl termcolor::WriteColor {
StandardStream::stderr(self.color)
}
}

fn reporter_from_kind(
Expand Down
4 changes: 4 additions & 0 deletions crates/abq_cli/src/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ pub async fn start_workers_standalone(
run_id: run_id.clone(),
batch_size_hint: batch_size,
test_strategy,
test_command_hash: runner_kind.command_hash(),
};

let stderr_writer = Box::new(stdout_preferences.stderr_stream());

let mut worker_pool = WorkersNegotiator::negotiate_and_start_pool(
WorkersConfig {
tag,
Expand All @@ -101,6 +104,7 @@ pub async fn start_workers_standalone(
results_batch_size_hint: batch_size.get(),
max_run_number,
should_send_results: execution_mode == ExecutionMode::WriteNormal,
warning_writer: stderr_writer,
},
queue_negotiator,
client_opts,
Expand Down
86 changes: 86 additions & 0 deletions crates/abq_cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4649,3 +4649,89 @@ fn test_run_with_pat_and_run_id_that_doesnt_exist() {

term(queue_proc);
}

#[test]
#[with_protocol_version]
#[serial]
fn warn_on_different_runner_command() {
let name = "warn_on_different_runner_command";
let conf = CSConfigOptions {
use_auth_token: true,
tls: true,
};

let (queue_proc, queue_addr) = setup_queue!(name, conf);

let proto = AbqProtocolVersion::V0_2.get_supported_witness().unwrap();

let manifest = vec![TestOrGroup::test(Test::new(
proto,
"test1".to_string(),
[],
Default::default(),
))];

let manifest = ManifestMessage::new(Manifest::new(manifest, Default::default()));

let simulation = [
Connect,
//
// Write spawn message
OpaqueWrite(pack(legal_spawned_message(proto))),
//
// Write the manifest if we need to.
// Otherwise handle the one test.
IfGenerateManifest {
then_do: vec![OpaqueWrite(pack(&manifest))],
else_do: vec![
//
// Read init context message + write ACK
OpaqueRead,
OpaqueWrite(pack(InitSuccessMessage::new(proto))),
// Read first test, write okay
OpaqueRead,
OpaqueWrite(pack(RawTestResultMessage::fake(proto))),
],
},
//
// Finish
Exit(0),
];

let packed = pack_msgs_to_disk(simulation);

let test_args = |instance: usize| {
let simulator = native_runner_simulation_bin();
let simfile_path = packed.path.display().to_string();
let args = vec![
format!("test"),
format!("--worker=0"),
format!("--queue-addr={queue_addr}"),
format!("--run-id=test-run-id"),
];
let mut args = conf.extend_args_for_client(args);
args.extend([s!("--"), simulator, simfile_path]);
// Add a fake argument to differentiate the two runs.
args.push("--fake-instance-arg".to_string());
args.push(instance.to_string());
args
};

let instance1 = Abq::new(format!("{name}_inst1")).args(test_args(0)).run();

assert!(instance1.exit_status.success());

let instance2 = Abq::new(format!("{name}_inst2")).args(test_args(1)).run();

assert!(instance2.exit_status.success());

let stdout = instance2.stdout;
let stderr = instance2.stderr;

assert!(
stderr.contains("WARNING: The passed test command differs from the one used by other ABQ workers for this run ID."),
"STDOUT:\n{stdout}\nSTDERR:\n{stderr}"
);

term(queue_proc);
}
1 change: 1 addition & 0 deletions crates/abq_queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ s3 = [
]

[dev-dependencies]
abq_reporting = { path = "../abq_reporting" }
abq_utils = { path = "../abq_utils", features = ["expose-native-protocols"] }
abq_test_utils = { path = "../abq_test_support/abq_test_utils" }
abq_with_protocol_version = { path = "../abq_test_support/with_protocol_version" }
Expand Down
67 changes: 66 additions & 1 deletion crates/abq_queue/src/persistence/run_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use abq_utils::{
error::OpaqueResult,
exit::ExitCode,
net_protocol::{entity::Entity, runners::MetadataMap, workers::RunId},
test_command_hash::TestCommandHash,
};
use serde_derive::{Deserialize, Serialize};
use thiserror::Error;
Expand All @@ -27,6 +28,9 @@ pub struct RunState {
pub new_worker_exit_code: ExitCode,
pub init_metadata: MetadataMap,
pub seen_workers: Vec<Entity>,
/// NB: Optional because this did not exist prior to ABQ 1.6.2.
#[serde(skip_serializing_if = "Option::is_none")]
pub test_command_hash: Option<TestCommandHash>,
}

impl RunState {
Expand All @@ -36,6 +40,7 @@ impl RunState {
new_worker_exit_code: ExitCode::SUCCESS,
init_metadata: MetadataMap::new(),
seen_workers: Vec::new(),
test_command_hash: Some(TestCommandHash::random()),
}
}
}
Expand Down Expand Up @@ -135,6 +140,7 @@ mod test {
new_worker_exit_code: ExitCode::SUCCESS,
init_metadata: MetadataMap::new(),
seen_workers: vec![],
test_command_hash: Some(TestCommandHash::from_command("yarn", &["jest".to_owned()])),
};
let serialized = SerializableRunState::new(run_state.clone());
let serialized_bytes = serialized.serialize().unwrap();
Expand All @@ -146,7 +152,41 @@ mod test {
"schema_version": 1,
"new_worker_exit_code": 0,
"init_metadata": {},
"seen_workers": []
"seen_workers": [],
"test_command_hash": [
115,
226,
225,
160,
14,
172,
165,
205,
140,
100,
188,
76,
56,
80,
143,
200,
177,
239,
232,
249,
38,
80,
127,
129,
4,
157,
230,
41,
123,
72,
130,
80
]
}
"###);
}
Expand All @@ -170,12 +210,37 @@ mod test {
));
}

#[test]
fn test_deserialize_missing_test_command_hash() {
let serialized = r###"
{
"schema_version": 1,
"new_worker_exit_code": 0,
"init_metadata": {},
"seen_workers": []
}
"###;
let result = SerializableRunState::deserialize(serialized.as_bytes()).unwrap();
let result = result.into_run_state();

assert_eq!(
result,
RunState {
new_worker_exit_code: ExitCode::SUCCESS,
init_metadata: MetadataMap::new(),
seen_workers: vec![],
test_command_hash: None,
}
);
}

#[test]
fn test_deserialize_compatible_schema_but_different_version() {
let run_state = RunState {
new_worker_exit_code: ExitCode::SUCCESS,
init_metadata: MetadataMap::new(),
seen_workers: vec![],
test_command_hash: Some(TestCommandHash::random()),
};
let serialized = SerializableRunState(SerializedRunStateInner {
schema_version: SchemaVersion {
Expand Down
Loading

0 comments on commit b6dfff7

Please sign in to comment.