From e2716f3facec615a00dac288fcd6504c028e6aee Mon Sep 17 00:00:00 2001 From: axiomatic-aardvark Date: Fri, 15 Mar 2024 17:00:12 +0200 Subject: [PATCH] chore: test --- .github/workflows/main.yml | 10 +-- ...0b192cc9b7ddc23323def08c60118df914a47.json | 22 ------ ...863faf0fcf652cbded292e4eddca6fb4f1ccb.json | 20 +++++ src/db/resolver.rs | 76 +++++++++++++++---- src/operator/mod.rs | 23 +++--- src/server/model/mod.rs | 12 +-- 6 files changed, 101 insertions(+), 62 deletions(-) delete mode 100644 .sqlx/query-072603f4f0b17b26cec784a0e210b192cc9b7ddc23323def08c60118df914a47.json create mode 100644 .sqlx/query-3ed0545a37d071d142c970f7b0f863faf0fcf652cbded292e4eddca6fb4f1ccb.json diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 665e6d5..f259d59 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -2,9 +2,7 @@ name: CI on: push: - branches: [main] - pull_request_review: - types: [submitted] + pull_request: jobs: fmt: @@ -20,7 +18,6 @@ jobs: check: name: Check - if: github.event.review.state == 'approved' runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -30,7 +27,7 @@ jobs: run: git submodule update --init --recursive - uses: actions/setup-go@v4 with: - go-version: '1.19' + go-version: '1.20' - uses: actions-rs/toolchain@v1 with: profile: minimal @@ -47,7 +44,6 @@ jobs: test: name: Test Suite - if: github.event.review.state == 'approved' strategy: matrix: os: [ubuntu-latest] @@ -77,7 +73,7 @@ jobs: run: git submodule update --init --recursive - uses: actions/setup-go@v4 with: - go-version: '1.19' + go-version: '1.20' - uses: actions-rs/toolchain@v1 with: profile: minimal diff --git a/.sqlx/query-072603f4f0b17b26cec784a0e210b192cc9b7ddc23323def08c60118df914a47.json b/.sqlx/query-072603f4f0b17b26cec784a0e210b192cc9b7ddc23323def08c60118df914a47.json deleted file mode 100644 index 1c04b3a..0000000 --- a/.sqlx/query-072603f4f0b17b26cec784a0e210b192cc9b7ddc23323def08c60118df914a47.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM messages\n WHERE (message->>'nonce')::bigint < $1\n RETURNING id\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "072603f4f0b17b26cec784a0e210b192cc9b7ddc23323def08c60118df914a47" -} diff --git a/.sqlx/query-3ed0545a37d071d142c970f7b0f863faf0fcf652cbded292e4eddca6fb4f1ccb.json b/.sqlx/query-3ed0545a37d071d142c970f7b0f863faf0fcf652cbded292e4eddca6fb4f1ccb.json new file mode 100644 index 0000000..1f5fde9 --- /dev/null +++ b/.sqlx/query-3ed0545a37d071d142c970f7b0f863faf0fcf652cbded292e4eddca6fb4f1ccb.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT COUNT(*) as \"count!: i64\"\n FROM messages\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!: i64", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "3ed0545a37d071d142c970f7b0f863faf0fcf652cbded292e4eddca6fb4f1ccb" +} diff --git a/src/db/resolver.rs b/src/db/resolver.rs index 7d50964..a1823d1 100644 --- a/src/db/resolver.rs +++ b/src/db/resolver.rs @@ -1,7 +1,7 @@ use async_graphql::{OutputType, SimpleObject}; use chrono::Utc; use serde::{de::DeserializeOwned, Serialize}; -use sqlx::{types::Json, FromRow, PgPool, Row as SqliteRow}; +use sqlx::{postgres::PgQueryResult, types::Json, FromRow, PgPool, Row as SqliteRow}; use std::ops::Deref; use tracing::trace; @@ -83,6 +83,23 @@ ORDER BY id Ok(rows) } +pub async fn count_messages(pool: &PgPool) -> anyhow::Result { + let result = sqlx::query!( + r#" + SELECT COUNT(*) as "count!: i64" + FROM messages + "# + ) + .fetch_one(pool) + .await + .map_err(|e| { + trace!("Database query error: {:#?}", e); + anyhow::Error::new(e) + })?; + + Ok(result.count) +} + pub async fn list_rows(pool: &PgPool) -> Result>, anyhow::Error> where T: Clone + Serialize + DeserializeOwned + OutputType + std::marker::Unpin, @@ -201,24 +218,51 @@ RETURNING id Ok(deleted_ids.try_into().unwrap()) } -/// Function to delete messages older than `retention` minutes -/// Returns the number of messages deleted -pub async fn prune_old_messages(pool: &PgPool, retention: i32) -> Result { +/// Function to delete messages older than `retention` minutes in batches +/// Returns the total number of messages deleted +/// Arguments: +/// - `pool`: &PgPool - A reference to the PostgreSQL connection pool +/// - `retention`: i32 - The retention time in minutes +/// - `batch_size`: i64 - The number of messages to delete in each batch +pub async fn prune_old_messages( + pool: &PgPool, + retention: i32, + batch_size: i64, +) -> Result { let cutoff_nonce = Utc::now().timestamp() - (retention as i64 * 60); + let mut total_deleted = 0i64; + + loop { + let delete_query = sqlx::query( + r#" + WITH deleted AS ( + SELECT id + FROM messages + WHERE (message->>'nonce')::bigint < $1 + ORDER BY id ASC + LIMIT $2 + FOR UPDATE SKIP LOCKED + ) + DELETE FROM messages + WHERE id IN (SELECT id FROM deleted) + RETURNING id + "#, + ) + .bind(cutoff_nonce) + .bind(batch_size); - let deleted_count = sqlx::query!( - r#" - DELETE FROM messages - WHERE (message->>'nonce')::bigint < $1 - RETURNING id - "#, - cutoff_nonce - ) - .fetch_all(pool) - .await? - .len() as i64; + let result: PgQueryResult = delete_query.execute(pool).await?; + let deleted_count = result.rows_affected() as i64; + + total_deleted += deleted_count; + + // Break the loop if we deleted fewer rows than the batch size, indicating we've processed all eligible messages. + if deleted_count < batch_size { + break; + } + } - Ok(deleted_count) + Ok(total_deleted) } pub async fn list_active_indexers( diff --git a/src/operator/mod.rs b/src/operator/mod.rs index b2c79df..ba6cff2 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -13,11 +13,11 @@ use tracing::{debug, info, trace, warn}; use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent}; -use crate::db::resolver::{prune_old_messages, retain_max_storage}; +use crate::db::resolver::{count_messages, prune_old_messages, retain_max_storage}; use crate::metrics::{CONNECTED_PEERS, GOSSIP_PEERS, PRUNED_MESSAGES, RECEIVED_MESSAGES}; use crate::{ config::Config, - db::resolver::{add_message, list_messages}, + db::resolver::add_message, message_types::{PublicPoiMessage, SimpleMessage, UpgradeIntentMessage}, metrics::{handle_serve_metrics, ACTIVE_PEERS, CACHED_MESSAGES}, server::run_server, @@ -109,7 +109,7 @@ impl RadioOperator { let skip_iteration_clone = skip_iteration.clone(); let mut network_update_interval = interval(Duration::from_secs(600)); - let mut comparison_interval = interval(Duration::from_secs(30)); + let mut summary_interval = interval(Duration::from_secs(180)); let iteration_timeout = Duration::from_secs(180); let update_timeout = Duration::from_secs(5); @@ -160,7 +160,7 @@ impl RadioOperator { .set(self.graphcast_agent.number_of_peers().try_into().unwrap()); } }, - _ = comparison_interval.tick() => { + _ = summary_interval.tick() => { trace!("Local summary update"); if skip_iteration.load(Ordering::SeqCst) { skip_iteration.store(false, Ordering::SeqCst); @@ -185,10 +185,12 @@ impl RadioOperator { }; } + let batch_size = 1000; + // Always prune old messages based on RETENTION match timeout( update_timeout, - prune_old_messages(&self.db, self.config.retention) + prune_old_messages(&self.db, self.config.retention, batch_size) ).await { Err(e) => debug!(err = tracing::field::debug(e), "Pruning by retention timed out"), Ok(Ok(num_pruned)) => { @@ -199,14 +201,13 @@ impl RadioOperator { }; // List the remaining messages - let result = timeout(update_timeout, list_messages::>(&self.db)).await; + let result = timeout(update_timeout, count_messages(&self.db)).await.expect("could not count messages"); match result { - Err(e) => warn!(err = tracing::field::debug(e), "Public PoI messages summary timed out"), - Ok(msgs) => { - let msg_num = msgs.map_or(0, |m| m.len()); - CACHED_MESSAGES.set(msg_num as i64); - info!(total_messages = msg_num, + Err(e) => warn!(err = tracing::field::debug(e), "Database query for message count timed out"), + Ok(count) => { + CACHED_MESSAGES.set(count); + info!(total_messages = count, total_num_pruned, "Monitoring summary" ) diff --git a/src/server/model/mod.rs b/src/server/model/mod.rs index 1feeb2b..cd36220 100644 --- a/src/server/model/mod.rs +++ b/src/server/model/mod.rs @@ -1,9 +1,9 @@ use async_graphql::{Context, EmptySubscription, Object, OutputType, Schema, SimpleObject}; -use chrono::{Duration, Utc}; +use chrono::Utc; use serde::{de::DeserializeOwned, Serialize}; use sqlx::{Pool, Postgres}; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use thiserror::Error; use crate::{ @@ -61,13 +61,13 @@ impl QueryRoot { &self, ctx: &Context<'_>, indexers: Option>, - minutes_ago: Option, + minutes_ago: Option, ) -> Result, HttpServiceError> { let pool = ctx.data_unchecked::>(); // Use a default time window if not specified // Default to 1440 minutes (24 hours) if not provided let minutes_ago = minutes_ago.unwrap_or(1440); - let from_timestamp = (Utc::now() - Duration::minutes(minutes_ago)).timestamp(); + let from_timestamp = (Utc::now() - Duration::from_secs(minutes_ago * 60)).timestamp(); let active_indexers = list_active_indexers(pool, indexers, from_timestamp).await?; Ok(active_indexers) @@ -77,11 +77,11 @@ impl QueryRoot { &self, ctx: &Context<'_>, indexers: Option>, - minutes_ago: Option, + minutes_ago: Option, ) -> Result, HttpServiceError> { let pool = ctx.data_unchecked::>(); let minutes_ago = minutes_ago.unwrap_or(1440); - let from_timestamp = (Utc::now() - Duration::minutes(minutes_ago)).timestamp(); + let from_timestamp = (Utc::now() - Duration::from_secs(minutes_ago * 60)).timestamp(); let stats = get_indexer_stats(pool, indexers, from_timestamp).await?; Ok(stats)