Skip to content

Commit

Permalink
Rm finality blocks and refactor checkpoint_submitter to be resilient …
Browse files Browse the repository at this point in the history
…to edge cases (#2836)

### Description

* #532 wasn't
actually fully closed out - finality_blocks was used still for indexing.
Sadly testnet4 infra wasn't configuring finality blocks anymore, so we
weren't indexing only finalized blocks. Moved fully to reorg_period
* Refactored `checkpoint_submitter` in light of races revealed by the
above ^ problem. It used to assume that message indexing and the
`latest_checkpoint()` call would align with one another, and wasn't
resilient to the tree already being past the correctness checkpoint. A
couple situations were possible before that aren't now:
a. Indexing is ahead of the latest_checkpoint() call, which will result
in tree ingesting the new indexed messages and the tree being ahead of
the correctness checkpoint :(
b. It's possible for the tree() call that constructs the tree initially
to be made against a block that's after the next latest_checkpoint()
call, which would result in the tree being ahead of the correctness
checkpoint from the very beginning :(

### Drive-by changes

removed a function that wasn't being used anymore

### Related issues

#532

### Backward compatibility

Removes finality blocks entirely

### Testing

Builds, e2e
  • Loading branch information
tkporter authored Oct 24, 2023
1 parent 3efaafb commit 88346bf
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 135 deletions.
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ mod test {
ChainConf {
domain: domain.clone(),
signer: Default::default(),
finality_blocks: Default::default(),
reorg_period: Default::default(),
addresses: Default::default(),
connection: ChainConnectionConf::Ethereum(hyperlane_ethereum::ConnectionConf::Http {
url: "http://example.com".parse().unwrap(),
Expand Down
237 changes: 156 additions & 81 deletions rust/agents/validator/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,92 @@ impl ValidatorSubmitter {
}
}

#[instrument(err, skip(self, tree), fields(domain=%self.merkle_tree_hook.domain()))]
pub(crate) async fn checkpoint_submitter(
/// Submits signed checkpoints from index 0 until the target checkpoint (inclusive).
/// Runs idly forever once the target checkpoint is reached to avoid exiting the task.
#[instrument(err, skip(self), fields(domain=%self.merkle_tree_hook.domain()))]
pub(crate) async fn backfill_checkpoint_submitter(
self,
mut tree: IncrementalMerkle,
target_checkpoint: Option<Checkpoint>,
target_checkpoint: Checkpoint,
) -> Result<()> {
let mut checkpoint_queue = vec![];
let mut tree = IncrementalMerkle::default();
self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &target_checkpoint)
.await?;

let mut reached_target = false;
info!(
?target_checkpoint,
"Backfill checkpoint submitter successfully reached target checkpoint"
);

while !reached_target {
let correctness_checkpoint = if let Some(c) = target_checkpoint {
c
} else {
// lag by reorg period to match message indexing
let latest_checkpoint = self
.merkle_tree_hook
.latest_checkpoint(self.reorg_period)
.await?;
self.metrics
.latest_checkpoint_observed
.set(latest_checkpoint.index as i64);
latest_checkpoint
};
// TODO: remove this once validator is tolerant of tasks exiting.
loop {
sleep(Duration::from_secs(u64::MAX)).await;
}
}

/// Submits signed checkpoints indefinitely, starting from the `tree`.
#[instrument(err, skip(self, tree), fields(domain=%self.merkle_tree_hook.domain()))]
pub(crate) async fn checkpoint_submitter(self, mut tree: IncrementalMerkle) -> Result<()> {
loop {
// Lag by reorg period because this is our correctness checkpoint.
let latest_checkpoint = self
.merkle_tree_hook
.latest_checkpoint(self.reorg_period)
.await?;
self.metrics
.latest_checkpoint_observed
.set(latest_checkpoint.index as i64);

// This may occur e.g. if RPC providers are unreliable and make calls against
// inconsistent block tips.
//
// In this case, we just sleep a bit until we fetch a new latest checkpoint
// that at least meets the tree.
if tree_exceeds_checkpoint(&latest_checkpoint, &tree) {
debug!(
?latest_checkpoint,
tree_count = tree.count(),
"Latest checkpoint is behind tree, sleeping briefly"
);
sleep(self.interval).await;
continue;
}

// ingest available messages from DB
while let Some(insertion) = self
self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &latest_checkpoint)
.await?;

self.metrics
.latest_checkpoint_processed
.set(latest_checkpoint.index as i64);

sleep(self.interval).await;
}
}

/// Submits signed checkpoints relating to the given tree until the correctness checkpoint (inclusive).
/// Only submits the signed checkpoints once the correctness checkpoint is reached.
async fn submit_checkpoints_until_correctness_checkpoint(
&self,
tree: &mut IncrementalMerkle,
correctness_checkpoint: &Checkpoint,
) -> Result<()> {
// This should never be called with a tree that is ahead of the correctness checkpoint.
assert!(
!tree_exceeds_checkpoint(correctness_checkpoint, tree),
"tree (count: {}) is ahead of correctness checkpoint {:?}",
tree.count(),
correctness_checkpoint,
);

// All intermediate checkpoints will be stored here and signed once the correctness
// checkpoint is reached.
let mut checkpoint_queue = vec![];

// If the correctness checkpoint is ahead of the tree, we need to ingest more messages.
//
// tree.index() will panic if the tree is empty, so we use tree.count() instead
// and convert the correctness_checkpoint.index to a count by adding 1.
while correctness_checkpoint.index + 1 > tree.count() as u32 {
if let Some(insertion) = self
.message_db
.retrieve_merkle_tree_insertion_by_leaf_index(&(tree.count() as u32))?
{
Expand All @@ -96,76 +155,85 @@ impl ValidatorSubmitter {
let message_id = insertion.message_id();
tree.ingest(message_id);

let checkpoint = self.checkpoint(&tree);
let checkpoint = self.checkpoint(tree);

checkpoint_queue.push(CheckpointWithMessageId {
checkpoint,
message_id,
});
} else {
// If we haven't yet indexed the next merkle tree insertion but know that
// it will soon exist (because we know the correctness checkpoint), wait a bit and
// try again.
sleep(Duration::from_millis(100)).await
}
}

if checkpoint.index == correctness_checkpoint.index {
// We got to the right height, now lets compare whether we got the right tree
if checkpoint.root != correctness_checkpoint.root {
// Bad news, bail
error!(
?checkpoint,
?correctness_checkpoint,
"Incorrect tree root, something went wrong"
);
bail!("Incorrect tree root, something went wrong");
}
}
// At this point we know that correctness_checkpoint.index == tree.index().
assert_eq!(
correctness_checkpoint.index,
tree.index(),
"correctness checkpoint index {} != tree index {}",
correctness_checkpoint.index,
tree.index(),
);

let checkpoint = self.checkpoint(tree);

// If the tree's checkpoint doesn't match the correctness checkpoint, something went wrong
// and we bail loudly.
if checkpoint != *correctness_checkpoint {
error!(
?checkpoint,
?correctness_checkpoint,
"Incorrect tree root, something went wrong"
);
bail!("Incorrect tree root, something went wrong");
}

// compare against every queued checkpoint to prevent ingesting past target
if checkpoint == correctness_checkpoint {
debug!(index = checkpoint.index, "Reached tree consistency");

// drain and sign all checkpoints in the queue
for queued_checkpoint in checkpoint_queue.drain(..) {
let existing = self
.checkpoint_syncer
.fetch_checkpoint(queued_checkpoint.index)
.await?;
if existing.is_some() {
debug!(
index = queued_checkpoint.index,
"Checkpoint already submitted"
);
continue;
}

let signed_checkpoint = self.signer.sign(queued_checkpoint).await?;
self.checkpoint_syncer
.write_checkpoint(&signed_checkpoint)
.await?;
debug!(
index = queued_checkpoint.index,
"Signed and submitted checkpoint"
);

// small sleep before signing next checkpoint to avoid rate limiting
sleep(Duration::from_millis(100)).await;
}

info!(index = checkpoint.index, "Signed all queued checkpoints");

self.metrics
.latest_checkpoint_processed
.set(checkpoint.index as i64);

// break out of submitter loop if target checkpoint is reached
reached_target = target_checkpoint.is_some();
break;
}
debug!(index = checkpoint.index, "Reached tree consistency");

self.sign_and_submit_checkpoints(checkpoint_queue).await?;

info!(
index = checkpoint.index,
"Signed all queued checkpoints until index"
);

Ok(())
}

/// Signs and submits any previously unsubmitted checkpoints.
async fn sign_and_submit_checkpoints(
&self,
checkpoints: Vec<CheckpointWithMessageId>,
) -> Result<()> {
for queued_checkpoint in checkpoints {
let existing = self
.checkpoint_syncer
.fetch_checkpoint(queued_checkpoint.index)
.await?;
if existing.is_some() {
debug!(
index = queued_checkpoint.index,
"Checkpoint already submitted"
);
continue;
}

sleep(self.interval).await;
}
let signed_checkpoint = self.signer.sign(queued_checkpoint).await?;
self.checkpoint_syncer
.write_checkpoint(&signed_checkpoint)
.await?;
debug!(
index = queued_checkpoint.index,
"Signed and submitted checkpoint"
);

// TODO: remove this once validator is tolerant of tasks exiting
loop {
sleep(Duration::from_secs(u64::MAX)).await;
// small sleep before signing next checkpoint to avoid rate limiting
sleep(Duration::from_millis(100)).await;
}
Ok(())
}

pub(crate) async fn legacy_checkpoint_submitter(self) -> Result<()> {
Expand Down Expand Up @@ -249,6 +317,13 @@ impl ValidatorSubmitter {
}
}

/// Returns whether the tree exceeds the checkpoint.
fn tree_exceeds_checkpoint(checkpoint: &Checkpoint, tree: &IncrementalMerkle) -> bool {
// tree.index() will panic if the tree is empty, so we use tree.count() instead
// and convert the correctness_checkpoint.index to a count by adding 1.
checkpoint.index + 1 < tree.count() as u32
}

#[derive(Clone)]
pub(crate) struct ValidatorSubmitterMetrics {
latest_checkpoint_observed: IntGauge,
Expand Down
14 changes: 8 additions & 6 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use hyperlane_base::{
WatermarkContractSync,
};
use hyperlane_core::{
accumulator::incremental::IncrementalMerkle, Announcement, ChainResult, HyperlaneChain,
HyperlaneContract, HyperlaneDomain, HyperlaneSigner, HyperlaneSignerExt, Mailbox,
MerkleTreeHook, MerkleTreeInsertion, TxOutcome, ValidatorAnnounce, H256, U256,
Announcement, ChainResult, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneSigner,
HyperlaneSignerExt, Mailbox, MerkleTreeHook, MerkleTreeInsertion, TxOutcome, ValidatorAnnounce,
H256, U256,
};
use hyperlane_ethereum::{SingletonSigner, SingletonSignerHandle};
use tokio::{task::JoinHandle, time::sleep};
Expand Down Expand Up @@ -167,13 +167,15 @@ impl Validator {
ValidatorSubmitterMetrics::new(&self.core.metrics, &self.origin_chain),
);

let empty_tree = IncrementalMerkle::default();
let reorg_period = NonZeroU64::new(self.reorg_period);
let tip_tree = self
.merkle_tree_hook
.tree(reorg_period)
.await
.expect("failed to get merkle tree");
// This function is only called after we have already checked that the
// merkle tree hook has count > 0, but we assert to be extra sure this is
// the case.
assert!(tip_tree.count() > 0, "merkle tree is empty");
let backfill_target = submitter.checkpoint(&tip_tree);

Expand All @@ -184,14 +186,14 @@ impl Validator {
tasks.push(
tokio::spawn(async move {
backfill_submitter
.checkpoint_submitter(empty_tree, Some(backfill_target))
.backfill_checkpoint_submitter(backfill_target)
.await
})
.instrument(info_span!("BackfillCheckpointSubmitter")),
);

tasks.push(
tokio::spawn(async move { submitter.checkpoint_submitter(tip_tree, None).await })
tokio::spawn(async move { submitter.checkpoint_submitter(tip_tree).await })
.instrument(info_span!("TipCheckpointSubmitter")),
);
tasks.push(
Expand Down
12 changes: 6 additions & 6 deletions rust/chains/hyperlane-ethereum/src/interchain_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where

pub struct InterchainGasPaymasterIndexerBuilder {
pub mailbox_address: H160,
pub finality_blocks: u32,
pub reorg_period: u32,
}

#[async_trait]
Expand All @@ -46,7 +46,7 @@ impl BuildableWithProvider for InterchainGasPaymasterIndexerBuilder {
Box::new(EthereumInterchainGasPaymasterIndexer::new(
Arc::new(provider),
locator,
self.finality_blocks,
self.reorg_period,
))
}
}
Expand All @@ -59,22 +59,22 @@ where
{
contract: Arc<EthereumInterchainGasPaymasterInternal<M>>,
provider: Arc<M>,
finality_blocks: u32,
reorg_period: u32,
}

impl<M> EthereumInterchainGasPaymasterIndexer<M>
where
M: Middleware + 'static,
{
/// Create new EthereumInterchainGasPaymasterIndexer
pub fn new(provider: Arc<M>, locator: &ContractLocator, finality_blocks: u32) -> Self {
pub fn new(provider: Arc<M>, locator: &ContractLocator, reorg_period: u32) -> Self {
Self {
contract: Arc::new(EthereumInterchainGasPaymasterInternal::new(
locator.address,
provider.clone(),
)),
provider,
finality_blocks,
reorg_period,
}
}
}
Expand Down Expand Up @@ -121,7 +121,7 @@ where
.await
.map_err(ChainCommunicationError::from_other)?
.as_u32()
.saturating_sub(self.finality_blocks))
.saturating_sub(self.reorg_period))
}
}

Expand Down
Loading

0 comments on commit 88346bf

Please sign in to comment.