diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3182005..70598ac 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -44,13 +44,21 @@ jobs: run: cargo clippy -- -Dwarnings - name: Build run: cargo build - - name: Tests + - name: Integration tests env: GOLEM_DOCKER_SERVICES: true GOLEM_TEST_TEMPLATES: ./test-templates RUST_LOG: info CI: true - run: cargo test + run: cargo test --test integration + - name: Sharding tests + env: + GOLEM_DOCKER_SERVICES: true + GOLEM_TEST_TEMPLATES: ./test-templates + RUST_LOG: info + CI: true + run: cargo test --test sharding + timeout-minutes: 10 publish: needs: [build] if: "startsWith(github.ref, 'refs/tags/v')" diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b88cbb4..d37c058 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -9,13 +9,20 @@ You can run all tests with To run individual tests you should first build all executables with `./scripts/build-all.sh` and then run tests by name: ```shell -GOLEM_DOCKER_SERVICES=true GOLEM_TEST_TEMPLATES="./test-templates" cargo test worker_new_instance +GOLEM_DOCKER_SERVICES=true GOLEM_TEST_TEMPLATES="./test-templates" cargo test --test integration worker_new_instance ``` With `QUIET=true` you can hide services output: ```shell -QUIET=true GOLEM_DOCKER_SERVICES=true GOLEM_TEST_TEMPLATES="./test-templates" cargo test +QUIET=true GOLEM_DOCKER_SERVICES=true GOLEM_TEST_TEMPLATES="./test-templates" cargo test --test integration ``` This way tests will use configured versions of golem docker images. To run tests against the latest binaries without docker - see [`golem-services` CONTRIBUTING.md](https://github.com/golemcloud/golem-services/blob/main/CONTRIBUTING.md) + +## Running sharding tests + +Same as integration tests, but with `--test sharding` instead of `--test integration`: +```shell +QUIET=true GOLEM_DOCKER_SERVICES=true GOLEM_TEST_TEMPLATES="./test-templates" cargo test --test sharding +``` diff --git a/Cargo.lock b/Cargo.lock index 3616e9d..95fcbec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1722,6 +1722,7 @@ dependencies = [ name = "golem-cli" version = "0.0.0" dependencies = [ + "async-recursion", "async-trait", "chrono", "clap", @@ -1742,6 +1743,7 @@ dependencies = [ "log", "native-tls", "postgres", + "rand 0.8.5", "redis", "reqwest", "serde", diff --git a/Cargo.toml b/Cargo.toml index 388ec6f..7db0cad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,11 @@ name = "integration" path = "tests/main.rs" harness = false +[[test]] +name = "sharding" +path = "tests/sharding.rs" +harness = false + [dependencies] async-trait = "0.1.76" chrono = { version = "0.4.31", features = ["serde"] } @@ -46,6 +51,7 @@ h2 = "0.3.24" hyper = "0.14.28" tower = "0.4.13" testcontainers-modules = { version = "0.3.2", features = ["postgres", "redis"] } +rand = "0.8.5" [dev-dependencies] env_logger = "0.11.1" @@ -59,6 +65,8 @@ testcontainers-modules = { version = "0.3.2", features = ["postgres", "redis"] } tokio-postgres = "0.7.10" tonic = "0.10.2" tonic-health = "0.10.2" +rand = "0.8.5" +async-recursion = "1.0.5" [features] default = ["stubgen"] diff --git a/scripts/it.sh b/scripts/it.sh index 0e317f4..afa1239 100755 --- a/scripts/it.sh +++ b/scripts/it.sh @@ -6,4 +6,4 @@ script_full_path=$(dirname "$0") cd "${script_full_path}"/.. || exit ./scripts/build-all.sh -GOLEM_DOCKER_SERVICES=true GOLEM_TEST_TEMPLATES="./test-templates" RUST_LOG=info cargo test +GOLEM_DOCKER_SERVICES=true GOLEM_TEST_TEMPLATES="./test-templates" RUST_LOG=info cargo test --test integration diff --git a/test-templates/option-service.wasm b/test-templates/option-service.wasm new file mode 100755 index 0000000..b1f7f03 Binary files /dev/null and b/test-templates/option-service.wasm differ diff --git a/tests/context.rs b/tests/context.rs index 13f7ff1..cf57bd5 100644 --- a/tests/context.rs +++ b/tests/context.rs @@ -16,7 +16,7 @@ use std::path::PathBuf; use testcontainers::clients; const NETWORK: &str = "golem_test_network"; -const TAG: &str = "v0.0.63"; +const TAG: &str = "v0.0.67"; #[derive(Debug, Clone)] pub struct EnvConfig { @@ -27,6 +27,7 @@ pub struct EnvConfig { pub wasm_root: PathBuf, pub local_golem: bool, pub db_type: DbType, + pub n_worker_executors: usize, } #[derive(Debug, Clone)] @@ -49,7 +50,7 @@ impl DbType { } impl EnvConfig { - pub fn from_env() -> EnvConfig { + pub fn from_env_with_shards(shards: usize) -> EnvConfig { EnvConfig { verbose: std::env::var("CI").is_err(), on_ci: std::env::var("CI").is_ok(), @@ -60,24 +61,28 @@ impl EnvConfig { ), local_golem: std::env::var("GOLEM_DOCKER_SERVICES").is_err(), db_type: DbType::from_env(), + n_worker_executors: shards, } } + + pub fn from_env() -> EnvConfig { + Self::from_env_with_shards(3) + } } pub struct Context<'docker_client> { - env: EnvConfig, - db: Db<'docker_client>, - redis: Redis<'docker_client>, - shard_manager: ShardManager<'docker_client>, - golem_template_service: GolemTemplateService<'docker_client>, - golem_worker_service: GolemWorkerService<'docker_client>, - worker_executors: WorkerExecutors<'docker_client>, + pub docker: &'docker_client clients::Cli, + pub env: EnvConfig, + pub db: Db<'docker_client>, + pub redis: Redis<'docker_client>, + pub shard_manager: Option>, + pub golem_template_service: GolemTemplateService<'docker_client>, + pub golem_worker_service: GolemWorkerService<'docker_client>, + pub worker_executors: WorkerExecutors<'docker_client>, } impl Context<'_> { - pub fn start(docker: &clients::Cli) -> Result { - let env_config = EnvConfig::from_env(); - + pub fn start(docker: &clients::Cli, env_config: EnvConfig) -> Result { println!("Starting context with env config: {env_config:?}"); let db = Db::start(docker, &env_config)?; @@ -105,10 +110,11 @@ impl Context<'_> { )?; Ok(Context { + docker, env: env_config, db, redis, - shard_manager, + shard_manager: Some(shard_manager), golem_template_service, golem_worker_service, worker_executors, @@ -120,7 +126,7 @@ impl Context<'_> { env: self.env.clone(), db: self.db.info(), redis: self.redis.info(), - shard_manager: self.shard_manager.info(), + shard_manager: self.shard_manager.as_ref().unwrap().info(), golem_template_service: self.golem_template_service.info(), golem_worker_service: self.golem_worker_service.info(), worker_executors: self.worker_executors.info(), diff --git a/tests/context/worker.rs b/tests/context/worker.rs index ede19a5..13c4098 100644 --- a/tests/context/worker.rs +++ b/tests/context/worker.rs @@ -17,6 +17,7 @@ use tonic_health::pb::health_client::HealthClient; use tonic_health::pb::HealthCheckRequest; pub struct WorkerExecutor<'docker_client> { + pub shard_id: u16, host: String, port: u16, inner: WorkerExecutorInner<'docker_client>, @@ -135,6 +136,7 @@ impl<'docker_client> WorkerExecutor<'docker_client> { let node = docker.run(image); Ok(WorkerExecutor { + shard_id, host: name, port, inner: WorkerExecutorInner::Docker(node), @@ -264,6 +266,7 @@ impl<'docker_client> WorkerExecutor<'docker_client> { println!("Worker Executor {shard_id} online"); Ok(WorkerExecutor { + shard_id, host: "localhost".to_string(), port, inner: WorkerExecutorInner::Process(child), @@ -365,7 +368,7 @@ impl Drop for WorkerExecutor<'_> { } pub struct WorkerExecutors<'docker_client> { - worker_executors: Vec>, + pub worker_executors: Vec>, } impl<'docker_client> WorkerExecutors<'docker_client> { @@ -377,7 +380,7 @@ impl<'docker_client> WorkerExecutors<'docker_client> { template: &GolemTemplateServiceInfo, shard_manager: &ShardManagerInfo, ) -> Result, Failed> { - let shards = 3; + let shards = env_config.n_worker_executors; let mut worker_executors = Vec::with_capacity(shards); diff --git a/tests/main.rs b/tests/main.rs index 46a67c9..dfa50b4 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -1,4 +1,4 @@ -use crate::context::Context; +use crate::context::{Context, EnvConfig}; use libtest_mimic::{Arguments, Conclusion, Failed}; use std::sync::Arc; use testcontainers::clients; @@ -25,7 +25,7 @@ fn main() -> Result<(), Failed> { env_logger::init(); let docker = clients::Cli::default(); - let context = Context::start(&docker)?; + let context = Context::start(&docker, EnvConfig::from_env())?; let res = run(&context); diff --git a/tests/sharding.rs b/tests/sharding.rs new file mode 100644 index 0000000..7ed7bcd --- /dev/null +++ b/tests/sharding.rs @@ -0,0 +1,372 @@ +use crate::cli::{Cli, CliLive}; +use crate::context::shard_manager::ShardManager; +use crate::context::worker::WorkerExecutor; +use crate::context::{Context, ContextInfo, EnvConfig}; +use golem_cli::clients::template::TemplateView; +use golem_cli::model::InvocationKey; +use golem_client::model::VersionedWorkerId; +use libtest_mimic::{Arguments, Conclusion, Failed, Trial}; +use rand::prelude::*; +use serde_json::Value; +use std::collections::HashSet; +use std::sync::mpsc::Receiver; +use std::sync::Arc; +use std::time::Duration; +use testcontainers::clients; + +pub mod cli; +pub mod context; + +fn run(context: ContextInfo) -> Conclusion { + let args = Arguments::from_args(); + + let context = Arc::new(context); + + let mut tests = Vec::new(); + + tests.append(&mut all(context.clone())); + + libtest_mimic::run(&args, tests) +} + +fn main() -> Result<(), Failed> { + env_logger::init(); + + let (tx, rx) = std::sync::mpsc::channel(); + let (stop_tx, stop_rx) = std::sync::mpsc::channel(); + + let context_handler = std::thread::spawn(move || { + let docker = clients::Cli::default(); + let context = Context::start(&docker, EnvConfig::from_env_with_shards(0)).unwrap(); + + let context_info = context.info(); + + tx.send(context_info).unwrap(); + + make_env_unstable(context, stop_rx); + + drop(docker); + }); + + let context_info = rx.recv().unwrap(); + + let res = run(context_info); + + stop_tx.send(()).unwrap(); + context_handler.join().unwrap(); + + res.exit() +} + +pub fn all(context: Arc) -> Vec { + let cli = CliLive::make(&context).unwrap().with_long_args(); + let ctx = (context, cli); + vec![Trial::test_in_context( + format!("service_is_responsive_to_shard_changes"), + ctx.clone(), + service_is_responsive_to_shard_changes, + )] +} + +enum Command { + StartShard, + StopShard, + RestartShardManager, +} + +fn start_shard(context: &mut Context) { + let used_ids: HashSet = context + .worker_executors + .worker_executors + .iter() + .map(|we| we.shard_id) + .collect(); + let mut ids = (0..10) + .into_iter() + .filter(|i| !used_ids.contains(i)) + .collect::>(); + let mut rng = thread_rng(); + ids.shuffle(&mut rng); + + match ids.get(0) { + Some(id) => { + match WorkerExecutor::start( + context.docker, + *id, + &context.env, + &context.redis.info(), + &context.golem_worker_service.info(), + &context.golem_template_service.info(), + &context.shard_manager.as_ref().unwrap().info(), + ) { + Ok(we) => context.worker_executors.worker_executors.push(we), + Err(e) => { + println!("Failed to start worker: {e:?}"); + } + } + } + None => {} + } +} + +fn stop_shard(context: &mut Context) { + let len = context.worker_executors.worker_executors.len(); + + if len == 0 { + return; + } + + let mut rng = thread_rng(); + let i = rng.gen_range(0..len); + let we = context.worker_executors.worker_executors.remove(i); + drop(we) // Not needed. Just making it explicit; +} + +fn reload_shard_manager(context: &mut Context) { + let old_shard_manager = context.shard_manager.take(); + drop(old_shard_manager); // Important! We should stop the old one first. + match ShardManager::start(context.docker, &context.env, &context.redis.info()) { + Ok(shard_manager) => context.shard_manager = Some(shard_manager), + Err(e) => { + println!("!!! Failed to start shard manager: {e:?}"); + } + } +} + +fn make_env_unstable(context: Context, stop_rx: Receiver<()>) { + let mut context = context; + + println!("!!! Starting Golem Sharding Tester"); + + fn worker(context: &mut Context) { + let mut commands = vec![ + Command::StartShard, + Command::StopShard, + Command::RestartShardManager, + ]; + let mut rng = rand::thread_rng(); + commands.shuffle(&mut rng); + match commands[0] { + Command::StartShard => { + println!("!!! Golem Sharding Tester starting shard"); + start_shard(context); + println!("!!! Golem Sharding Tester started shard"); + } + Command::StopShard => { + println!("!!! Golem Sharding Tester stopping shard"); + stop_shard(context); + println!("!!! Golem Sharding Tester stopped shard"); + } + Command::RestartShardManager => { + println!("!!! Golem Sharding Tester reloading shard manager"); + reload_shard_manager(context); + println!("!!! Golem Sharding Tester reloaded shard manager"); + } + } + } + + while stop_rx.try_recv().is_err() { + let mut rng = rand::thread_rng(); + let n = rng.gen_range(1..10); + std::thread::sleep(Duration::from_secs(n)); + worker(&mut context); + } +} + +fn upload_and_start_worker( + template: &TemplateView, + worker_name: &str, + cli: &CliLive, +) -> Result { + let cfg = &cli.config; + + let worker_id: VersionedWorkerId = cli.run(&[ + "worker", + "add", + &cfg.arg('w', "worker-name"), + &worker_name, + &cfg.arg('T', "template-id"), + &template.template_id, + ])?; + + Ok(worker_id) +} + +fn get_invocation_key( + template_id: &str, + worker_name: &str, + cli: &CliLive, +) -> Result { + let cfg = &cli.config; + + let key: InvocationKey = cli.run(&[ + "worker", + "invocation-key", + &cfg.arg('T', "template-id"), + template_id, + &cfg.arg('w', "worker-name"), + worker_name, + ])?; + + Ok(key) +} + +fn get_invocation_key_with_retry( + template_id: &str, + worker_name: &str, + cli: &CliLive, +) -> Result { + loop { + match get_invocation_key(template_id, worker_name, cli) { + Ok(key) => return Ok(key), + Err(_) => { + std::thread::sleep(Duration::from_secs(1)); + } + } + } +} + +fn invoke_and_await_result( + template_id: &str, + worker_name: &str, + function: &str, + params: &str, + key: &InvocationKey, + cli: &CliLive, +) -> Result { + let cfg = &cli.config; + + cli.run_json(&[ + "worker", + "invoke-and-await", + &cfg.arg('T', "template-id"), + &template_id, + &cfg.arg('w', "worker-name"), + &worker_name, + &cfg.arg('f', "function"), + &function, + &cfg.arg('j', "parameters"), + ¶ms, + &cfg.arg('k', "invocation-key"), + &key.0, + ]) +} + +fn invoke_and_await_result_with_retry( + template_id: &str, + worker_name: &str, + function: &str, + params: &str, + key: &InvocationKey, + cli: &CliLive, +) -> Result { + loop { + match invoke_and_await_result(template_id, worker_name, function, params, key, cli) { + Ok(res) => return Ok(res), + Err(e) => { + if e.message() + .iter() + .any(|m| m.contains("Invalid invocation key")) + { + return get_invocation_key_invoke_and_await_with_retry( + template_id, + worker_name, + function, + params, + cli, + ); + } else { + std::thread::sleep(Duration::from_secs(1)); + } + } + } + } +} + +fn get_invocation_key_invoke_and_await_with_retry( + template_id: &str, + worker_name: &str, + function: &str, + params: &str, + cli: &CliLive, +) -> Result { + let key = get_invocation_key_with_retry(&template_id, &worker_name, &cli)?; + let res = + invoke_and_await_result_with_retry(template_id, worker_name, function, params, &key, cli); + println!("*** WORKER {worker_name} INVOKED ***"); + res +} + +fn service_is_responsive_to_shard_changes( + (context, cli): (Arc, CliLive), +) -> Result<(), Failed> { + let template_name = "echo-service-1".to_string(); + + let cfg = &cli.config; + + let template: TemplateView = cli.run(&[ + "template", + "add", + &cfg.arg('t', "template-name"), + &template_name, + context + .env + .wasm_root + .join("option-service.wasm") + .to_str() + .unwrap(), + ])?; + + let mut workers_with_names = Vec::new(); + + for n in 1..=4 { + println!("*** WORKER {n} STARTING ***"); + let worker_name = format!("echo-service-{n}"); + let worker_id = upload_and_start_worker(&template, &worker_name, &cli)?; + println!("*** WORKER {n} STARTED ***"); + workers_with_names.push((worker_id, worker_name)) + } + + println!("*** ALL WORKERS STARTED ***"); + + fn invoke_and_await_workers( + workers: &[(VersionedWorkerId, String)], + cli: &CliLive, + ) -> Result<(), Failed> { + let mut tasks = Vec::new(); + + for (worker, name) in workers { + let name = name.clone(); + let template_id = worker.worker_id.template_id.to_string(); + let cli = cli.clone(); + tasks.push(std::thread::spawn(move || { + get_invocation_key_invoke_and_await_with_retry( + &template_id, + &name, + "golem:it/api/echo", + r#"["Hello"]"#, + &cli, + ) + })); + } + + for task in tasks { + let _ = task.join().unwrap()?; + } + + Ok(()) + } + + for c in 0..2 { + if c != 0 { + std::thread::sleep(Duration::from_secs(10)); + } + println!("*** INVOKING WORKERS {c} ***"); + invoke_and_await_workers(&workers_with_names, &cli)?; + println!("*** INVOKING WORKERS {c} DONE ***"); + } + + println!("*** TEST COMPLETED ***"); + + Ok(()) +}