Skip to content

Commit

Permalink
web: mod: Add stats endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick José Pereira <[email protected]>
  • Loading branch information
patrickelectric authored and joaoantoniocardoso committed Oct 3, 2024
1 parent 53a8db3 commit aea3d04
Showing 1 changed file with 62 additions and 1 deletion.
63 changes: 62 additions & 1 deletion src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,23 @@ use std::{
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
Query, State,
},
http::StatusCode,
response::Response,
routing::get,
Router,
};
use futures::{sink::SinkExt, stream::StreamExt};
use serde::Deserialize;
use tokio::sync::{broadcast, mpsc, RwLock};
use tracing::*;
use uuid::Uuid;

use lazy_static::lazy_static;

use crate::hub;

fn default_router(state: AppState) -> Router {
Router::new()
.route("/", get(endpoints::root))
Expand All @@ -31,6 +34,7 @@ fn default_router(state: AppState) -> Router {
.route("/stats/driver", get(endpoints::driver_stats))
.route("/stats/hub", get(endpoints::hub_stats))
.route("/stats/messages", get(endpoints::hub_messages_stats))
.route("/stats/ws", get(stats_websocket_handler))
.route("/rest/ws", get(websocket_handler))
// We are matching all possible keys for the user
.route("/rest/mavlink", get(endpoints::mavlink))
Expand Down Expand Up @@ -84,6 +88,63 @@ async fn websocket_connection(socket: WebSocket, state: AppState) {
send_task.await.unwrap();
}

#[derive(Deserialize)]
struct FrequencyQuery {
frequency: Option<u8>,
}

async fn stats_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Query(freq_query): Query<FrequencyQuery>,
) -> Response {
ws.on_upgrade(|socket| stats_websocket_connection(socket, state, freq_query))
}

async fn stats_websocket_connection(
socket: WebSocket,
state: AppState,
freq_query: FrequencyQuery,
) {
let frequency = freq_query.frequency.unwrap_or(1);
let identifier = Uuid::new_v4();
debug!("WS client connected with ID: {identifier}");

let (mut sender, mut receiver) = socket.split();

let interval_duration = tokio::time::Duration::from_secs_f32(1.0 / frequency as f32);
let periodic_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_duration);
loop {
interval.tick().await;
let hub_message_stats = match hub::hub_messages_stats().await {
Ok(hub_message_stats) => hub_message_stats,
Err(error) => {
warn!("Failed getting hub message stats: {error:?}");
continue;
}
};
let json = match serde_json::to_string(&hub_message_stats) {
Ok(json) => json,
Err(error) => {
warn!("Failed to create json from Hub Message Stats: {error:?}");
continue;
}
};
if sender.send(Message::Text(json)).await.is_err() {
break;
}
}
});
if let Err(error) = periodic_task.await {
error!("Failed finishing task Stats WebSocket task: {error:?}");
}

// Clean up when the connection is closed
state.clients.write().await.remove(&identifier);
debug!("WS client {identifier} removed");
}

async fn broadcast_message_websockets(state: &AppState, sender_identifier: Uuid, message: Message) {
let mut clients = state.clients.write().await;

Expand Down

0 comments on commit aea3d04

Please sign in to comment.