From f20f847a75b8586454165bdedcb73063b14e8ea2 Mon Sep 17 00:00:00 2001 From: axiomatic-aardvark Date: Fri, 29 Mar 2024 16:29:30 +0200 Subject: [PATCH] feat: aggregate metrics --- ...dd8246c5fb0ece6d3536bbf4046b83eb1dbdb.json | 14 +++ ...74330704535d8e5b0ca83aa3c8883e7f74713.json | 22 +++++ ...baf8b4e9ad57b949eda73f8529d481f62da91.json | 17 ++++ ...76b666eb2c524086334561cd6b9f2d0936083.json | 34 +++++++ migrations/20240325124203_aggregates.down.sql | 1 + migrations/20240325124203_aggregates.up.sql | 9 ++ src/db/resolver.rs | 89 +++++++++++++++++-- src/operator/mod.rs | 37 +++++++- src/server/model/mod.rs | 71 ++++++++++++++- 9 files changed, 283 insertions(+), 11 deletions(-) create mode 100644 .sqlx/query-1250005571c00e3619803b857b4dd8246c5fb0ece6d3536bbf4046b83eb1dbdb.json create mode 100644 .sqlx/query-339014f817c328ecfca7006d96e74330704535d8e5b0ca83aa3c8883e7f74713.json create mode 100644 .sqlx/query-37ef8132482cade63949bce7577baf8b4e9ad57b949eda73f8529d481f62da91.json create mode 100644 .sqlx/query-d5d98636783ca4170bc41ffa76f76b666eb2c524086334561cd6b9f2d0936083.json create mode 100644 migrations/20240325124203_aggregates.down.sql create mode 100644 migrations/20240325124203_aggregates.up.sql diff --git a/.sqlx/query-1250005571c00e3619803b857b4dd8246c5fb0ece6d3536bbf4046b83eb1dbdb.json b/.sqlx/query-1250005571c00e3619803b857b4dd8246c5fb0ece6d3536bbf4046b83eb1dbdb.json new file mode 100644 index 0000000..9cdb174 --- /dev/null +++ b/.sqlx/query-1250005571c00e3619803b857b4dd8246c5fb0ece6d3536bbf4046b83eb1dbdb.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM indexer_aggregates WHERE timestamp < $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "1250005571c00e3619803b857b4dd8246c5fb0ece6d3536bbf4046b83eb1dbdb" +} diff --git a/.sqlx/query-339014f817c328ecfca7006d96e74330704535d8e5b0ca83aa3c8883e7f74713.json b/.sqlx/query-339014f817c328ecfca7006d96e74330704535d8e5b0ca83aa3c8883e7f74713.json new file mode 100644 index 0000000..2cc4489 --- /dev/null +++ b/.sqlx/query-339014f817c328ecfca7006d96e74330704535d8e5b0ca83aa3c8883e7f74713.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT COUNT(DISTINCT message->>'identifier') AS count\n FROM messages\n WHERE (CAST(message->>'nonce' AS BIGINT)) > $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "339014f817c328ecfca7006d96e74330704535d8e5b0ca83aa3c8883e7f74713" +} diff --git a/.sqlx/query-37ef8132482cade63949bce7577baf8b4e9ad57b949eda73f8529d481f62da91.json b/.sqlx/query-37ef8132482cade63949bce7577baf8b4e9ad57b949eda73f8529d481f62da91.json new file mode 100644 index 0000000..603cc8c --- /dev/null +++ b/.sqlx/query-37ef8132482cade63949bce7577baf8b4e9ad57b949eda73f8529d481f62da91.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO indexer_aggregates (timestamp, graph_account, message_count, subgraphs_count) VALUES ($1, $2, $3, $4)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Varchar", + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "37ef8132482cade63949bce7577baf8b4e9ad57b949eda73f8529d481f62da91" +} diff --git a/.sqlx/query-d5d98636783ca4170bc41ffa76f76b666eb2c524086334561cd6b9f2d0936083.json b/.sqlx/query-d5d98636783ca4170bc41ffa76f76b666eb2c524086334561cd6b9f2d0936083.json new file mode 100644 index 0000000..6d6eaf7 --- /dev/null +++ b/.sqlx/query-d5d98636783ca4170bc41ffa76f76b666eb2c524086334561cd6b9f2d0936083.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT graph_account, message_count, subgraphs_count FROM indexer_aggregates WHERE timestamp > $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "graph_account", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "message_count", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "subgraphs_count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "d5d98636783ca4170bc41ffa76f76b666eb2c524086334561cd6b9f2d0936083" +} diff --git a/migrations/20240325124203_aggregates.down.sql b/migrations/20240325124203_aggregates.down.sql new file mode 100644 index 0000000..b0e28f3 --- /dev/null +++ b/migrations/20240325124203_aggregates.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS indexer_aggregates; diff --git a/migrations/20240325124203_aggregates.up.sql b/migrations/20240325124203_aggregates.up.sql new file mode 100644 index 0000000..0706fe8 --- /dev/null +++ b/migrations/20240325124203_aggregates.up.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS indexer_aggregates +( + id SERIAL PRIMARY KEY, + timestamp BIGINT NOT NULL, + graph_account VARCHAR(255) NOT NULL, + message_count BIGINT NOT NULL, + subgraphs_count BIGINT NOT NULL, + UNIQUE(graph_account, timestamp) +); diff --git a/src/db/resolver.rs b/src/db/resolver.rs index a1823d1..f389ccb 100644 --- a/src/db/resolver.rs +++ b/src/db/resolver.rs @@ -1,6 +1,6 @@ use async_graphql::{OutputType, SimpleObject}; use chrono::Utc; -use serde::{de::DeserializeOwned, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sqlx::{postgres::PgQueryResult, types::Json, FromRow, PgPool, Row as SqliteRow}; use std::ops::Deref; use tracing::trace; @@ -21,11 +21,11 @@ pub struct MessageID { } #[allow(dead_code)] -#[derive(FromRow, SimpleObject, Serialize, Debug, Clone)] +#[derive(FromRow, SimpleObject, Serialize, Deserialize, Debug, Clone)] pub struct IndexerStats { - graph_account: String, - message_count: i64, - subgraphs_count: i64, + pub graph_account: String, + pub message_count: i64, + pub subgraphs_count: i64, } // Define graphql type for the Row in Messages @@ -315,7 +315,7 @@ pub async fn get_indexer_stats( SELECT message->>'graph_account' as graph_account, COUNT(*) as message_count, - COUNT(DISTINCT message->>'identifier') as subgraphs_count -- Updated field name + COUNT(DISTINCT message->>'identifier') as subgraphs_count FROM messages WHERE (CAST(message->>'nonce' AS BIGINT)) > $1"; @@ -352,6 +352,83 @@ pub async fn get_indexer_stats( Ok(stats) } +pub async fn insert_aggregate( + pool: &PgPool, + timestamp: i64, + graph_account: String, + message_count: i64, + subgraphs_count: i64, +) -> anyhow::Result<()> { + let _ = sqlx::query!( + "INSERT INTO indexer_aggregates (timestamp, graph_account, message_count, subgraphs_count) VALUES ($1, $2, $3, $4)", + timestamp, + graph_account, + message_count, + subgraphs_count + ) + .execute(pool) + .await?; + + Ok(()) +} + +pub async fn fetch_aggregates( + pool: &PgPool, + since_timestamp: i64, +) -> Result, anyhow::Error> { + let aggregates = sqlx::query_as!( + IndexerStats, + "SELECT graph_account, message_count, subgraphs_count FROM indexer_aggregates WHERE timestamp > $1", + since_timestamp + ) + .fetch_all(pool) + .await + .map_err(anyhow::Error::new)?; + + let results: Vec = aggregates + .clone() + .into_iter() + .map(|agg| IndexerStats { + graph_account: agg.graph_account, + message_count: agg.message_count, + subgraphs_count: agg.subgraphs_count, + }) + .collect(); + + Ok(results) +} + +pub async fn count_distinct_subgraphs( + pool: &PgPool, + from_timestamp: i64, +) -> Result { + let result = sqlx::query!( + " + SELECT COUNT(DISTINCT message->>'identifier') AS count + FROM messages + WHERE (CAST(message->>'nonce' AS BIGINT)) > $1 + ", + from_timestamp + ) + .fetch_one(pool) + .await + .map_err(anyhow::Error::new)?; + + Ok(result.count.unwrap_or(0) as i64) +} + +pub async fn prune_old_aggregates(pool: &PgPool) -> Result { + let since_timestamp = (Utc::now() - chrono::Duration::try_days(90).unwrap()).timestamp(); + let result = sqlx::query!( + "DELETE FROM indexer_aggregates WHERE timestamp < $1", + since_timestamp + ) + .execute(pool) + .await?; + + Ok(result.rows_affected() as i64) +} + #[cfg(test)] mod tests { use crate::message_types::PublicPoiMessage; diff --git a/src/operator/mod.rs b/src/operator/mod.rs index ba6cff2..3bb5d84 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -1,4 +1,5 @@ use anyhow::anyhow; +use chrono::Utc; use graphcast_sdk::WakuMessage; use sqlx::postgres::PgPoolOptions; use sqlx::{Pool, Postgres}; @@ -9,11 +10,14 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use tokio::runtime::Runtime; use tokio::time::{interval, sleep, timeout}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent}; -use crate::db::resolver::{count_messages, prune_old_messages, retain_max_storage}; +use crate::db::resolver::{ + count_messages, get_indexer_stats, insert_aggregate, prune_old_aggregates, prune_old_messages, + retain_max_storage, +}; use crate::metrics::{CONNECTED_PEERS, GOSSIP_PEERS, PRUNED_MESSAGES, RECEIVED_MESSAGES}; use crate::{ config::Config, @@ -110,6 +114,7 @@ impl RadioOperator { let mut network_update_interval = interval(Duration::from_secs(600)); let mut summary_interval = interval(Duration::from_secs(180)); + let mut daily_aggregate_interval = interval(Duration::from_secs(86400)); // 24 hours let iteration_timeout = Duration::from_secs(180); let update_timeout = Duration::from_secs(5); @@ -214,6 +219,34 @@ impl RadioOperator { } } }, + _ = daily_aggregate_interval.tick() => { + if skip_iteration.load(Ordering::SeqCst) { + skip_iteration.store(false, Ordering::SeqCst); + continue; + } + + let pool = &self.db; + + match prune_old_aggregates(pool).await { + Ok(deleted_count) => trace!("Pruned {} old aggregate entries.", deleted_count), + Err(e) => error!("Failed to prune old aggregates: {:?}", e), + } + + let from_timestamp = (Utc::now() - Duration::from_secs(86400)).timestamp(); + + match get_indexer_stats(pool, None, from_timestamp).await { + Ok(stats) => { + for stat in stats { + match insert_aggregate(pool, Utc::now().timestamp(), stat.graph_account, stat.message_count, stat.subgraphs_count).await { + Ok(_) => trace!("Successfully inserted daily aggregate."), + Err(e) => error!("Failed to insert daily aggregate: {:?}", e), + } + } + }, + Err(e) => error!("Failed to fetch Indexer stats: {:?}", e), + } + }, + else => break, } diff --git a/src/server/model/mod.rs b/src/server/model/mod.rs index cd36220..721851c 100644 --- a/src/server/model/mod.rs +++ b/src/server/model/mod.rs @@ -3,14 +3,16 @@ use async_graphql::{Context, EmptySubscription, Object, OutputType, Schema, Simp use chrono::Utc; use serde::{de::DeserializeOwned, Serialize}; use sqlx::{Pool, Postgres}; -use std::{sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use thiserror::Error; +use tracing::error; use crate::{ config::Config, db::resolver::{ - delete_message_all, delete_message_by_id, get_indexer_stats, list_active_indexers, - list_messages, list_rows, message_by_id, IndexerStats, + count_distinct_subgraphs, delete_message_all, delete_message_by_id, fetch_aggregates, + get_indexer_stats, list_active_indexers, list_messages, list_rows, message_by_id, + IndexerStats, }, operator::radio_types::RadioPayloadMessage, }; @@ -35,6 +37,13 @@ impl RadioContext { } } +#[derive(Serialize, SimpleObject)] +pub struct Summary { + total_message_count: HashMap, + average_subgraphs_count: HashMap, + total_subgraphs_covered: i64, +} + // Unified query object for resolvers #[derive(Default)] pub struct QueryRoot; @@ -127,6 +136,62 @@ impl QueryRoot { message_by_id(pool, id).await?.get_message(); Ok(msg) } + + async fn query_aggregate_stats( + &self, + ctx: &Context<'_>, + days: i32, + ) -> Result { + let pool = ctx.data_unchecked::>(); + + let since_timestamp = + (Utc::now() - chrono::Duration::try_days(days.into()).unwrap()).timestamp(); + let aggregates = fetch_aggregates(pool, since_timestamp) + .await + .map_err(HttpServiceError::Others)?; + + let mut total_message_count: HashMap = HashMap::new(); + let mut total_subgraphs_count: HashMap = HashMap::new(); + + let mut subgraphs_counts = HashMap::new(); + + for stat in aggregates { + *total_message_count + .entry(stat.graph_account.clone()) + .or_default() += stat.message_count; + *total_subgraphs_count + .entry(stat.graph_account.clone()) + .or_default() += stat.subgraphs_count; + subgraphs_counts + .entry(stat.graph_account.clone()) + .or_insert_with(Vec::new) + .push(stat.subgraphs_count); + } + + let average_subgraphs_count: HashMap = total_subgraphs_count + .iter() + .map(|(key, &total_count)| { + let count = subgraphs_counts.get(key).map_or(1, |counts| counts.len()); + ( + key.clone(), + if count > 0 { + (total_count as f64 / count as f64).ceil() as i64 + } else { + 0 + }, + ) + }) + .collect(); + + let total_subgraphs_covered = count_distinct_subgraphs(pool, since_timestamp) + .await + .map_err(HttpServiceError::Others)?; + Ok(Summary { + total_message_count, + average_subgraphs_count, + total_subgraphs_covered, + }) + } } // Unified query object for resolvers