Skip to content

Commit

Permalink
feat: load inputs in parallel
Browse files Browse the repository at this point in the history
great time saving as inputs were loaded sequentially before. Hasardous implementation to refactor though.
  • Loading branch information
PierreBeucher committed Nov 2, 2024
1 parent 5932c4e commit 6d151aa
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 96 deletions.
41 changes: 21 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ convert_case = "0.5.0"
async-trait = "0.1.68"
anyhow = { version = "1.0", features = ["backtrace"] }
rand = "0.5"
vaultrs = { version = "0.7.1" }
vaultrs = "=0.7.1"
url = "2.3.1"
schemars = "0.8.10"
http = "0.2"
Expand All @@ -44,6 +44,7 @@ google-secretmanager1 = "5.0.2"
dialoguer = "0.11.0"
console = "0.15.7"
base64 = "0.22.1"
futures = "0.3.31"

# Use OpenSSL vendored dependencies on Linux musl
# As somehow musl fails to build from source
Expand Down
230 changes: 155 additions & 75 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
pub mod core;
pub mod modules;
pub mod resolvers;

use crate::core::{ResolveTo, NovopsEnvironmentInput, NovopsConfigFile, NovopsContext};
use crate::core::{NovopsEnvironmentInput, NovopsConfigFile, NovopsContext};
use crate::modules::files::FileOutput;
use crate::modules::variables::VariableOutput;
use log::{info, debug, error, warn};
use resolvers::parallel::ParallelEnvironmentResolver;
use crate::resolvers::resolver::NovopsEnvironmentResolver;

use std::os::unix::prelude::{OpenOptionsExt, PermissionsExt};
use std::os::unix::fs::{MetadataExt, symlink};
Expand Down Expand Up @@ -98,10 +101,8 @@ pub async fn load_context_and_resolve(args: &NovopsLoadArgs) -> Result<NovopsOut

let ctx = make_context(args).await?;
let novops_env = get_current_environment(&ctx).await?;

// Revole inputs and export
let (var_out, file_out) =
resolve_environment_inputs(&ctx, novops_env).await?;

let (var_out, file_out) = ParallelEnvironmentResolver::resolve_environment_inputs(&ctx, novops_env).await?;

Ok(NovopsOutputs {
context: ctx,
Expand Down Expand Up @@ -252,86 +253,165 @@ pub async fn get_current_environment(ctx: &NovopsContext) -> Result<NovopsEnviro
Ok(novops_env.clone())
}

/**
* Resolve all Inputs to their concrete Output values
* Depending on Input types, external systems will be called-upon (such as BitWarden, Hashicorp Vault...)
*/
pub async fn resolve_environment_inputs(ctx: &NovopsContext, inputs: NovopsEnvironmentInput)
-> Result<(HashMap<String, VariableOutput>, HashMap<String, FileOutput>), anyhow::Error>
{

let mut variable_outputs: HashMap<String, VariableOutput> = HashMap::new();
let mut file_outputs: HashMap<String, FileOutput> = HashMap::new();

// Expose Novops internal variables
// Load first so user can override via config if needed
variable_outputs.insert(String::from("NOVOPS_ENVIRONMENT"), VariableOutput {
name: String::from("NOVOPS_ENVIRONMENT"),
value: ctx.env_name.clone()
});

for v in &inputs.variables.unwrap_or_default() {
let val = v.resolve(ctx).await
.with_context(|| format!("Couldn't resolve variable input {:?}", v))?;

variable_outputs.insert(v.name.clone(), val);
};
// /**
// * Resolve all given variable outputs parallelly
// */
// pub async fn resolve_variable_inputs(ctx: &NovopsContext, vars: Vec<VariableInput>)
// -> Vec<Result<VariableOutput, anyhow::Error>> {

for f in &inputs.files.unwrap_or_default() {
let r = f.resolve(ctx).await
.with_context(|| format!("Couldn't resolve file input {:?}", f))?;

let fpath_str = r.dest.to_str()
.ok_or(anyhow::anyhow!("Couldn't convert PathBuf '{:?}' to String", &r.dest))?;
// let mut result_futures: Vec<Pin<Box<dyn Future<Output = Result<VariableOutput, anyhow::Error>> + Send>>> = Vec::new();
// for v in &vars {
// let val_future = v.resolve(ctx);

// FileInput generates both var and file output
variable_outputs.insert(fpath_str.to_string(), r.variable.clone());
file_outputs.insert(fpath_str.to_string(), r.clone());
};
// result_futures.push(val_future);
// };

match &inputs.aws {
Some(aws) => {
let r = aws.assume_role.resolve(ctx).await
.with_context(|| format!("Could not resolve AWS input {:?}", aws))?;
// let result = futures::future::join_all(result_futures).await;

for vo in r {
variable_outputs.insert(vo.name.clone(), vo);
}

},
None => (),
}
// result
// }

match &inputs.hashivault {
Some(hashivault) => {
let r = hashivault.aws.resolve(ctx).await
.with_context(|| format!("Could not resolve Hashivault input {:?}", hashivault))?;
// pub async fn resolve_variable_inputs_bis(ctx: &NovopsContext, inputs: Vec<VariableInput>)
// -> Vec<Result<VariableOutput, anyhow::Error>> {

for vo in r {
variable_outputs.insert(vo.name.clone(), vo);
}

},
None => (),
}
// let mut future_outputs: Vec<Pin<Box<dyn Future<Output = Result<VariableOutput, anyhow::Error>> + Send>>> = Vec::new();
// for i in &inputs {
// let future_out = async {
// use std::{thread, time::Duration};

match &inputs.sops_dotenv {
Some(sops_dotenv) => {
for s in sops_dotenv {
let r = s.resolve(ctx).await
.with_context(|| format!("Could not resolve SopsDotenv input {:?}", s))?;
// info!("Starting to solve {:}", &i.name);

for vo in r {
variable_outputs.insert(vo.name.clone(), vo);
}
}
},
None => (),
}
// thread::sleep(Duration::from_millis(4000));

Ok((variable_outputs, file_outputs))
// info!("Finished solveing {:}", &i.name);

}
// i.resolve(ctx).await
// .with_context(|| format!("Couldn't resolve variable input {:}", &i.name))
// };

// future_outputs.push(Box::pin(future_out));
// };

// futures::future::join_all(future_outputs).await
// }

// pub async fn resolve_a_var(ctx: NovopsContext, i: VariableInput) -> Result<VariableOutput, anyhow::Error> {

// info!("Starting to solve {:}", &i.name);

// let result = i.resolve(&ctx).await
// .with_context(|| format!("Couldn't resolve variable input {:}", &i.name));

// info!("Finished solving {:}", &i.name);

// result
// }

// pub async fn resolve_variable_inputs_tokio(ctx: &NovopsContext, inputs: Vec<VariableInput>)
// -> Vec<Result<VariableOutput, anyhow::Error>> {

// let mut task_set = JoinSet::new();

// for i in inputs {
// let ctx_clone = ctx.clone();
// let result_future = resolve_a_var(ctx_clone, i);
// task_set.spawn(result_future);
// };

// let mut var_results: Vec<Result<VariableOutput, anyhow::Error>> = Vec::new();

// while let Some(res) = task_set.join_next().await {
// let r = res.expect("Tokio error TODO");
// var_results.push(r);
// }

// return var_results
// }

// /**
// * Resolve all Inputs to their concrete Output values
// * Depending on Input types, external systems will be called-upon (such as BitWarden, Hashicorp Vault...)
// */
// pub async fn resolve_environment_inputs(ctx: &NovopsContext, inputs: NovopsEnvironmentInput)
// -> Result<(HashMap<String, VariableOutput>, HashMap<String, FileOutput>), anyhow::Error>
// {

// let mut variable_outputs: HashMap<String, VariableOutput> = HashMap::new();
// let mut file_outputs: HashMap<String, FileOutput> = HashMap::new();

// // Expose Novops internal variables
// // Load first so user can override via config if needed
// variable_outputs.insert(String::from("NOVOPS_ENVIRONMENT"), VariableOutput {
// name: String::from("NOVOPS_ENVIRONMENT"),
// value: ctx.env_name.clone()
// });

// // Load variable inputs asynchronously
// let vars_outputs = resolve_variable_inputs_tokio(ctx, inputs.variables.unwrap_or_default()).await;
// for v in vars_outputs {

// let err_msg = format!("Couldn't resolve variable input {:?}", v);
// let result = v.expect(&err_msg);

// variable_outputs.insert(result.name.clone(), result);
// }


// for f in &inputs.files.unwrap_or_default() {
// let r = f.resolve(ctx).await
// .with_context(|| format!("Couldn't resolve file input {:?}", f))?;

// let fpath_str = r.dest.to_str()
// .ok_or(anyhow::anyhow!("Couldn't convert PathBuf '{:?}' to String", &r.dest))?;

// // FileInput generates both var and file output
// variable_outputs.insert(fpath_str.to_string(), r.variable.clone());
// file_outputs.insert(fpath_str.to_string(), r.clone());
// };

// match &inputs.aws {
// Some(aws) => {
// let r = aws.assume_role.resolve(ctx).await
// .with_context(|| format!("Could not resolve AWS input {:?}", aws))?;

// for vo in r {
// variable_outputs.insert(vo.name.clone(), vo);
// }

// },
// None => (),
// }

// match &inputs.hashivault {
// Some(hashivault) => {
// let r = hashivault.aws.resolve(ctx).await
// .with_context(|| format!("Could not resolve Hashivault input {:?}", hashivault))?;

// for vo in r {
// variable_outputs.insert(vo.name.clone(), vo);
// }

// },
// None => (),
// }

// match &inputs.sops_dotenv {
// Some(sops_dotenv) => {
// for s in sops_dotenv {
// let r = s.resolve(ctx).await
// .with_context(|| format!("Could not resolve SopsDotenv input {:?}", s))?;

// for vo in r {
// variable_outputs.insert(vo.name.clone(), vo);
// }
// }
// },
// None => (),
// }

// Ok((variable_outputs, file_outputs))

// }

/**
* Read Novops configuration file. Use provided config file if Option is Some, default to .novops.y[a]ml in current directory.
Expand Down
Loading

0 comments on commit 6d151aa

Please sign in to comment.