diff --git a/scenario.json b/scenario.json new file mode 100644 index 0000000..06e7d9e --- /dev/null +++ b/scenario.json @@ -0,0 +1,48 @@ +{ + "workers_count": 4, + "steps": [ + "StopAllShards", + { + "InvokeAndAwaitWorkersAsync": "Invoke, RestartShardManager, StartShards" + }, + "RestartShardManager", + { + "Sleep": { + "secs": 3, + "nanos": 0 + } + }, + { + "StartShards": 4 + }, + "WaitForInvokeAndAwaitResult", + "StopAllShards", + "RestartShardManager", + { + "StartShards": 4 + }, + "RestartShardManager", + { + "InvokeAndAwaitWorkersAsync": "StartShards, RestartShardManager, Invoke" + }, + "WaitForInvokeAndAwaitResult", + "StopAllShards", + "RestartShardManager", + { + "StartShards": 4 + }, + { + "StopShards": 3 + }, + { + "Sleep": { + "secs": 3, + "nanos": 0 + } + }, + { + "InvokeAndAwaitWorkersAsync": "StartShards(4), StopShards(3), Invoke" + }, + "WaitForInvokeAndAwaitResult" + ] +} diff --git a/scenario1.json b/scenario1.json new file mode 100644 index 0000000..75187da --- /dev/null +++ b/scenario1.json @@ -0,0 +1,20 @@ +{ + "workers_count": 4, + "steps": [ + "StopAllShards", + { + "InvokeAndAwaitWorkersAsync": "Invoke, RestartShardManager, StartShards" + }, + "RestartShardManager", + { + "Sleep": { + "secs": 3, + "nanos": 0 + } + }, + { + "StartShards": 4 + }, + "WaitForInvokeAndAwaitResult" + ] +} diff --git a/scenario2.json b/scenario2.json new file mode 100644 index 0000000..44d0e38 --- /dev/null +++ b/scenario2.json @@ -0,0 +1,15 @@ +{ + "workers_count": 4, + "steps": [ + "StopAllShards", + "RestartShardManager", + { + "StartShards": 4 + }, + "RestartShardManager", + { + "InvokeAndAwaitWorkersAsync": "StartShards, RestartShardManager, Invoke" + }, + "WaitForInvokeAndAwaitResult" + ] +} diff --git a/scenario3.json b/scenario3.json new file mode 100644 index 0000000..0942006 --- /dev/null +++ b/scenario3.json @@ -0,0 +1,23 @@ +{ + "workers_count": 4, + "steps": [ + "StopAllShards", + "RestartShardManager", + { + "StartShards": 4 + }, + { + "StopShards": 3 + }, + { + "Sleep": { + "secs": 3, + "nanos": 0 + } + }, + { + "InvokeAndAwaitWorkersAsync": "StartShards(4), StopShards(3), Invoke" + }, + "WaitForInvokeAndAwaitResult" + ] +} diff --git a/scenario4.json b/scenario4.json new file mode 100644 index 0000000..a172869 --- /dev/null +++ b/scenario4.json @@ -0,0 +1,41 @@ +{ + "workers_count": 4, + "steps": [ + "StopAllShards", + { + "Sleep": { + "secs": 10, + "nanos": 0 + } + }, + "RestartShardManager", + { + "Sleep": { + "secs": 10, + "nanos": 0 + } + }, + { + "StartShards": 4 + }, + { + "Sleep": { + "secs": 10, + "nanos": 0 + } + }, + { + "StopShards": 3 + }, + { + "Sleep": { + "secs": 10, + "nanos": 0 + } + }, + { + "InvokeAndAwaitWorkersAsync": "StartShards(4), StopShards(3), Invoke" + }, + "WaitForInvokeAndAwaitResult" + ] +} diff --git a/tests/cli.rs b/tests/cli.rs index 864cdee..5c2577f 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -11,6 +11,7 @@ use std::process::{Child, Command, Stdio}; #[derive(Debug, Clone)] pub struct CliConfig { short_args: bool, + quiet: bool, } impl CliConfig { @@ -44,7 +45,10 @@ pub struct CliLive { impl CliLive { pub fn with_short_args(&self) -> Self { CliLive { - config: CliConfig { short_args: true }, + config: CliConfig { + short_args: true, + quiet: self.config.quiet, + }, golem_template_port: self.golem_template_port, golem_worker_port: self.golem_worker_port, golem_cli_path: self.golem_cli_path.clone(), @@ -53,7 +57,10 @@ impl CliLive { pub fn with_long_args(&self) -> Self { CliLive { - config: CliConfig { short_args: false }, + config: CliConfig { + short_args: false, + quiet: self.config.quiet, + }, golem_template_port: self.golem_template_port, golem_worker_port: self.golem_worker_port, golem_cli_path: self.golem_cli_path.clone(), @@ -70,9 +77,14 @@ impl CliLive { context.golem_worker_service.local_http_port ); + let quiet = std::env::var("QUIET").is_ok(); + if golem_cli_path.exists() { Ok(CliLive { - config: CliConfig { short_args: false }, + config: CliConfig { + short_args: false, + quiet, + }, golem_template_port: context.golem_template_service.local_http_port, golem_worker_port: context.golem_worker_service.local_http_port, golem_cli_path, @@ -95,10 +107,12 @@ impl CliLive { } fn run_inner + Debug>(&self, args: &[S]) -> Result { - println!( - "Executing Golem CLI command: {} {args:?}", - self.golem_cli_path.to_str().unwrap_or("") - ); + if !self.config.quiet { + println!( + "Executing Golem CLI command: {} {args:?}", + self.golem_cli_path.to_str().unwrap_or("") + ); + } let output = Command::new(&self.golem_cli_path) .env("GOLEM_TEMPLATE_BASE_URL", self.template_base_url()) @@ -112,8 +126,10 @@ impl CliLive { let stdout = String::from_utf8_lossy(output.stdout.as_slice()).to_string(); let stderr = String::from_utf8_lossy(output.stderr.as_slice()).to_string(); - println!("CLI stdout: {stdout} for command {args:?}"); - println!("CLI stderr: {stderr} for command {args:?}"); + if !self.config.quiet { + println!("CLI stdout: {stdout} for command {args:?}"); + println!("CLI stderr: {stderr} for command {args:?}"); + } if !output.status.success() { return Err(format!( diff --git a/tests/sharding.rs b/tests/sharding.rs index 7ed7bcd..be584e7 100644 --- a/tests/sharding.rs +++ b/tests/sharding.rs @@ -7,24 +7,82 @@ use golem_cli::model::InvocationKey; use golem_client::model::VersionedWorkerId; use libtest_mimic::{Arguments, Conclusion, Failed, Trial}; use rand::prelude::*; +use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashSet; -use std::sync::mpsc::Receiver; -use std::sync::Arc; +use std::path::PathBuf; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use testcontainers::clients; pub mod cli; pub mod context; -fn run(context: ContextInfo) -> Conclusion { +#[derive(Debug, Clone, Serialize, Deserialize)] +struct Scenario { + pub workers_count: usize, + pub steps: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +enum Step { + StartShards(usize), + StopShards(usize), + StopAllShards, + RestartShardManager, + Sleep(Duration), + InvokeAndAwaitWorkersAsync(String), + WaitForInvokeAndAwaitResult, +} + +// fn default_scenario() -> Scenario { +// Scenario { +// workers_count: 4, +// steps: vec![ +// Step::StopAllShards, +// Step::InvokeAndAwaitWorkersAsync("Invoke, RestartShardManager, StartShards".to_string()), +// Step::RestartShardManager, +// Step::Sleep(Duration::from_secs(3)), +// Step::StartShards(4), +// Step::WaitForInvokeAndAwaitResult, +// Step::StopAllShards, +// Step::RestartShardManager, +// Step::StartShards(4), +// Step::RestartShardManager, +// Step::InvokeAndAwaitWorkersAsync("StartShards, RestartShardManager, Invoke".to_string()), +// Step::WaitForInvokeAndAwaitResult, +// Step::StopAllShards, +// Step::RestartShardManager, +// Step::StartShards(4), +// Step::StopShards(3), +// Step::Sleep(Duration::from_secs(3)), +// Step::InvokeAndAwaitWorkersAsync("StartShards(4), StopShards(3), Invoke".to_string()), +// Step::WaitForInvokeAndAwaitResult, +// ], +// } +// } + +fn read_scenario() -> Scenario { + let file_path_str = + std::env::var("GOLEM_TEST_SCENARIO").unwrap_or("./scenario.json".to_string()); + println!("Reading scenario from {file_path_str}"); + + let path = PathBuf::from(file_path_str); + + let file = std::fs::File::open(&path).unwrap(); + + serde_json::from_reader(file).unwrap() +} + +fn run(context: ContextInfo, env_command_tx: Sender, env_event_rx: Receiver) -> Conclusion { let args = Arguments::from_args(); let context = Arc::new(context); let mut tests = Vec::new(); - tests.append(&mut all(context.clone())); + tests.append(&mut all(context.clone(), env_command_tx, env_event_rx)); libtest_mimic::run(&args, tests) } @@ -33,45 +91,54 @@ fn main() -> Result<(), Failed> { env_logger::init(); let (tx, rx) = std::sync::mpsc::channel(); - let (stop_tx, stop_rx) = std::sync::mpsc::channel(); + let (env_command_tx, env_command_rx) = std::sync::mpsc::channel(); + let (env_event_tx, env_event_rx) = std::sync::mpsc::channel(); let context_handler = std::thread::spawn(move || { let docker = clients::Cli::default(); - let context = Context::start(&docker, EnvConfig::from_env_with_shards(0)).unwrap(); + let context = Context::start(&docker, EnvConfig::from_env_with_shards(3)).unwrap(); let context_info = context.info(); tx.send(context_info).unwrap(); - make_env_unstable(context, stop_rx); + env_handler(context, env_command_rx, env_event_tx); drop(docker); }); let context_info = rx.recv().unwrap(); - let res = run(context_info); + let res = run(context_info, env_command_tx, env_event_rx); - stop_tx.send(()).unwrap(); context_handler.join().unwrap(); res.exit() } -pub fn all(context: Arc) -> Vec { +fn all(context: Arc, env_command_tx: Sender, env_event_rx: Receiver) -> Vec { let cli = CliLive::make(&context).unwrap().with_long_args(); - let ctx = (context, cli); + let ctx = (context, read_scenario(), env_command_tx, Mutex::new(env_event_rx), cli); vec![Trial::test_in_context( format!("service_is_responsive_to_shard_changes"), - ctx.clone(), + ctx, service_is_responsive_to_shard_changes, )] } -enum Command { - StartShard, - StopShard, +enum EnvCommand { + StartShards(usize), + StopShards(usize), + StopAllShards, RestartShardManager, + Stop, +} + +enum EnvEvent { + StartShardsDone, + StopShardsDone, + StopAllShardsDone, + RestartShardManagerDone, } fn start_shard(context: &mut Context) { @@ -109,6 +176,12 @@ fn start_shard(context: &mut Context) { } } +fn start_shards(context: &mut Context, n: usize) { + for _ in 1..=n { + start_shard(context) + } +} + fn stop_shard(context: &mut Context) { let len = context.worker_executors.worker_executors.len(); @@ -122,6 +195,16 @@ fn stop_shard(context: &mut Context) { drop(we) // Not needed. Just making it explicit; } +fn stop_shards(context: &mut Context, n: usize) { + for _ in 1..=n { + stop_shard(context) + } +} + +fn stop_all_shards(context: &mut Context) { + stop_shards(context, context.worker_executors.worker_executors.len()) +} + fn reload_shard_manager(context: &mut Context) { let old_shard_manager = context.shard_manager.take(); drop(old_shard_manager); // Important! We should stop the old one first. @@ -133,44 +216,40 @@ fn reload_shard_manager(context: &mut Context) { } } -fn make_env_unstable(context: Context, stop_rx: Receiver<()>) { +fn env_handler(context: Context, command_rx: Receiver, event_tx: Sender) { let mut context = context; println!("!!! Starting Golem Sharding Tester"); - fn worker(context: &mut Context) { - let mut commands = vec![ - Command::StartShard, - Command::StopShard, - Command::RestartShardManager, - ]; - let mut rng = rand::thread_rng(); - commands.shuffle(&mut rng); - match commands[0] { - Command::StartShard => { - println!("!!! Golem Sharding Tester starting shard"); - start_shard(context); - println!("!!! Golem Sharding Tester started shard"); + loop { + match command_rx.recv().unwrap() { + EnvCommand::StartShards(n) => { + println!("!!! Golem Sharding Tester starting shards({n})"); + start_shards(&mut context, n); + println!("!!! Golem Sharding Tester started shards({n})"); + event_tx.send(EnvEvent::StartShardsDone).unwrap(); } - Command::StopShard => { - println!("!!! Golem Sharding Tester stopping shard"); - stop_shard(context); - println!("!!! Golem Sharding Tester stopped shard"); + EnvCommand::StopShards(n) => { + println!("!!! Golem Sharding Tester stopping shards{n}"); + stop_shards(&mut context, n); + println!("!!! Golem Sharding Tester stopped shard{n}"); + event_tx.send(EnvEvent::StopShardsDone).unwrap(); } - Command::RestartShardManager => { + EnvCommand::StopAllShards => { + println!("!!! Golem Sharding Tester stopping all shards"); + stop_all_shards(&mut context); + println!("!!! Golem Sharding Tester stopped all shard"); + event_tx.send(EnvEvent::StopAllShardsDone).unwrap(); + } + EnvCommand::RestartShardManager => { println!("!!! Golem Sharding Tester reloading shard manager"); - reload_shard_manager(context); + reload_shard_manager(&mut context); println!("!!! Golem Sharding Tester reloaded shard manager"); + event_tx.send(EnvEvent::RestartShardManagerDone).unwrap(); } + EnvCommand::Stop => break, } } - - while stop_rx.try_recv().is_err() { - let mut rng = rand::thread_rng(); - let n = rng.gen_range(1..10); - std::thread::sleep(Duration::from_secs(n)); - worker(&mut context); - } } fn upload_and_start_worker( @@ -298,7 +377,13 @@ fn get_invocation_key_invoke_and_await_with_retry( } fn service_is_responsive_to_shard_changes( - (context, cli): (Arc, CliLive), + (context, scenario, env_command_tx, env_event_rx, cli): ( + Arc, + Scenario, + Sender, + Mutex>, + CliLive, + ), ) -> Result<(), Failed> { let template_name = "echo-service-1".to_string(); @@ -319,7 +404,7 @@ fn service_is_responsive_to_shard_changes( let mut workers_with_names = Vec::new(); - for n in 1..=4 { + for n in 1..=scenario.workers_count { println!("*** WORKER {n} STARTING ***"); let worker_name = format!("echo-service-{n}"); let worker_id = upload_and_start_worker(&template, &worker_name, &cli)?; @@ -329,44 +414,103 @@ fn service_is_responsive_to_shard_changes( println!("*** ALL WORKERS STARTED ***"); - fn invoke_and_await_workers( - workers: &[(VersionedWorkerId, String)], - cli: &CliLive, - ) -> Result<(), Failed> { - let mut tasks = Vec::new(); - - for (worker, name) in workers { - let name = name.clone(); - let template_id = worker.worker_id.template_id.to_string(); - let cli = cli.clone(); - tasks.push(std::thread::spawn(move || { - get_invocation_key_invoke_and_await_with_retry( - &template_id, - &name, - "golem:it/api/echo", - r#"["Hello"]"#, - &cli, - ) - })); + let (worker_command_tx, worker_command_rx) = std::sync::mpsc::channel(); + let (worker_event_tx, worker_event_rx) = std::sync::mpsc::channel(); + + let workers_handler = std::thread::spawn(move || { + let workers_with_names = workers_with_names; + let cli = cli; + + loop { + match worker_command_rx.recv().unwrap() { + WorkerCommand::InvokeAndAwaitWorkers(name) => { + println!("*** INVOKING WORKERS: {name} ***"); + invoke_and_await_workers(&workers_with_names, &cli).unwrap(); + println!("*** INVOKING WORKERS {name} DONE ***"); + worker_event_tx + .send(WorkerEvent::InvokeAndAwaitWorkersCompleted(name)) + .unwrap(); + } + WorkerCommand::Stop => break, + } } + }); - for task in tasks { - let _ = task.join().unwrap()?; + for step in scenario.steps { + match step { + Step::StartShards(n) => { + env_command_tx.send(EnvCommand::StartShards(n)).unwrap(); + let _ = env_event_rx.lock().unwrap().recv().unwrap(); + } + Step::StopShards(n) => { + env_command_tx.send(EnvCommand::StopShards(n)).unwrap(); + let _ = env_event_rx.lock().unwrap().recv().unwrap(); + } + Step::StopAllShards => { + env_command_tx.send(EnvCommand::StopAllShards).unwrap(); + let _ = env_event_rx.lock().unwrap().recv().unwrap(); + } + Step::RestartShardManager => { + env_command_tx + .send(EnvCommand::RestartShardManager) + .unwrap(); + let _ = env_event_rx.lock().unwrap().recv().unwrap(); + } + Step::Sleep(duration) => { + std::thread::sleep(duration); + } + Step::InvokeAndAwaitWorkersAsync(name) => { + worker_command_tx + .send(WorkerCommand::InvokeAndAwaitWorkers(name)) + .unwrap(); + } + Step::WaitForInvokeAndAwaitResult => { + let _ = worker_event_rx.recv().unwrap(); + } } - - Ok(()) } - for c in 0..2 { - if c != 0 { - std::thread::sleep(Duration::from_secs(10)); - } - println!("*** INVOKING WORKERS {c} ***"); - invoke_and_await_workers(&workers_with_names, &cli)?; - println!("*** INVOKING WORKERS {c} DONE ***"); + worker_command_tx.send(WorkerCommand::Stop).unwrap(); + env_command_tx.send(EnvCommand::Stop).unwrap(); + + workers_handler.join().unwrap(); + + Ok(()) +} + +enum WorkerCommand { + InvokeAndAwaitWorkers(String), + Stop, +} + +enum WorkerEvent { + InvokeAndAwaitWorkersCompleted(String), +} + +fn invoke_and_await_workers( + workers: &[(VersionedWorkerId, String)], + cli: &CliLive, +) -> Result<(), Failed> { + let mut tasks = Vec::new(); + + for (worker, name) in workers { + let name = name.clone(); + let template_id = worker.worker_id.template_id.to_string(); + let cli = cli.clone(); + tasks.push(std::thread::spawn(move || { + get_invocation_key_invoke_and_await_with_retry( + &template_id, + &name, + "golem:it/api/echo", + r#"["Hello"]"#, + &cli, + ) + })); } - println!("*** TEST COMPLETED ***"); + for task in tasks { + let _ = task.join().unwrap()?; + } Ok(()) }