From 6451de6ba21f51a57904952412ae0e2484822fd7 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Tue, 16 Apr 2024 13:42:16 +0100 Subject: [PATCH] Retry fetching messages when indexing on v2 (#3613) ### Description - Retries infinitely for messages. This avoids skipping a message range and preventing the message processor loop from proceeding - Intentionally does not retry for IGP payments - this is required bc Solana IGP payment indexing is broken atm, so we can expect errors there. In practice this has not been an issue, only message indexing ### Drive-by changes ### Related issues ### Backward compatibility ### Testing --- rust/agents/relayer/src/relayer.rs | 2 + rust/agents/scraper/src/agent.rs | 5 +++ rust/agents/validator/src/validator.rs | 1 + .../templates/relayer-statefulset.yaml | 2 +- rust/hyperlane-base/src/contract_sync/mod.rs | 38 ++++++++++++++++--- rust/hyperlane-base/src/settings/base.rs | 16 ++++++-- .../config/environments/mainnet2/agent.ts | 4 +- 7 files changed, 55 insertions(+), 13 deletions(-) diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 1d62f5441e..493d00370a 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -115,6 +115,7 @@ impl BaseAgent for Relayer { dbs.iter() .map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _)) .collect(), + u32::MAX, ) .await?; let interchain_gas_payment_syncs = settings @@ -125,6 +126,7 @@ impl BaseAgent for Relayer { dbs.iter() .map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _)) .collect(), + 0, ) .await?; diff --git a/rust/agents/scraper/src/agent.rs b/rust/agents/scraper/src/agent.rs index b582f8e2ef..79dfe4e888 100644 --- a/rust/agents/scraper/src/agent.rs +++ b/rust/agents/scraper/src/agent.rs @@ -118,6 +118,7 @@ impl Scraper { self.contract_sync_metrics.clone(), db.clone(), index_settings.clone(), + 0, ) .await, ); @@ -128,6 +129,7 @@ impl Scraper { self.contract_sync_metrics.clone(), db, index_settings.clone(), + 0, ) .await, ); @@ -145,6 +147,7 @@ macro_rules! spawn_sync_task { contract_sync_metrics: Arc, db: HyperlaneSqlDb, index_settings: IndexSettings, + error_retry_count: u32, ) -> Instrumented>> { let sync = self .as_ref() @@ -154,6 +157,7 @@ macro_rules! spawn_sync_task { &metrics.clone(), &contract_sync_metrics.clone(), Arc::new(db.clone()), + error_retry_count, ) .await .unwrap(); @@ -186,6 +190,7 @@ impl Scraper { &metrics.clone(), &contract_sync_metrics.clone(), Arc::new(db.clone()), + u32::MAX, ) .await .unwrap(); diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index b27887469d..01f3b0eb97 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -74,6 +74,7 @@ impl BaseAgent for Validator { &metrics, &contract_sync_metrics, Arc::new(msg_db.clone()), + u32::MAX, ) .await? .into(); diff --git a/rust/helm/hyperlane-agent/templates/relayer-statefulset.yaml b/rust/helm/hyperlane-agent/templates/relayer-statefulset.yaml index 3f5bdead3c..93f93c6a2d 100644 --- a/rust/helm/hyperlane-agent/templates/relayer-statefulset.yaml +++ b/rust/helm/hyperlane-agent/templates/relayer-statefulset.yaml @@ -101,7 +101,7 @@ spec: # Make a query to see if the relayer's Solana index is increasing over the last 5 mins. RELAYER_SOLANA_INDEX_DERIV_QUERY=$(curl 'http://prometheus-server.monitoring.svc.cluster.local/api/v1/query' --data-urlencode 'query=deriv(hyperlane_last_known_message_nonce{phase="processor_loop", hyperlane_deployment="mainnet2", origin="solana", remote="any"}[5m])') - echo "Liveness probe: relayer Solana index deriv: $RELAYER_SOLANA_INDEX_DERIV" > /proc/1/fd/1 + echo "Liveness probe: relayer Solana index deriv: $RELAYER_SOLANA_INDEX_DERIV_QUERY" > /proc/1/fd/1 # This env var will be empty if the value is increasing, so the presence of this # env var is a signal that the relayer is unhealthy. diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index 1ba0a2290e..47b8473f96 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -1,17 +1,18 @@ use std::{ - collections::HashSet, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration, + collections::HashSet, fmt::Debug, hash::Hash, marker::PhantomData, ops::RangeInclusive, + sync::Arc, time::Duration, }; use cursor::*; use derive_new::new; use hyperlane_core::{ utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, Indexer, + HyperlaneMessage, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, Indexer, LogMeta, SequenceIndexer, }; pub use metrics::ContractSyncMetrics; use tokio::time::sleep; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use crate::settings::IndexSettings; @@ -30,6 +31,7 @@ pub struct ContractSync, I: Indexer> { db: D, indexer: I, metrics: ContractSyncMetrics, + error_retry_count: u32, _phantom: PhantomData, } @@ -74,10 +76,9 @@ where CursorAction::Query(range) => loop { debug!(?range, "Looking for for events in index range"); - let logs = match self.indexer.fetch_logs(range.clone()).await { + let logs = match self.get_logs(range.clone()).await { Ok(logs) => logs, - Err(err) => { - warn!(?err, "Failed to fetch logs"); + Err(_err) => { break SLEEP_DURATION; } }; @@ -112,6 +113,31 @@ where sleep(sleep_duration).await; } } + + #[instrument(skip(self), fields(domain = self.domain().name()))] + async fn get_logs(&self, range: RangeInclusive) -> eyre::Result> { + let mut attempt = 0; + + while attempt <= self.error_retry_count { + // Sleep before retrying + if attempt > 0 { + sleep(SLEEP_DURATION).await; + } + + match self.indexer.fetch_logs(range.clone()).await { + Ok(logs) => { + return Ok(logs); + } + Err(err) => { + warn!(?err, attempt, "Failed to fetch logs"); + } + }; + + attempt += 1; + } + + Err(eyre::eyre!("Failed to fetch logs")) + } } /// A ContractSync for syncing events using a RateLimitedContractSyncCursor diff --git a/rust/hyperlane-base/src/settings/base.rs b/rust/hyperlane-base/src/settings/base.rs index cd096abf3f..9a8b75573b 100644 --- a/rust/hyperlane-base/src/settings/base.rs +++ b/rust/hyperlane-base/src/settings/base.rs @@ -143,6 +143,7 @@ macro_rules! build_indexer_fns { metrics: &CoreMetrics, sync_metrics: &ContractSyncMetrics, db: Arc<$db>, + error_retry_count: u32, ) -> eyre::Result> { let setup = self.chain_setup(domain)?; let indexer = setup.$singular(metrics).await?; @@ -151,6 +152,7 @@ macro_rules! build_indexer_fns { db.clone(), indexer.into(), sync_metrics.clone(), + error_retry_count, ); Ok(Box::new(sync)) @@ -163,11 +165,17 @@ macro_rules! build_indexer_fns { metrics: &CoreMetrics, sync_metrics: &ContractSyncMetrics, dbs: HashMap>, + error_retry_count: u32, ) -> Result>> { - try_join_all( - domains - .map(|d| self.$singular(d, metrics, sync_metrics, dbs.get(d).unwrap().clone())), - ) + try_join_all(domains.map(|d| { + self.$singular( + d, + metrics, + sync_metrics, + dbs.get(d).unwrap().clone(), + error_retry_count, + ) + })) .await? .into_iter() .map(|i| Ok((i.domain().clone(), Arc::from(i)))) diff --git a/typescript/infra/config/environments/mainnet2/agent.ts b/typescript/infra/config/environments/mainnet2/agent.ts index 1a49f6a701..85864ee90f 100644 --- a/typescript/infra/config/environments/mainnet2/agent.ts +++ b/typescript/infra/config/environments/mainnet2/agent.ts @@ -127,7 +127,7 @@ const hyperlane: RootAgentConfig = { connectionType: AgentConnectionType.HttpFallback, docker: { repo, - tag: 'ad870de-20240321-111358', + tag: '475bd1c-20240416-105206', }, blacklist: [ ...releaseCandidateHelloworldMatchingList, @@ -145,7 +145,7 @@ const hyperlane: RootAgentConfig = { }, chainDockerOverrides: { [chainMetadata.solana.name]: { - tag: '49a581b-20240203-151524', + tag: '475bd1c-20240416-105206', }, [chainMetadata.nautilus.name]: { tag: '3b0685f-20230815-110725',