Skip to content

Commit

Permalink
feat: add expected_network_height metric (#4326)
Browse files Browse the repository at this point in the history
  • Loading branch information
LesnyRumcajs authored May 15, 2024
1 parent 5443b2e commit 0c74df9
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 46 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
- [#4315](https://github.com/ChainSafe/forest/pull/4315) Add support for the
`Filecoin.StateGetNetworkParams` RPC method.

- [#4326](https://github.com/ChainSafe/forest/pull/4326) Added
`expected_network_height` metric to the Prometheus metrics.

### Changed

- [#4170](https://github.com/ChainSafe/forest/pull/4170) Change the default
Expand Down
43 changes: 18 additions & 25 deletions src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{
time::SystemTime,
};

use crate::blocks::{Block, CreateTipsetError, FullTipset, GossipBlock, Tipset, TipsetKey};
use crate::chain::{ChainStore, Error as ChainStoreError};
use crate::libp2p::{
hello::HelloRequest, NetworkEvent, NetworkMessage, PeerId, PeerManager, PubsubMessage,
Expand All @@ -17,6 +16,10 @@ use crate::message::SignedMessage;
use crate::message_pool::{MessagePool, Provider};
use crate::shim::{clock::SECONDS_IN_DAY, message::Message};
use crate::state_manager::StateManager;
use crate::{
blocks::{Block, CreateTipsetError, FullTipset, GossipBlock, Tipset, TipsetKey},
networks::calculate_expected_epoch,
};
use cid::Cid;
use futures::{
future::{try_join_all, Future},
Expand Down Expand Up @@ -362,7 +365,7 @@ where
mem_pool: Arc<MessagePool<M>>,
genesis: Arc<Tipset>,
message_processing_strategy: PubsubMessageProcessingStrategy,
block_delay: u64,
block_delay: u32,
) -> Result<Option<(FullTipset, PeerId)>, ChainMuxerError> {
let (tipset, source) = match event {
NetworkEvent::HelloRequestInbound { source, request } => {
Expand Down Expand Up @@ -527,7 +530,7 @@ where
let genesis = self.genesis.clone();
let bad_block_cache = self.bad_blocks.clone();
let mem_pool = self.mpool.clone();
let block_delay = self.state_manager.chain_config().block_delay_secs as u64;
let block_delay = self.state_manager.chain_config().block_delay_secs;

let future = async move {
loop {
Expand Down Expand Up @@ -567,23 +570,23 @@ where
let chain_store = self.state_manager.chain_store().clone();
let network = self.network.clone();
let genesis = self.genesis.clone();
let genesis_timestamp = self.genesis.block_headers().first().timestamp as i64;
let genesis_timestamp = self.genesis.block_headers().first().timestamp;
let bad_block_cache = self.bad_blocks.clone();
let mem_pool = self.mpool.clone();
let tipset_sample_size = self.state_manager.sync_config().tipset_sample_size;
let block_delay = self.state_manager.chain_config().block_delay_secs as u64;
let block_delay = self.state_manager.chain_config().block_delay_secs;

let evaluator = async move {
// If `local_epoch >= now_epoch`, return `NetworkHeadEvaluation::InSync`
// and enter FOLLOW mode directly instead of waiting to collect `tipset_sample_size` tipsets.
// Otherwise in some conditions, `forest-cli sync wait` takes very long to exit (only when the node enters FOLLOW mode)
match (
chain_store.heaviest_tipset().epoch(),
get_now_epoch(
chrono::Utc::now().timestamp(),
calculate_expected_epoch(
chrono::Utc::now().timestamp() as u64,
genesis_timestamp,
block_delay as i64,
),
block_delay,
) as i64,
) {
(local_epoch, now_epoch) if local_epoch >= now_epoch => {
return Ok(NetworkHeadEvaluation::InSync)
Expand Down Expand Up @@ -623,11 +626,11 @@ where
}
};

let now_epoch = get_now_epoch(
chrono::Utc::now().timestamp(),
let now_epoch = calculate_expected_epoch(
chrono::Utc::now().timestamp() as u64,
genesis_timestamp,
block_delay as i64,
);
block_delay,
) as i64;
let is_block_valid = |block: &Block| -> bool {
let header = &block.header;
if !header.is_within_clock_drift() {
Expand Down Expand Up @@ -729,7 +732,7 @@ where
let genesis = self.genesis.clone();
let bad_block_cache = self.bad_blocks.clone();
let mem_pool = self.mpool.clone();
let block_delay = self.state_manager.chain_config().block_delay_secs as u64;
let block_delay = self.state_manager.chain_config().block_delay_secs;
let stream_processor: ChainMuxerFuture<(), ChainMuxerError> = Box::pin(async move {
loop {
let event = match p2p_messages.recv_async().await {
Expand Down Expand Up @@ -820,7 +823,7 @@ where
let bad_block_cache = self.bad_blocks.clone();
let mem_pool = self.mpool.clone();
let tipset_sender = self.tipset_sender.clone();
let block_delay = self.state_manager.chain_config().block_delay_secs as u64;
let block_delay = self.state_manager.chain_config().block_delay_secs;
let stream_processor: ChainMuxerFuture<UnexpectedReturnKind, ChainMuxerError> = Box::pin(
async move {
// If a tipset has been provided, pass it to the tipset processor
Expand Down Expand Up @@ -1023,13 +1026,3 @@ where
}
}
}

// The formula matches lotus
// ```go
// sinceGenesis := build.Clock.Now().Sub(genesisTime)
// expectedHeight := int64(sinceGenesis.Seconds()) / int64(build.BlockDelaySecs)
// ```
// See <https://github.com/filecoin-project/lotus/blob/b27c861485695d3f5bb92bcb281abc95f4d90fb6/chain/sync.go#L180>
fn get_now_epoch(now_timestamp: i64, genesis_timestamp: i64, block_delay: i64) -> i64 {
now_timestamp.saturating_sub(genesis_timestamp) / block_delay
}
7 changes: 4 additions & 3 deletions src/chain_sync/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<'a> TipsetValidator<'a> {
chainstore: Arc<ChainStore<DB>>,
bad_block_cache: Arc<BadBlockCache>,
genesis_tipset: Arc<Tipset>,
block_delay: u64,
block_delay: u32,
) -> Result<(), Box<TipsetValidationError>> {
// No empty blocks
if self.0.blocks().is_empty() {
Expand Down Expand Up @@ -91,13 +91,14 @@ impl<'a> TipsetValidator<'a> {
pub fn validate_epoch(
&self,
genesis_tipset: Arc<Tipset>,
block_delay: u64,
block_delay: u32,
) -> Result<(), Box<TipsetValidationError>> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let max_epoch = ((now - genesis_tipset.min_timestamp()) / block_delay) + MAX_HEIGHT_DRIFT;
let max_epoch =
((now - genesis_tipset.min_timestamp()) / block_delay as u64) + MAX_HEIGHT_DRIFT;
let too_far_ahead_in_time = self.0.epoch() as u64 > max_epoch;
if too_far_ahead_in_time {
Err(Box::new(TipsetValidationError::EpochTooLarge))
Expand Down
29 changes: 18 additions & 11 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::key_management::{
};
use crate::libp2p::{Libp2pConfig, Libp2pService, PeerManager};
use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
use crate::networks::{ChainConfig, NetworkChain};
use crate::networks::{self, ChainConfig, NetworkChain};
use crate::rpc::start_rpc;
use crate::rpc::RPCState;
use crate::shim::address::{CurrentNetwork, Network};
Expand Down Expand Up @@ -196,6 +196,16 @@ pub(super) async fn start(
});
}

// Read Genesis file
// * When snapshot command implemented, this genesis does not need to be
// initialized
let genesis_header = read_genesis_header(
config.client.genesis_file.as_ref(),
chain_config.genesis_bytes(&db).await?.as_deref(),
&db,
)
.await?;

if config.client.enable_metrics_endpoint {
// Start Prometheus server port
let prometheus_listener = TcpListener::bind(config.client.metrics_address)
Expand All @@ -212,17 +222,14 @@ pub(super) async fn start(
.await
.context("Failed to initiate prometheus server")
});
}

// Read Genesis file
// * When snapshot command implemented, this genesis does not need to be
// initialized
let genesis_header = read_genesis_header(
config.client.genesis_file.as_ref(),
chain_config.genesis_bytes(&db).await?.as_deref(),
&db,
)
.await?;
crate::metrics::default_registry().register_collector(Box::new(
networks::metrics::NetworkHeightCollector::new(
chain_config.block_delay_secs,
genesis_header.timestamp,
),
));
}

// Initialize ChainStore
let chain_store = Arc::new(ChainStore::new(
Expand Down
13 changes: 7 additions & 6 deletions src/health/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use ahash::HashMap;
use axum::extract::{self, Query};

use crate::chain_sync::SyncStage;
use crate::{chain_sync::SyncStage, networks::calculate_expected_epoch};

use super::{AppError, ForestState};

Expand Down Expand Up @@ -109,11 +109,12 @@ fn check_sync_state_not_error(state: &Arc<ForestState>, acc: &mut MessageAccumul
/// in case of forking.
fn check_epoch_up_to_date(state: &Arc<ForestState>, acc: &mut MessageAccumulator) -> bool {
const MAX_EPOCH_DIFF: i64 = 5;
let now_epoch = chrono::Utc::now()
.timestamp()
.saturating_add(state.chain_config.block_delay_secs as i64)
.saturating_sub(state.genesis_timestamp as i64)
/ state.chain_config.block_delay_secs as i64;

let now_epoch = calculate_expected_epoch(
chrono::Utc::now().timestamp() as u64,
state.genesis_timestamp,
state.chain_config.block_delay_secs,
) as i64;

// The current epoch of the node must be not too far behind the network
if state.sync_state.read().epoch() >= now_epoch - MAX_EPOCH_DIFF {
Expand Down
47 changes: 47 additions & 0 deletions src/networks/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use prometheus_client::{collector::Collector, encoding::EncodeMetric, metrics::gauge::Gauge};

use super::calculate_expected_epoch;

#[derive(Debug)]
pub struct NetworkHeightCollector {
block_delay_secs: u32,
genesis_timestamp: u64,
network_height: Gauge,
}

impl NetworkHeightCollector {
pub fn new(block_delay_secs: u32, genesis_timestamp: u64) -> Self {
Self {
block_delay_secs,
genesis_timestamp,
network_height: Gauge::default(),
}
}
}

impl Collector for NetworkHeightCollector {
fn encode(
&self,
mut encoder: prometheus_client::encoding::DescriptorEncoder,
) -> Result<(), std::fmt::Error> {
let metric_encoder = encoder.encode_descriptor(
"expected_network_height",
"The expected network height based on the current time and the genesis block time",
None,
self.network_height.metric_type(),
)?;

let expected_epoch = calculate_expected_epoch(
chrono::Utc::now().timestamp() as u64,
self.genesis_timestamp,
self.block_delay_secs,
);
self.network_height.set(expected_epoch as i64);
self.network_height.encode(metric_encoder)?;

Ok(())
}
}
49 changes: 49 additions & 0 deletions src/networks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub mod calibnet;
pub mod devnet;
pub mod mainnet;

pub mod metrics;

/// Newest network version for all networks
pub const NEWEST_NETWORK_VERSION: NetworkVersion = NetworkVersion::V17;

Expand Down Expand Up @@ -453,6 +455,20 @@ macro_rules! make_height {
};
}

// The formula matches lotus
// ```go
// sinceGenesis := build.Clock.Now().Sub(genesisTime)
// expectedHeight := int64(sinceGenesis.Seconds()) / int64(build.BlockDelaySecs)
// ```
// See <https://github.com/filecoin-project/lotus/blob/b27c861485695d3f5bb92bcb281abc95f4d90fb6/chain/sync.go#L180>
pub fn calculate_expected_epoch(
now_timestamp: u64,
genesis_timestamp: u64,
block_delay: u32,
) -> u64 {
now_timestamp.saturating_sub(genesis_timestamp) / block_delay as u64
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -534,4 +550,37 @@ mod tests {
let epoch = get_upgrade_height_from_env("FOREST_TEST_VAR_3");
assert_eq!(epoch, None);
}

#[test]
fn test_calculate_expected_epoch() {
// now, genesis, block_delay
assert_eq!(0, calculate_expected_epoch(0, 0, 1));
assert_eq!(5, calculate_expected_epoch(5, 0, 1));

let mainnet_genesis = 1598306400;
let mainnet_block_delay = 30;

assert_eq!(
0,
calculate_expected_epoch(mainnet_genesis, mainnet_genesis, mainnet_block_delay)
);

assert_eq!(
0,
calculate_expected_epoch(
mainnet_genesis + mainnet_block_delay as u64 - 1,
mainnet_genesis,
mainnet_block_delay
)
);

assert_eq!(
1,
calculate_expected_epoch(
mainnet_genesis + mainnet_block_delay as u64,
mainnet_genesis,
mainnet_block_delay
)
);
}
}
2 changes: 1 addition & 1 deletion src/rpc/methods/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl RpcMethod<1> for SyncSubmitBlock {
ctx.chain_store.clone(),
ctx.bad_blocks.clone(),
genesis_ts,
ctx.state_manager.chain_config().block_delay_secs as u64,
ctx.state_manager.chain_config().block_delay_secs,
)
.context("failed to validate the tipset")?;

Expand Down

0 comments on commit 0c74df9

Please sign in to comment.