From 99c9d4d94455c92019eda6409858ec55669e523b Mon Sep 17 00:00:00 2001 From: Marcelo Politzer <251334+mpolitzer@users.noreply.github.com> Date: Mon, 16 Dec 2024 12:30:21 -0300 Subject: [PATCH] feat: FilterClaimSubmission in chunks --- cmd/cartesi-rollups-claimer/root/root.go | 1 + internal/claimer/claimer.go | 3 + internal/claimer/side-effects.go | 87 +++++++++++++++++++----- 3 files changed, 73 insertions(+), 18 deletions(-) diff --git a/cmd/cartesi-rollups-claimer/root/root.go b/cmd/cartesi-rollups-claimer/root/root.go index ba7f002b3..02a56dae1 100644 --- a/cmd/cartesi-rollups-claimer/root/root.go +++ b/cmd/cartesi-rollups-claimer/root/root.go @@ -26,6 +26,7 @@ var ( }, EnableSubmission: true, MaxStartupTime: 10 * time.Second, + ChunkSize: 50000, // hallf of alchemy limit by default } ) diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index d62c65479..1e5a469bd 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -74,6 +74,7 @@ type CreateInfo struct { Repository *repository.Database EnableSubmission bool MaxStartupTime time.Duration + ChunkSize uint64 } type Service struct { @@ -84,6 +85,7 @@ type Service struct { EthConn *ethclient.Client TxOpts *bind.TransactOpts claimsInFlight map[address]hash // -> txHash + chunkSize uint64 } func (c *CreateInfo) LoadEnv() { @@ -94,6 +96,7 @@ func (c *CreateInfo) LoadEnv() { c.BlockchainHttpEndpoint.Value = config.GetBlockchainHttpEndpoint() c.PostgresEndpoint.Value = config.GetPostgresEndpoint() c.PollInterval = config.GetClaimerPollingInterval() + c.MaxStartupTime = config.GetMaxStartupTime() c.LogLevel = service.LogLevel(config.GetLogLevel()) c.LogPretty = config.GetLogPrettyEnabled() } diff --git a/internal/claimer/side-effects.go b/internal/claimer/side-effects.go index 9ff7853f8..9c5e11a3c 100644 --- a/internal/claimer/side-effects.go +++ b/internal/claimer/side-effects.go @@ -4,12 +4,17 @@ package claimer import ( + "context" "fmt" + "iter" + "math" "math/big" + "os" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" ) type sideEffects interface { @@ -97,7 +102,7 @@ func (s *Service) findClaimSubmissionEventAndSucc( *claimSubmissionEvent, error, ) { - ic, curr, next, err := s.FindClaimSubmissionEventAndSucc(claim) + ic, curr, next, err := s.FindClaimSubmissionEventAndSucc(claim, s.chunkSize) if err != nil { s.Logger.Error("findClaimSubmissionEventAndSucc:failed", "service", s.Name, @@ -161,10 +166,58 @@ func (s *Service) pollTransaction(txHash hash) (bool, *types.Receipt, error) { return ready, receipt, err } +func chunkedFindSubmissionEvent( + ctx context.Context, + ic *iconsensus.IConsensus, + client *ethclient.Client, + chunk uint64, + start uint64, + end uint64, +) (iter.Seq2[*claimSubmissionEvent, error], error) { + var err error + + if chunk == 0 || end < start { + return nil, os.ErrInvalid + } + if end == math.MaxUint64 { + end, err = client.BlockNumber(ctx) + if err != nil { + return nil, err + } + } + + return func(yield func(*claimSubmissionEvent, error) bool) { + for start < end { + // open range, othewise we get duplicates + limit := min(start+chunk-1, end) + it, err := ic.FilterClaimSubmission(&bind.FilterOpts{ + Context: ctx, + Start: start, + End: &limit, + }, nil, nil) + if err != nil { + yield(nil, err) + return + } + + for it.Next() { + yield(it.Event, nil) + } + if it.Error() != nil { + yield(nil, err) + return + } + + start = limit + 1 + } + }, nil +} + // scan the event stream for a claimSubmission event that matches claim. // return this event and its successor func (s *Service) FindClaimSubmissionEventAndSucc( claim *claimRow, + chunkSize uint64, ) ( *iconsensus.IConsensus, *claimSubmissionEvent, @@ -176,34 +229,32 @@ func (s *Service) FindClaimSubmissionEventAndSucc( return nil, nil, nil, err } - it, err := ic.FilterClaimSubmission(&bind.FilterOpts{ - Context: s.Context, - Start: claim.EpochLastBlock, - }, nil, nil) + it, err := chunkedFindSubmissionEvent(s.Context, ic, s.EthConn, + claim.EpochLastBlock, math.MaxUint64, chunkSize) if err != nil { return nil, nil, nil, err } + next, stop := iter.Pull2(it) + defer stop() - for it.Next() { - event := it.Event + for { + event, err, ok := next() + if err != nil || !ok { + return ic, nil, nil, err + } lastBlock := event.LastProcessedBlockNumber.Uint64() + if claimMatchesEvent(claim, event) { - var succ *claimSubmissionEvent = nil - if it.Next() { - succ = it.Event + succ, err, ok := next() + if err != nil || !ok { + return ic, event, nil, err } - if it.Error() != nil { - return nil, nil, nil, it.Error() - } - return ic, event, succ, nil + return ic, event, succ, err } else if lastBlock > claim.EpochLastBlock { err = fmt.Errorf("claim not found, searched up to %v", event) + return nil, nil, nil, err } } - if it.Error() != nil { - return nil, nil, nil, it.Error() - } - return ic, nil, nil, nil } /* poll a transaction hash for its submission status and receipt */