Skip to content

Commit

Permalink
feat: Support SequenceAware cursors for Deliveries (#4889)
Browse files Browse the repository at this point in the history
### Description

Support SequenceAware cursors for Deliveries

### Related issues

- Contributes into
#4271

### Backward compatibility

Yes (need adding column `sequence` to `delivered_message` table before
merging

### Testing

E2E Ethereum and Sealevel tests

---------

Co-authored-by: Danil Nemirovsky <[email protected]>
  • Loading branch information
ameten and ameten authored Dec 2, 2024
1 parent 665a7b8 commit a94c50f
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl MigrationTrait for Migration {
.big_integer()
.not_null(),
)
.col(ColumnDef::new(DeliveredMessage::Sequence).big_integer())
.foreign_key(
ForeignKey::create()
.from_col(DeliveredMessage::Domain)
Expand Down Expand Up @@ -105,4 +106,6 @@ pub enum DeliveredMessage {
DestinationMailbox,
/// Transaction the delivery was included in
DestinationTxId,
/// Sequence when message was delivered
Sequence,
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Model {
pub domain: i32,
pub destination_mailbox: Vec<u8>,
pub destination_tx_id: i64,
pub sequence: Option<i64>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
Expand All @@ -29,6 +30,7 @@ pub enum Column {
Domain,
DestinationMailbox,
DestinationTxId,
Sequence,
}

#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
Expand Down Expand Up @@ -59,6 +61,7 @@ impl ColumnTrait for Column {
Self::Domain => ColumnType::Integer.def(),
Self::DestinationMailbox => ColumnType::Binary(BlobSize::Blob(None)).def(),
Self::DestinationTxId => ColumnType::BigInteger.def(),
Self::Sequence => ColumnType::BigInteger.def().null(),
}
}
}
Expand Down
142 changes: 97 additions & 45 deletions rust/main/agents/scraper/src/db/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sea_orm::{prelude::*, ActiveValue::*, DeriveColumn, EnumIter, Insert, QueryS
use tracing::{debug, instrument, trace};

use hyperlane_core::{
address_to_bytes, bytes_to_address, h256_to_bytes, HyperlaneMessage, LogMeta, H256,
address_to_bytes, bytes_to_address, h256_to_bytes, Delivery, HyperlaneMessage, LogMeta, H256,
};
use migration::OnConflict;

Expand All @@ -18,6 +18,7 @@ use super::generated::{delivered_message, message};
#[derive(Debug, Clone)]
pub struct StorableDelivery<'a> {
pub message_id: H256,
pub sequence: Option<i64>,
pub meta: &'a LogMeta,
/// The database id of the transaction the delivery event occurred in
pub txn_id: i64,
Expand All @@ -31,64 +32,54 @@ pub struct StorableMessage<'a> {
}

impl ScraperDb {
/// Get the dispatched message associated with a nonce.
/// Get the delivered message associated with a sequence.
#[instrument(skip(self))]
pub async fn retrieve_message_by_nonce(
pub async fn retrieve_delivery_by_sequence(
&self,
origin_domain: u32,
origin_mailbox: &H256,
nonce: u32,
) -> Result<Option<HyperlaneMessage>> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Nonce,
}
if let Some(message) = message::Entity::find()
.filter(message::Column::Origin.eq(origin_domain))
.filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox)))
.filter(message::Column::Nonce.eq(nonce))
destination_domain: u32,
destination_mailbox: &H256,
sequence: u32,
) -> Result<Option<Delivery>> {
if let Some(delivery) = delivered_message::Entity::find()
.filter(delivered_message::Column::Domain.eq(destination_domain))
.filter(
delivered_message::Column::DestinationMailbox
.eq(address_to_bytes(destination_mailbox)),
)
.filter(delivered_message::Column::Sequence.eq(sequence))
.one(&self.0)
.await?
{
Ok(Some(HyperlaneMessage {
// We do not write version to the DB.
version: 3,
origin: message.origin as u32,
destination: message.destination as u32,
nonce: message.nonce as u32,
sender: bytes_to_address(message.sender)?,
recipient: bytes_to_address(message.recipient)?,
body: message.msg_body.unwrap_or(Vec::new()),
}))
let delivery = H256::from_slice(&delivery.msg_id);
Ok(Some(delivery))
} else {
Ok(None)
}
}

/// Get the tx id associated with a dispatched message.
/// Get the tx id of a delivered message associated with a sequence.
#[instrument(skip(self))]
pub async fn retrieve_dispatched_tx_id(
pub async fn retrieve_delivered_message_tx_id(
&self,
origin_domain: u32,
origin_mailbox: &H256,
nonce: u32,
destination_domain: u32,
destination_mailbox: &H256,
sequence: u32,
) -> Result<Option<i64>> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Nonce,
}

let tx_id = message::Entity::find()
.filter(message::Column::Origin.eq(origin_domain))
.filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox)))
.filter(message::Column::Nonce.eq(nonce))
.select_only()
.column_as(message::Column::OriginTxId.max(), QueryAs::Nonce)
.group_by(message::Column::Origin)
.into_values::<i64, QueryAs>()
if let Some(delivery) = delivered_message::Entity::find()
.filter(delivered_message::Column::Domain.eq(destination_domain))
.filter(
delivered_message::Column::DestinationMailbox
.eq(address_to_bytes(destination_mailbox)),
)
.filter(delivered_message::Column::Sequence.eq(sequence))
.one(&self.0)
.await?;
Ok(tx_id)
.await?
{
let txn_id = delivery.destination_tx_id;
Ok(Some(txn_id))
} else {
Ok(None)
}
}

async fn latest_deliveries_id(&self, domain: u32, destination_mailbox: Vec<u8>) -> Result<i64> {
Expand Down Expand Up @@ -147,6 +138,7 @@ impl ScraperDb {
domain: Unchanged(domain as i32),
destination_mailbox: Unchanged(destination_mailbox.clone()),
destination_tx_id: Set(delivery.txn_id),
sequence: Set(delivery.sequence),
})
.collect_vec();

Expand Down Expand Up @@ -180,6 +172,66 @@ impl ScraperDb {
Ok(new_deliveries_count)
}

/// Get the dispatched message associated with a nonce.
#[instrument(skip(self))]
pub async fn retrieve_dispatched_message_by_nonce(
&self,
origin_domain: u32,
origin_mailbox: &H256,
nonce: u32,
) -> Result<Option<HyperlaneMessage>> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Nonce,
}
if let Some(message) = message::Entity::find()
.filter(message::Column::Origin.eq(origin_domain))
.filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox)))
.filter(message::Column::Nonce.eq(nonce))
.one(&self.0)
.await?
{
Ok(Some(HyperlaneMessage {
// We do not write version to the DB.
version: 3,
origin: message.origin as u32,
destination: message.destination as u32,
nonce: message.nonce as u32,
sender: bytes_to_address(message.sender)?,
recipient: bytes_to_address(message.recipient)?,
body: message.msg_body.unwrap_or(Vec::new()),
}))
} else {
Ok(None)
}
}

/// Get the tx id associated with a dispatched message.
#[instrument(skip(self))]
pub async fn retrieve_dispatched_tx_id(
&self,
origin_domain: u32,
origin_mailbox: &H256,
nonce: u32,
) -> Result<Option<i64>> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Nonce,
}

let tx_id = message::Entity::find()
.filter(message::Column::Origin.eq(origin_domain))
.filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox)))
.filter(message::Column::Nonce.eq(nonce))
.select_only()
.column_as(message::Column::OriginTxId.max(), QueryAs::Nonce)
.group_by(message::Column::Origin)
.into_values::<i64, QueryAs>()
.one(&self.0)
.await?;
Ok(tx_id)
}

async fn latest_dispatched_id(&self, domain: u32, origin_mailbox: Vec<u8>) -> Result<i64> {
let result = message::Entity::find()
.select_only()
Expand Down
41 changes: 37 additions & 4 deletions rust/main/agents/scraper/src/store/deliveries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::collections::HashMap;
use async_trait::async_trait;
use eyre::Result;

use hyperlane_core::{Delivery, HyperlaneLogStore, Indexed, LogMeta, H512};
use hyperlane_core::{
unwrap_or_none_result, Delivery, HyperlaneLogStore, HyperlaneSequenceAwareIndexerStoreReader,
Indexed, LogMeta, H512,
};

use crate::db::StorableDelivery;
use crate::store::storage::{HyperlaneDbStore, TxnWithId};
Expand All @@ -25,11 +28,18 @@ impl HyperlaneLogStore<Delivery> for HyperlaneDbStore {
let storable = deliveries
.iter()
.filter_map(|(message_id, meta)| {
txns.get(&meta.transaction_id)
.map(|txn| (*message_id.inner(), meta, txn.id))
txns.get(&meta.transaction_id).map(|txn| {
(
*message_id.inner(),
message_id.sequence.map(|v| v as i64),
meta,
txn.id,
)
})
})
.map(|(message_id, meta, txn_id)| StorableDelivery {
.map(|(message_id, sequence, meta, txn_id)| StorableDelivery {
message_id,
sequence,
meta,
txn_id,
});
Expand All @@ -41,3 +51,26 @@ impl HyperlaneLogStore<Delivery> for HyperlaneDbStore {
Ok(stored as u32)
}
}

#[async_trait]
impl HyperlaneSequenceAwareIndexerStoreReader<Delivery> for HyperlaneDbStore {
/// Gets a delivered message by its sequence.
async fn retrieve_by_sequence(&self, sequence: u32) -> Result<Option<Delivery>> {
let delivery = self
.db
.retrieve_delivery_by_sequence(self.domain.id(), &self.mailbox_address, sequence)
.await?;
Ok(delivery)
}

/// Gets the block number at which the log occurred.
async fn retrieve_log_block_number_by_sequence(&self, sequence: u32) -> Result<Option<u64>> {
let tx_id = unwrap_or_none_result!(
self.db
.retrieve_delivered_message_tx_id(self.domain.id(), &self.mailbox_address, sequence)
.await?
);
let block_id = unwrap_or_none_result!(self.db.retrieve_block_id(tx_id).await?);
Ok(self.db.retrieve_block_number(block_id).await?)
}
}
2 changes: 1 addition & 1 deletion rust/main/agents/scraper/src/store/dispatches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl HyperlaneSequenceAwareIndexerStoreReader<HyperlaneMessage> for HyperlaneDbS
async fn retrieve_by_sequence(&self, sequence: u32) -> Result<Option<HyperlaneMessage>> {
let message = self
.db
.retrieve_message_by_nonce(self.domain.id(), &self.mailbox_address, sequence)
.retrieve_dispatched_message_by_nonce(self.domain.id(), &self.mailbox_address, sequence)
.await?;
Ok(message)
}
Expand Down
Loading

0 comments on commit a94c50f

Please sign in to comment.