Skip to content

Commit

Permalink
feat: FilterClaimSubmission in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
mpolitzer committed Dec 17, 2024
1 parent d0848e6 commit 6ba002a
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 25 deletions.
1 change: 1 addition & 0 deletions cmd/cartesi-rollups-claimer/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
},
EnableSubmission: true,
MaxStartupTime: 10 * time.Second,
ChunkSize: 50000, // hallf of alchemy limit by default
}
)

Expand Down
3 changes: 3 additions & 0 deletions internal/claimer/claimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type CreateInfo struct {
Repository *repository.Database
EnableSubmission bool
MaxStartupTime time.Duration
ChunkSize uint64
}

type Service struct {
Expand All @@ -84,6 +85,7 @@ type Service struct {
EthConn *ethclient.Client
TxOpts *bind.TransactOpts
claimsInFlight map[address]hash // -> txHash
chunkSize uint64
}

func (c *CreateInfo) LoadEnv() {
Expand All @@ -108,6 +110,7 @@ func Create(c *CreateInfo, s *Service) error {
}

return service.WithTimeout(c.MaxStartupTime, func() error {
s.chunkSize = c.ChunkSize
s.submissionEnabled = c.EnableSubmission
if s.EthConn == nil {
if c.EthConn == nil {
Expand Down
94 changes: 69 additions & 25 deletions internal/claimer/side-effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
package claimer

import (
"context"
"fmt"
"iter"
"math"
"math/big"
"os"

"github.com/cartesi/rollups-node/pkg/contracts/iconsensus"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)

type sideEffects interface {
Expand Down Expand Up @@ -55,11 +60,9 @@ func (s *Service) selectClaimPairsPerApp() (
computed, accepted, err := s.Repository.SelectClaimPairsPerApp(s.Context)
if err != nil {
s.Logger.Error("selectClaimPairsPerApp:failed",
"service", s.Name,
"error", err)
} else {
s.Logger.Debug("selectClaimPairsPerApp:success",
"service", s.Name,
"len(computed)", len(computed),
"len(accepted)", len(accepted))
}
Expand Down Expand Up @@ -95,7 +98,7 @@ func (s *Service) findClaimSubmissionEventAndSucc(
*claimSubmissionEvent,
error,
) {
ic, curr, next, err := s.FindClaimSubmissionEventAndSucc(claim)
ic, curr, next, err := s.FindClaimSubmissionEventAndSucc(claim, s.chunkSize)
if err != nil {
s.Logger.Error("findClaimSubmissionEventAndSucc:failed",
"claim", claim,
Expand All @@ -120,14 +123,12 @@ func (s *Service) submitClaimToBlockchain(
lastBlockNumber, claim.EpochHash)
if err != nil {
s.Logger.Error("submitClaimToBlockchain:failed",
"service", s.Name,
"appContractAddress", claim.AppContractAddress,
"claimHash", claim.EpochHash,
"error", err)
} else {
txHash = tx.Hash()
s.Logger.Debug("submitClaimToBlockchain:success",
"service", s.Name,
"appContractAddress", claim.AppContractAddress,
"claimHash", claim.EpochHash,
"TxHash", txHash)
Expand All @@ -139,28 +140,73 @@ func (s *Service) pollTransaction(txHash hash) (bool, *types.Receipt, error) {
ready, receipt, err := s.PollTransaction(txHash)
if err != nil {
s.Logger.Error("PollTransaction:failed",
"service", s.Name,
"tx", txHash,
"error", err)
} else if ready {
s.Logger.Debug("PollTransaction:success",
"service", s.Name,
"tx", txHash,
"ready", ready,
"blockNumber", receipt.BlockNumber)
} else {
s.Logger.Debug("PollTransaction:pending",
"service", s.Name,
"tx", txHash,
"ready", ready)
}
return ready, receipt, err
}

func chunkedFindSubmissionEvent(
ctx context.Context,
ic *iconsensus.IConsensus,
client *ethclient.Client,
chunk uint64,
start uint64,
end uint64,
) (iter.Seq2[*claimSubmissionEvent, error], error) {
var err error

if chunk == 0 || end < start {
return nil, os.ErrInvalid
}
if end == math.MaxUint64 {
end, err = client.BlockNumber(ctx)
if err != nil {
return nil, err
}
}

return func(yield func(*claimSubmissionEvent, error) bool) {
for start < end {
// open range, othewise we get duplicates
limit := min(start+chunk-1, end)
it, err := ic.FilterClaimSubmission(&bind.FilterOpts{
Context: ctx,
Start: start,
End: &limit,
}, nil, nil)
if err != nil {
yield(nil, err)
return
}

for it.Next() {
yield(it.Event, nil)
}
if it.Error() != nil {
yield(nil, err)
return
}

start = limit + 1
}
}, nil
}

// scan the event stream for a claimSubmission event that matches claim.
// return this event and its successor
func (s *Service) FindClaimSubmissionEventAndSucc(
claim *claimRow,
chunkSize uint64,
) (
*iconsensus.IConsensus,
*claimSubmissionEvent,
Expand All @@ -172,34 +218,32 @@ func (s *Service) FindClaimSubmissionEventAndSucc(
return nil, nil, nil, err
}

it, err := ic.FilterClaimSubmission(&bind.FilterOpts{
Context: s.Context,
Start: claim.EpochLastBlock,
}, nil, nil)
it, err := chunkedFindSubmissionEvent(s.Context, ic, s.EthConn,
chunkSize, claim.EpochLastBlock, math.MaxUint64)
if err != nil {
return nil, nil, nil, err
}
next, stop := iter.Pull2(it)
defer stop()

for it.Next() {
event := it.Event
for {
event, err, ok := next()
if err != nil || !ok {
return ic, nil, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()

if claimMatchesEvent(claim, event) {
var succ *claimSubmissionEvent = nil
if it.Next() {
succ = it.Event
succ, err, ok := next()
if err != nil || !ok {
return ic, event, nil, err
}
if it.Error() != nil {
return nil, nil, nil, it.Error()
}
return ic, event, succ, nil
return ic, event, succ, err
} else if lastBlock > claim.EpochLastBlock {
err = fmt.Errorf("claim not found, searched up to %v", event)
return nil, nil, nil, err
}
}
if it.Error() != nil {
return nil, nil, nil, it.Error()
}
return ic, nil, nil, nil
}

/* poll a transaction hash for its submission status and receipt */
Expand Down

0 comments on commit 6ba002a

Please sign in to comment.