Skip to content

Commit

Permalink
Merge pull request onflow#6292 from AndriiDiachuk/use-indexed-height-…
Browse files Browse the repository at this point in the history
…checking-payer-balance

[Access] Use Indexed height when checking payer balance
  • Loading branch information
peterargue authored Oct 2, 2024
2 parents 7d782c8 + 0a7f162 commit 21cfe5a
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 33 deletions.
15 changes: 15 additions & 0 deletions access/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
// ErrUnknownReferenceBlock indicates that a transaction references an unknown block.
var ErrUnknownReferenceBlock = errors.New("unknown reference block")

// IndexReporterNotInitialized is returned when indexReporter is nil because
// execution data syncing and indexing is disabled
var IndexReporterNotInitialized = errors.New("index reported not initialized")

// IncompleteTransactionError indicates that a transaction is missing one or more required fields.
type IncompleteTransactionError struct {
MissingFields []string
Expand Down Expand Up @@ -115,3 +119,14 @@ func IsInsufficientBalanceError(err error) bool {
var balanceError InsufficientBalanceError
return errors.As(err, &balanceError)
}

// IndexedHeightFarBehindError indicates that a node is far behind on indexing.
type IndexedHeightFarBehindError struct {
SealedHeight uint64
IndexedHeight uint64
}

func (e IndexedHeightFarBehindError) Error() string {
return fmt.Sprintf("the difference between the latest sealed height (%d) and indexed height (%d) exceeds the maximum gap allowed",
e.SealedHeight, e.IndexedHeight)
}
28 changes: 28 additions & 0 deletions access/mock/blocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 40 additions & 5 deletions access/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,32 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/state_synchronization"
"github.com/onflow/flow-go/state"
"github.com/onflow/flow-go/state/protocol"
)

// DefaultSealedIndexedHeightThreshold is the default number of blocks between sealed and indexed height
// this sets a limit on how far into the past the payer validator will allow for checking the payer's balance.
const DefaultSealedIndexedHeightThreshold = 30

type Blocks interface {
HeaderByID(id flow.Identifier) (*flow.Header, error)
FinalizedHeader() (*flow.Header, error)
SealedHeader() (*flow.Header, error)
IndexedHeight() (uint64, error)
}

type ProtocolStateBlocks struct {
state protocol.State
state protocol.State
indexReporter state_synchronization.IndexReporter
}

func NewProtocolStateBlocks(state protocol.State) *ProtocolStateBlocks {
return &ProtocolStateBlocks{state: state}
func NewProtocolStateBlocks(state protocol.State, indexReporter state_synchronization.IndexReporter) *ProtocolStateBlocks {
return &ProtocolStateBlocks{
state: state,
indexReporter: indexReporter,
}
}

func (b *ProtocolStateBlocks) HeaderByID(id flow.Identifier) (*flow.Header, error) {
Expand All @@ -56,7 +66,19 @@ func (b *ProtocolStateBlocks) FinalizedHeader() (*flow.Header, error) {
}

func (b *ProtocolStateBlocks) SealedHeader() (*flow.Header, error) {

return b.state.Sealed().Head()

}

// IndexedHeight returns the highest indexed height by calling corresponding function of indexReporter.
// Expected errors during normal operation:
// - access.IndexReporterNotInitialized - indexed reporter was not initialized.
func (b *ProtocolStateBlocks) IndexedHeight() (uint64, error) {
if b.indexReporter != nil {
return b.indexReporter.HighestIndexedHeight()
}
return 0, IndexReporterNotInitialized
}

// RateLimiter is an interface for checking if an address is rate limited.
Expand Down Expand Up @@ -464,6 +486,19 @@ func (v *TransactionValidator) checkSufficientBalanceToPayForTransaction(ctx con
return fmt.Errorf("could not fetch block header: %w", err)
}

indexedHeight, err := v.blocks.IndexedHeight()
if err != nil {
return fmt.Errorf("could not get indexed height: %w", err)
}

// we use latest indexed block to get the most up-to-date state data available for executing scripts.
// check here to make sure indexing is within an acceptable tolerance of sealing to avoid issues
// if indexing falls behind
sealedHeight := header.Height
if indexedHeight < sealedHeight-DefaultSealedIndexedHeightThreshold {
return IndexedHeightFarBehindError{SealedHeight: sealedHeight, IndexedHeight: indexedHeight}
}

payerAddress := cadence.NewAddress(tx.Payer)
inclusionEffort := cadence.UFix64(tx.InclusionEffort())
gasLimit := cadence.UFix64(tx.GasLimit)
Expand All @@ -473,7 +508,7 @@ func (v *TransactionValidator) checkSufficientBalanceToPayForTransaction(ctx con
return fmt.Errorf("failed to encode cadence args for script executor: %w", err)
}

result, err := v.scriptExecutor.ExecuteAtBlockHeight(ctx, v.verifyPayerBalanceScript, args, header.Height)
result, err := v.scriptExecutor.ExecuteAtBlockHeight(ctx, v.verifyPayerBalanceScript, args, indexedHeight)
if err != nil {
return fmt.Errorf("script finished with error: %w", err)
}
Expand All @@ -489,7 +524,7 @@ func (v *TransactionValidator) checkSufficientBalanceToPayForTransaction(ctx con
}

// return no error if payer has sufficient balance
if bool(canExecuteTransaction) {
if canExecuteTransaction {
return nil
}

Expand Down
33 changes: 33 additions & 0 deletions access/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (s *TransactionValidatorSuite) TestTransactionValidator_ScriptExecutorInter
scriptExecutor := execmock.NewScriptExecutor(s.T())
assert.NotNil(s.T(), scriptExecutor)

s.blocks.
On("IndexedHeight").
Return(s.header.Height, nil)

scriptExecutor.
On("ExecuteAtBlockHeight", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("script executor internal error")).
Expand Down Expand Up @@ -115,6 +119,10 @@ func (s *TransactionValidatorSuite) TestTransactionValidator_SufficientBalance()
actualResponse, err := jsoncdc.Encode(actualResponseValue)
assert.NoError(s.T(), err)

s.blocks.
On("IndexedHeight").
Return(s.header.Height, nil)

scriptExecutor.
On("ExecuteAtBlockHeight", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(actualResponse, nil).
Expand Down Expand Up @@ -142,6 +150,10 @@ func (s *TransactionValidatorSuite) TestTransactionValidator_InsufficientBalance
actualResponse, err := jsoncdc.Encode(actualResponseValue)
assert.NoError(s.T(), err)

s.blocks.
On("IndexedHeight").
Return(s.header.Height, nil)

scriptExecutor.
On("ExecuteAtBlockHeight", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(actualResponse, nil).Twice()
Expand Down Expand Up @@ -175,3 +187,24 @@ func (s *TransactionValidatorSuite) TestTransactionValidator_InsufficientBalance
assert.NoError(s.T(), err)
})
}

func (s *TransactionValidatorSuite) TestTransactionValidator_SealedIndexedHeightThresholdLimit() {
scriptExecutor := execmock.NewScriptExecutor(s.T())

// setting indexed height to be behind of sealed by bigger number than allowed(DefaultSealedIndexedHeightThreshold)
indexedHeight := s.header.Height - 40

s.blocks.
On("IndexedHeight").
Return(indexedHeight, nil)

validator, err := access.NewTransactionValidator(s.blocks, s.chain, s.metrics, s.validatorOptions, scriptExecutor)
assert.NoError(s.T(), err)
assert.NotNil(s.T(), validator)

txBody := unittest.TransactionBodyFixture()

err = validator.Validate(context.Background(), &txBody)
assert.NoError(s.T(), err)

}
14 changes: 8 additions & 6 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1924,9 +1924,16 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("transaction result query mode 'compare' is not supported")
}

// If execution data syncing and indexing is disabled, pass nil indexReporter
var indexReporter state_synchronization.IndexReporter
if builder.executionDataSyncEnabled && builder.executionDataIndexingEnabled {
indexReporter = builder.Reporter
}

checkPayerBalanceMode, err := accessNode.ParsePayerBalanceMode(builder.checkPayerBalanceMode)
if err != nil {
return nil, fmt.Errorf("could not parse payer balance mode: %w", err)

}

nodeBackend, err := backend.New(backend.Params{
Expand Down Expand Up @@ -1967,18 +1974,13 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
LastFullBlockHeight: lastFullBlockHeight,
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
}

// If execution data syncing and indexing is disabled, pass nil indexReporter
var indexReporter state_synchronization.IndexReporter
if builder.executionDataSyncEnabled && builder.executionDataIndexingEnabled {
indexReporter = builder.Reporter
}

engineBuilder, err := rpc.NewBuilder(
node.Logger,
node.State,
Expand Down
2 changes: 1 addition & 1 deletion cmd/bootstrap/utils/md5.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package utils
// The google storage API only provides md5 and crc32 hence overriding the linter flag for md5
// #nosec
import (
"crypto/md5"
"crypto/md5" //nolint:gosec
"io"
"os"
)
Expand Down
13 changes: 7 additions & 6 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1937,6 +1937,12 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
return nil, fmt.Errorf("failed to initialize block tracker: %w", err)
}

// If execution data syncing and indexing is disabled, pass nil indexReporter
var indexReporter state_synchronization.IndexReporter
if builder.executionDataSyncEnabled && builder.executionDataIndexingEnabled {
indexReporter = builder.Reporter
}

backendParams := backend.Params{
State: node.State,
Blocks: node.Storage.Blocks,
Expand All @@ -1963,6 +1969,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.stateStreamConf.ResponseLimit,
builder.stateStreamConf.ClientSendBufferSize,
),
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
}

Expand Down Expand Up @@ -1991,12 +1998,6 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
return nil, err
}

// If execution data syncing and indexing is disabled, pass nil indexReporter
var indexReporter state_synchronization.IndexReporter
if builder.executionDataSyncEnabled && builder.executionDataIndexingEnabled {
indexReporter = builder.Reporter
}

engineBuilder, err := rpc.NewBuilder(
node.Logger,
node.State,
Expand Down
13 changes: 5 additions & 8 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/state_synchronization"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
Expand Down Expand Up @@ -120,6 +121,7 @@ type Params struct {
TxResultQueryMode IndexQueryMode
TxResultsIndex *index.TransactionResultsIndex
LastFullBlockHeight *counters.PersistentStrictMonotonicCounter
IndexReporter state_synchronization.IndexReporter
VersionControl *version.VersionControl
}

Expand Down Expand Up @@ -246,13 +248,7 @@ func New(params Params) (*Backend, error) {
versionControl: params.VersionControl,
}

txValidator, err := configureTransactionValidator(
params.State,
params.ChainID,
params.AccessMetrics,
params.ScriptExecutor,
params.CheckPayerBalanceMode,
)
txValidator, err := configureTransactionValidator(params.State, params.ChainID, params.IndexReporter, params.AccessMetrics, params.ScriptExecutor, params.CheckPayerBalanceMode)
if err != nil {
return nil, fmt.Errorf("could not create transaction validator: %w", err)
}
Expand Down Expand Up @@ -319,12 +315,13 @@ func identifierList(ids []string) (flow.IdentifierList, error) {
func configureTransactionValidator(
state protocol.State,
chainID flow.ChainID,
indexReporter state_synchronization.IndexReporter,
transactionMetrics module.TransactionValidationMetrics,
executor execution.ScriptExecutor,
checkPayerBalanceMode access.PayerBalanceMode,
) (*access.TransactionValidator, error) {
return access.NewTransactionValidator(
access.NewProtocolStateBlocks(state),
access.NewProtocolStateBlocks(state, indexReporter),
chainID.Chain(),
transactionMetrics,
access.TransactionValidationOptions{
Expand Down
3 changes: 1 addition & 2 deletions engine/access/rpc/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"net/http"
"sync"

"github.com/onflow/flow-go/module/state_synchronization"

"github.com/rs/zerolog"
"google.golang.org/grpc/credentials"

Expand All @@ -25,6 +23,7 @@ import (
"github.com/onflow/flow-go/module/events"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/state_synchronization"
"github.com/onflow/flow-go/state/protocol"
)

Expand Down
2 changes: 1 addition & 1 deletion engine/collection/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func New(
logger := log.With().Str("engine", "ingest").Logger()

transactionValidator := access.NewTransactionValidatorWithLimiter(
access.NewProtocolStateBlocks(state),
access.NewProtocolStateBlocks(state, nil),
chain,
access.TransactionValidationOptions{
Expiry: flow.DefaultTransactionExpiry,
Expand Down
4 changes: 2 additions & 2 deletions integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ require (
github.com/onflow/crypto v0.25.2
github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1
github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1
github.com/onflow/flow-emulator v1.0.1-0.20240930092334-2f46b2112195
github.com/onflow/flow-go v0.38.0-preview.0
github.com/onflow/flow-emulator v1.0.1-0.20241002100151-fa253c380189
github.com/onflow/flow-go v0.38.0-preview.0.0.20241001140429-ec4ad1cf1c8a
github.com/onflow/flow-go-sdk v1.0.0
github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000
github.com/onflow/flow/protobuf/go/flow v0.4.7
Expand Down
4 changes: 2 additions & 2 deletions integration/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2153,8 +2153,8 @@ github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1 h1:q9tXLIALwQ76bO4
github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1/go.mod h1:u/mkP/B+PbV33tEG3qfkhhBlydSvAKxfLZSfB4lsJHg=
github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1 h1:FfhMBAb78p6VAWkJ+iqdKLErGQVQgxk5w6DP5ZruWX8=
github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1/go.mod h1:NgbMOYnMh0GN48VsNKZuiwK7uyk38Wyo8jN9+C9QE30=
github.com/onflow/flow-emulator v1.0.1-0.20240930092334-2f46b2112195 h1:buM9uEW5WhFiI9hMDA90lJhokItN1Cmac3ddb0GWSbY=
github.com/onflow/flow-emulator v1.0.1-0.20240930092334-2f46b2112195/go.mod h1:b9gi9kvRfUVHmyz7cTXBsnT12oHOJisvrxpqwtFRMpM=
github.com/onflow/flow-emulator v1.0.1-0.20241002100151-fa253c380189 h1:UCVla50Y50Q2b+o6l22um8nHrD35XYRveLFHQg9EOv0=
github.com/onflow/flow-emulator v1.0.1-0.20241002100151-fa253c380189/go.mod h1:DEfNNXJuEOWqG/NS3RJ8jI+5BOhbENZ2hzKOz14ZPJ0=
github.com/onflow/flow-ft/lib/go/contracts v1.0.0 h1:mToacZ5NWqtlWwk/7RgIl/jeKB/Sy/tIXdw90yKHcV0=
github.com/onflow/flow-ft/lib/go/contracts v1.0.0/go.mod h1:PwsL8fC81cjnUnTfmyL/HOIyHnyaw/JA474Wfj2tl6A=
github.com/onflow/flow-ft/lib/go/templates v1.0.0 h1:6cMS/lUJJ17HjKBfMO/eh0GGvnpElPgBXx7h5aoWJhs=
Expand Down

0 comments on commit 21cfe5a

Please sign in to comment.