From 6d151aa662353f5563bf2c08bd5763441b87c503 Mon Sep 17 00:00:00 2001 From: Pierre beucher Date: Sat, 2 Nov 2024 14:56:58 +0100 Subject: [PATCH] feat: load inputs in parallel great time saving as inputs were loaded sequentially before. Hasardous implementation to refactor though. --- Cargo.lock | 41 +++--- Cargo.toml | 3 +- src/lib.rs | 230 +++++++++++++++++++++----------- src/resolvers/mod.rs | 3 + src/resolvers/parallel.rs | 253 ++++++++++++++++++++++++++++++++++++ src/resolvers/resolver.rs | 15 +++ src/resolvers/sequential.rs | 0 7 files changed, 449 insertions(+), 96 deletions(-) create mode 100644 src/resolvers/mod.rs create mode 100644 src/resolvers/parallel.rs create mode 100644 src/resolvers/resolver.rs create mode 100644 src/resolvers/sequential.rs diff --git a/Cargo.lock b/Cargo.lock index ddc4f1f..24c871b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1470,9 +1470,9 @@ checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1485,9 +1485,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1495,15 +1495,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1512,9 +1512,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" @@ -1533,9 +1533,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1544,21 +1544,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -2477,6 +2477,7 @@ dependencies = [ "dialoguer", "digest", "env_logger", + "futures", "google-secretmanager1", "home", "http 0.2.12", @@ -4211,9 +4212,9 @@ dependencies = [ [[package]] name = "vaultrs" -version = "0.7.2" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb996bb053adadc767f8b0bda2a80bc2b67d24fe89f2b959ae919e200d79a19" +checksum = "dfaaf13c89ba01829cc0ce4dc77f8d1d6c171f6a0ca939c7513702d05cd79e92" dependencies = [ "async-trait", "bytes", diff --git a/Cargo.toml b/Cargo.toml index d89c16d..917c368 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ convert_case = "0.5.0" async-trait = "0.1.68" anyhow = { version = "1.0", features = ["backtrace"] } rand = "0.5" -vaultrs = { version = "0.7.1" } +vaultrs = "=0.7.1" url = "2.3.1" schemars = "0.8.10" http = "0.2" @@ -44,6 +44,7 @@ google-secretmanager1 = "5.0.2" dialoguer = "0.11.0" console = "0.15.7" base64 = "0.22.1" +futures = "0.3.31" # Use OpenSSL vendored dependencies on Linux musl # As somehow musl fails to build from source diff --git a/src/lib.rs b/src/lib.rs index 43c5d82..fef93ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,13 @@ pub mod core; pub mod modules; +pub mod resolvers; -use crate::core::{ResolveTo, NovopsEnvironmentInput, NovopsConfigFile, NovopsContext}; +use crate::core::{NovopsEnvironmentInput, NovopsConfigFile, NovopsContext}; use crate::modules::files::FileOutput; use crate::modules::variables::VariableOutput; use log::{info, debug, error, warn}; +use resolvers::parallel::ParallelEnvironmentResolver; +use crate::resolvers::resolver::NovopsEnvironmentResolver; use std::os::unix::prelude::{OpenOptionsExt, PermissionsExt}; use std::os::unix::fs::{MetadataExt, symlink}; @@ -98,10 +101,8 @@ pub async fn load_context_and_resolve(args: &NovopsLoadArgs) -> Result Result Result<(HashMap, HashMap), anyhow::Error> - { - - let mut variable_outputs: HashMap = HashMap::new(); - let mut file_outputs: HashMap = HashMap::new(); - - // Expose Novops internal variables - // Load first so user can override via config if needed - variable_outputs.insert(String::from("NOVOPS_ENVIRONMENT"), VariableOutput { - name: String::from("NOVOPS_ENVIRONMENT"), - value: ctx.env_name.clone() - }); - - for v in &inputs.variables.unwrap_or_default() { - let val = v.resolve(ctx).await - .with_context(|| format!("Couldn't resolve variable input {:?}", v))?; - - variable_outputs.insert(v.name.clone(), val); - }; +// /** +// * Resolve all given variable outputs parallelly +// */ +// pub async fn resolve_variable_inputs(ctx: &NovopsContext, vars: Vec) +// -> Vec> { - for f in &inputs.files.unwrap_or_default() { - let r = f.resolve(ctx).await - .with_context(|| format!("Couldn't resolve file input {:?}", f))?; - - let fpath_str = r.dest.to_str() - .ok_or(anyhow::anyhow!("Couldn't convert PathBuf '{:?}' to String", &r.dest))?; +// let mut result_futures: Vec> + Send>>> = Vec::new(); +// for v in &vars { +// let val_future = v.resolve(ctx); - // FileInput generates both var and file output - variable_outputs.insert(fpath_str.to_string(), r.variable.clone()); - file_outputs.insert(fpath_str.to_string(), r.clone()); - }; +// result_futures.push(val_future); +// }; - match &inputs.aws { - Some(aws) => { - let r = aws.assume_role.resolve(ctx).await - .with_context(|| format!("Could not resolve AWS input {:?}", aws))?; +// let result = futures::future::join_all(result_futures).await; - for vo in r { - variable_outputs.insert(vo.name.clone(), vo); - } - - }, - None => (), - } +// result +// } - match &inputs.hashivault { - Some(hashivault) => { - let r = hashivault.aws.resolve(ctx).await - .with_context(|| format!("Could not resolve Hashivault input {:?}", hashivault))?; +// pub async fn resolve_variable_inputs_bis(ctx: &NovopsContext, inputs: Vec) +// -> Vec> { - for vo in r { - variable_outputs.insert(vo.name.clone(), vo); - } - - }, - None => (), - } +// let mut future_outputs: Vec> + Send>>> = Vec::new(); +// for i in &inputs { +// let future_out = async { +// use std::{thread, time::Duration}; - match &inputs.sops_dotenv { - Some(sops_dotenv) => { - for s in sops_dotenv { - let r = s.resolve(ctx).await - .with_context(|| format!("Could not resolve SopsDotenv input {:?}", s))?; +// info!("Starting to solve {:}", &i.name); - for vo in r { - variable_outputs.insert(vo.name.clone(), vo); - } - } - }, - None => (), - } +// thread::sleep(Duration::from_millis(4000)); - Ok((variable_outputs, file_outputs)) +// info!("Finished solveing {:}", &i.name); -} +// i.resolve(ctx).await +// .with_context(|| format!("Couldn't resolve variable input {:}", &i.name)) +// }; + +// future_outputs.push(Box::pin(future_out)); +// }; + +// futures::future::join_all(future_outputs).await +// } + +// pub async fn resolve_a_var(ctx: NovopsContext, i: VariableInput) -> Result { + +// info!("Starting to solve {:}", &i.name); + +// let result = i.resolve(&ctx).await +// .with_context(|| format!("Couldn't resolve variable input {:}", &i.name)); + +// info!("Finished solving {:}", &i.name); + +// result +// } + +// pub async fn resolve_variable_inputs_tokio(ctx: &NovopsContext, inputs: Vec) +// -> Vec> { + +// let mut task_set = JoinSet::new(); + +// for i in inputs { +// let ctx_clone = ctx.clone(); +// let result_future = resolve_a_var(ctx_clone, i); +// task_set.spawn(result_future); +// }; + +// let mut var_results: Vec> = Vec::new(); + +// while let Some(res) = task_set.join_next().await { +// let r = res.expect("Tokio error TODO"); +// var_results.push(r); +// } + +// return var_results +// } + +// /** +// * Resolve all Inputs to their concrete Output values +// * Depending on Input types, external systems will be called-upon (such as BitWarden, Hashicorp Vault...) +// */ +// pub async fn resolve_environment_inputs(ctx: &NovopsContext, inputs: NovopsEnvironmentInput) +// -> Result<(HashMap, HashMap), anyhow::Error> +// { + +// let mut variable_outputs: HashMap = HashMap::new(); +// let mut file_outputs: HashMap = HashMap::new(); + +// // Expose Novops internal variables +// // Load first so user can override via config if needed +// variable_outputs.insert(String::from("NOVOPS_ENVIRONMENT"), VariableOutput { +// name: String::from("NOVOPS_ENVIRONMENT"), +// value: ctx.env_name.clone() +// }); + +// // Load variable inputs asynchronously +// let vars_outputs = resolve_variable_inputs_tokio(ctx, inputs.variables.unwrap_or_default()).await; +// for v in vars_outputs { + +// let err_msg = format!("Couldn't resolve variable input {:?}", v); +// let result = v.expect(&err_msg); + +// variable_outputs.insert(result.name.clone(), result); +// } + + +// for f in &inputs.files.unwrap_or_default() { +// let r = f.resolve(ctx).await +// .with_context(|| format!("Couldn't resolve file input {:?}", f))?; + +// let fpath_str = r.dest.to_str() +// .ok_or(anyhow::anyhow!("Couldn't convert PathBuf '{:?}' to String", &r.dest))?; + +// // FileInput generates both var and file output +// variable_outputs.insert(fpath_str.to_string(), r.variable.clone()); +// file_outputs.insert(fpath_str.to_string(), r.clone()); +// }; + +// match &inputs.aws { +// Some(aws) => { +// let r = aws.assume_role.resolve(ctx).await +// .with_context(|| format!("Could not resolve AWS input {:?}", aws))?; + +// for vo in r { +// variable_outputs.insert(vo.name.clone(), vo); +// } + +// }, +// None => (), +// } + +// match &inputs.hashivault { +// Some(hashivault) => { +// let r = hashivault.aws.resolve(ctx).await +// .with_context(|| format!("Could not resolve Hashivault input {:?}", hashivault))?; + +// for vo in r { +// variable_outputs.insert(vo.name.clone(), vo); +// } + +// }, +// None => (), +// } + +// match &inputs.sops_dotenv { +// Some(sops_dotenv) => { +// for s in sops_dotenv { +// let r = s.resolve(ctx).await +// .with_context(|| format!("Could not resolve SopsDotenv input {:?}", s))?; + +// for vo in r { +// variable_outputs.insert(vo.name.clone(), vo); +// } +// } +// }, +// None => (), +// } + +// Ok((variable_outputs, file_outputs)) + +// } /** * Read Novops configuration file. Use provided config file if Option is Some, default to .novops.y[a]ml in current directory. diff --git a/src/resolvers/mod.rs b/src/resolvers/mod.rs new file mode 100644 index 0000000..c5039c7 --- /dev/null +++ b/src/resolvers/mod.rs @@ -0,0 +1,3 @@ +pub mod resolver; +pub mod parallel; +pub mod sequential; \ No newline at end of file diff --git a/src/resolvers/parallel.rs b/src/resolvers/parallel.rs new file mode 100644 index 0000000..3e7b1fb --- /dev/null +++ b/src/resolvers/parallel.rs @@ -0,0 +1,253 @@ +use std::collections::HashMap; +use log::info; +use async_trait::async_trait; +use tokio::task::JoinSet; +use anyhow::Context; + +use crate::{ + core::{NovopsContext, NovopsEnvironmentInput, ResolveTo}, + modules::{aws::config::AwsInput, files::{FileInput, FileOutput}, hashivault::config::HashiVaultInput, sops::SopsDotenvInput, variables::{VariableInput, VariableOutput}} +}; + +use super::resolver::NovopsEnvironmentResolver; + +/// Resolve all inputs for an environment in parallel +pub struct ParallelEnvironmentResolver { } + +#[async_trait] +impl NovopsEnvironmentResolver for ParallelEnvironmentResolver { + + async fn resolve_environment_inputs(ctx: &NovopsContext, inputs: NovopsEnvironmentInput) + -> Result<(HashMap, HashMap), anyhow::Error> + { + + // Load variable inputs asynchronously + let vars_future = resolve_variable_inputs_parallel(ctx, inputs.variables.unwrap_or_default()); + let files_future = resolve_file_inputs_parallel(ctx, inputs.files.unwrap_or_default()); + let aws_future = resolve_aws_input(ctx, &inputs.aws); + let hashivault_future = resolve_hashivault_input(ctx, &inputs.hashivault); + let sops_future = resolve_sops_input(ctx, &inputs.sops_dotenv); + + let ( + vars_result, + files_result, + aws_result, + hashivault_result, + sops_result + ) = futures::future::join5(vars_future, files_future, aws_future, hashivault_future, sops_future).await; + + let vars = vars_result?; + let files = files_result?; + let aws = aws_result?; + let hashivault: Vec = hashivault_result?; + let sops = sops_result?; + + info!("AWS Variables after resolve: {:?}", aws); + info!("Raw Variables after resolve: {:?}", vars); + + let mut variable_outputs: HashMap = HashMap::new(); + let mut file_outputs: HashMap = HashMap::new(); + + for v in vars.iter() { variable_outputs.insert(v.name.clone(), v.clone()); } + for v in aws.iter() { variable_outputs.insert(v.name.clone(), v.clone()); } + for v in sops.iter() { variable_outputs.insert(v.name.clone(), v.clone()); } + for v in hashivault.iter() { variable_outputs.insert(v.name.clone(), v.clone()); } + + for f in files { + + let fpath_str = f.dest.to_str() + .ok_or(anyhow::anyhow!("Couldn't convert PathBuf '{:?}' to String", &f.dest))?; + + // FileInput generates both var and file output + variable_outputs.insert(fpath_str.to_string(), f.variable.clone()); + file_outputs.insert(fpath_str.to_string(), f.clone()); + }; + + // TODO this should be higher up ! + // Expose Novops internal variables + // Load first so user can override via config if needed + variable_outputs.insert(String::from("NOVOPS_ENVIRONMENT"), VariableOutput { + name: String::from("NOVOPS_ENVIRONMENT"), + value: ctx.env_name.clone() + }); + + info!("Variables after resolve: {:?}", variable_outputs); + + Ok((variable_outputs, file_outputs)) + + } +} + +async fn resolve_sops_input(ctx: &NovopsContext, sops_vec_opt: &Option>) -> Result, anyhow::Error> { + + match sops_vec_opt { + Some(sops_vec) => { + info!("Resolving SOPS Dotenv inputs"); + + let mut result = Vec::new(); + + for sops in sops_vec { + let r = sops.resolve(ctx).await + .with_context(|| format!("Could not resolve SopsDotenv input {:?}", sops))?; + + result.extend(r); + } + + info!("Resolved SOPS Dotenv inputs"); + + Ok(result) + }, + None => Ok(vec![]) + } +} + +async fn resolve_hashivault_input(ctx: &NovopsContext, hashivault: &Option) -> Result, anyhow::Error>{ + + match hashivault { + Some(hashivault) => { + + info!("Resolving Hashivault inputs"); + + let r = hashivault.aws.resolve(ctx).await + .with_context(|| format!("Could not resolve Hashivault input {:?}", hashivault))?; + + info!("Resolved Hashivault inputs"); + + Ok(r) + }, + + None => Ok(vec![]), + } + + +} + +async fn resolve_aws_input(ctx: &NovopsContext, aws: &Option) -> Result, anyhow::Error>{ + + info!("Resolving AWS inputs"); + + + match aws { + Some(aws) => { + let vars = aws.assume_role.resolve(ctx).await + .with_context(|| format!("Could not resolve AWS input {:?}", aws))?; + + info!("Resolved AWS inputs"); + + Ok(vars) + + }, + None => { + info!("No AWS inputs to resolve"); + Ok(vec![]) + } + } +} + +async fn resolve_single_variable_input(ctx: NovopsContext, i: VariableInput) -> Result { + + info!("Resolving variable input {:}", &i.name); + + let result = i.resolve(&ctx).await + .with_context(|| format!("Couldn't resolve variable input {:}", &i.name)); + + info!("Resolved variable input {:}", &i.name); + + result +} + +async fn resolve_single_file_input(ctx: NovopsContext, f: FileInput) -> Result { + + info!("Resolving file input {:?}", &f.name); + + let result = f.resolve(&ctx).await + .with_context(|| format!("Couldn't resolve file input {:?}", &f.name)); // TODO proper identifier + + info!("Resolved file input {:?}", &f.name); + + result +} + +async fn resolve_variable_inputs_parallel(ctx: &NovopsContext, inputs: Vec) + -> Result, anyhow::Error> { + + let mut task_set = JoinSet::new(); + + for i in inputs { + let ctx_clone = ctx.clone(); + let result_future = resolve_single_variable_input(ctx_clone, i); + task_set.spawn(result_future); + }; + + let mut var_results = Vec::new(); + + while let Some(res) = task_set.join_next().await { + + // Result is imbricated Result, JoinError> + // Wrap potential JoinError as anyhow Error + let wrapped_res = match res { + Ok(ok) => ok, + Err(err) => Err(anyhow::anyhow!(err)), + }; + + var_results.push(wrapped_res); + } + + // Parse all outputs and discriminate ok and errors + let mut var_outputs = Vec::new(); + let mut var_errors = Vec::new(); + for vr in var_results { + match vr { + Ok(v) => var_outputs.push(v), + Err(err) => var_errors.push(err), + } + } + + if !var_errors.is_empty() { + return Err(anyhow::format_err!("{:?}", &var_errors)) // TODO probably a better way to handle multiple errors + } + + Ok(var_outputs) +} + +async fn resolve_file_inputs_parallel(ctx: &NovopsContext, inputs: Vec) + -> Result, anyhow::Error> { + + let mut task_set = JoinSet::new(); + + for f in inputs { + let ctx_clone = ctx.clone(); + let result_future = resolve_single_file_input(ctx_clone, f); + task_set.spawn(result_future); + }; + + let mut file_results = Vec::new(); + + while let Some(res) = task_set.join_next().await { + + // Result is imbricated Result, JoinError> + // Wrap potential JoinError as anyhow Error + let wrapped_res = match res { + Ok(ok) => ok, + Err(err) => Err(anyhow::anyhow!(err)), + }; + + file_results.push(wrapped_res); + } + + // Parse all outputs and discriminate ok and errors + let mut file_outputs = Vec::new(); + let mut file_errors = Vec::new(); + for vr in file_results { + match vr { + Ok(v) => file_outputs.push(v), + Err(err) => file_errors.push(err), + } + } + + if !file_errors.is_empty() { + return Err(anyhow::format_err!("{:?}", &file_errors)) // TODO probably a better way to handle multiple errors + } + + Ok(file_outputs) +} diff --git a/src/resolvers/resolver.rs b/src/resolvers/resolver.rs new file mode 100644 index 0000000..1823761 --- /dev/null +++ b/src/resolvers/resolver.rs @@ -0,0 +1,15 @@ +use crate::{core::NovopsEnvironmentInput, NovopsContext, modules::{files::FileOutput, variables::VariableOutput}}; + +use std::collections::HashMap; +use async_trait::async_trait; + + +/// Resolve all Inputs for an environment in given context +#[async_trait] +pub trait NovopsEnvironmentResolver { + + /// Resolve all Inputs to their concrete Output values + /// Depending on Input types, external systems will be called-upon (such as BitWarden, Hashicorp Vault...) + async fn resolve_environment_inputs(ctx: &NovopsContext, inputs: NovopsEnvironmentInput) + -> Result<(HashMap, HashMap), anyhow::Error>; +} \ No newline at end of file diff --git a/src/resolvers/sequential.rs b/src/resolvers/sequential.rs new file mode 100644 index 0000000..e69de29