Skip to content

Commit

Permalink
assert message content
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoluisam committed Dec 21, 2024
1 parent 30f0433 commit 0824d90
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 24 deletions.
110 changes: 97 additions & 13 deletions integration-tests/ccip-tests/actions/ccip_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,26 +1833,28 @@ func (sourceCCIP *SourceCCIPModule) CCIPMsg(
func (sourceCCIP *SourceCCIPModule) SendRequest(
receiver common.Address,
gasLimit *big.Int,
) (common.Hash, time.Duration, *big.Int, error) {
) (common.Hash, time.Duration, *big.Int, []byte, error) {
var d time.Duration
destChainSelector, err := chainselectors.SelectorFromChainId(sourceCCIP.DestinationChainId)
if err != nil {
return common.Hash{}, d, nil, fmt.Errorf("failed getting the chain selector: %w", err)
return common.Hash{}, d, nil, nil, fmt.Errorf("failed getting the chain selector: %w", err)
}
// form the message for transfer
msg, err := sourceCCIP.CCIPMsg(receiver, sourceCCIP.Common.AllowOutOfOrder, gasLimit)
if err != nil {
return common.Hash{}, d, nil, fmt.Errorf("failed forming the ccip msg: %w", err)
return common.Hash{}, d, nil, nil, fmt.Errorf("failed forming the ccip msg: %w", err)
}

msgData := msg.Data

fee, err := sourceCCIP.Common.Router.GetFee(destChainSelector, msg)
if err != nil {
log.Info().Interface("Msg", msg).Msg("CCIP msg")
reason, _ := blockchain.RPCErrorFromError(err)
if reason != "" {
return common.Hash{}, d, nil, fmt.Errorf("failed getting the fee: %s", reason)
return common.Hash{}, d, nil, nil, fmt.Errorf("failed getting the fee: %s", reason)
}
return common.Hash{}, d, nil, fmt.Errorf("failed getting the fee: %w", err)
return common.Hash{}, d, nil, nil, fmt.Errorf("failed getting the fee: %w", err)
}
log.Info().Str("Fee", fee.String()).Msg("Calculated fee")

Expand All @@ -1868,7 +1870,7 @@ func (sourceCCIP *SourceCCIPModule) SendRequest(
if sendTx != nil {
txHash = sendTx.Hash()
}
return txHash, time.Since(timeNow), nil, fmt.Errorf("failed initiating the transfer ccip-send: %w", err)
return txHash, time.Since(timeNow), nil, nil, fmt.Errorf("failed initiating the transfer ccip-send: %w", err)
}
} else {
sendTx, err = sourceCCIP.Common.Router.CCIPSendAndProcessTx(destChainSelector, msg, fee)
Expand All @@ -1877,7 +1879,7 @@ func (sourceCCIP *SourceCCIPModule) SendRequest(
if sendTx != nil {
txHash = sendTx.Hash()
}
return txHash, time.Since(timeNow), nil, fmt.Errorf("failed initiating the transfer ccip-send: %w", err)
return txHash, time.Since(timeNow), nil, nil, fmt.Errorf("failed initiating the transfer ccip-send: %w", err)
}
}

Expand All @@ -1886,7 +1888,7 @@ func (sourceCCIP *SourceCCIPModule) SendRequest(
Str("Send token transaction", sendTx.Hash().String()).
Str("lane", fmt.Sprintf("%s-->%s", sourceCCIP.Common.ChainClient.GetNetworkName(), sourceCCIP.DestNetworkName)).
Msg("Sending token")
return sendTx.Hash(), time.Since(timeNow), fee, nil
return sendTx.Hash(), time.Since(timeNow), fee, msgData, nil
}

func DefaultSourceCCIPModule(
Expand Down Expand Up @@ -1932,6 +1934,7 @@ type DestCCIPModule struct {
OffRamp *contracts.OffRamp
ReportAcceptedWatcher *sync.Map
ExecStateChangedWatcher *sync.Map
MessageReceivedWatcher *sync.Map
ReportBlessedWatcher *sync.Map
ReportBlessedBySeqNum *sync.Map
NextSeqNumToCommit *atomic.Uint64
Expand Down Expand Up @@ -2642,6 +2645,74 @@ func (destCCIP *DestCCIPModule) AssertSeqNumberExecuted(
}
}

func (destCCIP *DestCCIPModule) AssertMessageContentMatch(
lggr *zerolog.Logger,
messageID string,
expectedContent []byte,
timeout time.Duration,
reqStat *testreporters.RequestStat,
) error {
fmt.Println(reqStat)
lggr.Info().
Str("MsgID", fmt.Sprintf("0x%x", messageID)).
Str("Timeout", timeout.String()).
Msg("Waiting for message content to match")
timer := time.NewTimer(timeout)
defer timer.Stop()
resetTimerCount := 0
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Load the message content from the watcher
value, ok := destCCIP.MessageReceivedWatcher.Load(messageID)
if !ok {
lggr.Warn().
Str("MsgID", fmt.Sprintf("0x%x", messageID)).
Msg("Message still not found in MessageReceivedWatcher")
continue
}

receivedContent, ok := value.([]uint8)
if !ok {
lggr.Warn().
Str("MsgID", fmt.Sprintf("0x%x", messageID)).
Msg("Invalid content type in MessageReceivedWatcher")
continue
}

// Compare the received content with the expected content
if string(receivedContent) == string(expectedContent) {
lggr.Info().
Str("MessageID", messageID).
Msg("Message content matches the expected content")
return nil
}

lggr.Warn().
Str("MessageID", messageID).
Str("ReceivedContent", string(receivedContent)).
Str("ExpectedContent", string(expectedContent)).
Msg("Message content mismatch")

case <-timer.C:
// Handle timeout with potential connection issue recovery
if destCCIP.Common.IsConnectionRestoredRecently != nil && !destCCIP.Common.IsConnectionRestoredRecently.Load() {
if resetTimerCount > 2 {
return fmt.Errorf("possible RPC issue - message content did not match for MessageID %s", messageID)
}
timer.Reset(timeout)
resetTimerCount++
lggr.Info().Int("count of reset", resetTimerCount).Msg("Resetting timer to validate message content match")
continue
}

return fmt.Errorf("timeout - message content did not match for MessageID %s", messageID)
}
}
}

func DefaultDestinationCCIPModule(
logger *zerolog.Logger,
testConf *testconfig.CCIPTestGroupConfig,
Expand Down Expand Up @@ -2671,6 +2742,7 @@ func DefaultDestinationCCIPModule(
ReportBlessedBySeqNum: &sync.Map{},
ExecStateChangedWatcher: &sync.Map{},
ReportAcceptedWatcher: &sync.Map{},
MessageReceivedWatcher: &sync.Map{},
}, nil
}

Expand All @@ -2679,6 +2751,7 @@ type CCIPRequest struct {
txHash string
txConfirmationTimestamp time.Time
RequestStat *testreporters.RequestStat
MessageData []byte
}

func CCIPRequestFromTxHash(txHash common.Hash, chainClient blockchain.EVMClient) (CCIPRequest, *types.Receipt, error) {
Expand Down Expand Up @@ -2831,7 +2904,7 @@ func (lane *CCIPLane) RecordStateBeforeTransfer() {
lane.SentReqs = make(map[common.Hash][]CCIPRequest)
}

func (lane *CCIPLane) AddToSentReqs(txHash common.Hash, reqStats []*testreporters.RequestStat) (*types.Receipt, error) {
func (lane *CCIPLane) AddToSentReqs(txHash common.Hash, reqStats []*testreporters.RequestStat, msgData []byte) (*types.Receipt, error) {
request, rcpt, err := CCIPRequestFromTxHash(txHash, lane.Source.Common.ChainClient)
if err != nil {
for _, stat := range reqStats {
Expand All @@ -2846,6 +2919,7 @@ func (lane *CCIPLane) AddToSentReqs(txHash common.Hash, reqStats []*testreporter
txHash: rcpt.TxHash.Hex(),
txConfirmationTimestamp: request.txConfirmationTimestamp,
RequestStat: stat,
MessageData: msgData,
})
lane.NumberOfReq++
}
Expand Down Expand Up @@ -2932,7 +3006,7 @@ func (lane *CCIPLane) Multicall(noOfRequests int, multiSendAddr common.Address)
}
return fmt.Errorf("failed to send the multicall: %w", err)
}
rcpt, err := lane.AddToSentReqs(tx.Hash(), reqStats)
rcpt, err := lane.AddToSentReqs(tx.Hash(), reqStats, nil)
if err != nil {
return err
}
Expand All @@ -2954,7 +3028,7 @@ func (lane *CCIPLane) Multicall(noOfRequests int, multiSendAddr common.Address)
func (lane *CCIPLane) SendRequests(noOfRequests int, gasLimit *big.Int) error {
for i := 1; i <= noOfRequests; i++ {
stat := testreporters.NewCCIPRequestStats(int64(lane.NumberOfReq+i), lane.SourceNetworkName, lane.DestNetworkName)
txHash, txConfirmationDur, fee, err := lane.Source.SendRequest(lane.Dest.ReceiverDapp.EthAddress, gasLimit)
txHash, txConfirmationDur, fee, msgData, err := lane.Source.SendRequest(lane.Dest.ReceiverDapp.EthAddress, gasLimit)
if err != nil {
stat.UpdateState(lane.Logger, 0, testreporters.TX, txConfirmationDur, testreporters.Failure, nil)
return fmt.Errorf("could not send request: %w", err)
Expand All @@ -2971,7 +3045,7 @@ func (lane *CCIPLane) SendRequests(noOfRequests int, gasLimit *big.Int) error {
noOfTokens++
}
}
_, err = lane.AddToSentReqs(txHash, []*testreporters.RequestStat{stat})
_, err = lane.AddToSentReqs(txHash, []*testreporters.RequestStat{stat}, msgData)
if err != nil {
return err
}
Expand Down Expand Up @@ -3246,6 +3320,14 @@ func (lane *CCIPLane) ValidateRequestByTxHash(txHash common.Hash, opts validatio
if opts.phaseExpectedToFail == testreporters.Commit && opts.timeout != 0 {
timeout = opts.timeout
}

err = lane.Dest.AssertMessageContentMatch(lane.Logger, string(msgLog.MessageId[:]), []byte(lane.SentReqs[txHash][0].MessageData), timeout, reqStat)
if err != nil {
return fmt.Errorf("message validation failed: %v", err)
} else {
log.Info().Msg("Message content validation successful")
}

err = lane.Dest.AssertSeqNumberExecuted(lane.Logger, seqNumber, timeout, sourceLogFinalizedAt, reqStat)
if shouldReturn, phaseErr := isPhaseValid(lane.Logger, testreporters.Commit, opts, err); shouldReturn {
return phaseErr
Expand Down Expand Up @@ -3503,7 +3585,9 @@ func (lane *CCIPLane) StartEventWatchers() error {
select {
case e := <-messageReceivedEvent:
log.Info().Msgf("messageReceivedEvent received with data: %+v", e)
// You could store these events in a map or other structure for validation later.
messageId := string(e.MessageId[:])
messageContent := e.Data
lane.Dest.MessageReceivedWatcher.Store(messageId, messageContent)
case <-lane.Context.Done():
return
}
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/ccip-tests/load/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (l *LoadArgs) ValidateCurseFollowedByUncurse() {
// try to send requests on lanes on which curse is applied on source RMN and the request should revert
// data-only transfer is sufficient
lane.Source.TransferAmount = []*big.Int{}
failedTx, _, _, err := lane.Source.SendRequest(lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err := lane.Source.SendRequest(lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
if lane.Source.Common.ChainClient.GetNetworkConfig().MinimumConfirmations > 0 {
require.Error(l.t, err)
} else {
Expand Down
20 changes: 10 additions & 10 deletions integration-tests/ccip-tests/smoke/ccip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestSmokeCCIPRateLimit(t *testing.T) {
tc.lane.Source.Common.ChainClient.GetDefaultWallet(), src.Common.Router.Address(), src.TransferAmount[0]),
)
require.NoError(t, tc.lane.Source.Common.ChainClient.WaitForEvents())
failedTx, _, _, err := tc.lane.Source.SendRequest(
failedTx, _, _, _, err := tc.lane.Source.SendRequest(
tc.lane.Dest.ReceiverDapp.EthAddress,
big.NewInt(actions.DefaultDestinationGasLimit),
)
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestSmokeCCIPRateLimit(t *testing.T) {
// try to send again with amount more than the amount refilled by rate and
// this should fail, as the refill rate is not enough to refill the capacity
src.TransferAmount[0] = new(big.Int).Mul(AggregatedRateLimitRate, big.NewInt(10))
failedTx, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
tc.lane.Logger.Info().Str("tokensToSend", src.TransferAmount[0].String()).Msg("More than Aggregated Rate")
require.NoError(t, err)
require.Error(t, tc.lane.Source.Common.ChainClient.WaitForEvents())
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestSmokeCCIPRateLimit(t *testing.T) {
src.TransferAmount[0] = tokensToSend
tc.lane.Logger.Info().Str("tokensToSend", tokensToSend.String()).Msg("More than Token Pool Capacity")

failedTx, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
require.NoError(t, err)
require.Error(t, tc.lane.Source.Common.ChainClient.WaitForEvents())
errReason, v, err = tc.lane.Source.Common.ChainClient.RevertReasonFromTx(failedTx, lock_release_token_pool.LockReleaseTokenPoolABI)
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestSmokeCCIPRateLimit(t *testing.T) {
src.Common.ChainClient.GetDefaultWallet(), src.Common.Router.Address(), src.TransferAmount[0]),
)
require.NoError(t, tc.lane.Source.Common.ChainClient.WaitForEvents())
failedTx, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
require.NoError(t, err)
require.Error(t, tc.lane.Source.Common.ChainClient.WaitForEvents())
errReason, v, err = tc.lane.Source.Common.ChainClient.RevertReasonFromTx(failedTx, lock_release_token_pool.LockReleaseTokenPoolABI)
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestSmokeCCIPOnRampLimits(t *testing.T) {
src.TransferAmount[bpsTokenIndex] = big.NewInt(0)
src.TransferAmount[aggRateTokenIndex] = overCapacityAmount
src.TransferAmount[bpsAndAggTokenIndex] = big.NewInt(0)
failedTx, _, _, err := tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err := tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
require.Error(t, err, "Limited token transfer should immediately revert")
errReason, _, err := src.Common.ChainClient.RevertReasonFromTx(failedTx, evm_2_evm_onramp.EVM2EVMOnRampABI)
require.NoError(t, err)
Expand All @@ -519,7 +519,7 @@ func TestSmokeCCIPOnRampLimits(t *testing.T) {

src.TransferAmount[aggRateTokenIndex] = big.NewInt(0)
src.TransferAmount[bpsAndAggTokenIndex] = overCapacityAmount
failedTx, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
require.Error(t, err, "Limited token transfer should immediately revert")
errReason, _, err = src.Common.ChainClient.RevertReasonFromTx(failedTx, evm_2_evm_onramp.EVM2EVMOnRampABI)
require.NoError(t, err)
Expand Down Expand Up @@ -567,7 +567,7 @@ func TestSmokeCCIPOnRampLimits(t *testing.T) {
src.TransferAmount[bpsTokenIndex] = big.NewInt(0)
src.TransferAmount[aggRateTokenIndex] = capacityLimit
src.TransferAmount[bpsAndAggTokenIndex] = big.NewInt(0)
failedTx, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
require.Error(t, err, "Aggregate rate limited token transfer should immediately revert")
errReason, _, err = src.Common.ChainClient.RevertReasonFromTx(failedTx, evm_2_evm_onramp.EVM2EVMOnRampABI)
require.NoError(t, err)
Expand All @@ -579,7 +579,7 @@ func TestSmokeCCIPOnRampLimits(t *testing.T) {

src.TransferAmount[aggRateTokenIndex] = big.NewInt(0)
src.TransferAmount[bpsAndAggTokenIndex] = capacityLimit
failedTx, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
require.Error(t, err, "Aggregate rate limited token transfer should immediately revert")
errReason, _, err = src.Common.ChainClient.RevertReasonFromTx(failedTx, evm_2_evm_onramp.EVM2EVMOnRampABI)
require.NoError(t, err)
Expand Down Expand Up @@ -700,7 +700,7 @@ func TestSmokeCCIPTokenPoolRateLimits(t *testing.T) {
// Send limited token over capacity and ensure it fails
src.TransferAmount[freeTokenIndex] = big.NewInt(0)
src.TransferAmount[limitedTokenIndex] = overCapacityAmount
failedTx, _, _, err := tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err := tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
require.Error(t, err, "Limited token transfer should immediately revert")
errReason, _, err := src.Common.ChainClient.RevertReasonFromTx(failedTx, lock_release_token_pool.LockReleaseTokenPoolABI)
require.NoError(t, err)
Expand Down Expand Up @@ -731,7 +731,7 @@ func TestSmokeCCIPTokenPoolRateLimits(t *testing.T) {
// Send limited token over rate limit and ensure it fails
src.TransferAmount[freeTokenIndex] = big.NewInt(0)
src.TransferAmount[limitedTokenIndex] = capacityLimit
failedTx, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
failedTx, _, _, _, err = tc.lane.Source.SendRequest(tc.lane.Dest.ReceiverDapp.EthAddress, big.NewInt(actions.DefaultDestinationGasLimit))
require.Error(t, err, "Limited token transfer should immediately revert")
errReason, _, err = src.Common.ChainClient.RevertReasonFromTx(failedTx, lock_release_token_pool.LockReleaseTokenPoolABI)
require.NoError(t, err)
Expand Down

0 comments on commit 0824d90

Please sign in to comment.