Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring back metrics #24

Merged
merged 8 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 7 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 10 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ default-run = "tx-sitter"
members = ["crates/*"]

[dependencies]
async-trait = "0.1.74"

## AWS
aws-config = { version = "1.0.1" }
aws-credential-types = { version = "1.0.1", features = [
Expand All @@ -19,6 +17,15 @@ aws-sdk-kms = "1.3.0"
aws-smithy-runtime-api = "1.0.2"
aws-smithy-types = "1.0.2"
aws-types = "1.0.1"

# Internal
postgres-docker-utils = { path = "crates/postgres-docker-utils" }

# Company
telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", rev = "ec8ba6d4da45fdb98f900d8d4c8e1a09186894b4" }

## External
async-trait = "0.1.74"
axum = { version = "0.6.20", features = ["headers"] }
base64 = "0.21.5"
bigdecimal = "0.4.2"
Expand All @@ -36,18 +43,12 @@ humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14.27"
itertools = "0.12.0"
metrics = "0.21.1"
metrics = "0.22.1"
num-bigint = "0.4.4"
# telemetry-batteries = { path = "../telemetry-batteries" }

# Internal
postgres-docker-utils = { path = "crates/postgres-docker-utils" }
rand = "0.8.5"
reqwest = { version = "0.11.13", default-features = false, features = [
"rustls-tls",
] }

## Other
serde = "1.0.136"
serde_json = "1.0.91"
sha3 = "0.10.8"
Expand All @@ -62,9 +63,6 @@ sqlx = { version = "0.7.2", features = [
"bigdecimal",
] }
strum = { version = "0.25.0", features = ["derive"] }

# Company
telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", rev = "ec8ba6d4da45fdb98f900d8d4c8e1a09186894b4" }
thiserror = "1.0.50"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
toml = "0.8.8"
Expand Down
57 changes: 57 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;

use config::FileFormat;
use serde::{Deserialize, Serialize};

use crate::api_key::ApiKey;

pub fn load_config<'a>(
config_files: impl Iterator<Item = &'a Path>,
) -> eyre::Result<Config> {
let mut settings = config::Config::builder();

for config_file in config_files {
settings = settings.add_source(
config::File::from(config_file).format(FileFormat::Toml),
);
}

let settings = settings
.add_source(
config::Environment::with_prefix("TX_SITTER").separator("__"),
)
.add_source(
config::Environment::with_prefix("TX_SITTER_EXT")
.separator("__")
.try_parsing(true)
.list_separator(","),
)
.build()?;

let config = settings.try_deserialize::<Config>()?;

Ok(config)
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct Config {
Expand Down Expand Up @@ -262,4 +292,31 @@ mod tests {

assert_eq!(toml, WITH_DB_PARTS);
}

#[test]
fn env_config_test() {
std::env::set_var("TX_SITTER__DATABASE__KIND", "parts");
std::env::set_var("TX_SITTER__DATABASE__HOST", "dbHost");
std::env::set_var("TX_SITTER__DATABASE__PORT", "dbPort");
std::env::set_var("TX_SITTER__DATABASE__DATABASE", "dbName");
std::env::set_var("TX_SITTER__DATABASE__USERNAME", "dbUsername");
std::env::set_var("TX_SITTER__DATABASE__PASSWORD", "dbPassword");
std::env::set_var("TX_SITTER__SERVICE__ESCALATION_INTERVAL", "1m");
std::env::set_var("TX_SITTER__SERVICE__DATADOG_ENABLED", "true");
std::env::set_var("TX_SITTER__SERVICE__STATSD_ENABLED", "true");
std::env::set_var("TX_SITTER__SERVER__HOST", "0.0.0.0:8080");
std::env::set_var("TX_SITTER__SERVER__USERNAME", "authUsername");
std::env::set_var("TX_SITTER__SERVER__PASSWORD", "authPassword");
std::env::set_var("TX_SITTER__KEYS__KIND", "kms");

let config = load_config(std::iter::empty()).unwrap();

assert!(config.service.statsd_enabled);
assert!(config.service.datadog_enabled);
assert_eq!(config.service.escalation_interval, Duration::from_secs(60));
assert_eq!(
config.database.to_connection_string(),
"postgres://dbUsername:dbPassword@dbHost:dbPort/dbName"
);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod keys;
pub mod serde_utils;
pub mod server;
pub mod service;
pub mod shutdown;
pub mod task_runner;
pub mod tasks;
pub mod types;
29 changes: 6 additions & 23 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::path::PathBuf;

use clap::Parser;
use config::FileFormat;
use telemetry_batteries::metrics::statsd::StatsdBattery;
use telemetry_batteries::tracing::datadog::DatadogBattery;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use tx_sitter::config::Config;
use tx_sitter::config::load_config;
use tx_sitter::service::Service;
use tx_sitter::shutdown::spawn_await_shutdown_task;

#[derive(Parser)]
#[command(author, version, about)]
Expand All @@ -35,27 +35,7 @@ async fn main() -> eyre::Result<()> {
dotenv::from_path(path)?;
}

let mut settings = config::Config::builder();

for arg in &args.config {
settings = settings.add_source(
config::File::from(arg.as_ref()).format(FileFormat::Toml),
);
}

let settings = settings
.add_source(
config::Environment::with_prefix("TX_SITTER").separator("__"),
)
.add_source(
config::Environment::with_prefix("TX_SITTER_EXT")
.separator("__")
.try_parsing(true)
.list_separator(","),
)
.build()?;

let config = settings.try_deserialize::<Config>()?;
let config = load_config(args.config.iter().map(PathBuf::as_ref))?;

if config.service.datadog_enabled {
DatadogBattery::init(None, "tx-sitter-monolith", None, true);
Expand All @@ -76,6 +56,9 @@ async fn main() -> eyre::Result<()> {
)?;
}

spawn_await_shutdown_task();

tracing::info!(?config, "Starting service");
let service = Service::new(config).await?;
service.wait().await?;

Expand Down
28 changes: 28 additions & 0 deletions src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use core::panic;

use tokio::signal::unix::{signal, SignalKind};

pub fn spawn_await_shutdown_task() {
tokio::spawn(async {
let result = await_shutdown_signal().await;
if let Err(err) = result {
tracing::error!("Error while waiting for shutdown signal: {}", err);
panic!("Error while waiting for shutdown signal: {}", err);
}

tracing::info!("Shutdown complete");
std::process::exit(0);
});
}

pub async fn await_shutdown_signal() -> eyre::Result<()> {
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;

tokio::select! {
_ = sigint.recv() => { tracing::info!("SIGINT received, shutting down"); }
_ = sigterm.recv() => { tracing::info!("SIGTERM received, shutting down"); }
};

Ok(())
}
2 changes: 1 addition & 1 deletion src/tasks/escalate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn escalate_txs(app: &App) -> eyre::Result<()> {
let mut futures = FuturesUnordered::new();

for (relayer_id, txs) in txs_for_escalation {
futures.push(escalate_relayer_txs(&app, relayer_id, txs));
futures.push(escalate_relayer_txs(app, relayer_id, txs));
}

while let Some(result) = futures.next().await {
Expand Down
17 changes: 6 additions & 11 deletions src/tasks/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub async fn index_block(
"Tx mined"
);

metrics::increment_counter!("tx_mined", &metric_labels);
metrics::counter!("tx_mined", &metric_labels).increment(1);
}

let relayers = app.db.get_relayers_by_chain_id(chain_id).await?;
Expand Down Expand Up @@ -171,29 +171,24 @@ pub async fn estimate_gas(app: Arc<App>, chain_id: u64) -> eyre::Result<()> {
.await?;

let labels = [("chain_id", chain_id.to_string())];
metrics::gauge!(
"gas_price",
gas_price.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR,
&labels
);
metrics::gauge!(
"base_fee_per_gas",
metrics::gauge!("gas_price", &labels)
.set(gas_price.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR);
metrics::gauge!("base_fee_per_gas", &labels).set(
fee_estimates.base_fee_per_gas.as_u64() as f64
* GAS_PRICE_FOR_METRICS_FACTOR,
&labels
);

for (i, percentile) in FEE_PERCENTILES.iter().enumerate() {
let percentile_fee = fee_estimates.percentile_fees[i];

metrics::gauge!(
"percentile_fee",
percentile_fee.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR,
&[
("chain_id", chain_id.to_string()),
("percentile", percentile.to_string()),
]
);
)
.set(percentile_fee.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR);
}

tokio::time::sleep(Duration::from_secs(
Expand Down
21 changes: 8 additions & 13 deletions src/tasks/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,14 @@ pub async fn emit_metrics(app: Arc<App>) -> eyre::Result<()> {
// TODO: Add labels for env, etc.
let labels = [("chain_id", chain_id.to_string())];

metrics::gauge!("pending_txs", stats.pending_txs as f64, &labels);
metrics::gauge!("mined_txs", stats.mined_txs as f64, &labels);
metrics::gauge!(
"finalized_txs",
stats.finalized_txs as f64,
&labels
);
metrics::gauge!(
"total_indexed_blocks",
stats.total_indexed_blocks as f64,
&labels
);
metrics::gauge!("block_txs", stats.block_txs as f64, &labels);
metrics::gauge!("pending_txs", &labels)
.set(stats.pending_txs as f64);
metrics::gauge!("mined_txs", &labels).set(stats.mined_txs as f64);
metrics::gauge!("finalized_txs", &labels)
.set(stats.finalized_txs as f64);
metrics::gauge!("total_indexed_blocks", &labels)
.set(stats.total_indexed_blocks as f64);
metrics::gauge!("block_txs", &labels).set(stats.block_txs as f64);
}

tokio::time::sleep(EMIT_METRICS_INTERVAL).await;
Expand Down