From ea1cff1f3a98feffa05f35118840a3ab6ab15574 Mon Sep 17 00:00:00 2001 From: Pi Lanningham Date: Mon, 25 Nov 2024 11:38:16 -0500 Subject: [PATCH] Tweak metrics, query stats --- crates/rpc/src/bin/metric_exporter/metrics.rs | 4 +- crates/rpc/src/main.rs | 19 +- crates/rpc/src/routes/stats.rs | 164 ++++++++++++++---- 3 files changed, 153 insertions(+), 34 deletions(-) diff --git a/crates/rpc/src/bin/metric_exporter/metrics.rs b/crates/rpc/src/bin/metric_exporter/metrics.rs index 2c86fb4..cb67969 100644 --- a/crates/rpc/src/bin/metric_exporter/metrics.rs +++ b/crates/rpc/src/bin/metric_exporter/metrics.rs @@ -158,7 +158,7 @@ impl Metrics { } pub fn start_game(&self) { - self.games_current.inc(); + self.games_current.set(1); self.game_state.set(GameState::Running.into()); let mut guard = self.game_timer.lock().unwrap(); if let Some(prev) = guard.take() { @@ -170,7 +170,7 @@ impl Metrics { pub fn end_game(&self) { self.players_current.set(0); - self.games_current.dec(); + self.games_current.set(0); self.game_state.set(GameState::Done.into()); let mut guard = self.game_timer.lock().unwrap(); if let Some(timer) = guard.take() { diff --git a/crates/rpc/src/main.rs b/crates/rpc/src/main.rs index 1f74916..0ffd03b 100644 --- a/crates/rpc/src/main.rs +++ b/crates/rpc/src/main.rs @@ -3,9 +3,16 @@ use model::cluster::ClusterState; use rocket::{http::Method, routes}; use rocket_cors::{AllowedOrigins, CorsOptions}; use routes::{ - add_player::add_player, cleanup::cleanup, end_game::end_game, head::head, heads::heads, - health::health, new_game::new_game, sample_transactions::sample_transactions, - start_game::start_game, stats::global_stats, + add_player::add_player, + cleanup::cleanup, + end_game::end_game, + head::head, + heads::heads, + health::health, + new_game::new_game, + sample_transactions::sample_transactions, + start_game::start_game, + stats::{global_stats, refresh_stats, StatsState}, }; use serde::Deserialize; @@ -31,6 +38,11 @@ async fn main() -> Result<()> { // context is set to the cluster. If you wanted to connect to a remote cluster, you can use the // `ClusterState::remote` initializer. let cluster = ClusterState::try_new(&config.admin_key_file, config.remote).await?; + let stats = StatsState::new( + refresh_stats() + .await + .expect("failed to fetch initial stats"), + ); let cors = CorsOptions::default() .allowed_origins(AllowedOrigins::all()) @@ -44,6 +56,7 @@ async fn main() -> Result<()> { let _rocket = rocket::build() .manage(cluster) + .manage(stats) .mount( "/", routes![ diff --git a/crates/rpc/src/routes/stats.rs b/crates/rpc/src/routes/stats.rs index 72bb518..e9fa240 100644 --- a/crates/rpc/src/routes/stats.rs +++ b/crates/rpc/src/routes/stats.rs @@ -1,43 +1,149 @@ -use rocket::{get, serde::json::Json}; +use futures_util::try_join; +use rocket::{get, serde::json::Json, State}; use rocket_errors::anyhow::Result; -use serde::Serialize; +use serde::{Deserialize, Serialize}; +use std::{str::FromStr, sync::RwLock, time}; -#[derive(Debug, Serialize)] +pub struct StatsState { + latest_stats: RwLock, +} + +impl StatsState { + pub fn new(stats: GlobalStats) -> Self { + Self { + latest_stats: RwLock::new(stats), + } + } +} + +#[derive(Debug, Serialize, Clone)] pub struct GlobalStats { + as_of: time::SystemTime, + total_txs: u64, + txs_per_second: f64, + total_bytes: u64, + bytes_per_second: f64, + total_games: u32, active_games: u32, total_players: u32, - active_player: u32, + active_players: u32, total_bots: u32, active_bots: u32, - total_txs: u32, - txs_per_second: u32, - total_bytes: u32, - bytes_per_second: u32, total_kills: u32, - kills_per_minute: u32, + kills_per_minute: f32, total_suicides: u32, - suicides_per_minute: u32, + suicides_per_minute: f32, +} + +const TOTAL_TRANSACTIONS: &str = "sum(last_over_time(hydra_doom_node_transactions[1y]))"; +const TRANSACTIONS_PER_SECOND: &str = "sum(irate(hydra_doom_node_transactions[1m])>0)"; +const TOTAL_BYTES: &str = "sum(last_over_time(hydra_doom_node_bytes[1y]))"; +const BYTES_PER_SECOND: &str = "sum(irate(hydra_doom_node_bytes[1m])>0)"; +const TOTAL_GAMES: &str = "sum(last_over_time(hydra_doom_games_seconds_count[1y]))"; +const ACTIVE_GAMES: &str = "sum(hydra_doom_games_current)"; +const TOTAL_PLAYERS: &str = "sum(last_over_time(hydra_doom_players_total[1y]))"; +const ACTIVE_PLAYERS: &str = "sum(hydra_doom_players_current)"; +const TOTAL_BOTS: &str = "sum(last_over_time(hydra_doom_bots_total[1y]))"; +const ACTIVE_BOTS: &str = ""; +const TOTAL_KILLS: &str = "sum(last_over_time(hydra_doom_kills[1y]))"; +const KILLS_PER_MINUTE: &str = "sum(irate(hydra_doom_kills[10m]) * 60)"; + +#[derive(Deserialize, Debug)] +struct ThanosResult { + pub value: (f32, String), +} +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct ThanosData { + pub result_type: String, + pub result: Vec, +} +#[derive(Deserialize, Debug)] +struct ThanosResponse { + pub status: String, + pub data: ThanosData, +} +pub async fn fetch_metric(query: &str) -> Result { + let url = format!( + "https://thanos.hydra-doom.sundae.fi/api/v1/query?query={}", + query + ); + let resp = reqwest::get(&url).await?; + // println!("{:?}", resp.text().await?); + let body = resp.json::().await?; + if body.data.result.len() == 0 { + Ok(Default::default()) + } else { + let parsed = body.data.result[0].value.1.parse::(); + match parsed { + Ok(v) => Ok(v), + Err(e) => { + println!( + "Invalid stats value for {}: {}", + query, body.data.result[0].value.1 + ); + Ok(Default::default()) + } + } + } +} + +pub async fn refresh_stats() -> Result { + let ( + total_txs, + txs_per_second, + total_bytes, + bytes_per_second, + total_games, + active_games, + total_players, + active_players, + total_kills, + kills_per_minute, + ) = try_join!( + fetch_metric(TOTAL_TRANSACTIONS), + fetch_metric(TRANSACTIONS_PER_SECOND), + fetch_metric(TOTAL_BYTES), + fetch_metric(BYTES_PER_SECOND), + fetch_metric(TOTAL_GAMES), + fetch_metric(ACTIVE_GAMES), + fetch_metric(TOTAL_PLAYERS), + fetch_metric(ACTIVE_PLAYERS), + fetch_metric(TOTAL_KILLS), + fetch_metric(KILLS_PER_MINUTE), + )?; + Ok(GlobalStats { + as_of: time::SystemTime::now(), + total_txs, + txs_per_second, + total_bytes, + bytes_per_second, + total_games, + active_games, + total_players, + active_players, + total_bots: 0, + active_bots: 0, + total_kills, + kills_per_minute, + total_suicides: 0, + suicides_per_minute: 0.0, + }) } #[get("/global_stats")] -pub async fn global_stats() -> Result> { - // TODO: fetch data from the observer server - // dummy data for now - Ok(Json(GlobalStats { - total_games: 123, - active_games: 21, - total_players: 125, - active_player: 21, - total_bots: 321, - active_bots: 21, - total_txs: 123456, - txs_per_second: 5000, - total_bytes: 123456789, - bytes_per_second: 12000, - total_kills: 100, - kills_per_minute: 30, - total_suicides: 1, - suicides_per_minute: 0, - })) +pub async fn global_stats(state: &State) -> Result> { + let elapsed = { state.latest_stats.read().unwrap().as_of.elapsed()? }; + if elapsed > time::Duration::from_secs(5) { + // Careful not to hold a lock over an async point! + let new_stats = refresh_stats().await?; + + let mut stats = state.latest_stats.write().unwrap(); + *stats = new_stats; + Ok(Json(stats.clone())) + } else { + let stats = state.latest_stats.read().unwrap(); + Ok(Json(stats.clone())) + } }