diff --git a/agent/Cargo.toml b/agent/Cargo.toml index df8a98f..13ab045 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -31,4 +31,4 @@ name = "agent_lib" path = "lib/src/lib.rs" [dev-dependencies] -rand = "0.8.5" +tempfile = "3.8.1" diff --git a/agent/lib/src/api/client.rs b/agent/lib/src/api/client.rs index f775024..9da06b7 100644 --- a/agent/lib/src/api/client.rs +++ b/agent/lib/src/api/client.rs @@ -5,16 +5,18 @@ use log::{error, info, trace}; use crate::api::grpc_definitions::{register_response::Response, RegisterRequest}; -use super::grpc_definitions::{ - lambdo_api_service_client::LambdoApiServiceClient, Code, StatusMessage, +use super::{ + grpc_definitions::{lambdo_api_service_client::LambdoApiServiceClient, Code, StatusMessage}, + ClientTrait, SelfCreatingClientTrait, }; pub struct Client { client: LambdoApiServiceClient, } -impl Client { - pub async fn new(gprc_host: IpAddr, port: u16) -> Self { +#[tonic::async_trait] +impl SelfCreatingClientTrait for Client { + async fn new(gprc_host: IpAddr, port: u16) -> Self { info!("Connecting to gRPC server at {}:{}", gprc_host, port); let mut counter = 0; @@ -34,8 +36,11 @@ impl Client { panic!("Failed to connect to gRPC server"); } +} - pub async fn register(&mut self, port: u16) -> Result { +#[tonic::async_trait] +impl ClientTrait for Client { + async fn register(&mut self, port: u16) -> Result { info!("Registering to lambdo.."); let register_response = self .client @@ -49,7 +54,7 @@ impl Client { } } - pub async fn status(&mut self, id: String, code: Code) -> Result<()> { + async fn status(&mut self, id: String, code: Code) -> Result<()> { self.client .status(StatusMessage { id, diff --git a/agent/lib/src/api/mod.rs b/agent/lib/src/api/mod.rs index 35cb39d..0ac390e 100644 --- a/agent/lib/src/api/mod.rs +++ b/agent/lib/src/api/mod.rs @@ -1,4 +1,24 @@ +use std::net::IpAddr; + +use anyhow::Result; +use grpc_definitions::Code; + pub mod client; #[rustfmt::skip] pub mod grpc_definitions; pub mod server; + +/// Client trait +/// This trait is used to abstract the gRPC client +#[tonic::async_trait] +pub trait ClientTrait: Send + Sync { + async fn status(&mut self, id: String, code: Code) -> Result<()>; + async fn register(&mut self, port: u16) -> Result; +} + +/// SelfCreatingClientTrait trait +/// This trait is used to abstract the gRPC client creation +#[tonic::async_trait] +pub trait SelfCreatingClientTrait: ClientTrait { + async fn new(grpc_host: IpAddr, port: u16) -> Self; +} diff --git a/agent/lib/src/api/server.rs b/agent/lib/src/api/server.rs index 7b2c6f8..0a751df 100644 --- a/agent/lib/src/api/server.rs +++ b/agent/lib/src/api/server.rs @@ -4,24 +4,26 @@ use log::{debug, error, info, trace}; use tokio::sync::Mutex; use tonic::{Request, Response, Status}; -use crate::{config::AgentConfig, runner_engine}; +use crate::{api::ClientTrait, config::AgentConfig, runner_engine}; use super::{ - client::Client, grpc_definitions::{ lambdo_agent_service_server::LambdoAgentService, Empty, ExecuteRequest, ExecuteResponse, StatusMessage, }, + SelfCreatingClientTrait, }; pub struct LambdoAgentServer { pub config: AgentConfig, - pub client: Arc>, + pub client: Arc>>, pub id: String, } impl LambdoAgentServer { - pub async fn new(config: AgentConfig) -> Self { + pub async fn new( + config: AgentConfig, + ) -> Self { let grpc_remote_host = IpAddr::from_str(&config.grpc.remote_host).unwrap_or_else(|e| { error!("Invalid IP address: {}", config.grpc.remote_host); panic!("{}", e.to_string()) @@ -29,7 +31,7 @@ impl LambdoAgentServer { trace!("gRPC remote host: {}", grpc_remote_host); trace!("Creating gRPC client.."); - let mut client = Client::new(grpc_remote_host, config.grpc.remote_port).await; + let mut client = C::new(grpc_remote_host, config.grpc.remote_port).await; trace!("Registering to gRPC server.."); let id = { @@ -38,7 +40,7 @@ impl LambdoAgentServer { match client.register(config.grpc.local_port).await { Ok(id) => break id, Err(e) => { - error!("Failed to register to gRPC server, {} try: {}", counter, e); + error!("Failed to rese provide us with your discord handle, after joining our servergister to gRPC server, {} try: {}", counter, e); counter += 1; if counter >= 10 { panic!("Failed to register to gRPC server"); @@ -63,7 +65,7 @@ impl LambdoAgentServer { Self { config, - client: Arc::new(Mutex::new(client)), + client: Arc::new(Mutex::new(Box::new(client))), id, } } @@ -84,7 +86,8 @@ impl LambdoAgentService for LambdoAgentServer { let request = request.into_inner(); debug!("Received request: {:?}", request); - let mut runner_engine = runner_engine::service::RunnerEngine::new(request); + let mut runner_engine = + runner_engine::service::RunnerEngine::new(request, &self.config.workspace_path); let mut self_client = self.client.lock().await; if let Err(e) = runner_engine.create_workspace() { @@ -122,3 +125,106 @@ impl LambdoAgentService for LambdoAgentServer { } } } + +#[cfg(test)] +mod test { + use super::super::grpc_definitions::Code; + use crate::{ + api::{ + grpc_definitions::{ + lambdo_agent_service_server::LambdoAgentService, Empty, ExecuteRequest, + ExecuteRequestStep, + }, + server::LambdoAgentServer, + ClientTrait, SelfCreatingClientTrait, + }, + config::{AgentConfig, GRPCConfig}, + }; + use anyhow::Result; + use tonic::Request; + + struct MockClient; + + #[tonic::async_trait] + impl ClientTrait for MockClient { + async fn register(&mut self, _local_port: u16) -> Result { + Ok("test".to_string()) + } + + async fn status(&mut self, _id: String, _code: Code) -> Result<()> { + Ok(()) + } + } + + #[tonic::async_trait] + impl SelfCreatingClientTrait for MockClient { + async fn new(_grpc_host: std::net::IpAddr, _port: u16) -> Self { + MockClient + } + } + + #[tokio::test] + async fn status_unimplemented() { + let config = AgentConfig { + apiVersion: "lambdo.io/v1alpha1".to_string(), + kind: "AgentConfig".to_string(), + grpc: GRPCConfig { + remote_port: 50051, + remote_host: "127.0.0.1".to_string(), + local_host: "127.0.0.1".to_string(), + local_port: 50051, + }, + workspace_path: tempfile::tempdir() + .unwrap() + .into_path() + .to_str() + .unwrap() + .to_string(), + }; + + let server = LambdoAgentServer::new::(config).await; + let status = server.status(Request::new(Empty {})).await; + + assert!(status.is_err()); + } + + #[tokio::test] + async fn execute_works() { + let config = AgentConfig { + apiVersion: "lambdo.io/v1alpha1".to_string(), + kind: "AgentConfig".to_string(), + grpc: GRPCConfig { + remote_port: 50051, + remote_host: "127.0.0.1".to_string(), + local_host: "127.0.0.1".to_string(), + local_port: 50051, + }, + workspace_path: tempfile::tempdir() + .unwrap() + .into_path() + .to_str() + .unwrap() + .to_string(), + }; + + let server = LambdoAgentServer::new::(config).await; + let execute = server + .execute(Request::new(ExecuteRequest { + id: "test".to_string(), + files: vec![], + steps: vec![ExecuteRequestStep { + command: "echo -n 'This is stdout' && echo -n 'This is stderr' >&2 && exit 1" + .to_string(), + enable_output: true, + }], + })) + .await; + + assert!(execute.is_ok()); + + let execution_recap = execute.unwrap().into_inner(); + + assert_eq!(execution_recap.clone().steps[0].stdout, "This is stdout"); + assert_eq!(execution_recap.steps[0].stderr, "This is stderr"); + } +} diff --git a/agent/lib/src/config.rs b/agent/lib/src/config.rs index f3054dc..c92a4a1 100644 --- a/agent/lib/src/config.rs +++ b/agent/lib/src/config.rs @@ -15,6 +15,10 @@ const fn default_local_port() -> u16 { 0 } +fn default_workspace_path() -> String { + std::env::temp_dir().to_str().unwrap().to_string() +} + #[derive(Error, Debug)] pub enum AgentConfigError { #[error("cannot load config file")] @@ -37,6 +41,9 @@ pub struct AgentConfig { /// The gRPC configuration #[serde(default = "default_grpc")] pub grpc: GRPCConfig, + /// The workspace where the agent will store the files of Requests and their resulting files + #[serde(default = "default_workspace_path")] + pub workspace_path: String, } #[derive(Serialize, Deserialize, PartialEq, Debug)] diff --git a/agent/lib/src/runner_engine/service.rs b/agent/lib/src/runner_engine/service.rs index 64eaf55..1723a40 100644 --- a/agent/lib/src/runner_engine/service.rs +++ b/agent/lib/src/runner_engine/service.rs @@ -10,26 +10,31 @@ use std::{ process::Command, }; -/// The path where the workspace will be created -const WORKSPACE_PATH: &str = "/tmp"; - -/// The RunnerEngine API +/// The RunnerEngine API, responsible for running a request message +#[derive(Clone, Debug)] pub struct RunnerEngine { + /// the request message pub request_message: ExecuteRequest, + /// The root path of the workspace + pub root_path: PathBuf, } impl RunnerEngine { - /// Create a new instance of RunnerEngine + /// Create a new instance of RunnerEngine and set the root path to your system's temp folder/directory /// /// # Arguments /// /// * `request_message` - The request message + /// * `root_path` - The root path of the workspace /// /// # Returns /// /// * `Self` - The new instance of RunnerEngine - pub fn new(request_message: ExecuteRequest) -> Self { - Self { request_message } + pub fn new(request_message: ExecuteRequest, root_path: &str) -> Self { + Self { + request_message, + root_path: PathBuf::from(root_path), + } } /// Create the workspace for the code execution @@ -42,14 +47,13 @@ impl RunnerEngine { // Create a vector of FileModel and a root path let mut file_models: Vec = Vec::new(); - let root_path = PathBuf::from(WORKSPACE_PATH); self.request_message.files.iter().for_each(|file| { let mut file_path = PathBuf::from(&file.filename); file_path.pop(); // Add `/tmp` before each path - file_path = root_path.join(file_path); + file_path = self.root_path.join(file_path); // Take the file name and add it to the vector of files let file_name = Path::file_name(Path::new(&file.filename)); @@ -169,7 +173,7 @@ impl RunnerEngine { let child_process = Command::new("/bin/sh") .args(["-c", command]) - .current_dir(WORKSPACE_PATH) + .current_dir(self.root_path.clone()) .output() .map_err(|e| anyhow!("Failed to spawn command : {}", e))?; @@ -194,71 +198,83 @@ mod tests { use crate::api::grpc_definitions::{ExecuteRequestStep, FileModel}; use super::*; - use rand::random; use std::fs::File; use std::io::Read; - /// Generate a random string - /// - /// # Arguments - /// - /// * `len` - The length of the string - /// - /// # Returns - /// - /// * `String` - The random string - fn native_rand_string(len: usize) -> String { - let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; - let mut string = String::new(); - - for _ in 0..len { - string.push( - chars - .chars() - .nth(random::() % (chars.len() - 1)) - .unwrap(), - ); - } + use tempfile::tempdir; - string + #[test] + fn run_one_works_with_ouputs_and_code() { + let res = RunnerEngine::new( + ExecuteRequest { + id: "".to_string(), + files: vec![], + steps: vec![], + }, + tempdir().unwrap().path().to_str().unwrap(), + ) + .run_one("echo -n 'This is stdout' && echo -n 'This is stderr' >&2 && exit 1"); + + assert!(res.is_ok()); + + let code_return = res.unwrap(); + + assert_eq!(code_return.stdout, "This is stdout"); + assert_eq!(code_return.stderr, "This is stderr"); + assert_eq!(code_return.exit_code, 1); } /// Test the creation of a file #[test] fn workload_runs_correctly() { + let tempdir = tempfile::tempdir(); + let temp_dir = &tempdir.unwrap(); + let path = temp_dir.path(); + let files: Vec = Vec::new(); - let mut steps: Vec = Vec::new(); - let step = ExecuteRequestStep { - command: "echo 'This is stdout' && echo 'This is stderr' >&2".to_string(), - enable_output: true, - }; - steps.push(step); + let steps: Vec = vec![ + ExecuteRequestStep { + command: "echo 'This is stdout' && echo 'This is stderr' >&2".to_string(), + enable_output: true, + }, + ExecuteRequestStep { + command: "echo 'This is stdout' && echo 'This is stderr' >&2 && exit 1".to_string(), + enable_output: false, + }, + ]; + let request_data = ExecuteRequest { id: "4bf68974-c315-4c41-aee2-3dc2920e76e9".to_string(), files, steps, }; - let mut api = RunnerEngine::new(request_data); + let mut api = RunnerEngine::new(request_data, path.as_os_str().to_str().unwrap()); let res = api.run().unwrap(); assert_eq!(res.steps[0].exit_code, 0); assert_eq!(res.steps[0].stderr, "This is stderr\n"); assert_eq!(res.steps[0].stdout, "This is stdout\n"); + + assert_eq!(res.steps[1].exit_code, 1); + assert_eq!(res.steps[1].stderr, "This is stderr\n"); + assert!(res.steps[1].stdout.is_empty()); + assert_eq!(res.id, "4bf68974-c315-4c41-aee2-3dc2920e76e9"); } /// Test the execution of a command with a workspace #[test] fn workspace_created_sucessfully() { - let mut base_dir = PathBuf::from(WORKSPACE_PATH); - base_dir.push(native_rand_string(20)); - base_dir.push("main.sh"); - let path = base_dir.into_os_string().into_string().unwrap(); + let tempdir = tempfile::tempdir(); + let temp_dir = &tempdir.unwrap(); + let path = temp_dir.path(); + + let filename = path.join("main.sh").to_str().unwrap().to_string(); let files: Vec = vec![FileModel { - filename: path.clone(), + filename: filename.clone(), content: "Hello World!".to_string(), }]; let steps: Vec = Vec::new(); @@ -268,12 +284,14 @@ mod tests { steps, }; - RunnerEngine::new(request_data).create_workspace().unwrap(); + RunnerEngine::new(request_data, path.as_os_str().to_str().unwrap()) + .create_workspace() + .unwrap(); assert!(Path::new(&path).exists()); //Check that the file contains the specified content - let mut file = File::open(&path).unwrap(); + let mut file = File::open(filename).unwrap(); let mut buffer = [0; 12]; file.read_exact(&mut buffer[..]).unwrap(); diff --git a/agent/src/main.rs b/agent/src/main.rs index 0d5514f..2cfb853 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,6 +1,6 @@ use agent_lib::{ api::{ - grpc_definitions::lambdo_agent_service_server::LambdoAgentServiceServer, + client::Client, grpc_definitions::lambdo_agent_service_server::LambdoAgentServiceServer, server::LambdoAgentServer, }, config::AgentConfig, @@ -57,7 +57,7 @@ async fn main() -> Result<()> { tonic::transport::Server::builder() .add_service(LambdoAgentServiceServer::new( - LambdoAgentServer::new(config).await, + LambdoAgentServer::new::(config).await, )) .serve_with_incoming(tcp_stream) .await