Skip to content

Commit

Permalink
Merge branch 'v3' into mainnet3-env-switch
Browse files Browse the repository at this point in the history
  • Loading branch information
yorhodes committed Oct 24, 2023
2 parents 31b306c + 88346bf commit 6052260
Show file tree
Hide file tree
Showing 69 changed files with 4,204 additions and 1,546 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust-skipped.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: rust

on:
pull_request:
branches: [main]
branches: [main, v3]
paths-ignore:
- 'rust/**'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,36 +41,43 @@ impl MultisigIsmMetadataBuilder for MerkleRootMultisigMetadataBuilder {
self.highest_known_leaf_index().await,
debug!("Couldn't get highest known leaf index")
);
unwrap_or_none_result!(
leaf_index,
self.get_merkle_leaf_id_by_message_id(message.id())
.await
.context(CTX)?,
debug!(
?message,
"No merkle leaf found for message id, must have not been enqueued in the tree"
)
);
unwrap_or_none_result!(
quorum_checkpoint,
checkpoint_syncer
.fetch_checkpoint_in_range(
validators,
threshold as usize,
message.nonce,
leaf_index,
highest_leaf_index
)
.await
.context(CTX)?,
debug!("Couldn't get checkpoint in range")
debug!(
leaf_index,
highest_leaf_index, "Couldn't get checkpoint in range"
)
);
unwrap_or_none_result!(
proof,
self.get_proof(message.nonce, quorum_checkpoint.checkpoint.checkpoint)
.await
.context(CTX)?
);
unwrap_or_none_result!(
merkle_leaf_id,
self.get_merkle_leaf_id_by_message_id(message.id())
self.get_proof(leaf_index, quorum_checkpoint.checkpoint.checkpoint)
.await
.context(CTX)?,
debug!("Couldn't get merkle proof")
debug!(leaf_index, checkpoint=?quorum_checkpoint, "Couldn't get proof")
);
Ok(Some(MultisigMetadata::new(
quorum_checkpoint.checkpoint.checkpoint,
quorum_checkpoint.signatures,
Some(merkle_leaf_id),
Some(leaf_index),
Some(quorum_checkpoint.checkpoint.message_id),
Some(proof),
)))
Expand Down
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ mod test {
ChainConf {
domain: domain.clone(),
signer: Default::default(),
finality_blocks: Default::default(),
reorg_period: Default::default(),
addresses: Default::default(),
connection: ChainConnectionConf::Ethereum(hyperlane_ethereum::ConnectionConf::Http {
url: "http://example.com".parse().unwrap(),
Expand Down
237 changes: 156 additions & 81 deletions rust/agents/validator/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,92 @@ impl ValidatorSubmitter {
}
}

#[instrument(err, skip(self, tree), fields(domain=%self.merkle_tree_hook.domain()))]
pub(crate) async fn checkpoint_submitter(
/// Submits signed checkpoints from index 0 until the target checkpoint (inclusive).
/// Runs idly forever once the target checkpoint is reached to avoid exiting the task.
#[instrument(err, skip(self), fields(domain=%self.merkle_tree_hook.domain()))]
pub(crate) async fn backfill_checkpoint_submitter(
self,
mut tree: IncrementalMerkle,
target_checkpoint: Option<Checkpoint>,
target_checkpoint: Checkpoint,
) -> Result<()> {
let mut checkpoint_queue = vec![];
let mut tree = IncrementalMerkle::default();
self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &target_checkpoint)
.await?;

let mut reached_target = false;
info!(
?target_checkpoint,
"Backfill checkpoint submitter successfully reached target checkpoint"
);

while !reached_target {
let correctness_checkpoint = if let Some(c) = target_checkpoint {
c
} else {
// lag by reorg period to match message indexing
let latest_checkpoint = self
.merkle_tree_hook
.latest_checkpoint(self.reorg_period)
.await?;
self.metrics
.latest_checkpoint_observed
.set(latest_checkpoint.index as i64);
latest_checkpoint
};
// TODO: remove this once validator is tolerant of tasks exiting.
loop {
sleep(Duration::from_secs(u64::MAX)).await;
}
}

/// Submits signed checkpoints indefinitely, starting from the `tree`.
#[instrument(err, skip(self, tree), fields(domain=%self.merkle_tree_hook.domain()))]
pub(crate) async fn checkpoint_submitter(self, mut tree: IncrementalMerkle) -> Result<()> {
loop {
// Lag by reorg period because this is our correctness checkpoint.
let latest_checkpoint = self
.merkle_tree_hook
.latest_checkpoint(self.reorg_period)
.await?;
self.metrics
.latest_checkpoint_observed
.set(latest_checkpoint.index as i64);

// This may occur e.g. if RPC providers are unreliable and make calls against
// inconsistent block tips.
//
// In this case, we just sleep a bit until we fetch a new latest checkpoint
// that at least meets the tree.
if tree_exceeds_checkpoint(&latest_checkpoint, &tree) {
debug!(
?latest_checkpoint,
tree_count = tree.count(),
"Latest checkpoint is behind tree, sleeping briefly"
);
sleep(self.interval).await;
continue;
}

// ingest available messages from DB
while let Some(insertion) = self
self.submit_checkpoints_until_correctness_checkpoint(&mut tree, &latest_checkpoint)
.await?;

self.metrics
.latest_checkpoint_processed
.set(latest_checkpoint.index as i64);

sleep(self.interval).await;
}
}

/// Submits signed checkpoints relating to the given tree until the correctness checkpoint (inclusive).
/// Only submits the signed checkpoints once the correctness checkpoint is reached.
async fn submit_checkpoints_until_correctness_checkpoint(
&self,
tree: &mut IncrementalMerkle,
correctness_checkpoint: &Checkpoint,
) -> Result<()> {
// This should never be called with a tree that is ahead of the correctness checkpoint.
assert!(
!tree_exceeds_checkpoint(correctness_checkpoint, tree),
"tree (count: {}) is ahead of correctness checkpoint {:?}",
tree.count(),
correctness_checkpoint,
);

// All intermediate checkpoints will be stored here and signed once the correctness
// checkpoint is reached.
let mut checkpoint_queue = vec![];

// If the correctness checkpoint is ahead of the tree, we need to ingest more messages.
//
// tree.index() will panic if the tree is empty, so we use tree.count() instead
// and convert the correctness_checkpoint.index to a count by adding 1.
while correctness_checkpoint.index + 1 > tree.count() as u32 {
if let Some(insertion) = self
.message_db
.retrieve_merkle_tree_insertion_by_leaf_index(&(tree.count() as u32))?
{
Expand All @@ -96,76 +155,85 @@ impl ValidatorSubmitter {
let message_id = insertion.message_id();
tree.ingest(message_id);

let checkpoint = self.checkpoint(&tree);
let checkpoint = self.checkpoint(tree);

checkpoint_queue.push(CheckpointWithMessageId {
checkpoint,
message_id,
});
} else {
// If we haven't yet indexed the next merkle tree insertion but know that
// it will soon exist (because we know the correctness checkpoint), wait a bit and
// try again.
sleep(Duration::from_millis(100)).await
}
}

if checkpoint.index == correctness_checkpoint.index {
// We got to the right height, now lets compare whether we got the right tree
if checkpoint.root != correctness_checkpoint.root {
// Bad news, bail
error!(
?checkpoint,
?correctness_checkpoint,
"Incorrect tree root, something went wrong"
);
bail!("Incorrect tree root, something went wrong");
}
}
// At this point we know that correctness_checkpoint.index == tree.index().
assert_eq!(
correctness_checkpoint.index,
tree.index(),
"correctness checkpoint index {} != tree index {}",
correctness_checkpoint.index,
tree.index(),
);

let checkpoint = self.checkpoint(tree);

// If the tree's checkpoint doesn't match the correctness checkpoint, something went wrong
// and we bail loudly.
if checkpoint != *correctness_checkpoint {
error!(
?checkpoint,
?correctness_checkpoint,
"Incorrect tree root, something went wrong"
);
bail!("Incorrect tree root, something went wrong");
}

// compare against every queued checkpoint to prevent ingesting past target
if checkpoint == correctness_checkpoint {
debug!(index = checkpoint.index, "Reached tree consistency");

// drain and sign all checkpoints in the queue
for queued_checkpoint in checkpoint_queue.drain(..) {
let existing = self
.checkpoint_syncer
.fetch_checkpoint(queued_checkpoint.index)
.await?;
if existing.is_some() {
debug!(
index = queued_checkpoint.index,
"Checkpoint already submitted"
);
continue;
}

let signed_checkpoint = self.signer.sign(queued_checkpoint).await?;
self.checkpoint_syncer
.write_checkpoint(&signed_checkpoint)
.await?;
debug!(
index = queued_checkpoint.index,
"Signed and submitted checkpoint"
);

// small sleep before signing next checkpoint to avoid rate limiting
sleep(Duration::from_millis(100)).await;
}

info!(index = checkpoint.index, "Signed all queued checkpoints");

self.metrics
.latest_checkpoint_processed
.set(checkpoint.index as i64);

// break out of submitter loop if target checkpoint is reached
reached_target = target_checkpoint.is_some();
break;
}
debug!(index = checkpoint.index, "Reached tree consistency");

self.sign_and_submit_checkpoints(checkpoint_queue).await?;

info!(
index = checkpoint.index,
"Signed all queued checkpoints until index"
);

Ok(())
}

/// Signs and submits any previously unsubmitted checkpoints.
async fn sign_and_submit_checkpoints(
&self,
checkpoints: Vec<CheckpointWithMessageId>,
) -> Result<()> {
for queued_checkpoint in checkpoints {
let existing = self
.checkpoint_syncer
.fetch_checkpoint(queued_checkpoint.index)
.await?;
if existing.is_some() {
debug!(
index = queued_checkpoint.index,
"Checkpoint already submitted"
);
continue;
}

sleep(self.interval).await;
}
let signed_checkpoint = self.signer.sign(queued_checkpoint).await?;
self.checkpoint_syncer
.write_checkpoint(&signed_checkpoint)
.await?;
debug!(
index = queued_checkpoint.index,
"Signed and submitted checkpoint"
);

// TODO: remove this once validator is tolerant of tasks exiting
loop {
sleep(Duration::from_secs(u64::MAX)).await;
// small sleep before signing next checkpoint to avoid rate limiting
sleep(Duration::from_millis(100)).await;
}
Ok(())
}

pub(crate) async fn legacy_checkpoint_submitter(self) -> Result<()> {
Expand Down Expand Up @@ -249,6 +317,13 @@ impl ValidatorSubmitter {
}
}

/// Returns whether the tree exceeds the checkpoint.
fn tree_exceeds_checkpoint(checkpoint: &Checkpoint, tree: &IncrementalMerkle) -> bool {
// tree.index() will panic if the tree is empty, so we use tree.count() instead
// and convert the correctness_checkpoint.index to a count by adding 1.
checkpoint.index + 1 < tree.count() as u32
}

#[derive(Clone)]
pub(crate) struct ValidatorSubmitterMetrics {
latest_checkpoint_observed: IntGauge,
Expand Down
14 changes: 8 additions & 6 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use hyperlane_base::{
WatermarkContractSync,
};
use hyperlane_core::{
accumulator::incremental::IncrementalMerkle, Announcement, ChainResult, HyperlaneChain,
HyperlaneContract, HyperlaneDomain, HyperlaneSigner, HyperlaneSignerExt, Mailbox,
MerkleTreeHook, MerkleTreeInsertion, TxOutcome, ValidatorAnnounce, H256, U256,
Announcement, ChainResult, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneSigner,
HyperlaneSignerExt, Mailbox, MerkleTreeHook, MerkleTreeInsertion, TxOutcome, ValidatorAnnounce,
H256, U256,
};
use hyperlane_ethereum::{SingletonSigner, SingletonSignerHandle};
use tokio::{task::JoinHandle, time::sleep};
Expand Down Expand Up @@ -167,13 +167,15 @@ impl Validator {
ValidatorSubmitterMetrics::new(&self.core.metrics, &self.origin_chain),
);

let empty_tree = IncrementalMerkle::default();
let reorg_period = NonZeroU64::new(self.reorg_period);
let tip_tree = self
.merkle_tree_hook
.tree(reorg_period)
.await
.expect("failed to get merkle tree");
// This function is only called after we have already checked that the
// merkle tree hook has count > 0, but we assert to be extra sure this is
// the case.
assert!(tip_tree.count() > 0, "merkle tree is empty");
let backfill_target = submitter.checkpoint(&tip_tree);

Expand All @@ -184,14 +186,14 @@ impl Validator {
tasks.push(
tokio::spawn(async move {
backfill_submitter
.checkpoint_submitter(empty_tree, Some(backfill_target))
.backfill_checkpoint_submitter(backfill_target)
.await
})
.instrument(info_span!("BackfillCheckpointSubmitter")),
);

tasks.push(
tokio::spawn(async move { submitter.checkpoint_submitter(tip_tree, None).await })
tokio::spawn(async move { submitter.checkpoint_submitter(tip_tree).await })
.instrument(info_span!("TipCheckpointSubmitter")),
);
tasks.push(
Expand Down
Loading

0 comments on commit 6052260

Please sign in to comment.