Skip to content

Commit

Permalink
Emit metrics periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Dec 7, 2023
1 parent 792b858 commit 5a576a4
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ sqlx = { version = "0.7.2", features = [
"migrate",
"bigdecimal",
] }
metrics = "0.21.1"
num-bigint = "0.4.4"
bigdecimal = "0.4.2"
spki = "0.7.2"
Expand Down
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub struct TxSitterConfig {

#[serde(default)]
pub datadog_enabled: bool,

#[serde(default)]
pub statsd_enabled: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -112,6 +115,7 @@ mod tests {
[service]
escalation_interval = "1h"
datadog_enabled = false
statsd_enabled = false
[server]
host = "127.0.0.1:3000"
Expand All @@ -128,6 +132,7 @@ mod tests {
[service]
escalation_interval = "1h"
datadog_enabled = false
statsd_enabled = false
[server]
host = "127.0.0.1:3000"
Expand All @@ -150,6 +155,7 @@ mod tests {
service: TxSitterConfig {
escalation_interval: Duration::from_secs(60 * 60),
datadog_enabled: false,
statsd_enabled: false,
},
server: ServerConfig {
host: SocketAddr::from(([127, 0, 0, 1], 3000)),
Expand All @@ -174,6 +180,7 @@ mod tests {
service: TxSitterConfig {
escalation_interval: Duration::from_secs(60 * 60),
datadog_enabled: false,
statsd_enabled: false,
},
server: ServerConfig {
host: SocketAddr::from(([127, 0, 0, 1], 3000)),
Expand Down
80 changes: 79 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use crate::types::{RelayerInfo, RelayerUpdate, TransactionPriority};

pub mod data;

use self::data::{AddressWrapper, BlockFees, H256Wrapper, ReadTxData, RpcKind};
use self::data::{
AddressWrapper, BlockFees, H256Wrapper, NetworkStats, ReadTxData, RpcKind,
};
pub use self::data::{TxForEscalation, TxStatus, UnsentTx};

// Statically link in migration files
Expand Down Expand Up @@ -924,6 +926,82 @@ impl Database {

Ok(is_valid)
}

pub async fn get_stats(&self, chain_id: u64) -> eyre::Result<NetworkStats> {
let (pending_txs,): (i64,) = sqlx::query_as(
r#"
SELECT COUNT(1)
FROM transactions t
JOIN relayers r ON (t.relayer_id = r.id)
LEFT JOIN sent_transactions s ON (t.id = s.tx_id)
WHERE s.tx_id IS NULL
AND r.chain_id = $1
"#,
)
.bind(chain_id as i64)
.fetch_one(&self.pool)
.await?;

let (mined_txs,): (i64,) = sqlx::query_as(
r#"
SELECT COUNT(1)
FROM transactions t
JOIN relayers r ON (t.relayer_id = r.id)
LEFT JOIN sent_transactions s ON (t.id = s.tx_id)
WHERE s.status = $1
AND r.chain_id = $2
"#,
)
.bind(TxStatus::Mined)
.bind(chain_id as i64)
.fetch_one(&self.pool)
.await?;

let (finalized_txs,): (i64,) = sqlx::query_as(
r#"
SELECT COUNT(1)
FROM transactions t
JOIN relayers r ON (t.relayer_id = r.id)
LEFT JOIN sent_transactions s ON (t.id = s.tx_id)
WHERE s.status = $1
AND r.chain_id = $2
"#,
)
.bind(TxStatus::Finalized)
.bind(chain_id as i64)
.fetch_one(&self.pool)
.await?;

let (total_indexed_blocks,): (i64,) = sqlx::query_as(
r#"
SELECT COUNT(1)
FROM blocks
WHERE chain_id = $1
"#,
)
.bind(chain_id as i64)
.fetch_one(&self.pool)
.await?;

let (block_txs,): (i64,) = sqlx::query_as(
r#"
SELECT COUNT(1)
FROM block_txs
WHERE chain_id = $1
"#,
)
.bind(chain_id as i64)
.fetch_one(&self.pool)
.await?;

Ok(NetworkStats {
pending_txs: pending_txs as u64,
mined_txs: mined_txs as u64,
finalized_txs: finalized_txs as u64,
total_indexed_blocks: total_indexed_blocks as u64,
block_txs: block_txs as u64,
})
}
}

#[cfg(test)]
Expand Down
9 changes: 9 additions & 0 deletions src/db/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ pub struct ReadTxData {
pub status: Option<TxStatus>,
}

#[derive(Debug, Clone)]
pub struct NetworkStats {
pub pending_txs: u64,
pub mined_txs: u64,
pub finalized_txs: u64,
pub total_indexed_blocks: u64,
pub block_txs: u64,
}

#[derive(Debug, Clone)]
pub struct BlockFees {
pub fee_estimates: FeesEstimate,
Expand Down
14 changes: 14 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::path::PathBuf;

use clap::Parser;
use config::FileFormat;
use telemetry_batteries::metrics::statsd::StatsdBattery;
use telemetry_batteries::metrics::MetricsBattery;
use telemetry_batteries::tracing::batteries::datadog::DatadogBattery;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
Expand Down Expand Up @@ -69,6 +71,18 @@ async fn main() -> eyre::Result<()> {
.init();
}

if config.service.statsd_enabled {
let statsd_battery = StatsdBattery::new(
"localhost",
8125,
5000,
1024,
Some("tx_sitter_monolith"),
)?;

statsd_battery.init()?;
}

let service = Service::new(config).await?;
service.wait().await?;

Expand Down
4 changes: 4 additions & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ impl Service {
task_runner.add_task("Handle soft reorgs", tasks::handle_soft_reorgs);
task_runner.add_task("Handle hard reorgs", tasks::handle_hard_reorgs);

if app.config.service.statsd_enabled {
task_runner.add_task("Emit metrics", tasks::emit_metrics);
}

for chain_id in chain_ids {
Self::spawn_chain_tasks(&task_runner, chain_id)?;
}
Expand Down
2 changes: 2 additions & 0 deletions src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ pub mod escalate;
pub mod finalize;
pub mod handle_reorgs;
pub mod index;
pub mod metrics;
pub mod prune;

pub use self::broadcast::broadcast_txs;
pub use self::escalate::escalate_txs;
pub use self::finalize::finalize_txs;
pub use self::handle_reorgs::{handle_hard_reorgs, handle_soft_reorgs};
pub use self::index::index_chain;
pub use self::metrics::emit_metrics;
pub use self::prune::{prune_blocks, prune_txs};
36 changes: 36 additions & 0 deletions src/tasks/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::sync::Arc;
use std::time::Duration;

use crate::app::App;

const EMIT_METRICS_INTERVAL: Duration = Duration::from_secs(1);

pub async fn emit_metrics(app: Arc<App>) -> eyre::Result<()> {
loop {
let chain_ids = app.db.get_network_chain_ids().await?;

for chain_id in chain_ids {
let stats = app.db.get_stats(chain_id).await?;

// TODO: Add labels for env, etc.
let labels = [("chain_id", chain_id.to_string())];

metrics::gauge!("pending_txs", stats.pending_txs as f64, &labels);
metrics::gauge!("mined_txs", stats.mined_txs as f64, &labels);
metrics::gauge!(
"finalized_txs",
stats.finalized_txs as f64,
&labels
);
metrics::gauge!(
"total_indexed_blocks",
stats.total_indexed_blocks as f64,
&labels
);
metrics::gauge!("block_fees", stats.block_txs as f64, &labels);
metrics::gauge!("block_txs", stats.block_txs as f64, &labels);
}

tokio::time::sleep(EMIT_METRICS_INTERVAL).await;
}
}
1 change: 1 addition & 0 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub async fn setup_service(
service: TxSitterConfig {
escalation_interval,
datadog_enabled: false,
statsd_enabled: false,
},
server: ServerConfig {
host: SocketAddr::V4(SocketAddrV4::new(
Expand Down

0 comments on commit 5a576a4

Please sign in to comment.