Skip to content

Commit

Permalink
Create validation mode for script exec on RN/EN (#4573)
Browse files Browse the repository at this point in the history
* Create validation mode for script exec on RN/EN

* cleanup builder

* fix lint

* fix observer init

* add check for not found block in RN

* Create validation mode for script exec on RN/EN

* add check for not found block in RN

* exclude errors in comparison and merge with master

* chnages per feedback and add metrics

* remove extra RN call

* flatten code

* fix and add tests

* Update tests for exepected behavior

* cleanup

* granular debug logs

* lint

* goimports

* fix tests
  • Loading branch information
koko1123 authored Aug 7, 2023
1 parent 04e12b3 commit e2ac1cf
Show file tree
Hide file tree
Showing 17 changed files with 402 additions and 41 deletions.
3 changes: 3 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
PreferredExecutionNodeIDs: nil,
FixedExecutionNodeIDs: nil,
ArchiveAddressList: nil,
ScriptExecValidation: false,
CircuitBreakerConfig: rpcConnection.CircuitBreakerConfig{
Enabled: false,
RestoreTimeout: 60 * time.Second,
Expand Down Expand Up @@ -677,6 +678,7 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.StringVarP(&builder.rpcConf.CollectionAddr, "static-collection-ingress-addr", "", defaultConfig.rpcConf.CollectionAddr, "the address (of the collection node) to send transactions to")
flags.StringVarP(&builder.ExecutionNodeAddress, "script-addr", "s", defaultConfig.ExecutionNodeAddress, "the address (of the execution node) forward the script to")
flags.StringSliceVar(&builder.rpcConf.BackendConfig.ArchiveAddressList, "archive-address-list", defaultConfig.rpcConf.BackendConfig.ArchiveAddressList, "the list of address of the archive node to forward the script queries to")
flags.BoolVar(&builder.rpcConf.BackendConfig.ScriptExecValidation, "validate-rn-script-exec", defaultConfig.rpcConf.BackendConfig.ScriptExecValidation, "whether to validate script execution results from the archive node with results from the execution node")
flags.StringVarP(&builder.rpcConf.HistoricalAccessAddrs, "historical-access-addr", "", defaultConfig.rpcConf.HistoricalAccessAddrs, "comma separated rpc addresses for historical access nodes")
flags.DurationVar(&builder.rpcConf.BackendConfig.CollectionClientTimeout, "collection-client-timeout", defaultConfig.rpcConf.BackendConfig.CollectionClientTimeout, "grpc client timeout for a collection node")
flags.DurationVar(&builder.rpcConf.BackendConfig.ExecutionClientTimeout, "execution-client-timeout", defaultConfig.rpcConf.BackendConfig.ExecutionClientTimeout, "grpc client timeout for an execution node")
Expand Down Expand Up @@ -1109,6 +1111,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.Logger,
backend.DefaultSnapshotHistoryLimit,
backendConfig.ArchiveAddressList,
backendConfig.ScriptExecValidation,
backendConfig.CircuitBreakerConfig.Enabled)

engineBuilder, err := rpc.NewBuilder(
Expand Down
1 change: 1 addition & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
node.Logger,
backend.DefaultSnapshotHistoryLimit,
backendConfig.ArchiveAddressList,
backendConfig.ScriptExecValidation,
backendConfig.CircuitBreakerConfig.Enabled)

observerCollector := metrics.NewObserverCollector()
Expand Down
5 changes: 5 additions & 0 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (suite *Suite) RunTest(
backend.DefaultSnapshotHistoryLimit,
nil,
false,
false,
)
handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, access.WithBlockSignerDecoder(suite.signerIndicesDecoder))
f(handler, db, all)
Expand Down Expand Up @@ -331,6 +332,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {
backend.DefaultSnapshotHistoryLimit,
nil,
false,
false,
)

handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
Expand Down Expand Up @@ -658,6 +660,7 @@ func (suite *Suite) TestGetSealedTransaction() {
backend.DefaultSnapshotHistoryLimit,
nil,
false,
false,
)

handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
Expand Down Expand Up @@ -798,6 +801,7 @@ func (suite *Suite) TestGetTransactionResult() {
backend.DefaultSnapshotHistoryLimit,
nil,
false,
false,
)

handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
Expand Down Expand Up @@ -990,6 +994,7 @@ func (suite *Suite) TestExecuteScript() {
backend.DefaultSnapshotHistoryLimit,
nil,
false,
false,
)

handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
Expand Down
4 changes: 3 additions & 1 deletion engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
suite.log,
0,
nil,
false)
false,
false,
)

// create rpc engine builder
rpcEngBuilder, err := rpc.NewBuilder(
Expand Down
4 changes: 3 additions & 1 deletion engine/access/rest_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ func (suite *RestAPITestSuite) SetupTest() {
suite.log,
0,
nil,
false)
false,
false,
)

rpcEngBuilder, err := rpc.NewBuilder(
suite.log,
Expand Down
37 changes: 20 additions & 17 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ type Backend struct {

// Config defines the configurable options for creating Backend
type Config struct {
ExecutionClientTimeout time.Duration // execution API GRPC client timeout
CollectionClientTimeout time.Duration // collection API GRPC client timeout
ConnectionPoolSize uint // size of the cache for storing collection and execution connections
MaxHeightRange uint // max size of height range requests
PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs
FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs
ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node
ExecutionClientTimeout time.Duration // execution API GRPC client timeout
CollectionClientTimeout time.Duration // collection API GRPC client timeout
ConnectionPoolSize uint // size of the cache for storing collection and execution connections
MaxHeightRange uint // max size of height range requests
PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs
FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs
ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node
ScriptExecValidation bool
CircuitBreakerConfig connection.CircuitBreakerConfig // the configuration for circuit breaker
}

Expand All @@ -110,6 +111,7 @@ func New(
log zerolog.Logger,
snapshotHistoryLimit int,
archiveAddressList []string,
scriptExecValidation bool,
circuitBreakerEnabled bool,
) *Backend {
retry := newRetry()
Expand Down Expand Up @@ -138,16 +140,17 @@ func New(
state: state,
// create the sub-backends
backendScripts: backendScripts{
headers: headers,
executionReceipts: executionReceipts,
connFactory: connFactory,
state: state,
log: log,
metrics: accessMetrics,
loggedScripts: loggedScripts,
archiveAddressList: archiveAddressList,
archivePorts: archivePorts,
nodeCommunicator: nodeCommunicator,
headers: headers,
executionReceipts: executionReceipts,
connFactory: connFactory,
state: state,
log: log,
metrics: accessMetrics,
loggedScripts: loggedScripts,
archiveAddressList: archiveAddressList,
archivePorts: archivePorts,
scriptExecValidation: scriptExecValidation,
nodeCommunicator: nodeCommunicator,
},
backendTransactions: backendTransactions{
staticCollectionRPC: collectionRPC,
Expand Down
144 changes: 126 additions & 18 deletions engine/access/rpc/backend/backend_scripts.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package backend

import (
"bytes"
"context"
"crypto/md5" //nolint:gosec
"io"
"time"

"github.com/hashicorp/go-multierror"
lru "github.com/hashicorp/golang-lru"
"github.com/onflow/flow/protobuf/go/flow/access"

execproto "github.com/onflow/flow/protobuf/go/flow/execution"
"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
Expand All @@ -26,16 +27,17 @@ import (
const uniqueScriptLoggingTimeWindow = 10 * time.Minute

type backendScripts struct {
headers storage.Headers
executionReceipts storage.ExecutionReceipts
state protocol.State
connFactory connection.ConnectionFactory
log zerolog.Logger
metrics module.BackendScriptsMetrics
loggedScripts *lru.Cache
archiveAddressList []string
archivePorts []uint
nodeCommunicator *NodeCommunicator
headers storage.Headers
executionReceipts storage.ExecutionReceipts
state protocol.State
connFactory connection.ConnectionFactory
log zerolog.Logger
metrics module.BackendScriptsMetrics
loggedScripts *lru.Cache
archiveAddressList []string
archivePorts []uint
scriptExecValidation bool
nodeCommunicator *NodeCommunicator
}

func (b *backendScripts) ExecuteScriptAtLatestBlock(
Expand Down Expand Up @@ -86,6 +88,10 @@ func (b *backendScripts) ExecuteScriptAtBlockHeight(
return b.executeScriptOnExecutor(ctx, blockID, script, arguments)
}

func isCadenceScriptError(scriptExecutionErr error) bool {
return scriptExecutionErr == nil || status.Code(scriptExecutionErr) == codes.InvalidArgument
}

// executeScriptOnExecutionNode forwards the request to the execution node using the execution node
// grpc client and converts the response back to the access node api response format
func (b *backendScripts) executeScriptOnExecutor(
Expand All @@ -94,17 +100,100 @@ func (b *backendScripts) executeScriptOnExecutor(
script []byte,
arguments [][]byte,
) ([]byte, error) {
// find few execution nodes which have executed the block earlier and provided an execution receipt for it
executors, err := executionNodesForBlockID(ctx, blockID, b.executionReceipts, b.state, b.log)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to find script executors at blockId %v: %v", blockID.String(), err)
}
// encode to MD5 as low compute/memory lookup key
// CAUTION: cryptographically insecure md5 is used here, but only to de-duplicate logs.
// *DO NOT* use this hash for any protocol-related or cryptographic functions.
insecureScriptHash := md5.Sum(script) //nolint:gosec
// try execution nodes if the script wasn't executed
if b.scriptExecValidation {
execNodeResult, execErr := b.executeScriptOnAvailableExecutionNodes(
ctx, blockID, script, arguments, insecureScriptHash)
// we can only compare the results if there are no errors from RN and EN
// since we cannot distinguish the EN error as caused by the block being pruned or some other reason,
// which may produce a valid RN output but an error for the EN
if isCadenceScriptError(execErr) {
archiveResult, archiveErr := b.executeScriptOnAvailableArchiveNodes(ctx, blockID, script, arguments,
insecureScriptHash)
// return EN results by default
b.compareScriptExecutionResults(execNodeResult, execErr, archiveResult, archiveErr, blockID,
script)
return execNodeResult, execErr
}
return execNodeResult, execErr
}
archiveResult, archiveErr := b.executeScriptOnAvailableArchiveNodes(ctx, blockID, script, arguments,
insecureScriptHash)
// execute on execution nodes if it's not a script error
if !isCadenceScriptError(archiveErr) {
execNodeResult, err := b.executeScriptOnAvailableExecutionNodes(
ctx, blockID, script, arguments, insecureScriptHash)
return execNodeResult, err
}
return archiveResult, archiveErr
}

func (b *backendScripts) compareScriptExecutionResults(
execNodeResult []byte,
execErr error,
archiveResult []byte,
archiveErr error,
blockID flow.Identifier,
script []byte,
) {
// check errors first
if execErr != nil {
if archiveErr != nil && execErr == archiveErr {
b.metrics.ScriptExecutionErrorMatch()
} else {
b.metrics.ScriptExecutionErrorMismatch()
b.logScriptExecutionComparison(blockID, script, execNodeResult, archiveResult, execErr, archiveErr,
"cadence errors on Archive node and EN are not equal")
}
return
}
if bytes.Equal(execNodeResult, archiveResult) {
b.metrics.ScriptExecutionResultMatch()
} else {
b.metrics.ScriptExecutionResultMismatch()
b.logScriptExecutionComparison(blockID, script, execNodeResult, archiveResult, execErr, archiveErr,
"script execution results on Archive node and EN are not equal")
}
}

func (b *backendScripts) logScriptExecutionComparison(
blockID flow.Identifier,
script []byte,
execNodeResult []byte,
archiveResult []byte,
executionError error,
archiveError error,
msg string,
) {
// over-log for ease of debug
if executionError != nil || archiveError != nil {
b.log.Debug().Hex("block_id", blockID[:]).
Str("script", string(script)).
AnErr("execution_node_error", executionError).
AnErr("archive_node_error", archiveError).
Msg(msg)
} else {
b.log.Debug().Hex("block_id", blockID[:]).
Str("script", string(script)).
Hex("execution_node_result", execNodeResult).
Hex("archive_node_result", archiveResult).
Msg(msg)
}
}

// try execution on Archive nodes
// executeScriptOnAvailableArchiveNodes executes the given script for a blockID on all archive nodes available
func (b *backendScripts) executeScriptOnAvailableArchiveNodes(
ctx context.Context,
blockID flow.Identifier,
script []byte,
arguments [][]byte,
insecureScriptHash [16]byte,
) ([]byte, error) {
var errors *multierror.Error
if len(b.archiveAddressList) > 0 {
startTime := time.Now()
for idx, rnAddr := range b.archiveAddressList {
Expand Down Expand Up @@ -133,14 +222,33 @@ func (b *backendScripts) executeScriptOnExecutor(
// failures due to unavailable blocks are explicitly marked Not found
b.metrics.ScriptExecutionErrorOnArchiveNode()
b.log.Error().Err(err).Msg("script execution failed for archive node")
return nil, err
default:
errors = multierror.Append(errors, err)
continue
}
}
}
}
// don't need to distinguish error codes at this point
if errors.ErrorOrNil() != nil {
return nil, rpc.ConvertMultiError(errors, "failed to execute script on archive nodes", codes.Internal)
}
return nil, status.Errorf(codes.Unavailable, "no archive nodes in address list")
}

// try to execute the script on one of the execution nodes found
func (b *backendScripts) executeScriptOnAvailableExecutionNodes(
ctx context.Context,
blockID flow.Identifier,
script []byte,
arguments [][]byte,
insecureScriptHash [16]byte,
) ([]byte, error) {
// find few execution nodes which have executed the block earlier and provided an execution receipt for it
executors, err := executionNodesForBlockID(ctx, blockID, b.executionReceipts, b.state, b.log)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to find script executors at blockId %v: %v", blockID.String(), err)
}
var result []byte
hasInvalidArgument := false
errToReturn := b.nodeCommunicator.CallAvailableNode(
Expand Down
Loading

0 comments on commit e2ac1cf

Please sign in to comment.