Skip to content

Commit

Permalink
Merge pull request onflow#5211 from onflow/alex/receipt-validator_err…
Browse files Browse the repository at this point in the history
…or-handling

receipt validator error handling
  • Loading branch information
AlexHentschel authored May 26, 2024
2 parents 0117852 + ea0bd9f commit 778a287
Show file tree
Hide file tree
Showing 19 changed files with 769 additions and 231 deletions.
54 changes: 29 additions & 25 deletions engine/consensus/matching/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/mempool"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/trace"
Expand Down Expand Up @@ -180,12 +181,18 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) {
Hex("initial_state", initialState[:]).
Hex("final_state", finalState[:]).Logger()

// if the receipt is for an unknown block, skip it. It will be re-requested
// later by `requestPending` function.
// If the receipt is for an unknown block, skip it.
// Reasoning: If this is an honest receipt, this replica is behind. Chances are high that other leaders will
// already have included the receipt by the time this replica has caught up. If we still need the receipt by
// the time this replica has caught up, it will be re-requested later by `requestPending` function. If it is
// a malicious receipt, discarding it is advantageous for mitigating spamming and resource exhaustion attacks.
executedBlock, err := c.headersDB.ByBlockID(receipt.ExecutionResult.BlockID)
if err != nil {
log.Debug().Msg("discarding receipt for unknown block")
return false, nil
if errors.Is(err, storage.ErrNotFound) {
log.Debug().Msg("dropping execution receipt for unknown block")
return false, nil
}
return false, irrecoverable.NewExceptionf("encountered unexpected storage error attempting to retrieve block %v: %w", receipt.ExecutionResult.BlockID, err)
}

log = log.With().
Expand All @@ -208,31 +215,28 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) {
childSpan := c.tracer.StartSpanFromParent(receiptSpan, trace.CONMatchProcessReceiptVal)
err = c.receiptValidator.Validate(receipt)
childSpan.End()

if engine.IsUnverifiableInputError(err) {
// If previous result is missing, we can't validate this receipt.
// Although we will request its previous receipt(s),
// we don't want to drop it now, because when the missing previous arrive
// in a wrong order, they will still be dropped, and causing the catch up
// to be inefficient.
// Instead, we cache the receipt in case it arrives earlier than its
// previous receipt.
// For instance, given blocks A <- B <- C <- D <- E, if we receive their receipts
// in the order of [E,C,D,B,A], then:
// if we drop the missing previous receipts, then only A will be processed;
// if we cache the missing previous receipts, then all of them will be processed, because
// once A is processed, we will check if there is a child receipt pending,
// if yes, then process it.
c.pendingReceipts.Add(receipt)
log.Info().Msg("receipt is cached because its previous result is missing")
return false, nil
}

if err != nil {
if module.IsUnknownResultError(err) {
// Previous result is missing. Hence, we can't validate this receipt.
// We want to efficiently handle receipts arriving out of order. Therefore, we cache the
// receipt in `c.pendingReceipts`. On finalization of new blocks, we request receipts
// for all unsealed but finalized blocks. For instance, given blocks
// A <- B <- C <- D <- E, if we receive their receipts in the order of [E,C,D,B,A], then:
// - If we drop the missing previous receipts, then only A will be processed.
// - If we cache the missing previous receipts, then all of them will be processed, because once
// A is processed, we will check if there is a child receipt pending, if yes, then process it.
c.pendingReceipts.Add(receipt)
log.Debug().Msg("receipt is cached because its previous result is missing")
return false, nil
}
if engine.IsInvalidInputError(err) {
log.Err(err).Msg("invalid execution receipt")
log.Err(err).Bool(logging.KeyProtocolViolation, true).Msg("invalid execution receipt")
return false, nil
}
if module.IsUnknownBlockError(err) { // This should never happen
// Above, we successfully retrieved the `executedBlock`. Hence, `UnknownBlockError` here means our state is corrupted!
return false, irrecoverable.NewExceptionf("internal state corruption detected when validating receipt %v for block %v: %w", receipt.ID(), receipt.BlockID, err)
}
return false, fmt.Errorf("failed to validate execution receipt: %w", err)
}

Expand Down
34 changes: 30 additions & 4 deletions engine/consensus/matching/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
mockmodule "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/module/trace"
Expand Down Expand Up @@ -154,7 +155,7 @@ func (ms *MatchingSuite) TestOnReceiptValid() {
ms.ReceiptsDB.AssertExpectations(ms.T())
}

// TestOnReceiptInvalid tests that we reject receipts that don't pass the ReceiptValidator
// TestOnReceiptInvalid tests handing of receipts that the ReceiptValidator detects as violating the protocol
func (ms *MatchingSuite) TestOnReceiptInvalid() {
// we use the same Receipt as in TestOnReceiptValid to ensure that the sealing Core is not
// rejecting the receipt for any other reason
Expand All @@ -166,13 +167,38 @@ func (ms *MatchingSuite) TestOnReceiptInvalid() {

// check that _expected_ failure case of invalid receipt is handled without error
ms.receiptValidator.On("Validate", receipt).Return(engine.NewInvalidInputError("")).Once()
_, err := ms.core.processReceipt(receipt)
wasAdded, err := ms.core.processReceipt(receipt)
ms.Require().NoError(err, "invalid receipt should be dropped but not error")
ms.Require().False(wasAdded, "invalid receipt should not be added")
ms.receiptValidator.AssertExpectations(ms.T())
ms.ReceiptsDB.AssertNumberOfCalls(ms.T(), "Store", 0)
}

// check that _unexpected_ failure case causes the error to be escalated
// TestOnReceiptValidatorExceptions tests matching.Core escalates unexpected errors and exceptions.
// We expect that such errors are *not* interpreted as the receipt being invalid.
func (ms *MatchingSuite) TestOnReceiptValidatorExceptions() {
// we use the same Receipt as in TestOnReceiptValid to ensure that the sealing Core is not rejecting the receipt for any other reason
originID := ms.ExeID
receipt := unittest.ExecutionReceiptFixture(
unittest.WithExecutorID(originID),
unittest.WithResult(unittest.ExecutionResultFixture(unittest.WithBlock(&ms.UnfinalizedBlock))),
)

// Check that _unexpected_ failure causes the error to be escalated and is *not* interpreted as an invalid receipt.
ms.receiptValidator.On("Validate", receipt).Return(fmt.Errorf("")).Once()
_, err := ms.core.processReceipt(receipt)
ms.Require().Error(err, "unexpected errors should be escalated")
ms.Require().False(engine.IsInvalidInputError(err), "exceptions should not be misinterpreted as an invalid receipt")

// Check that an `UnknownBlockError` causes the error to be escalated and is *not* interpreted as an invalid receipt.
// Reasoning: For attack resilience, we should discard outdated receipts based on the height of the executed block, _before_ we
// run the expensive receipt validation. Therefore, matching.Core should retrieve the executed block before calling into the
// ReceiptValidator. Hence, if matching.Core finds the executed block, but `ReceiptValidator.Validate(..)` errors saying that
// the executed block is unknown, our state is corrupted or we have a severe internal bug.
ms.receiptValidator.On("Validate", receipt).Return(module.NewUnknownBlockError("")).Once()
_, err = ms.core.processReceipt(receipt)
ms.Require().Error(err, "unexpected errors should be escalated")
ms.Require().False(engine.IsInvalidInputError(err), "exceptions should not be misinterpreted as an invalid receipt")

ms.receiptValidator.AssertExpectations(ms.T())
ms.ReceiptsDB.AssertNumberOfCalls(ms.T(), "Store", 0)
Expand All @@ -192,7 +218,7 @@ func (ms *MatchingSuite) TestOnUnverifiableReceipt() {
ms.PendingReceipts.On("Add", receipt).Return(false).Once()

// check that _expected_ failure case of invalid receipt is handled without error
ms.receiptValidator.On("Validate", receipt).Return(engine.NewUnverifiableInputError("missing parent result")).Once()
ms.receiptValidator.On("Validate", receipt).Return(module.NewUnknownResultError("missing parent result")).Once()
wasAdded, err := ms.core.processReceipt(receipt)
ms.Require().NoError(err, "unverifiable receipt should be cached but not error")
ms.Require().False(wasAdded, "unverifiable receipt should be cached but not added to the node's validated information")
Expand Down
6 changes: 4 additions & 2 deletions model/flow/execution_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (er ExecutionResult) ValidateChunksLength() bool {

// FinalStateCommitment returns the Execution Result's commitment to the final
// execution state of the block, i.e. the last chunk's output state.
// Error returns:
//
// This function is side-effect free. The only possible error it returns is of type:
// - ErrNoChunks: if there are no chunks (ExecutionResult is malformed)
func (er ExecutionResult) FinalStateCommitment() (StateCommitment, error) {
if !er.ValidateChunksLength() {
Expand All @@ -65,7 +66,8 @@ func (er ExecutionResult) FinalStateCommitment() (StateCommitment, error) {

// InitialStateCommit returns a commitment to the execution state used as input
// for computing the block, i.e. the leading chunk's input state.
// Error returns:
//
// This function is side-effect free. The only possible error it returns is of type
// - ErrNoChunks: if there are no chunks (ExecutionResult is malformed)
func (er ExecutionResult) InitialStateCommit() (StateCommitment, error) {
if !er.ValidateChunksLength() {
Expand Down
54 changes: 54 additions & 0 deletions module/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package module

import (
"errors"
"fmt"
)

// UnknownBlockError indicates that a referenced block is missing
type UnknownBlockError struct {
err error
}

func NewUnknownBlockError(msg string, args ...interface{}) error {
return UnknownBlockError{
err: fmt.Errorf(msg, args...),
}
}

func (e UnknownBlockError) Unwrap() error {
return e.err
}

func (e UnknownBlockError) Error() string {
return e.err.Error()
}

func IsUnknownBlockError(err error) bool {
var unknownExecutedBlockError UnknownBlockError
return errors.As(err, &unknownExecutedBlockError)
}

// UnknownResultError indicates that a referenced result is missing
type UnknownResultError struct {
err error
}

func NewUnknownResultError(msg string, args ...interface{}) error {
return UnknownResultError{
err: fmt.Errorf(msg, args...),
}
}

func (e UnknownResultError) Unwrap() error {
return e.err
}

func (e UnknownResultError) Error() string {
return e.err.Error()
}

func IsUnknownResultError(err error) bool {
var unknownParentResultError UnknownResultError
return errors.As(err, &unknownParentResultError)
}
8 changes: 4 additions & 4 deletions module/mock/receipt_validator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 35 additions & 25 deletions module/receipt_validator.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,51 @@
package module

import "github.com/onflow/flow-go/model/flow"
import (
"github.com/onflow/flow-go/model/flow"
)

// ReceiptValidator is an interface which is used for validating
// receipts with respect to current protocol state.
type ReceiptValidator interface {

// Validate verifies that the ExecutionReceipt satisfies
// the following conditions:
// * is from Execution node with positive weight
// * has valid signature
// * chunks are in correct format
// * execution result has a valid parent and satisfies the subgraph check
// Returns nil if all checks passed successfully.
// Validate verifies that the ExecutionReceipt satisfies the following conditions:
// - is from Execution node with positive weight
// - has valid signature
// - chunks are in correct format
// - execution result has a valid parent and satisfies the subgraph check
//
// In order to validate a receipt, both the executed block and the parent result
// referenced in `receipt.ExecutionResult` must be known. We return nil if all checks
// pass successfully.
//
// Expected errors during normal operations:
// * engine.InvalidInputError
// if receipt violates protocol condition
// * engine.UnverifiableInputError
// if receipt's parent result is unknown
Validate(receipts *flow.ExecutionReceipt) error
// - engine.InvalidInputError if receipt violates protocol condition
// - module.UnknownResultError if the receipt's parent result is unknown
// - module.UnknownBlockError if the executed block is unknown
//
// All other error are potential symptoms critical internal failures, such as bugs or state corruption.
Validate(receipt *flow.ExecutionReceipt) error

// ValidatePayload verifies the ExecutionReceipts and ExecutionResults
// in the payload for compliance with the protocol:
// Receipts:
// * are from Execution node with positive weight
// * have valid signature
// * chunks are in correct format
// * no duplicates in fork
// - are from Execution node with positive weight
// - have valid signature
// - chunks are in correct format
// - no duplicates in fork
//
// Results:
// * have valid parents and satisfy the subgraph check
// * extend the execution tree, where the tree root is the latest
// finalized block and only results from this fork are included
// * no duplicates in fork
// - have valid parents and satisfy the subgraph check
// - extend the execution tree, where the tree root is the latest
// finalized block and only results from this fork are included
// - no duplicates in fork
//
// Expected errors during normal operations:
// * engine.InvalidInputError
// if some receipts in the candidate block violate protocol condition
// * engine.UnverifiableInputError
// if for some of the receipts, their respective parent result is unknown
// - engine.InvalidInputError if some receipts in the candidate block violate protocol condition
// - module.UnknownBlockError if the candidate block's _parent_ is unknown
//
// All other error are potential symptoms critical internal failures, such as bugs or state corruption.
// Note that module.UnknownResultError is not possible; we have either an invalid candidate block
// (yields engine.InvalidInputError) or a missing parent block (yields module.UnknownBlockError).
ValidatePayload(candidate *flow.Block) error
}
23 changes: 14 additions & 9 deletions module/validation/common.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
package validation

import (
"errors"
"fmt"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
protocolstate "github.com/onflow/flow-go/state"
"github.com/onflow/flow-go/state/protocol"
)

// identityForNode ensures that `nodeID` is an authorized member of the network
// at the given block and returns the corresponding node's full identity.
// Error returns:
// - sentinel engine.InvalidInputError is nodeID is NOT an authorized member of the network
// - generic error indicating a fatal internal problem
// - engine.InvalidInputError if nodeID is NOT an authorized member of the network at the given block
// - module.UnknownBlockError if blockID is not known to the protocol state
//
// All other error are potential symptoms critical internal failures, such as bugs or state corruption.
func identityForNode(state protocol.State, blockID flow.Identifier, nodeID flow.Identifier) (*flow.Identity, error) {
// get the identity of the origin node
identity, err := state.AtBlockID(blockID).Identity(nodeID)
if err != nil {
if protocol.IsIdentityNotFound(err) {
return nil, engine.NewInvalidInputErrorf("unknown node identity: %w", err)
}
// unexpected exception
return nil, fmt.Errorf("failed to retrieve node identity: %w", err)
if errors.Is(err, protocolstate.ErrUnknownSnapshotReference) {
return nil, module.NewUnknownBlockError("block %v is unknown: %w", blockID, err)
}
return nil, fmt.Errorf("unexpected exception retrieving node identity: %w", err)
}

return identity, nil
Expand All @@ -33,8 +40,8 @@ func identityForNode(state protocol.State, blockID flow.Identifier, nodeID flow.
// - and has the expected role
// - is an active participant of the current epoch and not ejected (i.e. has `EpochParticipationStatusActive`)
//
// Returns the following errors:
// - sentinel engine.InvalidInputError if any of the above-listed conditions are violated.
// This function is side-effect free. The only possible error it returns is of type
// - engine.InvalidInputError if any of the above-listed conditions are violated.
//
// Note: the method receives the identity as proof of its existence.
// Therefore, we consider the case where the respective identity is unknown to the
Expand All @@ -50,9 +57,7 @@ func ensureNodeHasWeightAndRole(identity *flow.Identity, expectedRole flow.Role)
}
// check if the identity is a valid epoch participant(is active in the current epoch + not ejected)
if !filter.IsValidCurrentEpochParticipant(identity) {
return engine.NewInvalidInputErrorf("node (%x) is not an active participant, instead has status: %s", identity.NodeID,
identity.EpochParticipationStatus.String())
return engine.NewInvalidInputErrorf("node %x is not an active participant, instead has status: %s", identity.NodeID, identity.EpochParticipationStatus.String())
}

return nil
}
Loading

0 comments on commit 778a287

Please sign in to comment.