From dd311f683cee7dca3dab352b2a9fe11da042909f Mon Sep 17 00:00:00 2001 From: Pierre beucher Date: Sat, 2 Nov 2024 14:56:58 +0100 Subject: [PATCH] feat: load inputs in parallel great time saving as inputs were loaded sequentially before. --- Cargo.lock | 40 +++++------ Cargo.toml | 2 +- src/lib.rs | 118 ++++++++----------------------- src/main.rs | 1 - src/resolve.rs | 170 +++++++++++++++++++++++++++++++++++++++++++++ tests/test_core.rs | 2 +- 6 files changed, 221 insertions(+), 112 deletions(-) create mode 100644 src/resolve.rs diff --git a/Cargo.lock b/Cargo.lock index ddc4f1f..b63ab77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1470,9 +1470,9 @@ checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1485,9 +1485,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1495,15 +1495,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1512,9 +1512,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" @@ -1533,9 +1533,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1544,21 +1544,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -4211,9 +4211,9 @@ dependencies = [ [[package]] name = "vaultrs" -version = "0.7.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb996bb053adadc767f8b0bda2a80bc2b67d24fe89f2b959ae919e200d79a19" +checksum = "dfaaf13c89ba01829cc0ce4dc77f8d1d6c171f6a0ca939c7513702d05cd79e92" dependencies = [ "async-trait", "bytes", diff --git a/Cargo.toml b/Cargo.toml index d89c16d..9ee6ba0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ convert_case = "0.5.0" async-trait = "0.1.68" anyhow = { version = "1.0", features = ["backtrace"] } rand = "0.5" -vaultrs = { version = "0.7.1" } +vaultrs = "=0.7.1" url = "2.3.1" schemars = "0.8.10" http = "0.2" diff --git a/src/lib.rs b/src/lib.rs index 43c5d82..8d46bcd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,12 @@ pub mod core; pub mod modules; +pub mod resolve; -use crate::core::{ResolveTo, NovopsEnvironmentInput, NovopsConfigFile, NovopsContext}; +use crate::core::{NovopsEnvironmentInput, NovopsConfigFile, NovopsContext}; use crate::modules::files::FileOutput; use crate::modules::variables::VariableOutput; +use crate::resolve::resolve_environment_inputs_parallel; use log::{info, debug, error, warn}; - use std::os::unix::prelude::{OpenOptionsExt, PermissionsExt}; use std::os::unix::fs::{MetadataExt, symlink}; use std::{fs::{self, symlink_metadata, remove_file}, io::prelude::*}; @@ -98,15 +99,35 @@ pub async fn load_context_and_resolve(args: &NovopsLoadArgs) -> Result = HashMap::new(); + let mut file_outputs: HashMap = HashMap::new(); + + // Expose Novops internal variables + // Set first so user can override via config if needed + var_outputs.insert(String::from("NOVOPS_ENVIRONMENT"), VariableOutput { + name: String::from("NOVOPS_ENVIRONMENT"), + value: ctx.env_name.clone() + }); + + for v in raw_var_outputs.iter() { var_outputs.insert(v.name.clone(), v.clone()); } + for f in raw_file_outputs { + + let fpath_str = f.dest.to_str() + .ok_or(anyhow::anyhow!("Couldn't convert PathBuf '{:?}' to String", &f.dest))?; + + // FileInput generates both var and file output + var_outputs.insert(fpath_str.to_string(), f.variable.clone()); + file_outputs.insert(fpath_str.to_string(), f.clone()); + }; Ok(NovopsOutputs { context: ctx, - variables: var_out, - files: file_out + variables: var_outputs, + files: file_outputs }) } @@ -252,87 +273,6 @@ pub async fn get_current_environment(ctx: &NovopsContext) -> Result Result<(HashMap, HashMap), anyhow::Error> - { - - let mut variable_outputs: HashMap = HashMap::new(); - let mut file_outputs: HashMap = HashMap::new(); - - // Expose Novops internal variables - // Load first so user can override via config if needed - variable_outputs.insert(String::from("NOVOPS_ENVIRONMENT"), VariableOutput { - name: String::from("NOVOPS_ENVIRONMENT"), - value: ctx.env_name.clone() - }); - - for v in &inputs.variables.unwrap_or_default() { - let val = v.resolve(ctx).await - .with_context(|| format!("Couldn't resolve variable input {:?}", v))?; - - variable_outputs.insert(v.name.clone(), val); - }; - - for f in &inputs.files.unwrap_or_default() { - let r = f.resolve(ctx).await - .with_context(|| format!("Couldn't resolve file input {:?}", f))?; - - let fpath_str = r.dest.to_str() - .ok_or(anyhow::anyhow!("Couldn't convert PathBuf '{:?}' to String", &r.dest))?; - - // FileInput generates both var and file output - variable_outputs.insert(fpath_str.to_string(), r.variable.clone()); - file_outputs.insert(fpath_str.to_string(), r.clone()); - }; - - match &inputs.aws { - Some(aws) => { - let r = aws.assume_role.resolve(ctx).await - .with_context(|| format!("Could not resolve AWS input {:?}", aws))?; - - for vo in r { - variable_outputs.insert(vo.name.clone(), vo); - } - - }, - None => (), - } - - match &inputs.hashivault { - Some(hashivault) => { - let r = hashivault.aws.resolve(ctx).await - .with_context(|| format!("Could not resolve Hashivault input {:?}", hashivault))?; - - for vo in r { - variable_outputs.insert(vo.name.clone(), vo); - } - - }, - None => (), - } - - match &inputs.sops_dotenv { - Some(sops_dotenv) => { - for s in sops_dotenv { - let r = s.resolve(ctx).await - .with_context(|| format!("Could not resolve SopsDotenv input {:?}", s))?; - - for vo in r { - variable_outputs.insert(vo.name.clone(), vo); - } - } - }, - None => (), - } - - Ok((variable_outputs, file_outputs)) - -} - /** * Read Novops configuration file. Use provided config file if Option is Some, default to .novops.y[a]ml in current directory. */ diff --git a/src/main.rs b/src/main.rs index 38f8d91..4cc7dcd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -97,7 +97,6 @@ fn build_cli() -> Command { .help("Do not check if stdout is a tty (terminal), risking exposing secrets on screen. This is unsecure.") .long("skip-tty-check") .env("NOVOPS_LOAD_SKIP_TTY_CHECK") - .value_name("DRY_RUN") .action(ArgAction::SetTrue) .required(false) ) diff --git a/src/resolve.rs b/src/resolve.rs new file mode 100644 index 0000000..c50f9fb --- /dev/null +++ b/src/resolve.rs @@ -0,0 +1,170 @@ +use log::info; +use tokio::task::JoinSet; +use anyhow::Context; + +use crate::{ + core::{NovopsContext, NovopsEnvironmentInput, ResolveTo}, + modules::{aws::config::AwsInput, files::{FileInput, FileOutput}, hashivault::config::HashiVaultInput, sops::SopsDotenvInput, variables::{VariableInput, VariableOutput}} +}; + +pub async fn resolve_environment_inputs_parallel(ctx: &NovopsContext, inputs: NovopsEnvironmentInput) + -> Result<(Vec, Vec), anyhow::Error> +{ + + // + // Resolve all inputs in parallel + // For each inputs (variables, files, aws, hashivault, etc.) + // Run a future resolving to a generic (Vec, Vec) type + // So that each can be run in parallel in a single JoinSet. + // + + // Spawn every resolve tasks into JoinSet + let mut resolve_tasks = JoinSet::new(); + + for i in inputs.variables.unwrap_or_default() { + let var_fut = resolve_and_wrap_variable_input(ctx.clone(), i); + resolve_tasks.spawn(var_fut); + }; + + for f in inputs.files.unwrap_or_default() { + let file_fut = resolve_and_wrap_file_input(ctx.clone(), f); + resolve_tasks.spawn(file_fut); + }; + + let sops = resolve_and_wrap_sops_input(ctx.clone(), inputs.sops_dotenv); + resolve_tasks.spawn(sops); + + let aws = resolve_and_wrap_aws_input(ctx.clone(), inputs.aws); + resolve_tasks.spawn(aws); + + let hashivault = resolve_and_wrap_hashivault_input(ctx.clone(), inputs.hashivault); + resolve_tasks.spawn(hashivault); + + // Await on each output result + let mut output_results = vec![]; + while let Some(res) = resolve_tasks.join_next().await { + + // Result is imbricated Result, JoinError> + // Wrap potential JoinError as anyhow::Error + let wrapped_res = match res { + Ok(ok) => ok, + Err(err) => Err(anyhow::anyhow!(err)), + }; + + output_results.push(wrapped_res); + } + + // Parse all outputs and discriminate ok and errors + let mut var_outputs = vec![]; + let mut file_outputs = vec![]; + let mut resolve_errors = vec![]; + for result in output_results { + match result { + Ok(o) => { + var_outputs.extend(o.0); + file_outputs.extend(o.1); + }, + Err(err) => resolve_errors.push(err), + } + } + + if !resolve_errors.is_empty() { + // Build human-readable error with all existing errors + let mut final_message = String::from("Failed to resolve one or more Inputs:\n"); + for err in resolve_errors { + final_message.push_str(format!("\n---\n{:?}\n", err).as_str()); + } + return Err(anyhow::format_err!(final_message)); + } + + Ok( (var_outputs, file_outputs) ) + +} + +async fn resolve_and_wrap_file_input(ctx: NovopsContext, f: FileInput) -> Result<(Vec, Vec), anyhow::Error> { + + info!("Resolving file input {:?}", &f); + + let result = f.resolve(&ctx).await + .with_context(|| format!("Couldn't resolve file input {:?}", &f)) + .map(|o| (vec![], vec![o]) )?; + + info!("Resolved file input {:?}", &f); + + Ok(result) +} + +async fn resolve_and_wrap_variable_input(ctx: NovopsContext, i: VariableInput) -> Result<(Vec, Vec), anyhow::Error> { + + info!("Resolving variable input {:}", &i.name); + + let result = i.resolve(&ctx).await + .with_context(|| format!("Couldn't resolve variable input {:}", &i.name)) + .map(|o| (vec![o], vec![]) )?; + + info!("Resolved variable input {:}", &i.name); + + Ok(result) +} + +async fn resolve_and_wrap_sops_input(ctx: NovopsContext, sops_vec_opt: Option>) -> Result<(Vec, Vec), anyhow::Error> { + + match sops_vec_opt { + Some(sops_vec) => { + info!("Resolving SOPS Dotenv inputs"); + + let mut result = Vec::new(); + + for sops in sops_vec { + let r = sops.resolve(&ctx).await + .with_context(|| format!("Could not resolve SopsDotenv input {:?}", sops))?; + + result.extend(r); + } + + info!("Resolved SOPS Dotenv inputs"); + + Ok( (result, vec![]) ) + }, + None => Ok( (vec![], vec![]) ) + } +} + +async fn resolve_and_wrap_hashivault_input(ctx: NovopsContext, hashivault: Option) -> Result<(Vec, Vec), anyhow::Error> { + + match hashivault { + Some(hashivault) => { + + info!("Resolving Hashivault inputs"); + + let r = hashivault.aws.resolve(&ctx).await + .with_context(|| format!("Could not resolve Hashivault input {:?}", hashivault))?; + + info!("Resolved Hashivault inputs"); + + Ok( (r, vec![]) ) + }, + + None => Ok( (vec![], vec![]) ), + } + + +} + +async fn resolve_and_wrap_aws_input(ctx: NovopsContext, aws: Option) -> Result<(Vec, Vec), anyhow::Error> { + + match aws { + Some(aws) => { + info!("Resolving AWS inputs"); + + let vars = aws.assume_role.resolve(&ctx).await + .with_context(|| format!("Could not resolve AWS input {:?}", aws))?; + + info!("Resolved AWS inputs"); + + Ok( (vars, vec![]) ) + + }, + None => Ok( (vec![], vec![]) ), + } +} \ No newline at end of file diff --git a/tests/test_core.rs b/tests/test_core.rs index 4cd8779..aeb74e1 100644 --- a/tests/test_core.rs +++ b/tests/test_core.rs @@ -298,7 +298,7 @@ async fn test_default_loaded_vars() -> Result<(), anyhow::Error> { let result = load_env_dryrun_for("empty", "dev").await?; - assert_eq!(result.variables.get("NOVOPS_ENVIRONMENT").unwrap().value, "dev"); + assert_eq!(result.variables.get("NOVOPS_ENVIRONMENT").expect("NOVOPS_ENVIRONMENT variable not found in outputs").value, "dev"); Ok(()) }