diff --git a/Cargo.lock b/Cargo.lock index cde4346..68fd11b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5050,6 +5050,7 @@ dependencies = [ "hyper", "indoc", "itertools 0.12.0", + "metrics", "num-bigint", "postgres-docker-utils", "rand", diff --git a/Cargo.toml b/Cargo.toml index 7e50cbd..f1f02e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ sqlx = { version = "0.7.2", features = [ "migrate", "bigdecimal", ] } +metrics = "0.21.1" num-bigint = "0.4.4" bigdecimal = "0.4.2" spki = "0.7.2" diff --git a/src/config.rs b/src/config.rs index 5813ba4..53ad624 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,6 +20,9 @@ pub struct TxSitterConfig { #[serde(default)] pub datadog_enabled: bool, + + #[serde(default)] + pub statsd_enabled: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -112,6 +115,7 @@ mod tests { [service] escalation_interval = "1h" datadog_enabled = false + statsd_enabled = false [server] host = "127.0.0.1:3000" @@ -128,6 +132,7 @@ mod tests { [service] escalation_interval = "1h" datadog_enabled = false + statsd_enabled = false [server] host = "127.0.0.1:3000" @@ -150,6 +155,7 @@ mod tests { service: TxSitterConfig { escalation_interval: Duration::from_secs(60 * 60), datadog_enabled: false, + statsd_enabled: false, }, server: ServerConfig { host: SocketAddr::from(([127, 0, 0, 1], 3000)), @@ -174,6 +180,7 @@ mod tests { service: TxSitterConfig { escalation_interval: Duration::from_secs(60 * 60), datadog_enabled: false, + statsd_enabled: false, }, server: ServerConfig { host: SocketAddr::from(([127, 0, 0, 1], 3000)), diff --git a/src/db.rs b/src/db.rs index 550844c..8323a16 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,7 +14,9 @@ use crate::types::{RelayerInfo, RelayerUpdate, TransactionPriority}; pub mod data; -use self::data::{AddressWrapper, BlockFees, H256Wrapper, ReadTxData, RpcKind}; +use self::data::{ + AddressWrapper, BlockFees, H256Wrapper, NetworkStats, ReadTxData, RpcKind, +}; pub use self::data::{TxForEscalation, TxStatus, UnsentTx}; // Statically link in migration files @@ -924,6 +926,82 @@ impl Database { Ok(is_valid) } + + pub async fn get_stats(&self, chain_id: u64) -> eyre::Result { + let (pending_txs,): (i64,) = sqlx::query_as( + r#" + SELECT COUNT(1) + FROM transactions t + JOIN relayers r ON (t.relayer_id = r.id) + LEFT JOIN sent_transactions s ON (t.id = s.tx_id) + WHERE s.tx_id IS NULL + AND r.chain_id = $1 + "#, + ) + .bind(chain_id as i64) + .fetch_one(&self.pool) + .await?; + + let (mined_txs,): (i64,) = sqlx::query_as( + r#" + SELECT COUNT(1) + FROM transactions t + JOIN relayers r ON (t.relayer_id = r.id) + LEFT JOIN sent_transactions s ON (t.id = s.tx_id) + WHERE s.status = $1 + AND r.chain_id = $2 + "#, + ) + .bind(TxStatus::Mined) + .bind(chain_id as i64) + .fetch_one(&self.pool) + .await?; + + let (finalized_txs,): (i64,) = sqlx::query_as( + r#" + SELECT COUNT(1) + FROM transactions t + JOIN relayers r ON (t.relayer_id = r.id) + LEFT JOIN sent_transactions s ON (t.id = s.tx_id) + WHERE s.status = $1 + AND r.chain_id = $2 + "#, + ) + .bind(TxStatus::Finalized) + .bind(chain_id as i64) + .fetch_one(&self.pool) + .await?; + + let (total_indexed_blocks,): (i64,) = sqlx::query_as( + r#" + SELECT COUNT(1) + FROM blocks + WHERE chain_id = $1 + "#, + ) + .bind(chain_id as i64) + .fetch_one(&self.pool) + .await?; + + let (block_txs,): (i64,) = sqlx::query_as( + r#" + SELECT COUNT(1) + FROM block_txs + WHERE chain_id = $1 + "#, + ) + .bind(chain_id as i64) + .fetch_one(&self.pool) + .await?; + + Ok(NetworkStats { + pending_txs: pending_txs as u64, + mined_txs: mined_txs as u64, + finalized_txs: finalized_txs as u64, + total_indexed_blocks: total_indexed_blocks as u64, + block_txs: block_txs as u64, + }) + } } #[cfg(test)] diff --git a/src/db/data.rs b/src/db/data.rs index 5fccf40..7e8bd80 100644 --- a/src/db/data.rs +++ b/src/db/data.rs @@ -58,6 +58,15 @@ pub struct ReadTxData { pub status: Option, } +#[derive(Debug, Clone)] +pub struct NetworkStats { + pub pending_txs: u64, + pub mined_txs: u64, + pub finalized_txs: u64, + pub total_indexed_blocks: u64, + pub block_txs: u64, +} + #[derive(Debug, Clone)] pub struct BlockFees { pub fee_estimates: FeesEstimate, diff --git a/src/main.rs b/src/main.rs index a8cd9e9..18e8cca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,8 @@ use std::path::PathBuf; use clap::Parser; use config::FileFormat; +use telemetry_batteries::metrics::statsd::StatsdBattery; +use telemetry_batteries::metrics::MetricsBattery; use telemetry_batteries::tracing::batteries::datadog::DatadogBattery; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -69,6 +71,18 @@ async fn main() -> eyre::Result<()> { .init(); } + if config.service.statsd_enabled { + let statsd_battery = StatsdBattery::new( + "localhost", + 8125, + 5000, + 1024, + Some("tx_sitter_monolith"), + )?; + + statsd_battery.init()?; + } + let service = Service::new(config).await?; service.wait().await?; diff --git a/src/service.rs b/src/service.rs index cef7e49..599b0ac 100644 --- a/src/service.rs +++ b/src/service.rs @@ -29,6 +29,10 @@ impl Service { task_runner.add_task("Handle soft reorgs", tasks::handle_soft_reorgs); task_runner.add_task("Handle hard reorgs", tasks::handle_hard_reorgs); + if app.config.service.statsd_enabled { + task_runner.add_task("Emit metrics", tasks::emit_metrics); + } + for chain_id in chain_ids { Self::spawn_chain_tasks(&task_runner, chain_id)?; } diff --git a/src/tasks.rs b/src/tasks.rs index e15f411..b86014b 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -3,6 +3,7 @@ pub mod escalate; pub mod finalize; pub mod handle_reorgs; pub mod index; +pub mod metrics; pub mod prune; pub use self::broadcast::broadcast_txs; @@ -10,4 +11,5 @@ pub use self::escalate::escalate_txs; pub use self::finalize::finalize_txs; pub use self::handle_reorgs::{handle_hard_reorgs, handle_soft_reorgs}; pub use self::index::index_chain; +pub use self::metrics::emit_metrics; pub use self::prune::{prune_blocks, prune_txs}; diff --git a/src/tasks/metrics.rs b/src/tasks/metrics.rs new file mode 100644 index 0000000..4584815 --- /dev/null +++ b/src/tasks/metrics.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; +use std::time::Duration; + +use crate::app::App; + +const EMIT_METRICS_INTERVAL: Duration = Duration::from_secs(1); + +pub async fn emit_metrics(app: Arc) -> eyre::Result<()> { + loop { + let chain_ids = app.db.get_network_chain_ids().await?; + + for chain_id in chain_ids { + let stats = app.db.get_stats(chain_id).await?; + + // TODO: Add labels for env, etc. + let labels = [("chain_id", chain_id.to_string())]; + + metrics::gauge!("pending_txs", stats.pending_txs as f64, &labels); + metrics::gauge!("mined_txs", stats.mined_txs as f64, &labels); + metrics::gauge!( + "finalized_txs", + stats.finalized_txs as f64, + &labels + ); + metrics::gauge!( + "total_indexed_blocks", + stats.total_indexed_blocks as f64, + &labels + ); + metrics::gauge!("block_fees", stats.block_txs as f64, &labels); + metrics::gauge!("block_txs", stats.block_txs as f64, &labels); + } + + tokio::time::sleep(EMIT_METRICS_INTERVAL).await; + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 6620594..c24a844 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -142,6 +142,7 @@ pub async fn setup_service( service: TxSitterConfig { escalation_interval, datadog_enabled: false, + statsd_enabled: false, }, server: ServerConfig { host: SocketAddr::V4(SocketAddrV4::new(