diff --git a/.changelog/unreleased/improvements/2096-query-packet-pending.md b/.changelog/unreleased/improvements/2096-query-packet-pending.md new file mode 100644 index 0000000000..0ec1253df3 --- /dev/null +++ b/.changelog/unreleased/improvements/2096-query-packet-pending.md @@ -0,0 +1,4 @@ +- Added `query packet pending` command to list outstanding packet + commitments that are either unreceived or pending acknowledgement + at both ends of a channel. + ([#1862](https://github.com/informalsystems/ibc-rs/issues/1862)) diff --git a/relayer-cli/src/cli_utils.rs b/relayer-cli/src/cli_utils.rs index 9a4d0e408e..c51a3213e2 100644 --- a/relayer-cli/src/cli_utils.rs +++ b/relayer-cli/src/cli_utils.rs @@ -2,9 +2,8 @@ use alloc::sync::Arc; use tokio::runtime::Runtime as TokioRuntime; use ibc::core::ics02_client::client_state::ClientState; -use ibc::core::ics04_channel::channel::IdentifiedChannelEnd; use ibc::core::ics24_host::identifier::{ChainId, ChannelId, PortId}; -use ibc_relayer::chain::counterparty::channel_connection_client; +use ibc_relayer::chain::counterparty::{channel_connection_client, ChannelConnectionClient}; use ibc_relayer::{ chain::{ handle::{BaseChainHandle, ChainHandle}, @@ -80,7 +79,7 @@ pub fn spawn_chain_counterparty( chain_id: &ChainId, port_id: &PortId, channel_id: &ChannelId, -) -> Result<(ChainHandlePair, IdentifiedChannelEnd), Error> { +) -> Result<(ChainHandlePair, ChannelConnectionClient), Error> { let chain = spawn_chain_runtime_generic::(config, chain_id)?; let channel_connection_client = channel_connection_client(&chain, port_id, channel_id).map_err(Error::supervisor)?; @@ -94,6 +93,6 @@ pub fn spawn_chain_counterparty( src: chain, dst: counterparty_chain, }, - channel_connection_client.channel, + channel_connection_client, )) } diff --git a/relayer-cli/src/commands/query/packet.rs b/relayer-cli/src/commands/query/packet.rs index 701fbbc7f8..5ef8b97c42 100644 --- a/relayer-cli/src/commands/query/packet.rs +++ b/relayer-cli/src/commands/query/packet.rs @@ -5,6 +5,7 @@ mod ack; mod acks; mod commitment; mod commitments; +mod pending; mod unreceived_acks; mod unreceived_packets; @@ -27,4 +28,7 @@ pub enum QueryPacketCmds { /// Query unreceived acknowledgments UnreceivedAcks(unreceived_acks::QueryUnreceivedAcknowledgementCmd), + + /// Output a summary of pending packets in both directions + Pending(pending::QueryPendingPacketsCmd), } diff --git a/relayer-cli/src/commands/query/packet/acks.rs b/relayer-cli/src/commands/query/packet/acks.rs index 6190b2cccc..12f2a07e40 100644 --- a/relayer-cli/src/commands/query/packet/acks.rs +++ b/relayer-cli/src/commands/query/packet/acks.rs @@ -36,15 +36,16 @@ impl QueryPacketAcknowledgementsCmd { debug!("Options: {:?}", self); - let (chains, channel) = spawn_chain_counterparty::( + let (chains, chan_conn_cli) = spawn_chain_counterparty::( &config, &self.chain_id, &self.port_id, &self.channel_id, )?; - let (seqs, height) = acknowledgements_on_chain(&chains.src, &chains.dst, &channel) - .map_err(Error::supervisor)?; + let (seqs, height) = + acknowledgements_on_chain(&chains.src, &chains.dst, &chan_conn_cli.channel) + .map_err(Error::supervisor)?; Ok(PacketSeqs { seqs, height }) } diff --git a/relayer-cli/src/commands/query/packet/pending.rs b/relayer-cli/src/commands/query/packet/pending.rs new file mode 100644 index 0000000000..e59a7a1986 --- /dev/null +++ b/relayer-cli/src/commands/query/packet/pending.rs @@ -0,0 +1,95 @@ +use abscissa_core::clap::Parser; +use abscissa_core::{Command, Runnable}; +use serde::Serialize; + +use ibc::core::ics24_host::identifier::{ChainId, ChannelId, PortId}; +use ibc_relayer::chain::counterparty::{ + channel_on_destination, pending_packet_summary, PendingPackets, +}; +use ibc_relayer::chain::handle::BaseChainHandle; + +use crate::cli_utils::spawn_chain_counterparty; +use crate::conclude::Output; +use crate::error::Error; +use crate::prelude::*; + +/// A structure to display pending packet commitment sequence IDs +/// at both ends of a channel. +#[derive(Debug, Serialize)] +struct Summary { + /// The packets sent on the source chain as identified by the command. + src: PendingPackets, + /// The packets sent on the counterparty chain. + dst: PendingPackets, +} + +/// This command does the following: +/// +/// 1. queries the chain to get its counterparty chain, channel and port identifiers (needed in 2) +/// 2. queries both chains for all packet commitments/ sequences for the given port and channel +/// and its counterparty. +/// 3. queries both chains for the unreceived sequences and acks out of the lists obtained in 2. +#[derive(Clone, Command, Debug, Parser)] +pub struct QueryPendingPacketsCmd { + #[clap( + required = true, + help = "identifier of the chain at one end of the channel" + )] + chain_id: ChainId, + + #[clap( + required = true, + help = "port identifier on the chain given by " + )] + port_id: PortId, + + #[clap( + required = true, + help = "channel identifier on the chain given by " + )] + channel_id: ChannelId, +} + +impl QueryPendingPacketsCmd { + fn execute(&self) -> Result { + let config = app_config(); + + let (chains, chan_conn_cli) = spawn_chain_counterparty::( + &config, + &self.chain_id, + &self.port_id, + &self.channel_id, + )?; + + debug!( + chain=%self.chain_id, + "fetched channel from source chain: {:?}", + chan_conn_cli.channel + ); + + let src_summary = pending_packet_summary(&chains.src, &chains.dst, &chan_conn_cli.channel) + .map_err(Error::supervisor)?; + let counterparty_channel = channel_on_destination( + &chan_conn_cli.channel, + &chan_conn_cli.connection, + &chains.dst, + ) + .map_err(Error::supervisor)? + .ok_or_else(|| Error::missing_counterparty_channel_id(chan_conn_cli.channel))?; + let dst_summary = pending_packet_summary(&chains.dst, &chains.src, &counterparty_channel) + .map_err(Error::supervisor)?; + Ok(Summary { + src: src_summary, + dst: dst_summary, + }) + } +} + +impl Runnable for QueryPendingPacketsCmd { + fn run(&self) { + match self.execute() { + Ok(pending) => Output::success(pending).exit(), + Err(e) => Output::error(format!("{}", e)).exit(), + } + } +} diff --git a/relayer-cli/src/commands/query/packet/unreceived_acks.rs b/relayer-cli/src/commands/query/packet/unreceived_acks.rs index cee648b932..f223a086b9 100644 --- a/relayer-cli/src/commands/query/packet/unreceived_acks.rs +++ b/relayer-cli/src/commands/query/packet/unreceived_acks.rs @@ -34,7 +34,7 @@ impl QueryUnreceivedAcknowledgementCmd { let config = app_config(); debug!("Options: {:?}", self); - let (chains, channel) = spawn_chain_counterparty::( + let (chains, chan_conn_cli) = spawn_chain_counterparty::( &config, &self.chain_id, &self.port_id, @@ -43,10 +43,11 @@ impl QueryUnreceivedAcknowledgementCmd { debug!( "fetched from source chain {} the following channel {:?}", - self.chain_id, channel + self.chain_id, chan_conn_cli.channel, ); - unreceived_acknowledgements(&chains.src, &chains.dst, &channel).map_err(Error::supervisor) + unreceived_acknowledgements(&chains.src, &chains.dst, &chan_conn_cli.channel) + .map_err(Error::supervisor) } } diff --git a/relayer-cli/src/commands/query/packet/unreceived_packets.rs b/relayer-cli/src/commands/query/packet/unreceived_packets.rs index 5ff734ee2f..6bf9202e83 100644 --- a/relayer-cli/src/commands/query/packet/unreceived_packets.rs +++ b/relayer-cli/src/commands/query/packet/unreceived_packets.rs @@ -1,9 +1,7 @@ use abscissa_core::clap::Parser; use abscissa_core::{Command, Runnable}; -use serde::Serialize; use ibc::core::ics24_host::identifier::{ChainId, ChannelId, PortId}; -use ibc::Height; use ibc_relayer::chain::counterparty::unreceived_packets; use ibc_relayer::chain::handle::BaseChainHandle; @@ -12,12 +10,6 @@ use crate::conclude::Output; use crate::error::Error; use crate::prelude::*; -#[derive(Serialize, Debug)] -struct PacketSeqs { - height: Height, - seqs: Vec, -} - /// This command does the following: /// 1. queries the chain to get its counterparty chain, channel and port identifiers (needed in 2) /// 2. queries the counterparty chain for all packet commitments/ sequences for a given port and channel @@ -42,7 +34,7 @@ impl QueryUnreceivedPacketsCmd { let config = app_config(); debug!("Options: {:?}", self); - let (chains, channel) = spawn_chain_counterparty::( + let (chains, chan_conn_cli) = spawn_chain_counterparty::( &config, &self.chain_id, &self.port_id, @@ -51,10 +43,11 @@ impl QueryUnreceivedPacketsCmd { debug!( "fetched from source chain {} the following channel {:?}", - self.chain_id, channel + self.chain_id, chan_conn_cli.channel ); - unreceived_packets(&chains.src, &chains.dst, &channel).map_err(Error::supervisor) + unreceived_packets(&chains.src, &chains.dst, &chan_conn_cli.channel) + .map_err(Error::supervisor) } } diff --git a/relayer/src/chain/counterparty.rs b/relayer/src/chain/counterparty.rs index 5aee9a3547..9a5220f051 100644 --- a/relayer/src/chain/counterparty.rs +++ b/relayer/src/chain/counterparty.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use serde::{Deserialize, Serialize}; use tracing::{error, trace}; @@ -328,68 +330,47 @@ pub fn unreceived_packets_sequences( chain: &impl ChainHandle, port_id: &PortId, channel_id: &ChannelId, - counterparty_chain: &impl ChainHandle, - counterparty_port_id: &PortId, - counterparty_channel_id: &ChannelId, -) -> Result<(Vec, Vec, Height), Error> { - // get the packet commitments on the counterparty/ source chain - let (commitments_on_counterparty, counterparty_height) = commitments_on_chain( - counterparty_chain, - counterparty_port_id, - counterparty_channel_id, - )?; + commitments_on_counterparty: Vec, +) -> Result, Error> { if commitments_on_counterparty.is_empty() { - return Ok((vec![], vec![], counterparty_height)); + return Ok(vec![]); } let request = QueryUnreceivedPacketsRequest { port_id: port_id.to_string(), channel_id: channel_id.to_string(), - packet_commitment_sequences: commitments_on_counterparty.clone(), + packet_commitment_sequences: commitments_on_counterparty, }; - let unreceived_recv_msgs = chain + chain .query_unreceived_packets(request) - .map_err(Error::relayer)?; - - Ok(( - commitments_on_counterparty, - unreceived_recv_msgs, - counterparty_height, - )) + .map_err(Error::relayer) } /// Returns the sequences of the written acknowledgments on a given chain and channel (port_id + channel_id), out of /// the commitments still present on the counterparty chain. -pub fn packet_acknowledgedgments( +pub fn packet_acknowledgements( chain: &impl ChainHandle, port_id: &PortId, channel_id: &ChannelId, - counterparty_chain: &impl ChainHandle, - counterparty_port_id: &PortId, - counterparty_channel_id: &ChannelId, + commit_sequences: Vec, ) -> Result<(Vec, Height), Error> { - let (commit_sequences, _) = commitments_on_chain( - counterparty_chain, - counterparty_port_id, - counterparty_channel_id, - )?; + let commit_set = commit_sequences.iter().cloned().collect::>(); - // get the packet acknowledgments on counterparty/source chain + // Get the packet acknowledgments on counterparty/source chain let acks_request = QueryPacketAcknowledgementsRequest { port_id: port_id.to_string(), channel_id: channel_id.to_string(), pagination: ibc_proto::cosmos::base::query::pagination::all(), - packet_commitment_sequences: commit_sequences.clone(), + packet_commitment_sequences: commit_sequences, }; - let (acks, response_height) = chain .query_packet_acknowledgements(acks_request) .map_err(Error::relayer)?; let mut acked_sequences: Vec = acks.into_iter().map(|v| v.sequence).collect(); + acked_sequences.retain(|s| commit_set.contains(s)); acked_sequences.sort_unstable(); - acked_sequences.retain(|s| commit_sequences.contains(s)); Ok((acked_sequences, response_height)) } @@ -401,37 +382,21 @@ pub fn unreceived_acknowledgements_sequences( chain: &impl ChainHandle, port_id: &PortId, channel_id: &ChannelId, - counterparty_chain: &impl ChainHandle, - counterparty_port_id: &PortId, - counterparty_channel_id: &ChannelId, -) -> Result<(Vec, Vec, Height), Error> { - // get the packet commitments on the destination chain - - // get the packet acknowledgments on counterparty chain - let (acks_on_counterparty, counterparty_height) = packet_acknowledgedgments( - counterparty_chain, - counterparty_port_id, - counterparty_channel_id, - chain, - port_id, - channel_id, - )?; + acks_on_counterparty: Vec, +) -> Result, Error> { + if acks_on_counterparty.is_empty() { + return Ok(vec![]); + } let request = QueryUnreceivedAcksRequest { port_id: port_id.to_string(), channel_id: channel_id.to_string(), - packet_ack_sequences: acks_on_counterparty.clone(), + packet_ack_sequences: acks_on_counterparty, }; - let unreceived_ack_msgs = chain + chain .query_unreceived_acknowledgement(request) - .map_err(Error::relayer)?; - - Ok(( - acks_on_counterparty, - unreceived_ack_msgs, - counterparty_height, - )) + .map_err(Error::relayer) } pub fn unreceived_packets( @@ -439,23 +404,24 @@ pub fn unreceived_packets( counterparty_chain: &impl ChainHandle, channel: &IdentifiedChannelEnd, ) -> Result, Error> { - let counterparty_channel_id = channel - .channel_end - .counterparty() + let counterparty = channel.channel_end.counterparty(); + let counterparty_channel_id = counterparty .channel_id .as_ref() .ok_or_else(Error::missing_counterparty_channel_id)?; - let (_, sequences, _) = unreceived_packets_sequences( - chain, - &channel.port_id, - &channel.channel_id, + let (commit_sequences, _) = commitments_on_chain( counterparty_chain, - &channel.channel_end.counterparty().port_id, + &counterparty.port_id, counterparty_channel_id, )?; - Ok(sequences) + unreceived_packets_sequences( + chain, + &channel.port_id, + &channel.channel_id, + commit_sequences, + ) } pub fn acknowledgements_on_chain( @@ -463,20 +429,23 @@ pub fn acknowledgements_on_chain( counterparty_chain: &impl ChainHandle, channel: &IdentifiedChannelEnd, ) -> Result<(Vec, Height), Error> { - let counterparty_channel_id = channel - .channel_end - .counterparty() + let counterparty = channel.channel_end.counterparty(); + let counterparty_channel_id = counterparty .channel_id .as_ref() .ok_or_else(Error::missing_counterparty_channel_id)?; - let (sequences, height) = packet_acknowledgedgments( + let (commitments_on_counterparty, _) = commitments_on_chain( + counterparty_chain, + &counterparty.port_id, + counterparty_channel_id, + )?; + + let (sequences, height) = packet_acknowledgements( chain, &channel.port_id, &channel.channel_id, - counterparty_chain, - &channel.channel_end.counterparty().port_id, - counterparty_channel_id, + commitments_on_counterparty, )?; Ok((sequences, height)) @@ -487,21 +456,78 @@ pub fn unreceived_acknowledgements( counterparty_chain: &impl ChainHandle, channel: &IdentifiedChannelEnd, ) -> Result, Error> { - let counterparty_channel_id = channel - .channel_end - .counterparty() + let counterparty = channel.channel_end.counterparty(); + let counterparty_channel_id = counterparty .channel_id .as_ref() .ok_or_else(Error::missing_counterparty_channel_id)?; - let (_, sequences, _) = unreceived_acknowledgements_sequences( + let (commitments_on_src, _) = + commitments_on_chain(chain, &channel.port_id, &channel.channel_id)?; + + let (acks_on_counterparty, _) = packet_acknowledgements( + counterparty_chain, + &counterparty.port_id, + counterparty_channel_id, + commitments_on_src, + )?; + + unreceived_acknowledgements_sequences( chain, &channel.port_id, &channel.channel_id, + acks_on_counterparty, + ) +} + +/// A structure to display pending packet commitment IDs +/// at one end of a channel. +#[derive(Debug, Serialize)] +pub struct PendingPackets { + /// Not yet received on the counterparty chain. + pub unreceived_packets: Vec, + /// Received on the counterparty chain, + /// but the acknowledgement is not yet received on the local chain. + pub unreceived_acks: Vec, +} + +pub fn pending_packet_summary( + chain: &impl ChainHandle, + counterparty_chain: &impl ChainHandle, + channel: &IdentifiedChannelEnd, +) -> Result { + let counterparty = channel.channel_end.counterparty(); + let counterparty_channel_id = counterparty + .channel_id + .as_ref() + .ok_or_else(Error::missing_counterparty_channel_id)?; + + let (commitments_on_src, _) = + commitments_on_chain(chain, &channel.port_id, &channel.channel_id)?; + + let unreceived = unreceived_packets_sequences( counterparty_chain, - &channel.channel_end.counterparty().port_id, + &counterparty.port_id, counterparty_channel_id, + commitments_on_src.clone(), + )?; + + let (acks_on_counterparty, _) = packet_acknowledgements( + counterparty_chain, + &counterparty.port_id, + counterparty_channel_id, + commitments_on_src, + )?; + + let pending_acks = unreceived_acknowledgements_sequences( + chain, + &channel.port_id, + &channel.channel_id, + acks_on_counterparty, )?; - Ok(sequences) + Ok(PendingPackets { + unreceived_packets: unreceived, + unreceived_acks: pending_acks, + }) } diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index f2f164450c..35eb5b2594 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -38,7 +38,8 @@ use ibc_proto::ibc::core::channel::v1::{ }; use crate::chain::counterparty::{ - unreceived_acknowledgements_sequences, unreceived_packets_sequences, + commitments_on_chain, packet_acknowledgements, unreceived_acknowledgements_sequences, + unreceived_packets_sequences, }; use crate::chain::handle::ChainHandle; use crate::chain::tx::TrackedMsgs; @@ -968,13 +969,15 @@ impl RelayPath { let src_channel_id = self.src_channel_id(); let dst_channel_id = self.dst_channel_id(); - let (commit_sequences, sequences, src_response_height) = unreceived_packets_sequences( + let (commit_sequences, src_response_height) = + commitments_on_chain(self.src_chain(), self.src_port_id(), src_channel_id) + .map_err(LinkError::supervisor)?; + + let sequences = unreceived_packets_sequences( self.dst_chain(), self.dst_port_id(), dst_channel_id, - self.src_chain(), - self.src_port_id(), - src_channel_id, + commit_sequences, ) .map_err(LinkError::supervisor)?; @@ -985,13 +988,6 @@ impl RelayPath { return Ok((events_result.into(), query_height)); } - debug!( - "packet seq. that still have commitments on {}: {} (first 10 shown here; total={})", - self.src_chain().id(), - commit_sequences.iter().take(10).join(", "), - commit_sequences.len() - ); - debug!( "recv packets to send out to {} of the ones with commitments on {}: {} (first 10 shown here; total={})", self.dst_chain().id(), @@ -1079,16 +1075,25 @@ impl RelayPath { let src_channel_id = self.src_channel_id(); let dst_channel_id = self.dst_channel_id(); - let (acks_on_src, unreceived_acks_by_dst, src_response_height) = - unreceived_acknowledgements_sequences( - self.dst_chain(), - self.dst_port_id(), - dst_channel_id, - self.src_chain(), - self.src_port_id(), - src_channel_id, - ) - .map_err(LinkError::supervisor)?; + let (commitments_on_counterparty, _) = + commitments_on_chain(self.dst_chain(), self.dst_port_id(), dst_channel_id) + .map_err(LinkError::supervisor)?; + + let (acks_on_src, src_response_height) = packet_acknowledgements( + self.src_chain(), + self.src_port_id(), + src_channel_id, + commitments_on_counterparty, + ) + .map_err(LinkError::supervisor)?; + + let unreceived_acks_by_dst = unreceived_acknowledgements_sequences( + self.dst_chain(), + self.dst_port_id(), + dst_channel_id, + acks_on_src, + ) + .map_err(LinkError::supervisor)?; let query_height = opt_query_height.unwrap_or(src_response_height); @@ -1097,14 +1102,6 @@ impl RelayPath { return Ok((events_result.into(), query_height)); } - debug!( - "packets that have acknowledgments on {}: [{:?}..{:?}] (total={})", - self.src_chain().id(), - acks_on_src.first(), - acks_on_src.last(), - acks_on_src.len() - ); - debug!( "ack packets to send out to {} of the ones with acknowledgments on {}: {} (first 10 shown here; total={})", self.dst_chain().id(), diff --git a/tools/integration-test/src/tests/mod.rs b/tools/integration-test/src/tests/mod.rs index 1cab6d91e0..f35a5c8d8a 100644 --- a/tools/integration-test/src/tests/mod.rs +++ b/tools/integration-test/src/tests/mod.rs @@ -10,6 +10,7 @@ pub mod client_expiration; mod client_settings; pub mod connection_delay; pub mod memo; +mod query_packet; pub mod supervisor; pub mod ternary_transfer; pub mod transfer; diff --git a/tools/integration-test/src/tests/query_packet.rs b/tools/integration-test/src/tests/query_packet.rs new file mode 100644 index 0000000000..f4dd72abdd --- /dev/null +++ b/tools/integration-test/src/tests/query_packet.rs @@ -0,0 +1,144 @@ +use ibc_relayer::chain::counterparty::{channel_on_destination, pending_packet_summary}; +use ibc_relayer::link::{Link, LinkParameters}; + +use ibc_test_framework::prelude::*; +use ibc_test_framework::relayer::channel::query_identified_channel_end; +use ibc_test_framework::relayer::connection::query_identified_connection_end; +use ibc_test_framework::util::random::random_u64_range; + +#[test] +fn test_query_packet_pending() -> Result<(), Error> { + run_binary_channel_test(&QueryPacketPendingTest) +} + +pub struct QueryPacketPendingTest; + +impl TestOverrides for QueryPacketPendingTest { + fn modify_relayer_config(&self, config: &mut Config) { + // Disabling clear_on_start should make the relayer not + // relay any packet it missed before starting. + config.mode.packets.clear_on_start = false; + config.mode.packets.clear_interval = 0; + } + + fn should_spawn_supervisor(&self) -> bool { + false + } +} + +impl BinaryChannelTest for QueryPacketPendingTest { + fn run( + &self, + _config: &TestConfig, + _relayer: RelayerDriver, + chains: ConnectedChains, + channel: ConnectedChannel, + ) -> Result<(), Error> { + let denom_a = chains.node_a.denom(); + + let wallet_a = chains.node_a.wallets().user1().cloned(); + let wallet_b = chains.node_b.wallets().user1().cloned(); + + let amount1 = random_u64_range(1000, 5000); + + info!( + "Performing IBC transfer with amount {}, which should *not* be relayed", + amount1 + ); + + chains.node_a.chain_driver().transfer_token( + &channel.port_a.as_ref(), + &channel.channel_id_a.as_ref(), + &wallet_a.address(), + &wallet_b.address(), + amount1, + &denom_a, + )?; + + sleep(Duration::from_secs(2)); + + let opts = LinkParameters { + src_port_id: channel.port_a.clone().into_value(), + src_channel_id: channel.channel_id_a.into_value(), + }; + let link = Link::new_from_opts( + chains.handle_a().clone(), + chains.handle_b().clone(), + opts, + false, + )?; + + let channel_end = query_identified_channel_end( + chains.handle_a(), + channel.channel_id_a.as_ref(), + channel.port_a.as_ref(), + )?; + + let summary = + pending_packet_summary(chains.handle_a(), chains.handle_b(), channel_end.value())?; + + assert_eq!(summary.unreceived_packets, [1]); + assert!(summary.unreceived_acks.is_empty()); + + // Receive the packet on the destination chain + link.build_and_send_recv_packet_messages()?; + + let summary = + pending_packet_summary(chains.handle_a(), chains.handle_b(), channel_end.value())?; + + assert!(summary.unreceived_packets.is_empty()); + assert_eq!(summary.unreceived_acks, [1]); + + // Acknowledge the packet on the source chain + let link = link.reverse(false)?; + link.build_and_send_ack_packet_messages()?; + + let summary = + pending_packet_summary(chains.handle_a(), chains.handle_b(), channel_end.value())?; + + assert!(summary.unreceived_packets.is_empty()); + assert!(summary.unreceived_acks.is_empty()); + + let denom_b = chains.node_b.denom(); + let amount2 = random_u64_range(1000, 5000); + + chains.node_b.chain_driver().transfer_token( + &channel.port_b.as_ref(), + &channel.channel_id_b.as_ref(), + &wallet_b.address(), + &wallet_a.address(), + amount2, + &denom_b, + )?; + + info!( + "Performing IBC transfer with amount {}, which should *not* be relayed", + amount2 + ); + + sleep(Duration::from_secs(2)); + + // Get the reverse channel end, like the CLI command does + let connection_end = query_identified_connection_end( + chains.handle_a(), + channel.connection.connection_id_a.as_ref(), + )?; + let counterparty_channel_end = channel_on_destination( + channel_end.value(), + connection_end.value(), + chains.handle_b(), + )? + .unwrap(); + + let summary = pending_packet_summary( + chains.handle_b(), + chains.handle_a(), + &counterparty_channel_end, + )?; + + assert_eq!(summary.unreceived_packets, [1]); + assert!(summary.unreceived_acks.is_empty()); + + Ok(()) + } +} diff --git a/tools/test-framework/src/error.rs b/tools/test-framework/src/error.rs index d8d8db24c3..388f80b1dc 100644 --- a/tools/test-framework/src/error.rs +++ b/tools/test-framework/src/error.rs @@ -6,6 +6,7 @@ use flex_error::{define_error, TraceError}; use ibc_relayer::channel::error::ChannelError; use ibc_relayer::connection::ConnectionError; use ibc_relayer::error::Error as RelayerError; +use ibc_relayer::link::error::LinkError; use ibc_relayer::supervisor::error::Error as SupervisorError; use ibc_relayer::transfer::TransferError; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; @@ -49,6 +50,10 @@ define_error! { [ TransferError ] | _ | { "transfer error"}, + Link + [ LinkError ] + | _ | { "link error" }, + Retry { task_name: String, @@ -116,3 +121,9 @@ impl From for Error { Error::transfer(e) } } + +impl From for Error { + fn from(e: LinkError) -> Self { + Error::link(e) + } +} diff --git a/tools/test-framework/src/relayer/channel.rs b/tools/test-framework/src/relayer/channel.rs index de8de2455d..4c0223c77d 100644 --- a/tools/test-framework/src/relayer/channel.rs +++ b/tools/test-framework/src/relayer/channel.rs @@ -1,7 +1,7 @@ use core::time::Duration; use eyre::eyre; use ibc::core::ics04_channel::channel::State as ChannelState; -use ibc::core::ics04_channel::channel::{ChannelEnd, Order}; +use ibc::core::ics04_channel::channel::{ChannelEnd, IdentifiedChannelEnd, Order}; use ibc::Height; use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer::channel::{extract_channel_id, Channel, ChannelSide}; @@ -81,6 +81,19 @@ pub fn query_channel_end( Ok(DualTagged::new(channel_end)) } +pub fn query_identified_channel_end( + handle: &ChainA, + channel_id: TaggedChannelIdRef, + port_id: TaggedPortIdRef, +) -> Result, Error> { + let channel_end = handle.query_channel(port_id.value(), channel_id.value(), Height::zero())?; + Ok(DualTagged::new(IdentifiedChannelEnd::new( + port_id.into_value().clone(), + *channel_id.into_value(), + channel_end, + ))) +} + pub fn assert_eventually_channel_established( handle_a: &ChainA, handle_b: &ChainB, diff --git a/tools/test-framework/src/relayer/connection.rs b/tools/test-framework/src/relayer/connection.rs index 6f058a579d..8e2dfcc93c 100644 --- a/tools/test-framework/src/relayer/connection.rs +++ b/tools/test-framework/src/relayer/connection.rs @@ -4,8 +4,8 @@ use core::time::Duration; use eyre::eyre; -use ibc::core::ics03_connection::connection::ConnectionEnd; use ibc::core::ics03_connection::connection::State as ConnectionState; +use ibc::core::ics03_connection::connection::{ConnectionEnd, IdentifiedConnectionEnd}; use ibc::timestamp::ZERO_DURATION; use ibc::Height; use ibc_relayer::chain::handle::ChainHandle; @@ -93,6 +93,17 @@ pub fn query_connection_end( Ok(DualTagged::new(connection_end)) } +pub fn query_identified_connection_end( + handle: &ChainA, + connection_id: TaggedConnectionIdRef, +) -> Result, Error> { + let connection_end = handle.query_connection(connection_id.value(), Height::zero())?; + Ok(DualTagged::new(IdentifiedConnectionEnd::new( + connection_id.into_value().clone(), + connection_end, + ))) +} + pub fn assert_eventually_connection_established( handle_a: &ChainA, handle_b: &ChainB, diff --git a/tools/test-framework/src/types/tagged/dual.rs b/tools/test-framework/src/types/tagged/dual.rs index 78c741b76a..f1ab3f583f 100644 --- a/tools/test-framework/src/types/tagged/dual.rs +++ b/tools/test-framework/src/types/tagged/dual.rs @@ -389,6 +389,8 @@ impl AsRef for Tagged { } } +impl Copy for Tagged {} + impl Clone for Tagged { fn clone(&self) -> Self { Self::new(self.0.clone()) diff --git a/tools/test-framework/src/types/tagged/mono.rs b/tools/test-framework/src/types/tagged/mono.rs index af7d0564e6..c3631604d2 100644 --- a/tools/test-framework/src/types/tagged/mono.rs +++ b/tools/test-framework/src/types/tagged/mono.rs @@ -360,6 +360,8 @@ impl IntoIterator for Tagged { } } +impl Copy for Tagged {} + impl Clone for Tagged { fn clone(&self) -> Self { Self::new(self.0.clone())