Skip to content

Commit

Permalink
Merge pull request #615 from onflow/petera/add-traces-backfill-mode
Browse files Browse the repository at this point in the history
Add traces backfill option
  • Loading branch information
peterargue authored Oct 17, 2024
2 parents ed2e887 + 03d06c5 commit 5872cfb
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 41 deletions.
72 changes: 37 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,41 +172,43 @@ Running the EVM gateway for mainnet requires additional security and stability m

The application can be configured using the following flags at runtime:

| Flag | Default Value | Description |
|------------------------------|---------------------------------|------------------------------------------------------------------------------------------|
| `database-dir` | `./db` | Path to the directory for the database |
| `rpc-host` | `""` | Host for the RPC API server |
| `rpc-port` | `8545` | Port for the RPC API server |
| `ws-enabled` | `false` | Enable websocket connections |
| `access-node-grpc-host` | `localhost:3569` | Host to the flow access node gRPC API |
| `access-node-spork-hosts` | `""` | Previous spork AN hosts, defined as a comma-separated list (e.g. `"host-1.com,host2.com"`) |
| `flow-network-id` | `flow-emulator` | Flow network ID (options: `flow-emulator`, `flow-testnet`, `flow-mainnet`) |
| `coinbase` | `""` | Coinbase address to use for fee collection |
| `init-cadence-height` | `0` | Cadence block height to start indexing; avoid using on a new network |
| `gas-price` | `1` | Static gas price for EVM transactions |
| `coa-address` | `""` | Flow address holding COA account for submitting transactions |
| `coa-key` | `""` | Private key for the COA address used for transactions |
| `coa-key-file` | `""` | Path to a JSON file of COA keys for key-rotation (exclusive with `coa-key` flag) |
| `coa-resource-create` | `false` | Auto-create the COA resource if it doesn't exist in the Flow COA account |
| `coa-cloud-kms-project-id` | `""` | Project ID for KMS keys (e.g. `flow-evm-gateway`) |
| `coa-cloud-kms-location-id` | `""` | Location ID for KMS key ring (e.g. 'global') |
| `coa-cloud-kms-key-ring-id` | `""` | Key ring ID for KMS keys (e.g. 'tx-signing') |
| `coa-cloud-kms-keys` | `""` | KMS keys and versions, comma-separated (e.g. `"gw-key-6@1,gw-key-7@1"`) |
| `log-level` | `debug` | Log verbosity level (`debug`, `info`, `warn`, `error`, `fatal`, `panic`) |
| `log-writer` | `stderr` | Output method for logs (`stderr`, `console`) |
| `stream-limit` | `10` | Rate-limit for client events sent per second |
| `rate-limit` | `50` | Requests per second limit for clients over any protocol (ws/http) |
| `address-header` | `""` | Header for client IP when server is behind a proxy |
| `heartbeat-interval` | `100` | Interval for AN event subscription heartbeats |
| `stream-timeout` | `3` | Timeout in seconds for sending events to clients |
| `force-start-height` | `0` | Force-set starting Cadence height (local/testing use only) |
| `wallet-api-key` | `""` | ECDSA private key for wallet APIs (local/testing use only) |
| `filter-expiry` | `5m` | Expiry time for idle filters |
| `traces-gcp-bucket` | `""` | GCP bucket name for transaction traces |
| `index-only` | `false` | Run in index-only mode, allowing state queries and indexing but no transaction sending |
| `profiler-enabled` | `false` | Enable the pprof profiler server |
| `profiler-host` | `localhost` | Host for the pprof profiler |
| `profiler-port` | `6060` | Port for the pprof profiler |
| Flag | Default Value | Description |
|--------------------------------|-------------------------------|------------------------------------------------------------------------------------------|
| `database-dir` | `./db` | Path to the directory for the database |
| `rpc-host` | `""` | Host for the RPC API server |
| `rpc-port` | `8545` | Port for the RPC API server |
| `ws-enabled` | `false` | Enable websocket connections |
| `access-node-grpc-host` | `localhost:3569` | Host to the flow access node gRPC API |
| `access-node-spork-hosts` | `""` | Previous spork AN hosts, defined as a comma-separated list (e.g. `"host-1.com,host2.com"`) |
| `flow-network-id` | `flow-emulator` | Flow network ID (options: `flow-emulator`, `flow-testnet`, `flow-mainnet`) |
| `coinbase` | `""` | Coinbase address to use for fee collection |
| `init-cadence-height` | `0` | Cadence block height to start indexing; avoid using on a new network |
| `gas-price` | `1` | Static gas price for EVM transactions |
| `coa-address` | `""` | Flow address holding COA account for submitting transactions |
| `coa-key` | `""` | Private key for the COA address used for transactions |
| `coa-key-file` | `""` | Path to a JSON file of COA keys for key-rotation (exclusive with `coa-key` flag) |
| `coa-resource-create` | `false` | Auto-create the COA resource if it doesn't exist in the Flow COA account |
| `coa-cloud-kms-project-id` | `""` | Project ID for KMS keys (e.g. `flow-evm-gateway`) |
| `coa-cloud-kms-location-id` | `""` | Location ID for KMS key ring (e.g. 'global') |
| `coa-cloud-kms-key-ring-id` | `""` | Key ring ID for KMS keys (e.g. 'tx-signing') |
| `coa-cloud-kms-keys` | `""` | KMS keys and versions, comma-separated (e.g. `"gw-key-6@1,gw-key-7@1"`) |
| `log-level` | `debug` | Log verbosity level (`debug`, `info`, `warn`, `error`, `fatal`, `panic`) |
| `log-writer` | `stderr` | Output method for logs (`stderr`, `console`) |
| `stream-limit` | `10` | Rate-limit for client events sent per second |
| `rate-limit` | `50` | Requests per second limit for clients over any protocol (ws/http) |
| `address-header` | `""` | Header for client IP when server is behind a proxy |
| `heartbeat-interval` | `100` | Interval for AN event subscription heartbeats |
| `stream-timeout` | `3` | Timeout in seconds for sending events to clients |
| `force-start-height` | `0` | Force-set starting Cadence height (local/testing use only) |
| `wallet-api-key` | `""` | ECDSA private key for wallet APIs (local/testing use only) |
| `filter-expiry` | `5m` | Expiry time for idle filters |
| `traces-gcp-bucket` | `""` | GCP bucket name for transaction traces |
| `traces-backfill-start-height` | `0` | Start height for backfilling transaction traces |
| `traces-backfill-end-height` | `0` | End height for backfilling transaction traces |
| `index-only` | `false` | Run in index-only mode, allowing state queries and indexing but no transaction sending |
| `profiler-enabled` | `false` | Enable the pprof profiler server |
| `profiler-host` | `localhost` | Host for the pprof profiler |
| `profiler-port` | `6060` | Port for the pprof profiler |

# Deploying
Deploying the EVM Gateway node comes with some prerequisites as well as expectations and they are best explained in the WIP document: https://flowfoundation.notion.site/EVM-Gateway-Deployment-3c41da6710af40acbaf971e22ce0a9fd
Expand Down
4 changes: 2 additions & 2 deletions api/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func NewProfileServer(
}
}

func (h *ProfileServer) ListenAddr() string {
return h.endpoint
func (s *ProfileServer) ListenAddr() string {
return s.endpoint
}

func (s *ProfileServer) Start() {
Expand Down
33 changes: 33 additions & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,39 @@ func (b *Bootstrap) StartTraceDownloader(ctx context.Context) error {
)

StartEngine(ctx, b.traces, l)

if b.config.TracesBackfillStartHeight > 0 {
startHeight := b.config.TracesBackfillStartHeight
if _, err := b.storages.Blocks.GetByHeight(startHeight); err != nil {
return fmt.Errorf("failed to get provided start height %d in db: %w", startHeight, err)
}

cadenceStartHeight, err := b.storages.Blocks.GetCadenceHeight(startHeight)
if err != nil {
return fmt.Errorf("failed to get cadence height for backfill start height %d: %w", startHeight, err)
}

if cadenceStartHeight < b.config.InitCadenceHeight {
b.logger.Warn().
Uint64("evm-start-height", startHeight).
Uint64("cadence-start-height", cadenceStartHeight).
Uint64("init-cadence-height", b.config.InitCadenceHeight).
Msg("backfill start height is before initial cadence height. data may be missing from configured traces bucket")
}

endHeight := b.config.TracesBackfillEndHeight
if endHeight == 0 {
endHeight, err = b.storages.Blocks.LatestEVMHeight()
if err != nil {
return fmt.Errorf("failed to get latest EVM height: %w", err)
}
} else if _, err := b.storages.Blocks.GetByHeight(endHeight); err != nil {
return fmt.Errorf("failed to get provided end height %d in db: %w", endHeight, err)
}

go b.traces.Backfill(startHeight, endHeight)
}

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type Config struct {
TracesBucketName string
// TracesEnabled sets whether the node is supporting transaction traces.
TracesEnabled bool
// TracesBackfillStartHeight sets the starting block height for backfilling missing traces.
TracesBackfillStartHeight uint64
// TracesBackfillEndHeight sets the ending block height for backfilling missing traces.
TracesBackfillEndHeight uint64
// WalletEnabled sets whether wallet APIs are enabled
WalletEnabled bool
// WalletKey used for signing transactions
Expand Down Expand Up @@ -158,6 +162,8 @@ func FromFlags() (*Config, error) {
flag.Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. WARNING: This should only be used locally or for testing, never in production.")
flag.StringVar(&filterExpiry, "filter-expiry", "5m", "Filter defines the time it takes for an idle filter to expire")
flag.StringVar(&cfg.TracesBucketName, "traces-gcp-bucket", "", "GCP bucket name where transaction traces are stored")
flag.Uint64Var(&cfg.TracesBackfillStartHeight, "traces-backfill-start-height", 0, "evm block height from which to start backfilling missing traces.")
flag.Uint64Var(&cfg.TracesBackfillEndHeight, "traces-backfill-end-height", 0, "evm block height until which to backfill missing traces. If 0, backfill until the latest block")
flag.StringVar(&cloudKMSProjectID, "coa-cloud-kms-project-id", "", "The project ID containing the KMS keys, e.g. 'flow-evm-gateway'")
flag.StringVar(&cloudKMSLocationID, "coa-cloud-kms-location-id", "", "The location ID where the key ring is grouped into, e.g. 'global'")
flag.StringVar(&cloudKMSKeyRingID, "coa-cloud-kms-key-ring-id", "", "The key ring ID where the KMS keys exist, e.g. 'tx-signing'")
Expand Down Expand Up @@ -310,6 +316,10 @@ func FromFlags() (*Config, error) {

cfg.TracesEnabled = cfg.TracesBucketName != ""

if cfg.TracesBackfillStartHeight > 0 && cfg.TracesBackfillEndHeight > 0 && cfg.TracesBackfillStartHeight > cfg.TracesBackfillEndHeight {
return nil, fmt.Errorf("traces backfill start height must be less than the end height")
}

if walletKey != "" {
k, err := gethCrypto.HexToECDSA(walletKey)
if err != nil {
Expand Down
55 changes: 53 additions & 2 deletions services/traces/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ func (e *Engine) Notify(block *models.Block) {
return
}

go e.indexBlockTraces(block, cadenceID)
go e.indexBlockTraces(block, cadenceID, false)
}

// indexBlockTraces iterates the block transaction hashes and tries to download the traces
func (e *Engine) indexBlockTraces(evmBlock *models.Block, cadenceBlockID flow.Identifier) {
func (e *Engine) indexBlockTraces(evmBlock *models.Block, cadenceBlockID flow.Identifier, skipExisting bool) {
ctx, cancel := context.WithTimeout(context.Background(), downloadTimeout)
defer cancel()

Expand All @@ -107,9 +107,17 @@ func (e *Engine) indexBlockTraces(evmBlock *models.Block, cadenceBlockID flow.Id

l := e.logger.With().
Str("tx-id", h.String()).
Uint64("evm-height", evmBlock.Height).
Str("cadence-block-id", cadenceBlockID.String()).
Logger()

if skipExisting {
if _, err := e.traces.GetTransaction(h); err == nil {
l.Debug().Msg("trace already downloaded")
return
}
}

err := retry.Fibonacci(ctx, time.Second*1, func(ctx context.Context) error {
trace, err := e.downloader.Download(h, cadenceBlockID)
if err != nil {
Expand Down Expand Up @@ -140,3 +148,46 @@ func (e *Engine) Error() <-chan error {
func (e *Engine) Stop() {
e.MarkStopped()
}

// Backfill redownloads traces for blocks from EVM start to end height.
func (e *Engine) Backfill(start uint64, end uint64) {
select {
case <-e.Ready():
case <-e.Done():
return
}

lg := e.logger.With().Uint64("start", start).Uint64("end", end).Logger()

lg.Info().Msg("backfilling traces")
for height := start; height <= end; height++ {
select {
case <-e.Done():
return
case <-e.Stopped():
return
default:
}

l := lg.With().Uint64("evm-height", height).Logger()

block, err := e.blocks.GetByHeight(height)
if err != nil {
l.Error().Err(err).Msg("failed to get block by height")
return
}

if len(block.TransactionHashes) == 0 {
continue
}

cadenceID, err := e.blocks.GetCadenceID(block.Height)
if err != nil {
l.Error().Err(err).Msg("failed to get cadence block ID")
return
}

e.indexBlockTraces(block, cadenceID, true)
}
lg.Info().Msg("done backfilling traces")
}
2 changes: 0 additions & 2 deletions services/traces/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,10 @@ func TestTraceIngestion(t *testing.T) {
return blockID, nil
})

downloadedHashes := make(map[gethCommon.Hash]struct{})
downloader.
On("Download", mock.Anything, mock.Anything).
Return(func(txID gethCommon.Hash, blkID flow.Identifier) (json.RawMessage, error) {
require.Equal(t, blockID, blkID)
downloadedHashes[txID] = struct{}{}
time.Sleep(time.Millisecond * 200) // simulate download delay
return txTrace(txID), nil
})
Expand Down

0 comments on commit 5872cfb

Please sign in to comment.