Skip to content

Commit

Permalink
Retry fetching messages when indexing on v2 (#3613)
Browse files Browse the repository at this point in the history
### Description

- Retries infinitely for messages. This avoids skipping a message range
and preventing the message processor loop from proceeding
- Intentionally does not retry for IGP payments - this is required bc
Solana IGP payment indexing is broken atm, so we can expect errors
there. In practice this has not been an issue, only message indexing

### Drive-by changes

<!--
Are there any minor or drive-by changes also included?
-->

### Related issues

<!--
- Fixes #[issue number here]
-->

### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

<!--
What kind of testing have these changes undergone?

None/Manual/Unit Tests
-->
  • Loading branch information
tkporter authored Apr 16, 2024
1 parent 30e9107 commit 6451de6
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 13 deletions.
2 changes: 2 additions & 0 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl BaseAgent for Relayer {
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
.collect(),
u32::MAX,
)
.await?;
let interchain_gas_payment_syncs = settings
Expand All @@ -125,6 +126,7 @@ impl BaseAgent for Relayer {
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
.collect(),
0,
)
.await?;

Expand Down
5 changes: 5 additions & 0 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl Scraper {
self.contract_sync_metrics.clone(),
db.clone(),
index_settings.clone(),
0,
)
.await,
);
Expand All @@ -128,6 +129,7 @@ impl Scraper {
self.contract_sync_metrics.clone(),
db,
index_settings.clone(),
0,
)
.await,
);
Expand All @@ -145,6 +147,7 @@ macro_rules! spawn_sync_task {
contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb,
index_settings: IndexSettings,
error_retry_count: u32,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let sync = self
.as_ref()
Expand All @@ -154,6 +157,7 @@ macro_rules! spawn_sync_task {
&metrics.clone(),
&contract_sync_metrics.clone(),
Arc::new(db.clone()),
error_retry_count,
)
.await
.unwrap();
Expand Down Expand Up @@ -186,6 +190,7 @@ impl Scraper {
&metrics.clone(),
&contract_sync_metrics.clone(),
Arc::new(db.clone()),
u32::MAX,
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl BaseAgent for Validator {
&metrics,
&contract_sync_metrics,
Arc::new(msg_db.clone()),
u32::MAX,
)
.await?
.into();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ spec:
# Make a query to see if the relayer's Solana index is increasing over the last 5 mins.
RELAYER_SOLANA_INDEX_DERIV_QUERY=$(curl 'http://prometheus-server.monitoring.svc.cluster.local/api/v1/query' --data-urlencode 'query=deriv(hyperlane_last_known_message_nonce{phase="processor_loop", hyperlane_deployment="mainnet2", origin="solana", remote="any"}[5m])')
echo "Liveness probe: relayer Solana index deriv: $RELAYER_SOLANA_INDEX_DERIV" > /proc/1/fd/1
echo "Liveness probe: relayer Solana index deriv: $RELAYER_SOLANA_INDEX_DERIV_QUERY" > /proc/1/fd/1
# This env var will be empty if the value is increasing, so the presence of this
# env var is a signal that the relayer is unhealthy.
Expand Down
38 changes: 32 additions & 6 deletions rust/hyperlane-base/src/contract_sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::{
collections::HashSet, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration,
collections::HashSet, fmt::Debug, hash::Hash, marker::PhantomData, ops::RangeInclusive,
sync::Arc, time::Duration,
};

use cursor::*;
use derive_new::new;
use hyperlane_core::{
utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore,
HyperlaneMessage, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, Indexer,
HyperlaneMessage, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, Indexer, LogMeta,
SequenceIndexer,
};
pub use metrics::ContractSyncMetrics;
use tokio::time::sleep;
use tracing::{debug, info, warn};
use tracing::{debug, info, instrument, warn};

use crate::settings::IndexSettings;

Expand All @@ -30,6 +31,7 @@ pub struct ContractSync<T, D: HyperlaneLogStore<T>, I: Indexer<T>> {
db: D,
indexer: I,
metrics: ContractSyncMetrics,
error_retry_count: u32,
_phantom: PhantomData<T>,
}

Expand Down Expand Up @@ -74,10 +76,9 @@ where
CursorAction::Query(range) => loop {
debug!(?range, "Looking for for events in index range");

let logs = match self.indexer.fetch_logs(range.clone()).await {
let logs = match self.get_logs(range.clone()).await {
Ok(logs) => logs,
Err(err) => {
warn!(?err, "Failed to fetch logs");
Err(_err) => {
break SLEEP_DURATION;
}
};
Expand Down Expand Up @@ -112,6 +113,31 @@ where
sleep(sleep_duration).await;
}
}

#[instrument(skip(self), fields(domain = self.domain().name()))]
async fn get_logs(&self, range: RangeInclusive<u32>) -> eyre::Result<Vec<(T, LogMeta)>> {
let mut attempt = 0;

while attempt <= self.error_retry_count {
// Sleep before retrying
if attempt > 0 {
sleep(SLEEP_DURATION).await;
}

match self.indexer.fetch_logs(range.clone()).await {
Ok(logs) => {
return Ok(logs);
}
Err(err) => {
warn!(?err, attempt, "Failed to fetch logs");
}
};

attempt += 1;
}

Err(eyre::eyre!("Failed to fetch logs"))
}
}

/// A ContractSync for syncing events using a RateLimitedContractSyncCursor
Expand Down
16 changes: 12 additions & 4 deletions rust/hyperlane-base/src/settings/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ macro_rules! build_indexer_fns {
metrics: &CoreMetrics,
sync_metrics: &ContractSyncMetrics,
db: Arc<$db>,
error_retry_count: u32,
) -> eyre::Result<Box<$ret>> {
let setup = self.chain_setup(domain)?;
let indexer = setup.$singular(metrics).await?;
Expand All @@ -151,6 +152,7 @@ macro_rules! build_indexer_fns {
db.clone(),
indexer.into(),
sync_metrics.clone(),
error_retry_count,
);

Ok(Box::new(sync))
Expand All @@ -163,11 +165,17 @@ macro_rules! build_indexer_fns {
metrics: &CoreMetrics,
sync_metrics: &ContractSyncMetrics,
dbs: HashMap<HyperlaneDomain, Arc<$db>>,
error_retry_count: u32,
) -> Result<HashMap<HyperlaneDomain, Arc<$ret>>> {
try_join_all(
domains
.map(|d| self.$singular(d, metrics, sync_metrics, dbs.get(d).unwrap().clone())),
)
try_join_all(domains.map(|d| {
self.$singular(
d,
metrics,
sync_metrics,
dbs.get(d).unwrap().clone(),
error_retry_count,
)
}))
.await?
.into_iter()
.map(|i| Ok((i.domain().clone(), Arc::from(i))))
Expand Down
4 changes: 2 additions & 2 deletions typescript/infra/config/environments/mainnet2/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ const hyperlane: RootAgentConfig = {
connectionType: AgentConnectionType.HttpFallback,
docker: {
repo,
tag: 'ad870de-20240321-111358',
tag: '475bd1c-20240416-105206',
},
blacklist: [
...releaseCandidateHelloworldMatchingList,
Expand All @@ -145,7 +145,7 @@ const hyperlane: RootAgentConfig = {
},
chainDockerOverrides: {
[chainMetadata.solana.name]: {
tag: '49a581b-20240203-151524',
tag: '475bd1c-20240416-105206',
},
[chainMetadata.nautilus.name]: {
tag: '3b0685f-20230815-110725',
Expand Down

0 comments on commit 6451de6

Please sign in to comment.