diff --git a/.changeset/witty-numbers-sleep.md b/.changeset/witty-numbers-sleep.md new file mode 100644 index 00000000000..d42664d9f76 --- /dev/null +++ b/.changeset/witty-numbers-sleep.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Support for retention in LogPoller's filters registered by ContractTransmitter #changed diff --git a/core/services/relay/evm/contract_transmitter.go b/core/services/relay/evm/contract_transmitter.go index af0f83f6979..724bbbe4aa0 100644 --- a/core/services/relay/evm/contract_transmitter.go +++ b/core/services/relay/evm/contract_transmitter.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/hex" "math/big" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" @@ -63,13 +64,29 @@ func NewOCRContractTransmitter( lp logpoller.LogPoller, lggr logger.Logger, reportToEvmTxMeta ReportToEthMetadata, +) (*contractTransmitter, error) { + return NewOCRContractTransmitterWithRetention(ctx, address, caller, contractABI, transmitter, lp, lggr, reportToEvmTxMeta, 0) +} + +func NewOCRContractTransmitterWithRetention( + ctx context.Context, + address gethcommon.Address, + caller contractReader, + contractABI abi.ABI, + transmitter Transmitter, + lp logpoller.LogPoller, + lggr logger.Logger, + reportToEvmTxMeta ReportToEthMetadata, + retention time.Duration, ) (*contractTransmitter, error) { transmitted, ok := contractABI.Events["Transmitted"] if !ok { return nil, errors.New("invalid ABI, missing transmitted") } - err := lp.RegisterFilter(ctx, logpoller.Filter{Name: transmitterFilterName(address), EventSigs: []common.Hash{transmitted.ID}, Addresses: []common.Address{address}}) + // TODO It would be better to keep MaxLogsKept = 1 for the OCR contract transmitter instead of Retention. We are always interested only in the latest log. + // Although MaxLogsKept is present in the Filter struct, it is not supported by LogPoller yet. + err := lp.RegisterFilter(ctx, logpoller.Filter{Name: transmitterFilterName(address), EventSigs: []common.Hash{transmitted.ID}, Addresses: []common.Address{address}, Retention: retention}) if err != nil { return nil, err } diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 737a8e7561e..585d20df3ab 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -7,6 +7,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -172,7 +173,7 @@ func (r *Relayer) NewPluginProvider(rargs commontypes.RelayArgs, pargs commontyp return nil, err } - transmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI) + transmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0) if err != nil { return nil, err } @@ -476,7 +477,7 @@ type configTransmitterOpts struct { } // newOnChainContractTransmitter creates a new contract transmitter. -func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts, transmissionContractABI abi.ABI) (*contractTransmitter, error) { +func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rargs commontypes.RelayArgs, transmitterID string, ethKeystore keystore.Eth, configWatcher *configWatcher, opts configTransmitterOpts, transmissionContractABI abi.ABI, transmissionContractRetention time.Duration) (*contractTransmitter, error) { var relayConfig types.RelayConfig if err := json.Unmarshal(rargs.RelayConfig, &relayConfig); err != nil { return nil, err @@ -540,7 +541,7 @@ func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rarg return nil, pkgerrors.Wrap(err, "failed to create transmitter") } - return NewOCRContractTransmitter( + return NewOCRContractTransmitterWithRetention( ctx, configWatcher.contractAddress, configWatcher.chain.Client(), @@ -549,6 +550,7 @@ func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rarg configWatcher.chain.LogPoller(), lggr, nil, + transmissionContractRetention, ) } @@ -578,7 +580,7 @@ func (r *Relayer) NewMedianProvider(rargs commontypes.RelayArgs, pargs commontyp reportCodec := evmreportcodec.ReportCodec{} - contractTransmitter, err := newOnChainContractTransmitter(ctx, lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI) + contractTransmitter, err := newOnChainContractTransmitter(ctx, lggr, rargs, pargs.TransmitterID, r.ks.Eth(), configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2keeper.go b/core/services/relay/evm/ocr2keeper.go index 78f4b43b43f..a839ce8430e 100644 --- a/core/services/relay/evm/ocr2keeper.go +++ b/core/services/relay/evm/ocr2keeper.go @@ -91,7 +91,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p } gasLimit := cfgWatcher.chain.Config().EVM().OCR2().Automation().GasLimit() - contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, cfgWatcher, configTransmitterOpts{pluginGasLimit: &gasLimit}, OCR2AggregatorTransmissionContractABI) + contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, cfgWatcher, configTransmitterOpts{pluginGasLimit: &gasLimit}, OCR2AggregatorTransmissionContractABI, 0) if err != nil { return nil, err } diff --git a/core/services/relay/evm/ocr2vrf.go b/core/services/relay/evm/ocr2vrf.go index a108151be47..b83ce0fd81e 100644 --- a/core/services/relay/evm/ocr2vrf.go +++ b/core/services/relay/evm/ocr2vrf.go @@ -64,7 +64,7 @@ func (r *ocr2vrfRelayer) NewDKGProvider(rargs commontypes.RelayArgs, pargs commo if err != nil { return nil, err } - contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI) + contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0) if err != nil { return nil, err } @@ -91,7 +91,7 @@ func (r *ocr2vrfRelayer) NewOCR2VRFProvider(rargs commontypes.RelayArgs, pargs c if err != nil { return nil, err } - contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI) + contractTransmitter, err := newOnChainContractTransmitter(ctx, r.lggr, rargs, pargs.TransmitterID, r.ethKeystore, configWatcher, configTransmitterOpts{}, OCR2AggregatorTransmissionContractABI, 0) if err != nil { return nil, err }