From beb46181383a9c01b207a2f7e36d598e51e65c82 Mon Sep 17 00:00:00 2001 From: iverly Date: Fri, 17 Feb 2023 15:30:33 +0100 Subject: [PATCH 1/7] feat: add default logger & anyhow errors Signed-off-by: iverly --- .gitignore | 2 +- Cargo.toml | 4 ++++ src/lib.rs | 9 +++++++++ src/main.rs | 12 ++++++++++-- 4 files changed, 24 insertions(+), 3 deletions(-) create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore index 5e50d72..5e71d8a 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,4 @@ Cargo.lock # IDE files .idea/ -.vscode/ \ No newline at end of file +.vscode/ diff --git a/Cargo.toml b/Cargo.toml index 5cf7170..31e9e2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +log = "0.4.0" +env_logger = "0.8.4" +anyhow = "1.0.62" +thiserror = "1.0.32" diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..d00341d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,9 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum LambdoError { + #[error(transparent)] + Other(#[from] anyhow::Error), + #[error("unknown lambdo error")] + Unknown, +} diff --git a/src/main.rs b/src/main.rs index e7a11a9..c8c8a3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,11 @@ -fn main() { - println!("Hello, world!"); +use log::info; + +fn main() -> Result<(), Box> { + env_logger::init(); + info!("starting up"); + + // todo: do something + + info!("shutting down"); + Ok(()) } From 7f54aca78a3f02844a0cf59fa3a33557f70a154a Mon Sep 17 00:00:00 2001 From: iverly Date: Fri, 17 Feb 2023 15:47:59 +0100 Subject: [PATCH 2/7] feat: add clap and parse input Signed-off-by: iverly --- Cargo.toml | 1 + src/main.rs | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 31e9e2a..cce2881 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,4 @@ log = "0.4.0" env_logger = "0.8.4" anyhow = "1.0.62" thiserror = "1.0.32" +clap = { version="4.1.6", features=["derive"] } diff --git a/src/main.rs b/src/main.rs index c8c8a3e..87f0182 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,18 @@ +use clap::Parser; use log::info; +#[derive(Parser)] +#[clap( + version = "0.1", + author = "Polytech Montpellier - DevOps", + about = "A Serverless runtime in Rust" +)] +pub struct LambdoOpts {} + fn main() -> Result<(), Box> { env_logger::init(); + LambdoOpts::parse(); + info!("starting up"); // todo: do something From aee324c14a1d0c44a1628839eeafbfb4ddbed8d6 Mon Sep 17 00:00:00 2001 From: iverly Date: Fri, 17 Feb 2023 16:19:10 +0100 Subject: [PATCH 3/7] feat: add config structure & load from path Signed-off-by: iverly --- Cargo.toml | 2 + src/config.rs | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 + src/main.rs | 20 +++++++-- 4 files changed, 136 insertions(+), 4 deletions(-) create mode 100644 src/config.rs diff --git a/Cargo.toml b/Cargo.toml index cce2881..cce7733 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,6 @@ log = "0.4.0" env_logger = "0.8.4" anyhow = "1.0.62" thiserror = "1.0.32" +serde = { version = "1.0", features = ["derive"] } +serde_yaml = "0.9" clap = { version="4.1.6", features=["derive"] } diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..ad43c6f --- /dev/null +++ b/src/config.rs @@ -0,0 +1,116 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::{ + fs::File, + io::{self, BufReader}, +}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum LambdoConfigError { + #[error("cannot load config file")] + Load(#[from] io::Error), + #[error("cannot parse config file")] + Parse(#[from] serde_yaml::Error), + #[error("unsupported config kind")] + KindNotSupported, + #[error("unsupported config api version")] + VersionNotSupported, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[allow(non_snake_case)] +pub struct LambdoConfig { + /// The api version of the lambdo config file + apiVersion: String, + /// The kind of the lambdo config file + kind: String, + /// The lambdo vmm configuration + vmm: LambdoVMMConfig, + /// The lambdo api configuration + api: LambdoApiConfig, + /// The lambdo agent configuration + agent: LambdoAgentConfig, + /// The lambdo languages configuration + languages: Vec, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct LambdoVMMConfig { + /// The kernel path to use for the vmm + kernel: String, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct LambdoApiConfig { + /// The port on which the API server will listen + host: String, + /// The host on which the API server will listen + port: u16, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct LambdoAgentConfig { + /// The path to the agent binary + path: String, + /// The path to the agent configuration file + config: String, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct LambdoLanguageConfig { + /// The name of the language + name: String, + /// The version of the language + version: String, + /// The initramfs path to use for the language + initramfs: String, + /// The steps to execute + steps: Vec, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct LambdoLanguageStepConfig { + /// The name of the step + name: Option, + /// The command to execute + command: String, + /// The output configuration + output: LambdoLanguageStepOutputConfig, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct LambdoLanguageStepOutputConfig { + /// Is the output enabled ? + enabled: bool, + /// Is the output a debug output ? + debug: bool, +} + +impl LambdoConfig { + /// Load a LambdoConfig from a file. + /// + /// Arguments: + /// + /// * `path`: The path to the config file. + /// + /// Returns: + /// + /// A Result + pub fn load(path: &str) -> Result { + let file = File::open(path).map_err(LambdoConfigError::Load)?; + let reader = BufReader::new(file); + let config: LambdoConfig = + serde_yaml::from_reader(reader).map_err(LambdoConfigError::Parse)?; + + if config.kind != "Config" { + return Err(LambdoConfigError::KindNotSupported.into()); + } + + if config.apiVersion != "lambdo.io/v1alpha1" { + return Err(LambdoConfigError::VersionNotSupported.into()); + } + + Ok(config) + } +} diff --git a/src/lib.rs b/src/lib.rs index d00341d..4bc9f4c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ use thiserror::Error; +pub mod config; + #[derive(Error, Debug)] pub enum LambdoError { #[error(transparent)] diff --git a/src/main.rs b/src/main.rs index 87f0182..5926308 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use clap::Parser; -use log::info; +use lambdo::config::LambdoConfig; +use log::{debug, info, trace}; #[derive(Parser)] #[clap( @@ -7,13 +8,24 @@ use log::info; author = "Polytech Montpellier - DevOps", about = "A Serverless runtime in Rust" )] -pub struct LambdoOpts {} +pub struct LambdoOpts { + /// Config file path + #[clap(short, long, default_value = "/etc/lambdo/config.yaml")] + config: String, +} fn main() -> Result<(), Box> { env_logger::init(); - LambdoOpts::parse(); + let options = LambdoOpts::parse(); + + info!("starting up ..."); - info!("starting up"); + debug!("loading config file at {}", options.config); + let config = LambdoConfig::load(options.config.as_str())?; + trace!( + "config file loaded successfully with content: {:#?}", + config + ); // todo: do something From 137ff5ec7ecf2fcbc24274426e02080139111cda Mon Sep 17 00:00:00 2001 From: iverly Date: Wed, 29 Mar 2023 17:46:05 +0200 Subject: [PATCH 4/7] chore: update lumper to the latest version Signed-off-by: iverly --- lumper | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lumper b/lumper index 47e294f..8f1c594 160000 --- a/lumper +++ b/lumper @@ -1 +1 @@ -Subproject commit 47e294fcddc4136feda480da802d0f08135295bf +Subproject commit 8f1c5948d754fb484e832bdefafbdf165a90dc2b From 3084b7ed5ac43972731f15880b8a5428c1feabcb Mon Sep 17 00:00:00 2001 From: GridexX Date: Mon, 17 Apr 2023 18:02:55 +0200 Subject: [PATCH 5/7] feat(agent): implement api, send status message Signed-off-by: GridexX --- agent/Cargo.toml | 11 ++ agent/lib/src/api/comms.rs | 71 ++++++++++ agent/lib/src/api/mod.rs | 3 + agent/lib/src/api/model.rs | 187 +++++++++++++++++++++++++++ agent/lib/src/api/service.rs | 244 +++++++++++++++++++++++++++++++++++ agent/lib/src/lib.rs | 2 + agent/src/main.rs | 24 +++- 7 files changed, 540 insertions(+), 2 deletions(-) create mode 100644 agent/lib/src/api/comms.rs create mode 100644 agent/lib/src/api/mod.rs create mode 100644 agent/lib/src/api/model.rs create mode 100644 agent/lib/src/api/service.rs create mode 100644 agent/lib/src/lib.rs diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 884007e..bbe1836 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -6,3 +6,14 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +log = "0.4.0" +env_logger = "0.10.0" +anyhow = "1.0.69" +serialport = "4.2.0" +serde = { version = "1.0.152", features = ["derive"] } +serde_json = "1.0.93" +serde_yaml = "0.9" + +[lib] +name = "agent_lib" +path = "lib/src/lib.rs" \ No newline at end of file diff --git a/agent/lib/src/api/comms.rs b/agent/lib/src/api/comms.rs new file mode 100644 index 0000000..d904bad --- /dev/null +++ b/agent/lib/src/api/comms.rs @@ -0,0 +1,71 @@ +// This message is sent to the API server to indicate wether +// the agent is ready or not to receive messages. + +pub const MESSAGE_SIZE_NB_BYTES: usize = 8; + +/// Represents a message sent by the agent +pub struct Message { + /// These are characters e.g. 00002048 + pub message_size: [u8; MESSAGE_SIZE_NB_BYTES], + /// stringified json, vec because size is unknown + pub message: Vec, +} + +impl Message { + pub fn new(message_to_send: String) -> Self { + let mut message_size = [0; MESSAGE_SIZE_NB_BYTES]; + let message = message_to_send.as_bytes().to_vec(); + + let string_size = format!("{:0>8}", message.len()); + //We can't call directly as bytes as both &str and String sizes are not known at + //compile time unlike message_size + + for (i, c) in string_size.chars().enumerate() { + message_size[i] = c as u8; + } + + Self { + message_size, + message, + } + } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + bytes.extend_from_slice(&self.message_size); + bytes.extend_from_slice(&self.message); + bytes + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn new_message_well_encoded() { + let message_data = "Hello world".to_string(); + let message = Message::new(message_data); + assert_eq!( + message.message, + [72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100] + ); + assert_eq!(message.message_size, [48, 48, 48, 48, 48, 48, 49, 49]); + + assert_eq!( + message.to_bytes(), + [48, 48, 48, 48, 48, 48, 49, 49, 72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100] + ); + } + + #[test] + fn message_size_badly_encoded() { + let message_data = "Hello world".to_string(); + let message = Message::new(message_data); + assert_eq!( + message.message, + [72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100] + ); + assert_ne!(message.message_size, [48, 48, 48, 48, 48, 48, 49, 50]); // should be 11, is 12 + } +} diff --git a/agent/lib/src/api/mod.rs b/agent/lib/src/api/mod.rs new file mode 100644 index 0000000..e65c816 --- /dev/null +++ b/agent/lib/src/api/mod.rs @@ -0,0 +1,3 @@ +pub mod comms; +pub mod model; +pub mod service; diff --git a/agent/lib/src/api/model.rs b/agent/lib/src/api/model.rs new file mode 100644 index 0000000..33754ca --- /dev/null +++ b/agent/lib/src/api/model.rs @@ -0,0 +1,187 @@ +use serde::{Deserialize, Serialize}; + +/// Represents a file to be included in the workspace +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub struct FileModel { + /// Name of the file, paths relative to the workspace + pub filename: String, + /// Content of the file + pub content: String, +} + +/// Identifies the type of the message +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub enum Type { + /// Status message to indicate that the agent is ready + #[serde(rename = "status")] + Status, + /// Request message + #[serde(rename = "request")] + Request, + /// Response message answering to a request message + #[serde(rename = "response")] + Response, +} + +/// Code to tell what the Request/Response message is about +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)] +pub enum Code { + /// Represents a request to run the code or a response to such request + #[serde(rename = "run")] + Run, + /// Agent is ready to communicate + #[serde(rename = "ready")] + Ready, +} + +/// Represents a Status message +#[derive(Deserialize, Serialize, Debug)] +pub struct StatusMessage { + /// Type of the message + pub r#type: Type, + /// Code of the message + pub code: Code, +} + +impl StatusMessage { + pub fn new(code: Code) -> StatusMessage { + StatusMessage { + // r#type is a reserved keyword in Rust, so we need to use the raw identifier syntax + r#type: Type::Status, + code, + } + } +} + +impl Default for StatusMessage { + fn default() -> Self { + Self::new(Code::Ready) + } +} + +/// Serializes an Option as a String by returning an empty string if the Option is None +fn serialize_optionnal_string(value: &Option, serializer: S) -> Result +where + S: serde::Serializer, +{ + match value { + Some(v) => serializer.serialize_str(v), + None => serializer.serialize_str(""), + } +} + +/// Represents the output of a step +#[derive(Deserialize, Serialize, Debug)] +pub struct ResponseStep { + /// Command that was run + pub command: String, + /// Exit code of the command + #[serde(alias = "exitCode")] + pub exit_code: i32, + /// Stdout of the command. If it is None, it will be serialized as an empty string + /// to avoid api crashes + #[serde(serialize_with = "serialize_optionnal_string")] + pub stdout: Option, + /// Stderr of the command + pub stderr: String, +} + +impl ResponseStep { + pub fn new( + command: String, + exit_code: i32, + stdout: Option, + stderr: String, + ) -> ResponseStep { + ResponseStep { + command, + exit_code, + stdout, + stderr, + } + } +} + +/// Contains the id of the request and the result of all steps +#[derive(Deserialize, Serialize, Debug)] +pub struct ResponseData { + /// Id of the request (UUID) + pub id: String, + /// Result of all steps + pub steps: Vec, +} + +impl ResponseData { + pub fn new(id: String, steps: Vec) -> ResponseData { + ResponseData { id, steps } + } +} + +/// Represents a Response message with code Type::Run, meaning that it is a response to a run code request +#[derive(Deserialize, Serialize, Debug)] +pub struct ResponseMessage { + /// Type of the message + pub r#type: Type, + /// Code of the message + pub code: Code, + /// Data of the message + pub data: ResponseData, +} + +impl ResponseMessage { + pub fn new(data: ResponseData) -> ResponseMessage { + ResponseMessage { + r#type: Type::Response, + code: Code::Run, + data, + } + } +} + +/// Represent a step in the request with type Type::Run +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] +pub struct RequestStep { + /// Command to run + pub command: String, + /// Whether the stdout should be returned or not (stderr will alaways be) + #[serde(alias = "enableOutput")] + pub enable_output: bool, +} + +/// Represents the data of a request message with type Type::Run +#[derive(Deserialize, Serialize, Debug)] +pub struct RequestData { + /// Id of the request (UUID) + pub id: String, + /// Files to be included in the workspace, paths relative to the workspace + pub files: Vec, + /// Steps to be executed + pub steps: Vec, +} + +impl RequestData { + pub fn new(id: String, files: Vec, steps: Vec) -> RequestData { + RequestData { id, files, steps } + } +} + +/// Represents a Request message with type Type::Run +#[derive(Deserialize, Serialize, Debug)] +pub struct RequestMessage { + /// Type of the message + pub r#type: Type, + /// Code of the message + pub code: Code, + /// Data of the message + pub data: RequestData, +} + +impl RequestMessage { + pub fn new(data: RequestData) -> RequestMessage { + RequestMessage { + r#type: Type::Request, + code: Code::Run, + data, + } + } +} diff --git a/agent/lib/src/api/service.rs b/agent/lib/src/api/service.rs new file mode 100644 index 0000000..53efb6d --- /dev/null +++ b/agent/lib/src/api/service.rs @@ -0,0 +1,244 @@ +use anyhow::{anyhow, Result}; +use log::{debug, error, info, trace}; + +use serialport::SerialPort; + +use super::comms::{Message, MESSAGE_SIZE_NB_BYTES}; +use super::model::{Code, RequestMessage, ResponseMessage, StatusMessage}; + +pub struct Api { + serial_path: String, + + serial_port: Box, // So we don't open it multiple times +} + +impl Api { + pub fn new(serial_path: String, serial_baud_rate: u32) -> Self { + Self { + serial_path: serial_path.clone(), + serial_port: serialport::new(serial_path, serial_baud_rate) + .open() + .unwrap(), + } + } + + pub fn read_from_serial(&mut self) -> Result { + info!("Reading from serial port: {}", self.serial_path); + + // Create a buffer to hold the data + let mut size_buffer = [0u8; MESSAGE_SIZE_NB_BYTES]; + + let mut buf = [0; 128]; + let mut bytes_read: usize = 0; + + // Create the final vector to hold the data + let mut data_received: Vec = Vec::new(); + + //we read the buffer and retrieve the first 8 bytes which are the size of the message + while bytes_read < MESSAGE_SIZE_NB_BYTES { + match self.serial_port.read(&mut size_buffer) { + Ok(t) => { + if t > 0 { + bytes_read += t; + data_received.extend_from_slice(&size_buffer[..t]); + debug!("Received {} bytes", t); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => (), + Err(e) => error!("{:?}", e), + } + } + + let size_string = String::from_utf8(data_received.clone()) + .map_err(|e| anyhow!("Failed to get message size as string: {}", e))?; + + trace!("Size string: {}", size_string); + + let data_size = size_string + .parse::() + .map_err(|e| anyhow!("Failed to parse length of message: {}", e))?; + + // We clean up the vector to only keep the message + data_received.drain(..MESSAGE_SIZE_NB_BYTES); + + bytes_read = 0; + + while bytes_read < data_size { + match self.serial_port.read(&mut buf) { + Ok(t) => { + if t > 0 { + bytes_read += t; + data_received.extend_from_slice(&buf[..t]); + debug!("Received {} bytes", t); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => (), + Err(e) => error!("{:?}", e), + } + } + + debug!("Final received data: {:?}", data_received); + debug!("Total bytes read {:?}", bytes_read); + + let code_entry = self.parse_json_payload(&data_received)?; + + info!("Code entry: {:?}", code_entry); + + Ok(code_entry) + } + + pub fn parse_json_payload(&mut self, data: &[u8]) -> Result { + // Convert the data vector to a RequestMessage struct + let request_message: RequestMessage = serde_json::from_slice(data) + .map_err(|e| anyhow!("Failed to parse JSON payload: {}", e))?; + + info!("Code request message: {:?}", request_message); + + Ok(request_message) + } + + pub fn send_status_message(&mut self) -> Result<()> { + let status_message: StatusMessage = StatusMessage::new(Code::Ready); + let status_message_json = serde_json::to_string(&status_message) + .map_err(|e| anyhow!("Failed to serialize status message: {}", e))?; + self.write_to_serial(&status_message_json)?; + Ok(()) + } + + pub fn send_response_message(&mut self, response_message: ResponseMessage) -> Result<()> { + let code_json = serde_json::to_string(&response_message) + .map_err(|e| anyhow!("Failed to stringify response message : {}", e))?; + + // Write the JSON to the serial port + self.write_to_serial(&code_json)?; + + info!( + "Response message written to serial port: {:?}", + response_message + ); + Ok(()) + } + + pub fn write_to_serial(&mut self, data: &str) -> Result<()> { + info!("Writing to serial port: {}", self.serial_path); + + // Convert the string to a byte array + let message = Message::new(data.to_string()).to_bytes(); + let buf = message.as_slice(); + + // Write the byte array to the serial port + self.serial_port + .write_all(buf) + .map_err(|e| anyhow!("Failed to write to serial port: {}", e))?; + + // In order to still be readable by ``readline`` on the api side, we add a carriage return + // (not included in the message size) + self.serial_port + .write("\r\n".as_bytes()) + .map_err(|e| anyhow!("Failed to write to serial port: {}", e))?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use anyhow::Result; + + use crate::api::model::{Code, FileModel, RequestStep, Type}; + + use super::Api; + + #[test] + fn test_parse_json_payload() -> Result<()> { + let mut api = Api::new("".to_string(), 0); + + // Data vector with the following JSON payload: + // { + // "type": "Request", + // "code": "Run", + // "data": { + // "id": "4bf68974-c315-4c41-aee2-3dc2920e76e9", + // "files": [ + // { + // "filename": "src/index.js", + // "content": "console.log('Hello World!');" + // } + // ], + // "steps": [ + // { + // "command": "node src/index.js", + // "enable_output": true + // } + // ] + // } + // } + + let data = [ + 123, 10, 32, 32, 34, 116, 121, 112, 101, 34, 58, 32, 34, 82, 101, 113, 117, 101, 115, + 116, 34, 44, 10, 32, 32, 34, 99, 111, 100, 101, 34, 58, 32, 34, 82, 117, 110, 34, 44, + 10, 32, 32, 34, 100, 97, 116, 97, 34, 58, 32, 123, 10, 32, 32, 32, 32, 34, 105, 100, + 34, 58, 32, 34, 52, 98, 102, 54, 56, 57, 55, 52, 45, 99, 51, 49, 53, 45, 52, 99, 52, + 49, 45, 97, 101, 101, 50, 45, 51, 100, 99, 50, 57, 50, 48, 101, 55, 54, 101, 57, 34, + 44, 10, 32, 32, 32, 32, 34, 102, 105, 108, 101, 115, 34, 58, 32, 91, 10, 32, 32, 32, + 32, 32, 32, 123, 10, 32, 32, 32, 32, 32, 32, 32, 32, 34, 102, 105, 108, 101, 110, 97, + 109, 101, 34, 58, 32, 34, 115, 114, 99, 47, 105, 110, 100, 101, 120, 46, 106, 115, 34, + 44, 10, 32, 32, 32, 32, 32, 32, 32, 32, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, + 32, 34, 99, 111, 110, 115, 111, 108, 101, 46, 108, 111, 103, 40, 39, 72, 101, 108, 108, + 111, 32, 87, 111, 114, 108, 100, 33, 39, 41, 59, 34, 10, 32, 32, 32, 32, 32, 32, 125, + 10, 32, 32, 32, 32, 93, 44, 10, 32, 32, 32, 32, 34, 115, 116, 101, 112, 115, 34, 58, + 32, 91, 10, 32, 32, 32, 32, 32, 32, 123, 10, 32, 32, 32, 32, 32, 32, 32, 32, 34, 99, + 111, 109, 109, 97, 110, 100, 34, 58, 32, 34, 110, 111, 100, 101, 32, 115, 114, 99, 47, + 105, 110, 100, 101, 120, 46, 106, 115, 34, 44, 10, 32, 32, 32, 32, 32, 32, 32, 32, 34, + 101, 110, 97, 98, 108, 101, 95, 111, 117, 116, 112, 117, 116, 34, 58, 32, 116, 114, + 117, 101, 10, 32, 32, 32, 32, 32, 32, 125, 10, 32, 32, 32, 32, 93, 10, 32, 32, 125, 10, + 125, + ]; + + let request_message = api.parse_json_payload(&data)?; + + let files = vec![FileModel { + filename: "src/index.js".to_string(), + content: "console.log('Hello World!');".to_string(), + }]; + + let steps = vec![RequestStep { + command: "node src/index.js".to_string(), + enable_output: true, + }]; + + assert_eq!(request_message.r#type, Type::Request); + assert_eq!(request_message.code, Code::Run); + assert_eq!( + request_message.data.id, + "4bf68974-c315-4c41-aee2-3dc2920e76e9" + ); + assert_eq!(request_message.data.files[0], files[0]); + assert_eq!(request_message.data.steps[0], steps[0]); + Ok(()) + } + + #[test] + fn test_parse_json_payload_failed() -> Result<()> { + let mut api = Api::new("".to_string(), 0); + + // Data vector with missing comma + let data = [ + 123, 10, 32, 32, 34, 102, 105, 108, 101, 34, 58, 32, 91, 10, 32, 32, 32, 32, 123, 10, + 32, 32, 32, 32, 32, 32, 34, 102, 105, 108, 101, 110, 97, 109, 101, 34, 58, 32, 34, 116, + 101, 115, 116, 46, 112, 121, 34, 44, 10, 32, 32, 32, 32, 32, 32, 34, 99, 111, 110, 116, + 101, 110, 116, 34, 58, 32, 34, 112, 114, 105, 110, 116, 40, 39, 72, 101, 108, 108, 111, + 32, 87, 111, 114, 108, 100, 39, 41, 34, 10, 32, 32, 32, 32, 125, 10, 32, 32, 93, 44, + 10, 32, 32, 34, 115, 99, 114, 105, 112, 116, 34, 58, 32, 91, 10, 32, 32, 32, 32, 34, + 112, 121, 116, 104, 111, 110, 51, 32, 116, 101, 115, 116, 46, 112, 121, 34, 10, 32, 32, + 93, 10, 32, 32, 10, 125, 10, + ]; + + let code_entry = api.parse_json_payload(&data); + + assert!(code_entry.is_err()); + + Ok(()) + } +} diff --git a/agent/lib/src/lib.rs b/agent/lib/src/lib.rs new file mode 100644 index 0000000..51316f9 --- /dev/null +++ b/agent/lib/src/lib.rs @@ -0,0 +1,2 @@ +pub mod api; + diff --git a/agent/src/main.rs b/agent/src/main.rs index e7a11a9..e7fd3a9 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,3 +1,23 @@ -fn main() { - println!("Hello, world!"); +use anyhow::{anyhow, Result}; +use agent_lib::{api::service::Api}; +use log::{debug, info, trace}; + +/// Main function +fn main() -> Result<()> { + // Initialize logger + env_logger::init(); + + info!("Starting agent"); + + // Initialize API + let mut api = Api::new("/dev/pts/6".to_string(), 9200); + + // Send status message to serial port + api.send_status_message()?; + + // Read request message from serial port + let request_message = api.read_from_serial()?; + + info!("Stopping agent"); + Ok(()) } From 53030466390b93f73b150b4608a44b99e82fd738 Mon Sep 17 00:00:00 2001 From: GridexX Date: Mon, 17 Apr 2023 18:07:27 +0200 Subject: [PATCH 6/7] feat(agent): add configuration and construct api Signed-off-by: GridexX --- agent/Cargo.toml | 2 ++ agent/lib/src/config.rs | 66 +++++++++++++++++++++++++++++++++++++++++ agent/lib/src/lib.rs | 2 +- agent/src/main.rs | 31 +++++++++++++++++-- 4 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 agent/lib/src/config.rs diff --git a/agent/Cargo.toml b/agent/Cargo.toml index bbe1836..6224608 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -13,6 +13,8 @@ serialport = "4.2.0" serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.93" serde_yaml = "0.9" +clap = { version="4.1.6", features=["derive"] } +thiserror = "1.0.32" [lib] name = "agent_lib" diff --git a/agent/lib/src/config.rs b/agent/lib/src/config.rs new file mode 100644 index 0000000..0f85c27 --- /dev/null +++ b/agent/lib/src/config.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::{ + fs::File, + io::{self, BufReader}, +}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum AgentConfigError { + #[error("cannot load config file")] + Load(#[from] io::Error), + #[error("cannot parse config file")] + Parse(#[from] serde_yaml::Error), + #[error("unsupported config kind")] + KindNotSupported, + #[error("unsupported config api version")] + VersionNotSupported, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[allow(non_snake_case)] +pub struct AgentConfig { + /// The api version of the agent config file + pub apiVersion: String, + /// The kind of the agent config file + pub kind: String, + /// The serial configuration + pub serial: SerialConfig, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct SerialConfig { + /// The path to the serial port + pub path: String, + /// The baud rate to use for the serial port + pub baud_rate: u32, +} + +impl AgentConfig { + /// Load a AgentConfig from a file. + /// + /// Arguments: + /// + /// * `path`: The path to the config file. + /// + /// Returns: + /// + /// A Result + pub fn load(path: &str) -> Result { + let file = File::open(path).map_err(AgentConfigError::Load)?; + let reader = BufReader::new(file); + let config: AgentConfig = + serde_yaml::from_reader(reader).map_err(AgentConfigError::Parse)?; + + if config.kind != "AgentConfig" { + return Err(AgentConfigError::KindNotSupported.into()); + } + + if config.apiVersion != "lambdo.io/v1alpha1" { + return Err(AgentConfigError::VersionNotSupported.into()); + } + + Ok(config) + } +} diff --git a/agent/lib/src/lib.rs b/agent/lib/src/lib.rs index 51316f9..5cf8e65 100644 --- a/agent/lib/src/lib.rs +++ b/agent/lib/src/lib.rs @@ -1,2 +1,2 @@ pub mod api; - +pub mod config; diff --git a/agent/src/main.rs b/agent/src/main.rs index e7fd3a9..2316874 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,7 +1,21 @@ +use agent_lib::{api::service::Api, config::AgentConfig}; use anyhow::{anyhow, Result}; -use agent_lib::{api::service::Api}; +use clap::Parser; use log::{debug, info, trace}; +/// Agent CLI options +#[derive(Parser)] +#[clap( + version = "0.1", + author = "Polytech Montpellier - DevOps", + about = "A Serverless runtime in Rust" +)] +pub struct AgentOpts { + /// Config file path + #[clap(short, long, default_value = "/etc/lambdo/agent/config.yaml")] + config: String, +} + /// Main function fn main() -> Result<()> { // Initialize logger @@ -9,8 +23,21 @@ fn main() -> Result<()> { info!("Starting agent"); + // Parse CLI options + let options = AgentOpts::parse(); + + debug!("loading config file at {}", options.config); + + // Load config file + let config = AgentConfig::load(options.config.as_str())?; + + trace!( + "config file loaded successfully with content: {:#?}", + config + ); + // Initialize API - let mut api = Api::new("/dev/pts/6".to_string(), 9200); + let mut api = Api::new(config.serial.path, config.serial.baud_rate); // Send status message to serial port api.send_status_message()?; From 6fa890b9e1e984b17f63c9cddfacea09d33a506c Mon Sep 17 00:00:00 2001 From: GridexX Date: Mon, 17 Apr 2023 18:13:12 +0200 Subject: [PATCH 7/7] feat(agent): create workspace and launch code Signed-off-by: GridexX --- agent/Cargo.toml | 6 +- agent/lib/src/lib.rs | 1 + agent/lib/src/runner_engine/mod.rs | 2 + agent/lib/src/runner_engine/model.rs | 51 +++++ agent/lib/src/runner_engine/service.rs | 291 +++++++++++++++++++++++++ agent/src/main.rs | 14 +- 6 files changed, 363 insertions(+), 2 deletions(-) create mode 100644 agent/lib/src/runner_engine/mod.rs create mode 100644 agent/lib/src/runner_engine/model.rs create mode 100644 agent/lib/src/runner_engine/service.rs diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 6224608..4d0a601 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -14,8 +14,12 @@ serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.93" serde_yaml = "0.9" clap = { version="4.1.6", features=["derive"] } +unshare = "0.7.0" thiserror = "1.0.32" [lib] name = "agent_lib" -path = "lib/src/lib.rs" \ No newline at end of file +path = "lib/src/lib.rs" + +[dev-dependencies] +rand = "0.8.5" diff --git a/agent/lib/src/lib.rs b/agent/lib/src/lib.rs index 5cf8e65..6ae8a12 100644 --- a/agent/lib/src/lib.rs +++ b/agent/lib/src/lib.rs @@ -1,2 +1,3 @@ pub mod api; pub mod config; +pub mod runner_engine; diff --git a/agent/lib/src/runner_engine/mod.rs b/agent/lib/src/runner_engine/mod.rs new file mode 100644 index 0000000..908db95 --- /dev/null +++ b/agent/lib/src/runner_engine/mod.rs @@ -0,0 +1,2 @@ +pub mod model; +pub mod service; diff --git a/agent/lib/src/runner_engine/model.rs b/agent/lib/src/runner_engine/model.rs new file mode 100644 index 0000000..47358be --- /dev/null +++ b/agent/lib/src/runner_engine/model.rs @@ -0,0 +1,51 @@ +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; + +/// A struct to represent a file in the request message +/// +/// # Attributes +/// +/// * `path` - The path of the file +/// * `file_name` - The name of the file +/// * `content` - The content of the file +#[derive(Deserialize, Serialize, Debug)] +pub struct FileModel { + pub path: PathBuf, + pub file_name: String, + pub content: String, +} + +impl FileModel { + pub fn new(path: PathBuf, file_name: String, content: String) -> Self { + Self { + path, + file_name, + content, + } + } +} + +/// A struct to represent the result of a command +/// +/// # Attributes +/// +/// * `stdout` - The stdout of the command +/// * `stderr` - The stderr of the command +/// * `exit_code` - The exit code of the command +#[derive(Deserialize, Serialize, Debug)] +pub struct CodeReturn { + pub stdout: String, + pub stderr: String, + pub exit_code: i32, +} + +impl CodeReturn { + pub fn new(stdout: String, stderr: String, exit_code: i32) -> Self { + Self { + stdout, + stderr, + exit_code, + } + } +} diff --git a/agent/lib/src/runner_engine/service.rs b/agent/lib/src/runner_engine/service.rs new file mode 100644 index 0000000..e16d075 --- /dev/null +++ b/agent/lib/src/runner_engine/service.rs @@ -0,0 +1,291 @@ +use super::model::CodeReturn; +use crate::{ + api::model::{RequestMessage, ResponseData, ResponseMessage, ResponseStep}, + runner_engine::model::FileModel, +}; +use anyhow::{anyhow, Ok, Result}; +use log::{error, info}; +use std::io::Write; +use std::{ + fs::File, + path::{Path, PathBuf}, + process::Command, +}; + +/// The path where the workspace will be created +const WORKSPACE_PATH: &str = "/tmp"; + +/// The RunnerEngine API +pub struct RunnerEngine { + pub request_message: RequestMessage, +} + +impl RunnerEngine { + /// Create a new instance of RunnerEngine + /// + /// # Arguments + /// + /// * `request_message` - The request message + /// + /// # Returns + /// + /// * `Self` - The new instance of RunnerEngine + pub fn new(request_message: RequestMessage) -> Self { + Self { request_message } + } + + /// Create the workspace for the code execution + /// + /// # Returns + /// + /// * `Result<()>` - Nothing or an error + pub fn create_workspace(&mut self) -> Result<()> { + info!("Creating workspace for code execution"); + + // Create a vector of FileModel and a root path + let mut file_models: Vec = Vec::new(); + let root_path = PathBuf::from(WORKSPACE_PATH); + + self.request_message.data.files.iter().for_each(|file| { + let mut file_path = PathBuf::from(&file.filename); + file_path.pop(); + + // Add `/tmp` before each path + file_path = root_path.join(file_path); + + // Take the file name and add it to the vector of files + let file_name = Path::file_name(Path::new(&file.filename)); + + // Extract the file name from the path and create a FileModel + if let Some(file_name_str) = file_name { + let fns = file_name_str.to_os_string(); + let file_name_string_option = fns.to_str(); + + if let Some(file_name_string) = file_name_string_option { + let file_model = FileModel::new( + file_path, + file_name_string.to_string(), + file.content.clone(), + ); + file_models.push(file_model); + } else { + error!("Failed to convert file name to string"); + } + } else { + error!("Failed to extract file name from path"); + } + }); + + info!("Final file models: {:?}", file_models); + + // For each file model, create the directory and the file + file_models.iter().for_each(|file_model| { + let file_path = file_model.path.clone(); + let file_name = file_model.file_name.clone(); + + // Create the directory + let op_dir = std::fs::create_dir_all(&file_path) + .map_err(|e| anyhow!("Failed to create directory: {}", e)); + if op_dir.is_err() { + error!("Failed to create directory: {:?}", op_dir.err()); + } else { + info!("Directory created: {:?}", file_path); + } + + // Create the file + let file_path = file_path.join(file_name); + let op_file = + File::create(file_path).map_err(|e| anyhow!("Failed to create file: {}", e)); + + if let Err(e) = op_file { + error!("Failed to create file: {:?}", e); + } else { + let mut file = op_file.unwrap(); + info!("File created: {:?}", file); + + // Write the content inside the file + let res = write!(file, "{}", file_model.content); + + if let Err(err) = res { + error!("Failed to write to file: {:?}", err); + } else { + info!("File written: {:?}", file); + } + } + }); + + Ok(()) + } + + /// Run all the steps of the request message + /// + /// # Returns + /// + /// * `Result` - The response message or an error + pub fn run(&mut self) -> Result { + info!("Running all steps"); + let mut steps: Vec = Vec::new(); + + // For each commands in the request, run it + let steps_to_process = self.request_message.data.steps.clone(); + + for step in steps_to_process { + let command = step.command.as_str(); + let code_return = self.run_one(command)?; + + // Hide Stdout if enable_output is false + let stdout = if step.enable_output { + Some(code_return.stdout) + } else { + None + }; + let response_step = ResponseStep::new( + command.to_string(), + code_return.exit_code, + stdout, + code_return.stderr, + ); + + steps.push(response_step); + } + + let data: ResponseData = ResponseData::new(self.request_message.data.id.clone(), steps); + let response_message = ResponseMessage::new(data); + + Ok(response_message) + } + + /// Run a command + /// + /// # Arguments + /// + /// * `command` - The command to run + /// + /// # Returns + /// + /// * `Result` - The code return or an error + pub fn run_one(&mut self, command: &str) -> Result { + info!("Running command : {}", command); + + let child_process = Command::new("/bin/sh") + .args(["-c", command]) + .current_dir(WORKSPACE_PATH) + .output() + .map_err(|e| anyhow!("Failed to spawn command : {}", e))?; + + let exit_code = child_process + .status + .code() + .ok_or_else(|| anyhow!("Failed to retrieve exit_code"))?; + let stdout = String::from_utf8(child_process.stdout) + .map_err(|e| anyhow!("Failed to retrieve stdout stream : {}", e))?; + let stderr = String::from_utf8(child_process.stderr) + .map_err(|e| anyhow!("Failed to retrieve stderr stream : {}", e))?; + + let code_return = CodeReturn::new(stdout, stderr, exit_code); + + info!("Code execution finished: {:?}", code_return); + Ok(code_return) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::api::model::{FileModel, RequestData, RequestStep}; + use rand::random; + use std::fs::File; + use std::io::Read; + + /// Generate a random string + /// + /// # Arguments + /// + /// * `len` - The length of the string + /// + /// # Returns + /// + /// * `String` - The random string + fn native_rand_string(len: usize) -> String { + let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; + let mut string = String::new(); + + for _ in 0..len { + string.push( + chars + .chars() + .nth(random::() % (chars.len() - 1)) + .unwrap(), + ); + } + + string + } + + /// Test the creation of a file + #[test] + fn workload_runs_correctly() { + let files: Vec = Vec::new(); + let mut steps: Vec = Vec::new(); + let step = RequestStep { + command: "echo 'This is stdout' && echo 'This is stderr' >&2".to_string(), + enable_output: true, + }; + steps.push(step); + let request_data = RequestData::new( + "4bf68974-c315-4c41-aee2-3dc2920e76e9".to_string(), + files, + steps, + ); + let request_message = RequestMessage::new(request_data); + + let mut api = RunnerEngine::new(request_message); + + let res = api.run().unwrap(); + + assert_eq!(res.data.steps[0].exit_code, 0); + assert_eq!(res.data.steps[0].stderr, "This is stderr\n"); + assert_eq!( + res.data.steps[0].stdout.as_ref().unwrap(), + "This is stdout\n" + ); + assert_eq!(res.data.id, "4bf68974-c315-4c41-aee2-3dc2920e76e9"); + } + + /// Test the execution of a command with a workspace + #[test] + fn workspace_created_sucessfully() { + let mut base_dir = PathBuf::from(WORKSPACE_PATH); + base_dir.push(native_rand_string(20)); + base_dir.push("main.sh"); + let path = base_dir.into_os_string().into_string().unwrap(); + + let files: Vec = vec![FileModel { + filename: path.clone(), + content: "Hello World!".to_string(), + }]; + let steps: Vec = Vec::new(); + let request_data = RequestData::new( + "4bf68974-c315-4c41-aee2-3dc2920e76e9".to_string(), + files, + steps, + ); + let request_message = RequestMessage::new(request_data); + + RunnerEngine::new(request_message) + .create_workspace() + .unwrap(); + + assert!(Path::new(&path).exists()); + + //Check that the file contains the specified content + let mut file = File::open(&path).unwrap(); + let mut buffer = [0; 12]; + file.read_exact(&mut buffer[..]).unwrap(); + + // Convert buffer to string + let content = String::from_utf8(buffer.to_vec()).unwrap(); + assert!(file.metadata().unwrap().is_file()); + assert_eq!(content, "Hello World!"); + } +} diff --git a/agent/src/main.rs b/agent/src/main.rs index 2316874..1c780bd 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,4 +1,4 @@ -use agent_lib::{api::service::Api, config::AgentConfig}; +use agent_lib::{api::service::Api, config::AgentConfig, runner_engine::service::RunnerEngine}; use anyhow::{anyhow, Result}; use clap::Parser; use log::{debug, info, trace}; @@ -45,6 +45,18 @@ fn main() -> Result<()> { // Read request message from serial port let request_message = api.read_from_serial()?; + // Initialize RunnerEngine + let mut runner_engine = RunnerEngine::new(request_message); + + // Create the workspace + runner_engine.create_workspace()?; + + // Run the steps of the request message + let response_message = runner_engine.run().map_err(|e| anyhow!("{:?}", e))?; + + // Send response message to serial port + api.send_response_message(response_message)?; + info!("Stopping agent"); Ok(()) }