Skip to content

Commit

Permalink
Merge pull request #653 from onflow/janez/performace-changes
Browse files Browse the repository at this point in the history
Ingestion Performance improvements
  • Loading branch information
janezpodhostnik authored Nov 14, 2024
2 parents b08b495 + ca019da commit b3212e0
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 50 deletions.
20 changes: 15 additions & 5 deletions cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
var Cmd = &cobra.Command{
Use: "run",
Short: "Runs the EVM Gateway Node",
Run: func(*cobra.Command, []string) {
Run: func(command *cobra.Command, _ []string) {
// create multi-key account
if _, exists := os.LookupEnv("MULTIKEY_MODE"); exists {
bootstrap.RunCreateMultiKeyAccount()
Expand All @@ -40,13 +40,15 @@ var Cmd = &cobra.Command{
os.Exit(1)
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(command.Context())
done := make(chan struct{})
ready := make(chan struct{})
go func() {
if err := bootstrap.Run(ctx, cfg, ready); err != nil {
defer close(done)
err := bootstrap.Run(ctx, cfg, ready)
if err != nil {
log.Err(err).Msg("failed to run bootstrap")
cancel()
os.Exit(1)
}
}()

Expand All @@ -55,7 +57,15 @@ var Cmd = &cobra.Command{
osSig := make(chan os.Signal, 1)
signal.Notify(osSig, syscall.SIGINT, syscall.SIGTERM)

<-osSig
select {
case <-osSig:
log.Info().Msg("OS Signal to shutdown received, shutting down")
cancel()
case <-done:
log.Info().Msg("done, shutting down")
cancel()
}

log.Info().Msg("OS Signal to shutdown received, shutting down")
cancel()
},
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/sethvargo/go-retry v0.2.3
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.9.0
go.uber.org/ratelimit v0.3.1
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/sync v0.8.0
google.golang.org/grpc v1.63.2
Expand All @@ -35,6 +36,7 @@ require (
github.com/SaveTheRbtz/mph v0.1.1-0.20240117162131-4166ec7869bc // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -762,6 +764,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
Expand Down
18 changes: 9 additions & 9 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,17 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
Int("cadence-event-length", events.Length()).
Msg("received new cadence evm events")

batch := e.store.NewBatch()
defer func(batch *pebbleDB.Batch) {
err := batch.Close()
if err != nil {
e.log.Fatal().Err(err).Msg("failed to close batch")
}
}(batch)

// if heartbeat interval with no data still update the cadence height
if events.Empty() {
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), nil); err != nil {
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), batch); err != nil {
return fmt.Errorf(
"failed to update to latest cadence height: %d, during events ingestion: %w",
events.CadenceHeight(),
Expand All @@ -176,14 +184,6 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return nil // nothing else to do this was heartbeat event with not event payloads
}

batch := e.store.NewBatch()
defer func(batch *pebbleDB.Batch) {
err := batch.Close()
if err != nil {
e.log.Fatal().Err(err).Msg("failed to close batch")
}
}(batch)

// Step 1: Re-execute all transactions on the latest EVM block

// Step 1.1: Notify the `BlocksProvider` of the newly received EVM block
Expand Down
119 changes: 90 additions & 29 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sort"

"github.com/onflow/cadence/common"
"github.com/onflow/flow-go/fvm/evm/events"
Expand Down Expand Up @@ -62,7 +63,8 @@ func NewRPCEventSubscriber(
//
// If error is encountered during backfill the subscription will end and the response chanel will be closed.
func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)
// buffered channel so that the decoding of the events can happen in parallel to other operations
eventsChan := make(chan models.BlockEvents, 1000)

go func() {
defer func() {
Expand Down Expand Up @@ -190,12 +192,9 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
return eventsChan
}

// backfill will use the provided height and with the client for the provided spork will start backfilling
// events. Before subscribing, it will check what is the latest block in the current spork (defined by height)
// and check for each event it receives whether we reached the end, if we reach the end it will increase
// the height by one (next height), and check if we are still in previous sporks, if so repeat everything,
// otherwise return.
func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents {
// backfill returns a channel that is filled with block events from the provided fromCadenceHeight up to the first
// height in the current spork.
func (r *RPCEventSubscriber) backfill(ctx context.Context, fromCadenceHeight uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

go func() {
Expand All @@ -204,49 +203,111 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan
}()

for {
// check if the current height is still in past sporks, and if not return since we are done with backfilling
if !r.client.IsPastSpork(height) {
// check if the current fromCadenceHeight is still in past sporks, and if not return since we are done with backfilling
if !r.client.IsPastSpork(fromCadenceHeight) {
r.logger.Info().
Uint64("height", height).
Uint64("height", fromCadenceHeight).
Msg("completed backfilling")

return
}

latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height)
var err error
fromCadenceHeight, err = r.backfillSporkFromHeight(ctx, fromCadenceHeight, eventsChan)
if err != nil {
r.logger.Error().Err(err).Msg("error backfilling spork")
eventsChan <- models.NewBlockEventsError(err)
return
}

r.logger.Info().
Uint64("start-height", height).
Uint64("last-spork-height", latestHeight).
Msg("backfilling spork")
Uint64("next-cadence-height", fromCadenceHeight).
Msg("reached the end of spork, checking next spork")
}
}()

for ev := range r.subscribe(ctx, height) {
eventsChan <- ev
return eventsChan
}

if ev.Err != nil {
return
}
// maxRangeForGetEvents is the maximum range of blocks that can be fetched using the GetEventsForHeightRange method.
const maxRangeForGetEvents = uint64(249)

// / backfillSporkFromHeight will fill the eventsChan with block events from the provided fromHeight up to the first height in the spork that comes
// after the spork of the provided fromHeight.
func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) {
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)

lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromCadenceHeight)
if err != nil {
eventsChan <- models.NewBlockEventsError(err)
return 0, err
}

r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", ev.Events.CadenceHeight(), latestHeight))
r.logger.Info().
Uint64("start-height", fromCadenceHeight).
Uint64("last-spork-height", lastHeight).
Msg("backfilling spork")

if ev.Events != nil && ev.Events.CadenceHeight() == latestHeight {
height = ev.Events.CadenceHeight() + 1 // go to next height in the next spork
for fromCadenceHeight < lastHeight {
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d] ...", fromCadenceHeight, lastHeight))

r.logger.Info().
Uint64("next-height", height).
Msg("reached the end of spork, checking next spork")
startHeight := fromCadenceHeight
endHeight := fromCadenceHeight + maxRangeForGetEvents
if endHeight > lastHeight {
endHeight = lastHeight
}

break
}
blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
if err != nil {
return 0, fmt.Errorf("failed to get block events: %w", err)
}

transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight)
if err != nil {
return 0, fmt.Errorf("failed to get block events: %w", err)
}

if len(transactions) != len(blocks) {
return 0, fmt.Errorf("transactions and blocks have different length")
}

// sort both, just in case
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Height < blocks[j].Height
})
sort.Slice(transactions, func(i, j int) bool {
return transactions[i].Height < transactions[j].Height
})

for i := range transactions {
if transactions[i].Height != blocks[i].Height {
return 0, fmt.Errorf("transactions and blocks have different height")
}

// append the transaction events to the block events
blocks[i].Events = append(blocks[i].Events, transactions[i].Events...)

evmEvents := models.NewBlockEvents(blocks[i])
eventsChan <- evmEvents

// advance the height
fromCadenceHeight = evmEvents.Events.CadenceHeight() + 1
}
}()

return eventsChan
}
return fromCadenceHeight, nil
}

// fetchMissingData is used as a backup mechanism for fetching EVM-related
Expand Down
6 changes: 4 additions & 2 deletions services/ingestion/event_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
prevHeight = eventHeight

if eventHeight == cadenceHeight {
assert.Equal(t, evmBlock, ev.Events.Block())
require.Equal(t, evmBlock, ev.Events.Block())
for i := 0; i < len(txHashes); i++ {
tx := ev.Events.Transactions()[i]
assert.Equal(t, txHashes[i], tx.Hash())
require.Equal(t, txHashes[i], tx.Hash())
}
}
}
Expand Down Expand Up @@ -417,6 +417,8 @@ func setupClientForBackupEventFetching(
cadenceHeight,
).Return([]flow.BlockEvents{evmTxEvents}, nil).Once()

client.GetEventsForHeightRangeFunc = nil

client.SubscribeEventsByBlockHeightFunc = func(
ctx context.Context,
startHeight uint64,
Expand Down
39 changes: 34 additions & 5 deletions services/requester/cross-spork_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,35 @@ import (
"fmt"

"github.com/onflow/cadence"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go-sdk/access"
flowGo "github.com/onflow/flow-go/model/flow"
"github.com/rs/zerolog"
"go.uber.org/ratelimit"
"golang.org/x/exp/slices"

errs "github.com/onflow/flow-evm-gateway/models/errors"
)

type sporkClient struct {
firstHeight uint64
lastHeight uint64
client access.Client
firstHeight uint64
lastHeight uint64
client access.Client
getEventsForHeightRangeLimiter ratelimit.Limiter
}

// contains checks if the provided height is withing the range of available heights
func (s *sporkClient) contains(height uint64) bool {
return height >= s.firstHeight && height <= s.lastHeight
}

func (s *sporkClient) GetEventsForHeightRange(
ctx context.Context, eventType string, startHeight uint64, endHeight uint64,
) ([]flow.BlockEvents, error) {
s.getEventsForHeightRangeLimiter.Take()

return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
}

type sporkClients []*sporkClient

// addSpork will add a new spork host defined by the first and last height boundary in that spork.
Expand All @@ -48,6 +57,8 @@ func (s *sporkClients) add(logger zerolog.Logger, client access.Client) error {
firstHeight: info.NodeRootBlockHeight,
lastHeight: header.Height,
client: client,
// TODO (JanezP): Make this configurable
getEventsForHeightRangeLimiter: ratelimit.New(100, ratelimit.WithoutSlack),
})

// make sure clients are always sorted
Expand Down Expand Up @@ -214,3 +225,21 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight(
}
return client.SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...)
}

func (c *CrossSporkClient) GetEventsForHeightRange(
ctx context.Context, eventType string, startHeight uint64, endHeight uint64,
) ([]flow.BlockEvents, error) {
client, err := c.getClientForHeight(startHeight)
if err != nil {
return nil, err
}
endClient, err := c.getClientForHeight(endHeight)
if err != nil {
return nil, err
}
// there is one client reference per spork, so we can compare the clients
if endClient != client {
return nil, fmt.Errorf("invalid height range, end height %d is not in the same spork as start height %d", endHeight, startHeight)
}
return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
}
Loading

0 comments on commit b3212e0

Please sign in to comment.