From 60c5795e14c27c9db13bf3deb9ec04bd0a321a10 Mon Sep 17 00:00:00 2001 From: Raimundo Henriques Date: Wed, 15 Nov 2023 16:32:38 +0000 Subject: [PATCH] feat: integration test for quitting participant --- src/main.rs | 2 +- tests/cli.rs | 133 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 681cacb..031ebad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -389,7 +389,7 @@ async fn main() -> Result<(), Box> { let disconnected = disconnected.1.0.clone(); - if swarm.connected_peers().count() == 0 { + if swarm.connected_peers().count() == 0 && !is_leader { println!("The benchmark leader cancelled the session."); std::process::exit(1); } diff --git a/tests/cli.rs b/tests/cli.rs index 780a30f..489dac9 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -70,6 +70,139 @@ fn invalid_address() -> Result<(), Box> { Ok(()) } +#[test] +fn quit_and_rejoin_session() -> Result<(), Box> { + let mut new_session = new_command("foo", None, "tests/test_files/valid_json.json")?; + + let mut leader = new_session + .stdout(Stdio::piped()) + .stdin(Stdio::piped()) + .spawn()?; + let stdout = leader.stdout.take().unwrap(); + let reader = BufReader::new(stdout); + let stdin = leader.stdin.take().unwrap(); + let mut writer = BufWriter::new(stdin); + let mut lines = reader.lines(); + + let address = loop { + if let Some(Ok(l)) = lines.next() { + if l.contains("--address=/ip4/") { + break l + .split(" ") + .find(|s| s.contains("--address=/ip4/")) + .unwrap() + .replace("--address=", ""); + } + } + }; + + let bar_address = address.clone(); + let bar_handle = thread::spawn(move || { + let mut participant = new_command( + "bar", + Some(&bar_address), + "tests/test_files/valid_json.json", + ) + .unwrap() + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let stdout = participant.stdout.take().unwrap(); + let reader = BufReader::new(stdout); + let mut lines = reader.lines(); + + while let Some(Ok(l)) = lines.next() { + println!("bar > {l}"); + if l.contains("- foo") { + participant.kill().unwrap(); + // break; + } + } + }); + + // while let Some(Ok(l)) = lines.next() { + // println!("foo > {l}"); + // if l.contains("bar disconnected") { + // break; + // } + // } + + // bar_handle.join().unwrap(); + + let mut threads = vec![]; + for name in ["baz", "qux"] { + sleep(Duration::from_millis(200)); + let address = address.clone(); + threads.push(thread::spawn(move || { + let mut participant = + new_command(name, Some(&address), "tests/test_files/valid_json.json") + .unwrap() + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let stdout = participant.stdout.take().unwrap(); + let reader = BufReader::new(stdout); + let stdin = participant.stdin.take().unwrap(); + let mut writer = BufWriter::new(stdin); + let mut lines = reader.lines(); + + while let Some(Ok(l)) = lines.next() { + println!("{name} > {l}"); + + if l.contains("Do you want to join the benchmark?") { + sleep(Duration::from_millis(200)); + writeln!(writer, "y").unwrap(); + writer.flush().unwrap(); + } + + if l.contains("results") { + participant.kill().unwrap(); + return; + } + } + })); + } + + let mut participant_count = 1; + let mut benchmark_complete = false; + println!("Waiting for next line from foo"); + println!("lines.next(): {:?}", lines.next()); + while let Some(Ok(l)) = lines.next() { + println!("foo > {}", l); + if l.contains("- baz") || l.contains("- qux") { + participant_count += 1; + } + if participant_count == 3 { + sleep(Duration::from_millis(200)); + writeln!(writer, "").unwrap(); + writer.flush().unwrap(); + } + if l.contains("results") { + benchmark_complete = true; + break; + } + } + + sleep(Duration::from_millis(200)); + leader.kill()?; + + for t in threads { + t.join().unwrap(); + } + + if benchmark_complete { + Ok(()) + } else { + Err(Box::new(Error::new( + ErrorKind::Other, + "Could not complete benchmark", + ))) + } +} + #[test] fn session() -> Result<(), Box> { let mut new_session = new_command("foo", None, "tests/test_files/valid_json.json")?;