From a2bfa82329be6a92f8316510e68c65abeb746042 Mon Sep 17 00:00:00 2001 From: Raul Victor Trombin Date: Wed, 17 Jul 2024 13:26:33 -0300 Subject: [PATCH] src: server: Add restAPI and websocket services --- src/server/manager.rs | 30 ++- src/server/mod.rs | 24 +++ src/server/protocols/mod.rs | 1 + src/server/protocols/v1/errors.rs | 40 ++++ src/server/protocols/v1/frontend/index.html | 61 ++++++ src/server/protocols/v1/mod.rs | 3 + src/server/protocols/v1/rest.rs | 92 ++++++++ src/server/protocols/v1/websocket.rs | 219 ++++++++++++++++++++ 8 files changed, 465 insertions(+), 5 deletions(-) create mode 100644 src/server/protocols/mod.rs create mode 100644 src/server/protocols/v1/errors.rs create mode 100644 src/server/protocols/v1/frontend/index.html create mode 100644 src/server/protocols/v1/mod.rs create mode 100644 src/server/protocols/v1/rest.rs create mode 100644 src/server/protocols/v1/websocket.rs diff --git a/src/server/manager.rs b/src/server/manager.rs index c24aae38..6b1bed93 100644 --- a/src/server/manager.rs +++ b/src/server/manager.rs @@ -1,19 +1,39 @@ -use actix_web::{middleware, App, HttpServer}; -use paperclip::actix::OpenApiExt; +use crate::device::manager::ManagerActorHandler; + +use super::protocols; +use actix_web::{middleware, web::Data, App, HttpServer}; use tracing::info; -pub async fn run(server_address: &str) -> std::io::Result<()> { +use paperclip::actix::{ + web::{self, Scope}, + OpenApiExt, +}; + +fn add_v1_paths(scope: Scope) -> Scope { + scope.configure(protocols::v1::rest::register_services) +} + +pub async fn run(server_address: &str, handler: ManagerActorHandler) -> std::io::Result<()> { let server_address = server_address.to_string(); - info!("starting HTTP server at http://{server_address}"); + info!("ServerManager: Service starting"); + + let server = HttpServer::new(move || { + let v1 = add_v1_paths(web::scope("/v1")); + let default = add_v1_paths(web::scope("")); - let server = HttpServer::new(|| { App::new() + .app_data(Data::new(handler.clone())) .wrap(middleware::Logger::default()) .wrap_api() .with_json_spec_at("/api/spec") .with_swagger_ui_at("/docs") + .service(v1) + .service(protocols::v1::rest::server_metadata) + .service(protocols::v1::websocket::websocket) + .service(default) .build() }); + info!("ServerManager: HTTP server running at http://{server_address}"); server.bind(server_address)?.run().await } diff --git a/src/server/mod.rs b/src/server/mod.rs index ff8de9eb..f8567926 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1 +1,25 @@ pub mod manager; +pub mod protocols; + +// The Server module consists of a manager and all available layers that provide access to internal services. +// +// Manager: +// The Manager module requires a DeviceManagerHandler, which will be used to forward all incoming requests. +// This allows the Manager to receive and process requests from RestAPI and WebSocket methods. +// The requests are forwarded to the DeviceManager using the server's AppData, which holds a clone of the DeviceManager's Handler and will provide the responses. +// +// Front-end: +// The frontend provides access to REST API documentation through {address}/docs with a Swagger interface and the API specifications. +// +// RestAPI: +// The REST API will have a default route and versioned routes. +// To keep the application stable through updates, users can use {address}/v{x}/route. +// +// WebSocket: +// WebSocket is provided via the {address}/ws route. +// Users can use the following queries: +// ?filter="some_desired_string_to_use_regex" +// ?device-number="00000000-0000-0000-b9c0-f5752d453eb3" // The UUID provided by the source of the device created +// Otherwise, if they are not defined, the WebSocket channel will receive all available messages. +// All operations made through REST API and WebSocket routes will be broadcast to all clients subscribed to device-number=null (default), +// except for errors, which are forwarded directly to the requester. diff --git a/src/server/protocols/mod.rs b/src/server/protocols/mod.rs new file mode 100644 index 00000000..a3a6d96c --- /dev/null +++ b/src/server/protocols/mod.rs @@ -0,0 +1 @@ +pub mod v1; diff --git a/src/server/protocols/v1/errors.rs b/src/server/protocols/v1/errors.rs new file mode 100644 index 00000000..b0b94cdd --- /dev/null +++ b/src/server/protocols/v1/errors.rs @@ -0,0 +1,40 @@ +use actix_web::{http::StatusCode, ResponseError}; + +use paperclip::actix::api_v2_errors; +use validator::ValidationErrors; + +#[allow(dead_code)] +#[api_v2_errors( + code = 400, + description = "Bad Request: The client's request contains invalid or malformed data.", + code = 500, + description = "Internal Server Error: An unexpected server error has occurred." +)] +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Bad Request: {0}")] + BadRequest(String), + #[error("Internal Server Error: {0}")] + Internal(String), +} + +impl ResponseError for Error { + fn status_code(&self) -> StatusCode { + match self { + Self::BadRequest(_) => StatusCode::BAD_REQUEST, + Self::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + +impl From for Error { + fn from(error: ValidationErrors) -> Self { + Self::BadRequest(error.to_string()) + } +} + +impl From for Error { + fn from(error: crate::device::manager::ManagerError) -> Self { + Self::Internal(serde_json::to_string_pretty(&error).unwrap_or_default()) + } +} diff --git a/src/server/protocols/v1/frontend/index.html b/src/server/protocols/v1/frontend/index.html new file mode 100644 index 00000000..915927a9 --- /dev/null +++ b/src/server/protocols/v1/frontend/index.html @@ -0,0 +1,61 @@ + + + + + + Ping Viewer Next + + + +

Ping Viewer Next

+ +

Websocket Client

+
Connecting...
+
+ + + + diff --git a/src/server/protocols/v1/mod.rs b/src/server/protocols/v1/mod.rs new file mode 100644 index 00000000..d7a332af --- /dev/null +++ b/src/server/protocols/v1/mod.rs @@ -0,0 +1,3 @@ +pub mod errors; +pub mod rest; +pub mod websocket; diff --git a/src/server/protocols/v1/rest.rs b/src/server/protocols/v1/rest.rs new file mode 100644 index 00000000..1b9a36f8 --- /dev/null +++ b/src/server/protocols/v1/rest.rs @@ -0,0 +1,92 @@ +use crate::device::manager::ManagerActorHandler; +use crate::server::protocols::v1::errors::Error; +use actix_web::Responder; +use mime_guess::from_path; +use paperclip::actix::{ + api_v2_operation, get, post, + web::{self, HttpResponse, Json}, + Apiv2Schema, +}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(rust_embed::RustEmbed)] +#[folder = "src/server/protocols/v1/frontend"] +struct Asset; + +fn handle_embedded_file(path: &str) -> HttpResponse { + match Asset::get(path) { + Some(content) => HttpResponse::Ok() + .content_type(from_path(path).first_or_octet_stream().as_ref()) + .body(content.data.into_owned()), + None => HttpResponse::NotFound().body("404 Not Found"), + } +} + +#[api_v2_operation(skip)] +#[get("/")] +async fn index() -> impl Responder { + handle_embedded_file("index.html") +} + +#[api_v2_operation(skip)] +#[get("/{file_path:.*}")] +async fn index_files(file_path: web::Path) -> impl Responder { + handle_embedded_file(&file_path) +} + +/// The "register_service" route is used by BlueOS extensions manager +#[api_v2_operation] +#[get("register_service")] +async fn server_metadata() -> Result, Error> { + let package = ServerMetadata::default(); + Ok(Json(package)) +} + +pub fn register_services(cfg: &mut web::ServiceConfig) { + cfg.service(index) + .service(post_request) + .service(index_files); +} + +#[api_v2_operation] +#[post("device/request")] +async fn post_request( + manager_handler: web::Data, + json: web::Json, +) -> Result, Error> { + let request = json.into_inner(); + + let answer = manager_handler.send(request).await?; + + // Broadcast the results to webscoket clients. + crate::server::protocols::v1::websocket::send_to_websockets(json!(answer), None); + + Ok(Json(answer)) +} +#[derive(Debug, Serialize, Deserialize, Apiv2Schema)] +pub struct ServerMetadata { + pub name: &'static str, + pub description: &'static str, + pub icon: &'static str, + pub company: &'static str, + pub version: &'static str, + pub new_page: bool, + pub webpage: &'static str, + pub api: &'static str, +} + +impl Default for ServerMetadata { + fn default() -> Self { + Self { + name: "Ping Viewer Next", + description: "A ping protocol extension for expose devices to web.", + icon: "mdi-compass-outline", + company: "BlueRobotics", + version: "0.0.0", + new_page: false, + webpage: "https://github.com/RaulTrombin/navigator-assistant", + api: "/docs", + } + } +} diff --git a/src/server/protocols/v1/websocket.rs b/src/server/protocols/v1/websocket.rs new file mode 100644 index 00000000..1e638154 --- /dev/null +++ b/src/server/protocols/v1/websocket.rs @@ -0,0 +1,219 @@ +use actix::{ + dev::ContextFutureSpawner, fut, Actor, ActorFutureExt, Addr, AsyncContext, Handler, Message, + StreamHandler, WrapFuture, +}; +use actix_web::HttpRequest; +use actix_web_actors::ws; +use lazy_static::lazy_static; +use paperclip::actix::{ + api_v2_operation, get, + web::{self, HttpResponse}, + Apiv2Schema, +}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::sync::{Arc, Mutex}; +use tracing::info; +use uuid::Uuid; + +use crate::device::manager::{ManagerActorHandler, Request}; + +pub struct StringMessage(String); + +impl Message for StringMessage { + type Result = (); +} + +#[derive(Serialize, Debug)] +pub struct WebsocketError { + pub error: String, +} + +#[derive(Debug)] +pub struct WebsocketActorContent { + pub actor: Addr, + pub re: Option, + pub device_number: Option, +} + +#[derive(Debug, Default)] +pub struct WebsocketManager { + pub clients: Vec, +} + +impl WebsocketManager { + pub fn send(&self, value: &serde_json::Value, name: &str, device_number: Option) { + if self.clients.is_empty() { + return; + } + + let string = serde_json::to_string(value).unwrap(); + for client in &self.clients { + // check client list was subscribed or subscribed to all + if client.device_number.is_none() || client.device_number == device_number { + let is_match = client.re.as_ref().map_or(false, |regx| regx.is_match(name)); + if is_match { + client.actor.do_send(StringMessage(string.clone())); + } + } + } + } +} + +lazy_static! { + pub static ref MANAGER: Arc> = + Arc::new(Mutex::new(WebsocketManager::default())); +} + +pub fn send_to_websockets(message: Value, device: Option) { + MANAGER + .lock() + .unwrap() + .send(&message, &message.to_string(), device); +} + +pub struct WebsocketActor { + server: Arc>, + pub filter: String, + pub device_number: Option, + pub manager_handler: web::Data, +} + +impl WebsocketActor { + pub fn new( + message_filter: String, + device_number: Option, + manager_handler: web::Data, + ) -> Self { + Self { + server: MANAGER.clone(), + filter: message_filter, + device_number, + manager_handler, + } + } +} + +impl Handler for WebsocketActor { + type Result = (); + + fn handle(&mut self, message: StringMessage, context: &mut Self::Context) { + context.text(message.0); + } +} + +impl Actor for WebsocketActor { + type Context = ws::WebsocketContext; +} + +impl StreamHandler> for WebsocketActor { + fn started(&mut self, ctx: &mut Self::Context) { + info!("ServerManager: Starting websocket client, add itself in manager."); + self.server + .lock() + .unwrap() + .clients + .push(WebsocketActorContent { + actor: ctx.address(), + re: Regex::new(&self.filter).ok(), + device_number: (self.device_number), + }); + } + + fn finished(&mut self, ctx: &mut Self::Context) { + info!("ServerManager: Finishing websocket, remove itself from manager."); + self.server + .lock() + .unwrap() + .clients + .retain(|x| x.actor != ctx.address()); + } + + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), + Ok(ws::Message::Text(text)) => { + let manager_requests: Vec = match serde_json::from_str(&text) { + Ok(requests) => requests, + Err(err) => match serde_json::from_str(&text) { + Ok(request) => vec![request], + Err(_) => { + let error_msg = format!("Error: {}", err); + ctx.text(error_msg); + return; + } + }, + }; + + for request in manager_requests { + let manager_handler = self.manager_handler.clone(); + + let future = + async move { manager_handler.send(request).await }.into_actor(self); + + future + .then(|res, _, ctx| { + match &res { + Ok(result) => { + crate::server::protocols::v1::websocket::send_to_websockets( + json!(result), + None, + ); + } + Err(err) => { + ctx.text(serde_json::to_string_pretty(err).unwrap()); + } + } + fut::ready(()) + }) + .wait(ctx); + } + } + Ok(ws::Message::Close(msg)) => ctx.close(msg), + _ => (), + } + } +} + +#[api_v2_operation(skip)] +#[get("ws")] +pub async fn websocket( + req: HttpRequest, + query: web::Query, + stream: web::Payload, + manager_handler: web::Data, +) -> Result { + let filter = match query.clone().into_inner().filter { + Some(filter) => filter.clone(), + _ => ".*".to_owned(), + }; + let device_number = query.into_inner().device_number; + + if let Some(device_number) = device_number { + let request = crate::device::manager::Request::Info(device_number); + match manager_handler.send(request).await { + Ok(response) => { + info!( + "ServerManager: Received websocket request connection for device: {response:?}" + ); + } + Err(err) => { + return Ok(HttpResponse::InternalServerError().json(json!(err))); + } + } + } + + ws::start( + WebsocketActor::new(filter, device_number, manager_handler.clone()), + &req, + stream, + ) +} + +#[derive(Deserialize, Apiv2Schema, Clone)] +pub struct WebsocketQuery { + /// Regex filter to select the desired incoming messages + filter: Option, + device_number: Option, +}