Skip to content

Commit

Permalink
feat: aggregate metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
neriumrevolta committed Mar 29, 2024
1 parent 92e0f0a commit f20f847
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 11 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions migrations/20240325124203_aggregates.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS indexer_aggregates;
9 changes: 9 additions & 0 deletions migrations/20240325124203_aggregates.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS indexer_aggregates
(
id SERIAL PRIMARY KEY,
timestamp BIGINT NOT NULL,
graph_account VARCHAR(255) NOT NULL,
message_count BIGINT NOT NULL,
subgraphs_count BIGINT NOT NULL,
UNIQUE(graph_account, timestamp)
);
89 changes: 83 additions & 6 deletions src/db/resolver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_graphql::{OutputType, SimpleObject};
use chrono::Utc;
use serde::{de::DeserializeOwned, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sqlx::{postgres::PgQueryResult, types::Json, FromRow, PgPool, Row as SqliteRow};
use std::ops::Deref;
use tracing::trace;
Expand All @@ -21,11 +21,11 @@ pub struct MessageID {
}

#[allow(dead_code)]
#[derive(FromRow, SimpleObject, Serialize, Debug, Clone)]
#[derive(FromRow, SimpleObject, Serialize, Deserialize, Debug, Clone)]
pub struct IndexerStats {
graph_account: String,
message_count: i64,
subgraphs_count: i64,
pub graph_account: String,
pub message_count: i64,
pub subgraphs_count: i64,
}

// Define graphql type for the Row in Messages
Expand Down Expand Up @@ -315,7 +315,7 @@ pub async fn get_indexer_stats(
SELECT
message->>'graph_account' as graph_account,
COUNT(*) as message_count,
COUNT(DISTINCT message->>'identifier') as subgraphs_count -- Updated field name
COUNT(DISTINCT message->>'identifier') as subgraphs_count
FROM messages
WHERE (CAST(message->>'nonce' AS BIGINT)) > $1";

Expand Down Expand Up @@ -352,6 +352,83 @@ pub async fn get_indexer_stats(
Ok(stats)
}

pub async fn insert_aggregate(
pool: &PgPool,
timestamp: i64,
graph_account: String,
message_count: i64,
subgraphs_count: i64,
) -> anyhow::Result<()> {
let _ = sqlx::query!(
"INSERT INTO indexer_aggregates (timestamp, graph_account, message_count, subgraphs_count) VALUES ($1, $2, $3, $4)",
timestamp,
graph_account,
message_count,
subgraphs_count
)
.execute(pool)
.await?;

Ok(())
}

pub async fn fetch_aggregates(
pool: &PgPool,
since_timestamp: i64,
) -> Result<Vec<IndexerStats>, anyhow::Error> {
let aggregates = sqlx::query_as!(
IndexerStats,
"SELECT graph_account, message_count, subgraphs_count FROM indexer_aggregates WHERE timestamp > $1",
since_timestamp
)
.fetch_all(pool)
.await
.map_err(anyhow::Error::new)?;

let results: Vec<IndexerStats> = aggregates
.clone()
.into_iter()
.map(|agg| IndexerStats {
graph_account: agg.graph_account,
message_count: agg.message_count,
subgraphs_count: agg.subgraphs_count,
})
.collect();

Ok(results)
}

pub async fn count_distinct_subgraphs(
pool: &PgPool,
from_timestamp: i64,
) -> Result<i64, anyhow::Error> {
let result = sqlx::query!(
"
SELECT COUNT(DISTINCT message->>'identifier') AS count
FROM messages
WHERE (CAST(message->>'nonce' AS BIGINT)) > $1
",
from_timestamp
)
.fetch_one(pool)
.await
.map_err(anyhow::Error::new)?;

Ok(result.count.unwrap_or(0) as i64)
}

pub async fn prune_old_aggregates(pool: &PgPool) -> Result<i64, anyhow::Error> {
let since_timestamp = (Utc::now() - chrono::Duration::try_days(90).unwrap()).timestamp();
let result = sqlx::query!(
"DELETE FROM indexer_aggregates WHERE timestamp < $1",
since_timestamp
)
.execute(pool)
.await?;

Ok(result.rows_affected() as i64)
}

#[cfg(test)]
mod tests {
use crate::message_types::PublicPoiMessage;
Expand Down
37 changes: 35 additions & 2 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::anyhow;
use chrono::Utc;
use graphcast_sdk::WakuMessage;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres};
Expand All @@ -9,11 +10,14 @@ use std::thread::{self, JoinHandle};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::time::{interval, sleep, timeout};
use tracing::{debug, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent};

use crate::db::resolver::{count_messages, prune_old_messages, retain_max_storage};
use crate::db::resolver::{
count_messages, get_indexer_stats, insert_aggregate, prune_old_aggregates, prune_old_messages,
retain_max_storage,
};
use crate::metrics::{CONNECTED_PEERS, GOSSIP_PEERS, PRUNED_MESSAGES, RECEIVED_MESSAGES};
use crate::{
config::Config,
Expand Down Expand Up @@ -110,6 +114,7 @@ impl RadioOperator {

let mut network_update_interval = interval(Duration::from_secs(600));
let mut summary_interval = interval(Duration::from_secs(180));
let mut daily_aggregate_interval = interval(Duration::from_secs(86400)); // 24 hours

let iteration_timeout = Duration::from_secs(180);
let update_timeout = Duration::from_secs(5);
Expand Down Expand Up @@ -214,6 +219,34 @@ impl RadioOperator {
}
}
},
_ = daily_aggregate_interval.tick() => {
if skip_iteration.load(Ordering::SeqCst) {
skip_iteration.store(false, Ordering::SeqCst);
continue;
}

let pool = &self.db;

match prune_old_aggregates(pool).await {
Ok(deleted_count) => trace!("Pruned {} old aggregate entries.", deleted_count),
Err(e) => error!("Failed to prune old aggregates: {:?}", e),
}

let from_timestamp = (Utc::now() - Duration::from_secs(86400)).timestamp();

match get_indexer_stats(pool, None, from_timestamp).await {
Ok(stats) => {
for stat in stats {
match insert_aggregate(pool, Utc::now().timestamp(), stat.graph_account, stat.message_count, stat.subgraphs_count).await {
Ok(_) => trace!("Successfully inserted daily aggregate."),
Err(e) => error!("Failed to insert daily aggregate: {:?}", e),
}
}
},
Err(e) => error!("Failed to fetch Indexer stats: {:?}", e),
}
},

else => break,
}

Expand Down
71 changes: 68 additions & 3 deletions src/server/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use async_graphql::{Context, EmptySubscription, Object, OutputType, Schema, Simp
use chrono::Utc;
use serde::{de::DeserializeOwned, Serialize};
use sqlx::{Pool, Postgres};
use std::{sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};
use thiserror::Error;
use tracing::error;

use crate::{
config::Config,
db::resolver::{
delete_message_all, delete_message_by_id, get_indexer_stats, list_active_indexers,
list_messages, list_rows, message_by_id, IndexerStats,
count_distinct_subgraphs, delete_message_all, delete_message_by_id, fetch_aggregates,
get_indexer_stats, list_active_indexers, list_messages, list_rows, message_by_id,
IndexerStats,
},
operator::radio_types::RadioPayloadMessage,
};
Expand All @@ -35,6 +37,13 @@ impl RadioContext {
}
}

#[derive(Serialize, SimpleObject)]
pub struct Summary {
total_message_count: HashMap<String, i64>,
average_subgraphs_count: HashMap<String, i64>,
total_subgraphs_covered: i64,
}

// Unified query object for resolvers
#[derive(Default)]
pub struct QueryRoot;
Expand Down Expand Up @@ -127,6 +136,62 @@ impl QueryRoot {
message_by_id(pool, id).await?.get_message();
Ok(msg)
}

async fn query_aggregate_stats(
&self,
ctx: &Context<'_>,
days: i32,
) -> Result<Summary, HttpServiceError> {
let pool = ctx.data_unchecked::<Pool<Postgres>>();

let since_timestamp =
(Utc::now() - chrono::Duration::try_days(days.into()).unwrap()).timestamp();
let aggregates = fetch_aggregates(pool, since_timestamp)
.await
.map_err(HttpServiceError::Others)?;

let mut total_message_count: HashMap<String, i64> = HashMap::new();
let mut total_subgraphs_count: HashMap<String, i64> = HashMap::new();

let mut subgraphs_counts = HashMap::new();

for stat in aggregates {
*total_message_count
.entry(stat.graph_account.clone())
.or_default() += stat.message_count;
*total_subgraphs_count
.entry(stat.graph_account.clone())
.or_default() += stat.subgraphs_count;
subgraphs_counts
.entry(stat.graph_account.clone())
.or_insert_with(Vec::new)
.push(stat.subgraphs_count);
}

let average_subgraphs_count: HashMap<String, i64> = total_subgraphs_count
.iter()
.map(|(key, &total_count)| {
let count = subgraphs_counts.get(key).map_or(1, |counts| counts.len());
(
key.clone(),
if count > 0 {
(total_count as f64 / count as f64).ceil() as i64
} else {
0
},
)
})
.collect();

let total_subgraphs_covered = count_distinct_subgraphs(pool, since_timestamp)
.await
.map_err(HttpServiceError::Others)?;
Ok(Summary {
total_message_count,
average_subgraphs_count,
total_subgraphs_covered,
})
}
}

// Unified query object for resolvers
Expand Down

0 comments on commit f20f847

Please sign in to comment.