diff --git a/access/errors.go b/access/errors.go index a663179f018..310c86b43b5 100644 --- a/access/errors.go +++ b/access/errors.go @@ -12,6 +12,10 @@ import ( // ErrUnknownReferenceBlock indicates that a transaction references an unknown block. var ErrUnknownReferenceBlock = errors.New("unknown reference block") +// IndexReporterNotInitialized is returned when indexReporter is nil because +// execution data syncing and indexing is disabled +var IndexReporterNotInitialized = errors.New("index reported not initialized") + // IncompleteTransactionError indicates that a transaction is missing one or more required fields. type IncompleteTransactionError struct { MissingFields []string @@ -115,3 +119,14 @@ func IsInsufficientBalanceError(err error) bool { var balanceError InsufficientBalanceError return errors.As(err, &balanceError) } + +// IndexedHeightFarBehindError indicates that a node is far behind on indexing. +type IndexedHeightFarBehindError struct { + SealedHeight uint64 + IndexedHeight uint64 +} + +func (e IndexedHeightFarBehindError) Error() string { + return fmt.Sprintf("the difference between the latest sealed height (%d) and indexed height (%d) exceeds the maximum gap allowed", + e.SealedHeight, e.IndexedHeight) +} diff --git a/access/mock/blocks.go b/access/mock/blocks.go index 153c2160321..088a50aa155 100644 --- a/access/mock/blocks.go +++ b/access/mock/blocks.go @@ -72,6 +72,34 @@ func (_m *Blocks) HeaderByID(id flow.Identifier) (*flow.Header, error) { return r0, r1 } +// IndexedHeight provides a mock function with given fields: +func (_m *Blocks) IndexedHeight() (uint64, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for IndexedHeight") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func() (uint64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // SealedHeader provides a mock function with given fields: func (_m *Blocks) SealedHeader() (*flow.Header, error) { ret := _m.Called() diff --git a/access/validator.go b/access/validator.go index 61a58069c88..d3169d3a41f 100644 --- a/access/validator.go +++ b/access/validator.go @@ -20,22 +20,32 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/execution" "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/module/state_synchronization" "github.com/onflow/flow-go/state" "github.com/onflow/flow-go/state/protocol" ) +// DefaultSealedIndexedHeightThreshold is the default number of blocks between sealed and indexed height +// this sets a limit on how far into the past the payer validator will allow for checking the payer's balance. +const DefaultSealedIndexedHeightThreshold = 30 + type Blocks interface { HeaderByID(id flow.Identifier) (*flow.Header, error) FinalizedHeader() (*flow.Header, error) SealedHeader() (*flow.Header, error) + IndexedHeight() (uint64, error) } type ProtocolStateBlocks struct { - state protocol.State + state protocol.State + indexReporter state_synchronization.IndexReporter } -func NewProtocolStateBlocks(state protocol.State) *ProtocolStateBlocks { - return &ProtocolStateBlocks{state: state} +func NewProtocolStateBlocks(state protocol.State, indexReporter state_synchronization.IndexReporter) *ProtocolStateBlocks { + return &ProtocolStateBlocks{ + state: state, + indexReporter: indexReporter, + } } func (b *ProtocolStateBlocks) HeaderByID(id flow.Identifier) (*flow.Header, error) { @@ -56,7 +66,19 @@ func (b *ProtocolStateBlocks) FinalizedHeader() (*flow.Header, error) { } func (b *ProtocolStateBlocks) SealedHeader() (*flow.Header, error) { + return b.state.Sealed().Head() + +} + +// IndexedHeight returns the highest indexed height by calling corresponding function of indexReporter. +// Expected errors during normal operation: +// - access.IndexReporterNotInitialized - indexed reporter was not initialized. +func (b *ProtocolStateBlocks) IndexedHeight() (uint64, error) { + if b.indexReporter != nil { + return b.indexReporter.HighestIndexedHeight() + } + return 0, IndexReporterNotInitialized } // RateLimiter is an interface for checking if an address is rate limited. @@ -464,6 +486,19 @@ func (v *TransactionValidator) checkSufficientBalanceToPayForTransaction(ctx con return fmt.Errorf("could not fetch block header: %w", err) } + indexedHeight, err := v.blocks.IndexedHeight() + if err != nil { + return fmt.Errorf("could not get indexed height: %w", err) + } + + // we use latest indexed block to get the most up-to-date state data available for executing scripts. + // check here to make sure indexing is within an acceptable tolerance of sealing to avoid issues + // if indexing falls behind + sealedHeight := header.Height + if indexedHeight < sealedHeight-DefaultSealedIndexedHeightThreshold { + return IndexedHeightFarBehindError{SealedHeight: sealedHeight, IndexedHeight: indexedHeight} + } + payerAddress := cadence.NewAddress(tx.Payer) inclusionEffort := cadence.UFix64(tx.InclusionEffort()) gasLimit := cadence.UFix64(tx.GasLimit) @@ -473,7 +508,7 @@ func (v *TransactionValidator) checkSufficientBalanceToPayForTransaction(ctx con return fmt.Errorf("failed to encode cadence args for script executor: %w", err) } - result, err := v.scriptExecutor.ExecuteAtBlockHeight(ctx, v.verifyPayerBalanceScript, args, header.Height) + result, err := v.scriptExecutor.ExecuteAtBlockHeight(ctx, v.verifyPayerBalanceScript, args, indexedHeight) if err != nil { return fmt.Errorf("script finished with error: %w", err) } @@ -489,7 +524,7 @@ func (v *TransactionValidator) checkSufficientBalanceToPayForTransaction(ctx con } // return no error if payer has sufficient balance - if bool(canExecuteTransaction) { + if canExecuteTransaction { return nil } diff --git a/access/validator_test.go b/access/validator_test.go index 5f7c30774e8..58c8bd1e1ab 100644 --- a/access/validator_test.go +++ b/access/validator_test.go @@ -88,6 +88,10 @@ func (s *TransactionValidatorSuite) TestTransactionValidator_ScriptExecutorInter scriptExecutor := execmock.NewScriptExecutor(s.T()) assert.NotNil(s.T(), scriptExecutor) + s.blocks. + On("IndexedHeight"). + Return(s.header.Height, nil) + scriptExecutor. On("ExecuteAtBlockHeight", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil, errors.New("script executor internal error")). @@ -115,6 +119,10 @@ func (s *TransactionValidatorSuite) TestTransactionValidator_SufficientBalance() actualResponse, err := jsoncdc.Encode(actualResponseValue) assert.NoError(s.T(), err) + s.blocks. + On("IndexedHeight"). + Return(s.header.Height, nil) + scriptExecutor. On("ExecuteAtBlockHeight", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(actualResponse, nil). @@ -142,6 +150,10 @@ func (s *TransactionValidatorSuite) TestTransactionValidator_InsufficientBalance actualResponse, err := jsoncdc.Encode(actualResponseValue) assert.NoError(s.T(), err) + s.blocks. + On("IndexedHeight"). + Return(s.header.Height, nil) + scriptExecutor. On("ExecuteAtBlockHeight", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(actualResponse, nil).Twice() @@ -175,3 +187,24 @@ func (s *TransactionValidatorSuite) TestTransactionValidator_InsufficientBalance assert.NoError(s.T(), err) }) } + +func (s *TransactionValidatorSuite) TestTransactionValidator_SealedIndexedHeightThresholdLimit() { + scriptExecutor := execmock.NewScriptExecutor(s.T()) + + // setting indexed height to be behind of sealed by bigger number than allowed(DefaultSealedIndexedHeightThreshold) + indexedHeight := s.header.Height - 40 + + s.blocks. + On("IndexedHeight"). + Return(indexedHeight, nil) + + validator, err := access.NewTransactionValidator(s.blocks, s.chain, s.metrics, s.validatorOptions, scriptExecutor) + assert.NoError(s.T(), err) + assert.NotNil(s.T(), validator) + + txBody := unittest.TransactionBodyFixture() + + err = validator.Validate(context.Background(), &txBody) + assert.NoError(s.T(), err) + +} diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 594e861974e..595f797d600 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1924,9 +1924,16 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil, fmt.Errorf("transaction result query mode 'compare' is not supported") } + // If execution data syncing and indexing is disabled, pass nil indexReporter + var indexReporter state_synchronization.IndexReporter + if builder.executionDataSyncEnabled && builder.executionDataIndexingEnabled { + indexReporter = builder.Reporter + } + checkPayerBalanceMode, err := accessNode.ParsePayerBalanceMode(builder.checkPayerBalanceMode) if err != nil { return nil, fmt.Errorf("could not parse payer balance mode: %w", err) + } nodeBackend, err := backend.New(backend.Params{ @@ -1967,18 +1974,13 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { TxResultQueryMode: txResultQueryMode, TxResultsIndex: builder.TxResultsIndex, LastFullBlockHeight: lastFullBlockHeight, + IndexReporter: indexReporter, VersionControl: builder.VersionControl, }) if err != nil { return nil, fmt.Errorf("could not initialize backend: %w", err) } - // If execution data syncing and indexing is disabled, pass nil indexReporter - var indexReporter state_synchronization.IndexReporter - if builder.executionDataSyncEnabled && builder.executionDataIndexingEnabled { - indexReporter = builder.Reporter - } - engineBuilder, err := rpc.NewBuilder( node.Logger, node.State, diff --git a/cmd/bootstrap/utils/md5.go b/cmd/bootstrap/utils/md5.go index 3abe9c42948..e885ed891e2 100644 --- a/cmd/bootstrap/utils/md5.go +++ b/cmd/bootstrap/utils/md5.go @@ -3,7 +3,7 @@ package utils // The google storage API only provides md5 and crc32 hence overriding the linter flag for md5 // #nosec import ( - "crypto/md5" + "crypto/md5" //nolint:gosec "io" "os" ) diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index a1d1dbcf6a1..27cc55dc432 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -1937,6 +1937,12 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { return nil, fmt.Errorf("failed to initialize block tracker: %w", err) } + // If execution data syncing and indexing is disabled, pass nil indexReporter + var indexReporter state_synchronization.IndexReporter + if builder.executionDataSyncEnabled && builder.executionDataIndexingEnabled { + indexReporter = builder.Reporter + } + backendParams := backend.Params{ State: node.State, Blocks: node.Storage.Blocks, @@ -1963,6 +1969,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.stateStreamConf.ResponseLimit, builder.stateStreamConf.ClientSendBufferSize, ), + IndexReporter: indexReporter, VersionControl: builder.VersionControl, } @@ -1991,12 +1998,6 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { return nil, err } - // If execution data syncing and indexing is disabled, pass nil indexReporter - var indexReporter state_synchronization.IndexReporter - if builder.executionDataSyncEnabled && builder.executionDataIndexingEnabled { - indexReporter = builder.Reporter - } - engineBuilder, err := rpc.NewBuilder( node.Logger, node.State, diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 97bd3c371db..a74e7a1851d 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -23,6 +23,7 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/counters" "github.com/onflow/flow-go/module/execution" + "github.com/onflow/flow-go/module/state_synchronization" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" ) @@ -120,6 +121,7 @@ type Params struct { TxResultQueryMode IndexQueryMode TxResultsIndex *index.TransactionResultsIndex LastFullBlockHeight *counters.PersistentStrictMonotonicCounter + IndexReporter state_synchronization.IndexReporter VersionControl *version.VersionControl } @@ -246,13 +248,7 @@ func New(params Params) (*Backend, error) { versionControl: params.VersionControl, } - txValidator, err := configureTransactionValidator( - params.State, - params.ChainID, - params.AccessMetrics, - params.ScriptExecutor, - params.CheckPayerBalanceMode, - ) + txValidator, err := configureTransactionValidator(params.State, params.ChainID, params.IndexReporter, params.AccessMetrics, params.ScriptExecutor, params.CheckPayerBalanceMode) if err != nil { return nil, fmt.Errorf("could not create transaction validator: %w", err) } @@ -319,12 +315,13 @@ func identifierList(ids []string) (flow.IdentifierList, error) { func configureTransactionValidator( state protocol.State, chainID flow.ChainID, + indexReporter state_synchronization.IndexReporter, transactionMetrics module.TransactionValidationMetrics, executor execution.ScriptExecutor, checkPayerBalanceMode access.PayerBalanceMode, ) (*access.TransactionValidator, error) { return access.NewTransactionValidator( - access.NewProtocolStateBlocks(state), + access.NewProtocolStateBlocks(state, indexReporter), chainID.Chain(), transactionMetrics, access.TransactionValidationOptions{ diff --git a/engine/access/rpc/engine.go b/engine/access/rpc/engine.go index a42c8495345..145e3d62143 100644 --- a/engine/access/rpc/engine.go +++ b/engine/access/rpc/engine.go @@ -8,8 +8,6 @@ import ( "net/http" "sync" - "github.com/onflow/flow-go/module/state_synchronization" - "github.com/rs/zerolog" "google.golang.org/grpc/credentials" @@ -25,6 +23,7 @@ import ( "github.com/onflow/flow-go/module/events" "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/state_synchronization" "github.com/onflow/flow-go/state/protocol" ) diff --git a/engine/collection/ingest/engine.go b/engine/collection/ingest/engine.go index 438e6bea88b..ae21f71253f 100644 --- a/engine/collection/ingest/engine.go +++ b/engine/collection/ingest/engine.go @@ -61,7 +61,7 @@ func New( logger := log.With().Str("engine", "ingest").Logger() transactionValidator := access.NewTransactionValidatorWithLimiter( - access.NewProtocolStateBlocks(state), + access.NewProtocolStateBlocks(state, nil), chain, access.TransactionValidationOptions{ Expiry: flow.DefaultTransactionExpiry, diff --git a/integration/go.mod b/integration/go.mod index c6c9dc724bb..9bae86babca 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -24,8 +24,8 @@ require ( github.com/onflow/crypto v0.25.2 github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1 github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1 - github.com/onflow/flow-emulator v1.0.1-0.20240930092334-2f46b2112195 - github.com/onflow/flow-go v0.38.0-preview.0 + github.com/onflow/flow-emulator v1.0.1-0.20241002100151-fa253c380189 + github.com/onflow/flow-go v0.38.0-preview.0.0.20241001140429-ec4ad1cf1c8a github.com/onflow/flow-go-sdk v1.0.0 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 github.com/onflow/flow/protobuf/go/flow v0.4.7 diff --git a/integration/go.sum b/integration/go.sum index db2f27d710f..4128721ef99 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -2153,8 +2153,8 @@ github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1 h1:q9tXLIALwQ76bO4 github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1/go.mod h1:u/mkP/B+PbV33tEG3qfkhhBlydSvAKxfLZSfB4lsJHg= github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1 h1:FfhMBAb78p6VAWkJ+iqdKLErGQVQgxk5w6DP5ZruWX8= github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1/go.mod h1:NgbMOYnMh0GN48VsNKZuiwK7uyk38Wyo8jN9+C9QE30= -github.com/onflow/flow-emulator v1.0.1-0.20240930092334-2f46b2112195 h1:buM9uEW5WhFiI9hMDA90lJhokItN1Cmac3ddb0GWSbY= -github.com/onflow/flow-emulator v1.0.1-0.20240930092334-2f46b2112195/go.mod h1:b9gi9kvRfUVHmyz7cTXBsnT12oHOJisvrxpqwtFRMpM= +github.com/onflow/flow-emulator v1.0.1-0.20241002100151-fa253c380189 h1:UCVla50Y50Q2b+o6l22um8nHrD35XYRveLFHQg9EOv0= +github.com/onflow/flow-emulator v1.0.1-0.20241002100151-fa253c380189/go.mod h1:DEfNNXJuEOWqG/NS3RJ8jI+5BOhbENZ2hzKOz14ZPJ0= github.com/onflow/flow-ft/lib/go/contracts v1.0.0 h1:mToacZ5NWqtlWwk/7RgIl/jeKB/Sy/tIXdw90yKHcV0= github.com/onflow/flow-ft/lib/go/contracts v1.0.0/go.mod h1:PwsL8fC81cjnUnTfmyL/HOIyHnyaw/JA474Wfj2tl6A= github.com/onflow/flow-ft/lib/go/templates v1.0.0 h1:6cMS/lUJJ17HjKBfMO/eh0GGvnpElPgBXx7h5aoWJhs=