diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index c0b371c45b..50bb566de2 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -1488,7 +1488,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "5.2.4" +version = "5.3.0" dependencies = [ "anyhow", "axum", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index b749d423ce..c790bc8ba1 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "5.2.4" +version = "5.3.0" edition = "2021" [dependencies] diff --git a/apps/fortuna/src/api.rs b/apps/fortuna/src/api.rs index 4535c5ca9c..3fb1979682 100644 --- a/apps/fortuna/src/api.rs +++ b/apps/fortuna/src/api.rs @@ -52,20 +52,45 @@ mod revelation; pub type ChainId = String; +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct RequestLabel { + pub value: String, +} + +pub struct ApiMetrics { + pub http_requests: Family, +} + #[derive(Clone)] pub struct ApiState { pub chains: Arc>, + pub metrics_registry: Arc>, + /// Prometheus metrics - pub metrics: Arc, + pub metrics: Arc, } impl ApiState { - pub fn new(chains: &[(ChainId, BlockchainState)]) -> ApiState { - let map: HashMap = chains.into_iter().cloned().collect(); + pub async fn new( + chains: HashMap, + metrics_registry: Arc>, + ) -> ApiState { + let metrics = ApiMetrics { + http_requests: Family::default(), + }; + + let http_requests = metrics.http_requests.clone(); + metrics_registry.write().await.register( + "http_requests", + "Number of HTTP requests received", + http_requests, + ); + ApiState { - chains: Arc::new(map), - metrics: Arc::new(Metrics::new()), + chains: Arc::new(chains), + metrics: Arc::new(metrics), + metrics_registry, } } } @@ -89,38 +114,6 @@ pub struct BlockchainState { pub confirmed_block_status: BlockStatus, } -pub struct Metrics { - pub registry: RwLock, - // TODO: track useful metrics. this counter is just a placeholder to get things set up. - pub request_counter: Family, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct Label { - value: String, -} - -impl Metrics { - pub fn new() -> Self { - let mut metrics_registry = Registry::default(); - let http_requests = Family::::default(); - - // Register the metric family with the registry. - metrics_registry.register( - // With the metric name. - "http_requests", - // And the metric help text. - "Number of HTTP requests received", - http_requests.clone(), - ); - - Metrics { - registry: RwLock::new(metrics_registry), - request_counter: http_requests, - } - } -} - pub enum RestError { /// The caller passed a sequence number that isn't within the supported range InvalidSequenceNumber, @@ -225,7 +218,12 @@ mod test { }, ethers::prelude::Address, lazy_static::lazy_static, - std::sync::Arc, + prometheus_client::registry::Registry, + std::{ + collections::HashMap, + sync::Arc, + }, + tokio::sync::RwLock, }; const PROVIDER: Address = Address::zero(); @@ -243,7 +241,7 @@ mod test { )); } - fn test_server() -> (TestServer, Arc, Arc) { + async fn test_server() -> (TestServer, Arc, Arc) { let eth_read = Arc::new(MockEntropyReader::with_requests(10, &[])); let eth_state = BlockchainState { @@ -255,6 +253,8 @@ mod test { confirmed_block_status: BlockStatus::Latest, }; + let metrics_registry = Arc::new(RwLock::new(Registry::default())); + let avax_read = Arc::new(MockEntropyReader::with_requests(10, &[])); let avax_state = BlockchainState { @@ -266,10 +266,11 @@ mod test { confirmed_block_status: BlockStatus::Latest, }; - let api_state = ApiState::new(&[ - ("ethereum".into(), eth_state), - ("avalanche".into(), avax_state), - ]); + let mut chains = HashMap::new(); + chains.insert("ethereum".into(), eth_state); + chains.insert("avalanche".into(), avax_state); + + let api_state = ApiState::new(chains, metrics_registry).await; let app = api::routes(api_state); (TestServer::new(app).unwrap(), eth_read, avax_read) @@ -287,7 +288,7 @@ mod test { #[tokio::test] async fn test_revelation() { - let (server, eth_contract, avax_contract) = test_server(); + let (server, eth_contract, avax_contract) = test_server().await; // Can't access a revelation if it hasn't been requested get_and_assert_status( @@ -416,7 +417,7 @@ mod test { #[tokio::test] async fn test_revelation_confirmation_delay() { - let (server, eth_contract, avax_contract) = test_server(); + let (server, eth_contract, avax_contract) = test_server().await; eth_contract.insert(PROVIDER, 0, 10, false); eth_contract.insert(PROVIDER, 1, 11, false); diff --git a/apps/fortuna/src/api/metrics.rs b/apps/fortuna/src/api/metrics.rs index b211adec0c..8e162f0523 100644 --- a/apps/fortuna/src/api/metrics.rs +++ b/apps/fortuna/src/api/metrics.rs @@ -9,7 +9,7 @@ use { }; pub async fn metrics(State(state): State) -> impl IntoResponse { - let registry = state.metrics.registry.read().await; + let registry = state.metrics_registry.read().await; let mut buffer = String::new(); // Should not fail if the metrics are valid and there is memory available diff --git a/apps/fortuna/src/api/revelation.rs b/apps/fortuna/src/api/revelation.rs index 93d3588deb..f85462cdce 100644 --- a/apps/fortuna/src/api/revelation.rs +++ b/apps/fortuna/src/api/revelation.rs @@ -1,7 +1,7 @@ use { crate::api::{ ChainId, - Label, + RequestLabel, RestError, }, anyhow::Result, @@ -45,8 +45,8 @@ pub async fn revelation( ) -> Result, RestError> { state .metrics - .request_counter - .get_or_create(&Label { + .http_requests + .get_or_create(&RequestLabel { value: "/v1/chains/{chain_id}/revelations/{sequence}".to_string(), }) .inc(); diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index d3eaea961c..2f57228d89 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -25,23 +25,52 @@ use { Result, }, axum::Router, + ethers::{ + middleware::Middleware, + providers::{ + Http, + Provider, + }, + types::BlockNumber, + }, + prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + family::Family, + gauge::Gauge, + }, + registry::Registry, + }, std::{ collections::HashMap, net::SocketAddr, sync::Arc, + time::{ + Duration, + SystemTime, + UNIX_EPOCH, + }, }, tokio::{ spawn, - sync::watch, + sync::{ + watch, + RwLock, + }, + time, }, tower_http::cors::CorsLayer, utoipa::OpenApi, utoipa_swagger_ui::SwaggerUi, }; +/// Track metrics in this interval +const TRACK_INTERVAL: Duration = Duration::from_secs(10); + pub async fn run_api( socket_addr: SocketAddr, chains: HashMap, + metrics_registry: Arc>, mut rx_exit: watch::Receiver, ) -> Result<()> { #[derive(OpenApi)] @@ -63,11 +92,7 @@ pub async fn run_api( )] struct ApiDoc; - let metrics_registry = api::Metrics::new(); - let api_state = api::ApiState { - chains: Arc::new(chains), - metrics: Arc::new(metrics_registry), - }; + let api_state = api::ApiState::new(chains, metrics_registry).await; // Initialize Axum Router. Note the type here is a `Router` due to the use of the // `with_state` method which replaces `Body` with `State` in the type signature. @@ -101,6 +126,7 @@ pub async fn run_keeper( chains: HashMap, config: Config, private_key: String, + metrics_registry: Arc>, ) -> Result<()> { let mut handles = Vec::new(); for (chain_id, chain_config) in chains { @@ -114,6 +140,7 @@ pub async fn run_keeper( private_key, chain_eth_config, chain_config.clone(), + metrics_registry.clone(), ))); } @@ -218,11 +245,86 @@ pub async fn run(opts: &RunOptions) -> Result<()> { Ok::<(), Error>(()) }); + let metrics_registry = Arc::new(RwLock::new(Registry::default())); + if let Some(keeper_private_key) = opts.load_keeper_private_key()? { - spawn(run_keeper(chains.clone(), config, keeper_private_key)); + spawn(run_keeper( + chains.clone(), + config.clone(), + keeper_private_key, + metrics_registry.clone(), + )); } - run_api(opts.addr.clone(), chains, rx_exit).await?; + // Spawn a thread to track latest block lag. This helps us know if the rpc is up and updated with the latest block. + spawn(track_block_timestamp_lag(config, metrics_registry.clone())); + + run_api(opts.addr.clone(), chains, metrics_registry, rx_exit).await?; Ok(()) } + + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct ChainLabel { + pub chain_id: String, +} + +/// Tracks the difference between the server timestamp and the latest block timestamp for each chain +pub async fn track_block_timestamp_lag(config: Config, metrics_registry: Arc>) { + let metrics = Family::::default(); + metrics_registry.write().await.register( + "block_timestamp_lag", + "The difference between server timestamp and latest block timestamp", + metrics.clone(), + ); + loop { + for (chain_id, chain_config) in &config.chains { + let chain_id = chain_id.clone(); + let chain_config = chain_config.clone(); + let metrics = metrics.clone(); + + spawn(async move { + let chain_id = chain_id.clone(); + let chain_config = chain_config.clone(); + + let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { + Ok(r) => r, + Err(e) => { + tracing::error!( + "Failed to create provider for chain id {} - {:?}", + &chain_id, + e + ); + return; + } + }; + + match provider.get_block(BlockNumber::Latest).await { + Ok(b) => { + if let Some(block) = b { + let block_timestamp = block.timestamp; + let server_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let lag: i64 = + (server_timestamp as i64) - (block_timestamp.as_u64() as i64); + + metrics + .get_or_create(&ChainLabel { + chain_id: chain_id.clone(), + }) + .set(lag); + } + } + Err(e) => { + tracing::error!("Failed to get block for chain id {} - {:?}", &chain_id, e); + } + }; + }); + } + + time::sleep(TRACK_INTERVAL).await; + } +} diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index b4e6c2a550..2ebcafcc08 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -5,7 +5,10 @@ use { BlockchainState, }, chain::{ - ethereum::SignablePythContract, + ethereum::{ + PythContract, + SignablePythContract, + }, reader::{ BlockNumber, RequestedWithCallbackEvent, @@ -20,17 +23,37 @@ use { ethers::{ contract::ContractError, providers::{ + Http, Middleware, Provider, Ws, }, - types::U256, + signers::Signer, + types::{ + Address, + U256, + }, }, futures::StreamExt, - std::sync::Arc, + prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + gauge::Gauge, + }, + registry::Registry, + }, + std::sync::{ + atomic::AtomicU64, + Arc, + }, tokio::{ spawn, - sync::mpsc, + sync::{ + mpsc, + RwLock, + }, time::{ self, Duration, @@ -42,12 +65,6 @@ use { }, }; -#[derive(Debug)] -pub struct BlockRange { - pub from: BlockNumber, - pub to: BlockNumber, -} - /// How much to wait before retrying in case of an RPC error const RETRY_INTERVAL: Duration = Duration::from_secs(5); /// How many blocks to look back for events that might be missed when starting the keeper @@ -56,7 +73,89 @@ const BACKLOG_RANGE: u64 = 1000; const BLOCK_BATCH_SIZE: u64 = 100; /// How much to wait before polling the next latest block const POLL_INTERVAL: Duration = Duration::from_secs(2); +/// Track metrics in this interval +const TRACK_INTERVAL: Duration = Duration::from_secs(10); + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct AccountLabel { + pub chain_id: String, + pub address: String, +} + +#[derive(Default)] +pub struct KeeperMetrics { + pub current_sequence_number: Family, + pub end_sequence_number: Family, + pub balance: Family>, + pub collected_fee: Family>, + pub total_gas_spent: Family>, + pub requests: Family, + pub requests_processed: Family, + pub reveals: Family, +} + +impl KeeperMetrics { + pub async fn new(registry: Arc>) -> Self { + let mut writable_registry = registry.write().await; + let keeper_metrics = KeeperMetrics::default(); + + writable_registry.register( + "current_sequence_number", + "The sequence number for a new request", + keeper_metrics.current_sequence_number.clone(), + ); + + writable_registry.register( + "end_sequence_number", + "The sequence number for the end request", + keeper_metrics.end_sequence_number.clone(), + ); + + writable_registry.register( + "requests", + "Number of requests received through events", + keeper_metrics.requests.clone(), + ); + + writable_registry.register( + "requests_processed", + "Number of requests processed", + keeper_metrics.requests_processed.clone(), + ); + + writable_registry.register( + "reveal", + "Number of reveals", + keeper_metrics.reveals.clone(), + ); + + writable_registry.register( + "balance", + "Balance of the keeper", + keeper_metrics.balance.clone(), + ); + writable_registry.register( + "collected_fee", + "Collected fee on the contract", + keeper_metrics.collected_fee.clone(), + ); + + writable_registry.register( + "total_gas_spent", + "Total gas spent revealing requests", + keeper_metrics.total_gas_spent.clone(), + ); + + keeper_metrics + } +} + +#[derive(Debug)] +pub struct BlockRange { + pub from: BlockNumber, + pub to: BlockNumber, +} /// Get the latest safe block number for the chain. Retry internally if there is an error. async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber { @@ -88,7 +187,11 @@ pub async fn run_keeper_threads( private_key: String, chain_eth_config: EthereumConfig, chain_state: BlockchainState, + metrics: Arc>, ) { + // Register metrics + let keeper_metrics = Arc::new(KeeperMetrics::new(metrics.clone()).await); + tracing::info!("starting keeper"); let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; tracing::info!("latest safe block: {}", &latest_safe_block); @@ -98,6 +201,7 @@ pub async fn run_keeper_threads( .await .expect("Chain config should be valid"), ); + let keeper_address = contract.client().inner().inner().signer().address(); // Spawn a thread to handle the events from last BACKLOG_RANGE blocks. spawn( @@ -109,6 +213,7 @@ pub async fn run_keeper_threads( contract.clone(), chain_eth_config.gas_limit, chain_state.clone(), + keeper_metrics.clone(), ) .in_current_span(), ); @@ -131,9 +236,47 @@ pub async fn run_keeper_threads( rx, Arc::clone(&contract), chain_eth_config.gas_limit, + keeper_metrics.clone(), ) .in_current_span(), ); + + // Spawn a thread to track the provider info and the balance of the keeper + spawn( + async move { + let chain_id = chain_state.id.clone(); + let chain_config = chain_eth_config.clone(); + let provider_address = chain_state.provider_address.clone(); + let keeper_metrics = keeper_metrics.clone(); + + loop { + // There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds. + // If rpc start fails all of these threads will just exit, instead of retrying. + // We are tracking rpc failures elsewhere, so it's fine. + spawn( + track_provider( + chain_id.clone(), + chain_config.clone(), + provider_address.clone(), + keeper_metrics.clone(), + ) + .in_current_span(), + ); + spawn( + track_balance( + chain_id.clone(), + chain_config.clone(), + keeper_address.clone(), + keeper_metrics.clone(), + ) + .in_current_span(), + ); + + time::sleep(TRACK_INTERVAL).await; + } + } + .in_current_span(), + ); } @@ -146,6 +289,7 @@ pub async fn process_event( chain_config: &BlockchainState, contract: &Arc, gas_limit: U256, + metrics: Arc, ) -> Result<()> { if chain_config.provider_address != event.provider_address { return Ok(()); @@ -221,14 +365,53 @@ pub async fn process_event( }; match pending_tx.await { - Ok(res) => { - tracing::info!( - sequence_number = &event.sequence_number, - "Revealed with res: {:?}", - res - ); - Ok(()) - } + Ok(res) => match res { + Some(res) => { + tracing::info!( + sequence_number = &event.sequence_number, + transaction_hash = &res.transaction_hash.to_string(), + gas_used = ?res.gas_used, + "Revealed with res: {:?}", + res + ); + + if let Some(gas_used) = res.gas_used { + let gas_used = gas_used.as_u128() as f64 / 1e18; + metrics + .total_gas_spent + .get_or_create(&AccountLabel { + chain_id: chain_config.id.clone(), + address: contract + .client() + .inner() + .inner() + .signer() + .address() + .to_string(), + }) + .inc_by(gas_used); + } + + metrics + .reveals + .get_or_create(&AccountLabel { + chain_id: chain_config.id.clone(), + address: chain_config.provider_address.to_string(), + }) + .inc(); + Ok(()) + } + None => { + tracing::error!( + sequence_number = &event.sequence_number, + "Can't verify the reveal" + ); + // It is better to return an error in this scenario + // For the caller to retry + Err(anyhow!("Can't verify the reveal")) + } + }, + Err(e) => { tracing::error!( sequence_number = &event.sequence_number, @@ -266,6 +449,7 @@ pub async fn process_block_range( contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, + metrics: Arc, ) { let BlockRange { from: first_block, @@ -286,6 +470,7 @@ pub async fn process_block_range( contract.clone(), gas_limit, chain_state.clone(), + metrics.clone(), ) .in_current_span() .await; @@ -302,6 +487,7 @@ pub async fn process_single_block_batch( contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, + metrics: Arc, ) { loop { let events_res = chain_state @@ -313,11 +499,23 @@ pub async fn process_single_block_batch( Ok(events) => { tracing::info!(num_of_events = &events.len(), "Processing",); for event in &events { + metrics + .requests + .get_or_create(&AccountLabel { + chain_id: chain_state.id.clone(), + address: chain_state.provider_address.to_string(), + }) + .inc(); tracing::info!(sequence_number = &event.sequence_number, "Processing event",); - while let Err(e) = - process_event(event.clone(), &chain_state, &contract, gas_limit) - .in_current_span() - .await + while let Err(e) = process_event( + event.clone(), + &chain_state, + &contract, + gas_limit, + metrics.clone(), + ) + .in_current_span() + .await { tracing::error!( sequence_number = &event.sequence_number, @@ -328,6 +526,13 @@ pub async fn process_single_block_batch( time::sleep(RETRY_INTERVAL).await; } tracing::info!(sequence_number = &event.sequence_number, "Processed event",); + metrics + .requests_processed + .get_or_create(&AccountLabel { + chain_id: chain_state.id.clone(), + address: chain_state.provider_address.to_string(), + }) + .inc(); } tracing::info!(num_of_events = &events.len(), "Processed",); break; @@ -455,6 +660,7 @@ pub async fn process_new_blocks( mut rx: mpsc::Receiver, contract: Arc, gas_limit: U256, + metrics: Arc, ) { tracing::info!("Waiting for new block ranges to process"); loop { @@ -464,6 +670,7 @@ pub async fn process_new_blocks( Arc::clone(&contract), gas_limit, chain_state.clone(), + metrics.clone(), ) .in_current_span() .await; @@ -478,10 +685,110 @@ pub async fn process_backlog( contract: Arc, gas_limit: U256, chain_state: BlockchainState, + metrics: Arc, ) { tracing::info!("Processing backlog"); - process_block_range(backlog_range, contract, gas_limit, chain_state) + process_block_range(backlog_range, contract, gas_limit, chain_state, metrics) .in_current_span() .await; tracing::info!("Backlog processed"); } + + +/// tracks the balance of the given address on the given chain +/// if there was an error, the function will just return +#[tracing::instrument(skip_all)] +pub async fn track_balance( + chain_id: String, + chain_config: EthereumConfig, + address: Address, + metrics_registry: Arc, +) { + let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { + Ok(r) => r, + Err(e) => { + tracing::error!("Error while connecting to geth rpc. error: {:?}", e); + return; + } + }; + + let balance = match provider.get_balance(address, None).await { + // This conversion to u128 is fine as the total balance will never cross the limits + // of u128 practically. + Ok(r) => r.as_u128(), + Err(e) => { + tracing::error!("Error while getting balance. error: {:?}", e); + return; + } + }; + // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus. + // The balance is in wei, so we need to divide by 1e18 to convert it to eth. + let balance = balance as f64 / 1e18; + + metrics_registry + .balance + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: address.to_string(), + }) + .set(balance); +} + +/// tracks the collected fees and the hashchain data of the given provider address on the given chain +/// if there is a error the function will just return +#[tracing::instrument(skip_all)] +pub async fn track_provider( + chain_id: String, + chain_config: EthereumConfig, + provider_address: Address, + metrics_registry: Arc, +) { + let contract = match PythContract::from_config(&chain_config) { + Ok(r) => r, + Err(e) => { + tracing::error!("Error while connecting to pythnet contract. error: {:?}", e); + return; + } + }; + + let provider_info = match contract.get_provider_info(provider_address).call().await { + Ok(info) => info, + Err(e) => { + tracing::error!("Error while getting provider info. error: {:?}", e); + return; + } + }; + + // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. + // The fee is in wei, so we divide by 1e18 to convert it to eth. + let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18; + + let current_sequence_number = provider_info.sequence_number; + let end_sequence_number = provider_info.end_sequence_number; + + metrics_registry + .collected_fee + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + .set(collected_fee); + + metrics_registry + .current_sequence_number + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + // sequence_number type on chain is u64 but practically it will take + // a long time for it to cross the limits of i64. + // currently prometheus only supports i64 for Gauge types + .set(current_sequence_number as i64); + metrics_registry + .end_sequence_number + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + .set(end_sequence_number as i64); +}