diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index f083aaed0fd..55fd48ace8e 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -162,6 +162,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { PreferredExecutionNodeIDs: nil, FixedExecutionNodeIDs: nil, ArchiveAddressList: nil, + ScriptExecValidation: false, CircuitBreakerConfig: rpcConnection.CircuitBreakerConfig{ Enabled: false, RestoreTimeout: 60 * time.Second, @@ -677,6 +678,7 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { flags.StringVarP(&builder.rpcConf.CollectionAddr, "static-collection-ingress-addr", "", defaultConfig.rpcConf.CollectionAddr, "the address (of the collection node) to send transactions to") flags.StringVarP(&builder.ExecutionNodeAddress, "script-addr", "s", defaultConfig.ExecutionNodeAddress, "the address (of the execution node) forward the script to") flags.StringSliceVar(&builder.rpcConf.BackendConfig.ArchiveAddressList, "archive-address-list", defaultConfig.rpcConf.BackendConfig.ArchiveAddressList, "the list of address of the archive node to forward the script queries to") + flags.BoolVar(&builder.rpcConf.BackendConfig.ScriptExecValidation, "validate-rn-script-exec", defaultConfig.rpcConf.BackendConfig.ScriptExecValidation, "whether to validate script execution results from the archive node with results from the execution node") flags.StringVarP(&builder.rpcConf.HistoricalAccessAddrs, "historical-access-addr", "", defaultConfig.rpcConf.HistoricalAccessAddrs, "comma separated rpc addresses for historical access nodes") flags.DurationVar(&builder.rpcConf.BackendConfig.CollectionClientTimeout, "collection-client-timeout", defaultConfig.rpcConf.BackendConfig.CollectionClientTimeout, "grpc client timeout for a collection node") flags.DurationVar(&builder.rpcConf.BackendConfig.ExecutionClientTimeout, "execution-client-timeout", defaultConfig.rpcConf.BackendConfig.ExecutionClientTimeout, "grpc client timeout for an execution node") @@ -1109,6 +1111,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { node.Logger, backend.DefaultSnapshotHistoryLimit, backendConfig.ArchiveAddressList, + backendConfig.ScriptExecValidation, backendConfig.CircuitBreakerConfig.Enabled) engineBuilder, err := rpc.NewBuilder( diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 1e7687578c2..a8e7abdd4b6 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -946,6 +946,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { node.Logger, backend.DefaultSnapshotHistoryLimit, backendConfig.ArchiveAddressList, + backendConfig.ScriptExecValidation, backendConfig.CircuitBreakerConfig.Enabled) observerCollector := metrics.NewObserverCollector() diff --git a/engine/access/access_test.go b/engine/access/access_test.go index b3979979fb9..75a69892389 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -158,6 +158,7 @@ func (suite *Suite) RunTest( backend.DefaultSnapshotHistoryLimit, nil, false, + false, ) handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, access.WithBlockSignerDecoder(suite.signerIndicesDecoder)) f(handler, db, all) @@ -331,6 +332,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() { backend.DefaultSnapshotHistoryLimit, nil, false, + false, ) handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) @@ -658,6 +660,7 @@ func (suite *Suite) TestGetSealedTransaction() { backend.DefaultSnapshotHistoryLimit, nil, false, + false, ) handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) @@ -798,6 +801,7 @@ func (suite *Suite) TestGetTransactionResult() { backend.DefaultSnapshotHistoryLimit, nil, false, + false, ) handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) @@ -990,6 +994,7 @@ func (suite *Suite) TestExecuteScript() { backend.DefaultSnapshotHistoryLimit, nil, false, + false, ) handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) diff --git a/engine/access/integration_unsecure_grpc_server_test.go b/engine/access/integration_unsecure_grpc_server_test.go index e2cf78ade5a..b5660eece2d 100644 --- a/engine/access/integration_unsecure_grpc_server_test.go +++ b/engine/access/integration_unsecure_grpc_server_test.go @@ -189,7 +189,9 @@ func (suite *SameGRPCPortTestSuite) SetupTest() { suite.log, 0, nil, - false) + false, + false, + ) // create rpc engine builder rpcEngBuilder, err := rpc.NewBuilder( diff --git a/engine/access/rest_api_test.go b/engine/access/rest_api_test.go index 091c5e2e3ad..0f7a9058dc4 100644 --- a/engine/access/rest_api_test.go +++ b/engine/access/rest_api_test.go @@ -170,7 +170,9 @@ func (suite *RestAPITestSuite) SetupTest() { suite.log, 0, nil, - false) + false, + false, + ) rpcEngBuilder, err := rpc.NewBuilder( suite.log, diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 9b10cc5b539..9c53aada9f6 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -80,13 +80,14 @@ type Backend struct { // Config defines the configurable options for creating Backend type Config struct { - ExecutionClientTimeout time.Duration // execution API GRPC client timeout - CollectionClientTimeout time.Duration // collection API GRPC client timeout - ConnectionPoolSize uint // size of the cache for storing collection and execution connections - MaxHeightRange uint // max size of height range requests - PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs - FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs - ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node + ExecutionClientTimeout time.Duration // execution API GRPC client timeout + CollectionClientTimeout time.Duration // collection API GRPC client timeout + ConnectionPoolSize uint // size of the cache for storing collection and execution connections + MaxHeightRange uint // max size of height range requests + PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs + FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs + ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node + ScriptExecValidation bool CircuitBreakerConfig connection.CircuitBreakerConfig // the configuration for circuit breaker } @@ -110,6 +111,7 @@ func New( log zerolog.Logger, snapshotHistoryLimit int, archiveAddressList []string, + scriptExecValidation bool, circuitBreakerEnabled bool, ) *Backend { retry := newRetry() @@ -138,16 +140,17 @@ func New( state: state, // create the sub-backends backendScripts: backendScripts{ - headers: headers, - executionReceipts: executionReceipts, - connFactory: connFactory, - state: state, - log: log, - metrics: accessMetrics, - loggedScripts: loggedScripts, - archiveAddressList: archiveAddressList, - archivePorts: archivePorts, - nodeCommunicator: nodeCommunicator, + headers: headers, + executionReceipts: executionReceipts, + connFactory: connFactory, + state: state, + log: log, + metrics: accessMetrics, + loggedScripts: loggedScripts, + archiveAddressList: archiveAddressList, + archivePorts: archivePorts, + scriptExecValidation: scriptExecValidation, + nodeCommunicator: nodeCommunicator, }, backendTransactions: backendTransactions{ staticCollectionRPC: collectionRPC, diff --git a/engine/access/rpc/backend/backend_scripts.go b/engine/access/rpc/backend/backend_scripts.go index 62d32c56211..2d875a460ea 100644 --- a/engine/access/rpc/backend/backend_scripts.go +++ b/engine/access/rpc/backend/backend_scripts.go @@ -1,14 +1,15 @@ package backend import ( + "bytes" "context" "crypto/md5" //nolint:gosec "io" "time" + "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" "github.com/onflow/flow/protobuf/go/flow/access" - execproto "github.com/onflow/flow/protobuf/go/flow/execution" "github.com/rs/zerolog" "google.golang.org/grpc/codes" @@ -26,16 +27,17 @@ import ( const uniqueScriptLoggingTimeWindow = 10 * time.Minute type backendScripts struct { - headers storage.Headers - executionReceipts storage.ExecutionReceipts - state protocol.State - connFactory connection.ConnectionFactory - log zerolog.Logger - metrics module.BackendScriptsMetrics - loggedScripts *lru.Cache - archiveAddressList []string - archivePorts []uint - nodeCommunicator *NodeCommunicator + headers storage.Headers + executionReceipts storage.ExecutionReceipts + state protocol.State + connFactory connection.ConnectionFactory + log zerolog.Logger + metrics module.BackendScriptsMetrics + loggedScripts *lru.Cache + archiveAddressList []string + archivePorts []uint + scriptExecValidation bool + nodeCommunicator *NodeCommunicator } func (b *backendScripts) ExecuteScriptAtLatestBlock( @@ -86,6 +88,10 @@ func (b *backendScripts) ExecuteScriptAtBlockHeight( return b.executeScriptOnExecutor(ctx, blockID, script, arguments) } +func isCadenceScriptError(scriptExecutionErr error) bool { + return scriptExecutionErr == nil || status.Code(scriptExecutionErr) == codes.InvalidArgument +} + // executeScriptOnExecutionNode forwards the request to the execution node using the execution node // grpc client and converts the response back to the access node api response format func (b *backendScripts) executeScriptOnExecutor( @@ -94,17 +100,100 @@ func (b *backendScripts) executeScriptOnExecutor( script []byte, arguments [][]byte, ) ([]byte, error) { - // find few execution nodes which have executed the block earlier and provided an execution receipt for it - executors, err := executionNodesForBlockID(ctx, blockID, b.executionReceipts, b.state, b.log) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to find script executors at blockId %v: %v", blockID.String(), err) - } // encode to MD5 as low compute/memory lookup key // CAUTION: cryptographically insecure md5 is used here, but only to de-duplicate logs. // *DO NOT* use this hash for any protocol-related or cryptographic functions. insecureScriptHash := md5.Sum(script) //nolint:gosec + // try execution nodes if the script wasn't executed + if b.scriptExecValidation { + execNodeResult, execErr := b.executeScriptOnAvailableExecutionNodes( + ctx, blockID, script, arguments, insecureScriptHash) + // we can only compare the results if there are no errors from RN and EN + // since we cannot distinguish the EN error as caused by the block being pruned or some other reason, + // which may produce a valid RN output but an error for the EN + if isCadenceScriptError(execErr) { + archiveResult, archiveErr := b.executeScriptOnAvailableArchiveNodes(ctx, blockID, script, arguments, + insecureScriptHash) + // return EN results by default + b.compareScriptExecutionResults(execNodeResult, execErr, archiveResult, archiveErr, blockID, + script) + return execNodeResult, execErr + } + return execNodeResult, execErr + } + archiveResult, archiveErr := b.executeScriptOnAvailableArchiveNodes(ctx, blockID, script, arguments, + insecureScriptHash) + // execute on execution nodes if it's not a script error + if !isCadenceScriptError(archiveErr) { + execNodeResult, err := b.executeScriptOnAvailableExecutionNodes( + ctx, blockID, script, arguments, insecureScriptHash) + return execNodeResult, err + } + return archiveResult, archiveErr +} + +func (b *backendScripts) compareScriptExecutionResults( + execNodeResult []byte, + execErr error, + archiveResult []byte, + archiveErr error, + blockID flow.Identifier, + script []byte, +) { + // check errors first + if execErr != nil { + if archiveErr != nil && execErr == archiveErr { + b.metrics.ScriptExecutionErrorMatch() + } else { + b.metrics.ScriptExecutionErrorMismatch() + b.logScriptExecutionComparison(blockID, script, execNodeResult, archiveResult, execErr, archiveErr, + "cadence errors on Archive node and EN are not equal") + } + return + } + if bytes.Equal(execNodeResult, archiveResult) { + b.metrics.ScriptExecutionResultMatch() + } else { + b.metrics.ScriptExecutionResultMismatch() + b.logScriptExecutionComparison(blockID, script, execNodeResult, archiveResult, execErr, archiveErr, + "script execution results on Archive node and EN are not equal") + } +} + +func (b *backendScripts) logScriptExecutionComparison( + blockID flow.Identifier, + script []byte, + execNodeResult []byte, + archiveResult []byte, + executionError error, + archiveError error, + msg string, +) { + // over-log for ease of debug + if executionError != nil || archiveError != nil { + b.log.Debug().Hex("block_id", blockID[:]). + Str("script", string(script)). + AnErr("execution_node_error", executionError). + AnErr("archive_node_error", archiveError). + Msg(msg) + } else { + b.log.Debug().Hex("block_id", blockID[:]). + Str("script", string(script)). + Hex("execution_node_result", execNodeResult). + Hex("archive_node_result", archiveResult). + Msg(msg) + } +} - // try execution on Archive nodes +// executeScriptOnAvailableArchiveNodes executes the given script for a blockID on all archive nodes available +func (b *backendScripts) executeScriptOnAvailableArchiveNodes( + ctx context.Context, + blockID flow.Identifier, + script []byte, + arguments [][]byte, + insecureScriptHash [16]byte, +) ([]byte, error) { + var errors *multierror.Error if len(b.archiveAddressList) > 0 { startTime := time.Now() for idx, rnAddr := range b.archiveAddressList { @@ -133,14 +222,33 @@ func (b *backendScripts) executeScriptOnExecutor( // failures due to unavailable blocks are explicitly marked Not found b.metrics.ScriptExecutionErrorOnArchiveNode() b.log.Error().Err(err).Msg("script execution failed for archive node") + return nil, err default: + errors = multierror.Append(errors, err) continue } } } } + // don't need to distinguish error codes at this point + if errors.ErrorOrNil() != nil { + return nil, rpc.ConvertMultiError(errors, "failed to execute script on archive nodes", codes.Internal) + } + return nil, status.Errorf(codes.Unavailable, "no archive nodes in address list") +} - // try to execute the script on one of the execution nodes found +func (b *backendScripts) executeScriptOnAvailableExecutionNodes( + ctx context.Context, + blockID flow.Identifier, + script []byte, + arguments [][]byte, + insecureScriptHash [16]byte, +) ([]byte, error) { + // find few execution nodes which have executed the block earlier and provided an execution receipt for it + executors, err := executionNodesForBlockID(ctx, blockID, b.executionReceipts, b.state, b.log) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to find script executors at blockId %v: %v", blockID.String(), err) + } var result []byte hasInvalidArgument := false errToReturn := b.nodeCommunicator.CallAvailableNode( diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index d40ff45890e..9c87d88a4e4 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -111,6 +111,7 @@ func (suite *Suite) TestPing() { DefaultSnapshotHistoryLimit, nil, false, + false, ) err := backend.Ping(context.Background()) @@ -147,6 +148,7 @@ func (suite *Suite) TestGetLatestFinalizedBlockHeader() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // query the handler for the latest finalized block @@ -213,6 +215,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_NoTransitionSpan() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // query the handler for the latest finalized snapshot @@ -286,6 +289,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_TransitionSpans() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // query the handler for the latest finalized snapshot @@ -352,6 +356,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_PhaseTransitionSpan() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // query the handler for the latest finalized snapshot @@ -429,6 +434,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_EpochTransitionSpan() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // query the handler for the latest finalized snapshot @@ -490,6 +496,7 @@ func (suite *Suite) TestGetLatestProtocolStateSnapshot_HistoryLimit() { snapshotHistoryLimit, nil, false, + false, ) // the handler should return a snapshot history limit error @@ -529,6 +536,7 @@ func (suite *Suite) TestGetLatestSealedBlockHeader() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // query the handler for the latest sealed block @@ -576,6 +584,7 @@ func (suite *Suite) TestGetTransaction() { DefaultSnapshotHistoryLimit, nil, false, + false, ) actual, err := backend.GetTransaction(context.Background(), transaction.ID()) @@ -617,6 +626,7 @@ func (suite *Suite) TestGetCollection() { DefaultSnapshotHistoryLimit, nil, false, + false, ) actual, err := backend.GetCollectionByID(context.Background(), expected.ID()) @@ -681,6 +691,7 @@ func (suite *Suite) TestGetTransactionResultByIndex() { DefaultSnapshotHistoryLimit, nil, false, + false, ) suite.execClient. On("GetTransactionResultByIndex", ctx, exeEventReq). @@ -745,6 +756,7 @@ func (suite *Suite) TestGetTransactionResultsByBlockID() { DefaultSnapshotHistoryLimit, nil, false, + false, ) suite.execClient. On("GetTransactionResultsByBlockID", ctx, exeEventReq). @@ -837,6 +849,7 @@ func (suite *Suite) TestTransactionStatusTransition() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // Successfully return empty event list @@ -958,6 +971,7 @@ func (suite *Suite) TestTransactionExpiredStatusTransition() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // should return pending status when we have not observed an expiry block @@ -1126,6 +1140,7 @@ func (suite *Suite) TestTransactionPendingToFinalizedStatusTransition() { DefaultSnapshotHistoryLimit, nil, false, + false, ) preferredENIdentifiers = flow.IdentifierList{receipts[0].ExecutorID} @@ -1185,6 +1200,7 @@ func (suite *Suite) TestTransactionResultUnknown() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // first call - when block under test is greater height than the sealed head, but execution node does not know about Tx @@ -1240,6 +1256,7 @@ func (suite *Suite) TestGetLatestFinalizedBlock() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // query the handler for the latest finalized header @@ -1371,6 +1388,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // execute request @@ -1404,6 +1422,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // execute request with an empty block id list and expect an empty list of events and no error @@ -1464,6 +1483,7 @@ func (suite *Suite) TestGetExecutionResultByID() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // execute request @@ -1495,6 +1515,7 @@ func (suite *Suite) TestGetExecutionResultByID() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // execute request @@ -1559,6 +1580,7 @@ func (suite *Suite) TestGetExecutionResultByBlockID() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // execute request @@ -1591,6 +1613,7 @@ func (suite *Suite) TestGetExecutionResultByBlockID() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // execute request @@ -1742,6 +1765,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { DefaultSnapshotHistoryLimit, nil, false, + false, ) _, err := backend.GetEventsForHeightRange(ctx, string(flow.EventAccountCreated), maxHeight, minHeight) @@ -1782,6 +1806,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // execute request @@ -1821,6 +1846,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { DefaultSnapshotHistoryLimit, nil, false, + false, ) actualResp, err := backend.GetEventsForHeightRange(ctx, string(flow.EventAccountCreated), minHeight, maxHeight) @@ -1859,6 +1885,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { DefaultSnapshotHistoryLimit, nil, false, + false, ) _, err := backend.GetEventsForHeightRange(ctx, string(flow.EventAccountCreated), minHeight, minHeight+1) @@ -1897,6 +1924,7 @@ func (suite *Suite) TestGetEventsForHeightRange() { DefaultSnapshotHistoryLimit, nil, false, + false, ) _, err := backend.GetEventsForHeightRange(ctx, string(flow.EventAccountCreated), minHeight, maxHeight) @@ -1975,6 +2003,7 @@ func (suite *Suite) TestGetAccount() { DefaultSnapshotHistoryLimit, nil, false, + false, ) preferredENIdentifiers = flow.IdentifierList{receipts[0].ExecutorID} @@ -2057,6 +2086,7 @@ func (suite *Suite) TestGetAccountAtBlockHeight() { DefaultSnapshotHistoryLimit, nil, false, + false, ) preferredENIdentifiers = flow.IdentifierList{receipts[0].ExecutorID} @@ -2097,6 +2127,7 @@ func (suite *Suite) TestGetNetworkParameters() { DefaultSnapshotHistoryLimit, nil, false, + false, ) params := backend.GetNetworkParameters(context.Background()) @@ -2300,6 +2331,7 @@ func (suite *Suite) TestExecuteScriptOnExecutionNode() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // mock parameters @@ -2377,6 +2409,7 @@ func (suite *Suite) TestExecuteScriptOnArchiveNode() { DefaultSnapshotHistoryLimit, []string{fullArchiveAddress}, false, + false, ) // mock parameters @@ -2426,6 +2459,124 @@ func (suite *Suite) TestExecuteScriptOnArchiveNode() { }) } +// TestExecuteScriptOnArchiveNode tests the method backend.scripts.executeScriptOnArchiveNode for script execution +func (suite *Suite) TestScriptExecutionValidationMode() { + + // create a mock connection factory + var mockPort uint = 9000 + connFactory := new(backendmock.ConnectionFactory) + connFactory.On("GetAccessAPIClientWithPort", mock.Anything, mockPort).Return(suite.archiveClient, &mockCloser{}, nil) + connFactory.On("GetExecutionAPIClient", mock.Anything).Return(suite.execClient, &mockCloser{}, nil) + connFactory.On("InvalidateAccessAPIClient", mock.Anything) + archiveNode := unittest.IdentityFixture(unittest.WithRole(flow.RoleAccess)) + fullArchiveAddress := archiveNode.Address + ":" + strconv.FormatUint(uint64(mockPort), 10) + // create the handler with the mock + backend := New( + suite.state, + nil, + nil, + nil, + suite.headers, + nil, + nil, + suite.receipts, + suite.results, + flow.Mainnet, + metrics.NewNoopCollector(), + connFactory, // the connection factory should be used to get the execution node client + false, + DefaultMaxHeightRange, + nil, + nil, + suite.log, + DefaultSnapshotHistoryLimit, + []string{fullArchiveAddress}, + true, + false, + ) + + // mock parameters + ctx := context.Background() + block := unittest.BlockFixture() + blockID := block.ID() + _, ids := suite.setupReceipts(&block) + suite.state.On("Final").Return(suite.snapshot, nil).Maybe() + suite.snapshot.On("Identities", mock.Anything).Return(ids, nil) + suite.state.On("AtBlockID", mock.Anything).Return(suite.snapshot) + + script := []byte("dummy script") + arguments := [][]byte(nil) + archiveRes := &accessproto.ExecuteScriptResponse{Value: []byte{4, 5, 6}} + archiveReq := &accessproto.ExecuteScriptAtBlockIDRequest{ + BlockId: blockID[:], + Script: script, + Arguments: arguments} + + archiveBlockUnavailableErr := status.Error(codes.NotFound, "placeholder block error") + archiveCadenceErr := status.Error(codes.InvalidArgument, "placeholder cadence error") + internalErr := status.Error(codes.Internal, "placeholder internal error") + + execReq := &execproto.ExecuteScriptAtBlockIDRequest{ + BlockId: blockID[:], + Script: script, + Arguments: arguments} + matchingExecRes := &execproto.ExecuteScriptAtBlockIDResponse{Value: []byte{4, 5, 6}} + mismatchingExecRes := &execproto.ExecuteScriptAtBlockIDResponse{Value: []byte{1, 2, 3}} + + suite.Run("happy path script execution success both en and rn return responses", func() { + suite.archiveClient.On("ExecuteScriptAtBlockID", ctx, archiveReq).Return(archiveRes, nil).Once() + suite.execClient.On("ExecuteScriptAtBlockID", ctx, execReq).Return(matchingExecRes, nil).Once() + res, err := backend.executeScriptOnExecutor(ctx, blockID, script, arguments) + suite.archiveClient.AssertExpectations(suite.T()) + suite.checkResponse(res, err) + assert.Equal(suite.T(), res, matchingExecRes.Value) + }) + + suite.Run("script execution success but mismatching responses", func() { + suite.archiveClient.On("ExecuteScriptAtBlockID", ctx, archiveReq).Return(archiveRes, nil).Once() + suite.execClient.On("ExecuteScriptAtBlockID", ctx, execReq).Return(mismatchingExecRes, nil).Once() + res, err := backend.executeScriptOnExecutor(ctx, blockID, script, arguments) + suite.archiveClient.AssertExpectations(suite.T()) + suite.checkResponse(res, err) + suite.Require().Equal(res, mismatchingExecRes.Value) + }) + + suite.Run("script execution failure on both nodes", func() { + suite.archiveClient.On("ExecuteScriptAtBlockID", ctx, archiveReq).Return(nil, archiveCadenceErr).Once() + suite.execClient.On("ExecuteScriptAtBlockID", ctx, execReq).Return(nil, archiveCadenceErr).Once() + _, err := backend.executeScriptOnExecutor(ctx, blockID, script, arguments) + suite.archiveClient.AssertExpectations(suite.T()) + suite.Require().Error(err) + suite.Require().Equal(status.Code(err), codes.InvalidArgument) + }) + + suite.Run("script execution failure on rn but not en", func() { + suite.archiveClient.On("ExecuteScriptAtBlockID", ctx, archiveReq).Return( + nil, archiveCadenceErr).Once() + suite.execClient.On("ExecuteScriptAtBlockID", ctx, execReq).Return(matchingExecRes, nil).Once() + _, err := backend.executeScriptOnExecutor(ctx, blockID, script, arguments) + suite.Require().NoError(err) + suite.archiveClient.AssertExpectations(suite.T()) + }) + + suite.Run("block not found on rn", func() { + suite.archiveClient.On("ExecuteScriptAtBlockID", ctx, archiveReq).Return( + nil, archiveBlockUnavailableErr).Once() + suite.execClient.On("ExecuteScriptAtBlockID", ctx, execReq).Return(matchingExecRes, nil).Once() + _, err := backend.ExecuteScriptAtBlockID(ctx, blockID, script, arguments) + suite.Require().NoError(err) + suite.archiveClient.AssertExpectations(suite.T()) + }) + + suite.Run("block not found on en", func() { + suite.execClient.On("ExecuteScriptAtBlockID", ctx, execReq).Return(nil, internalErr). + Times(int(ids.Count())) + _, err := backend.ExecuteScriptAtBlockID(ctx, blockID, script, arguments) + suite.archiveClient.AssertExpectations(suite.T()) + suite.Require().Error(err) + }) +} + func (suite *Suite) assertAllExpectations() { suite.snapshot.AssertExpectations(suite.T()) suite.state.AssertExpectations(suite.T()) diff --git a/engine/access/rpc/backend/historical_access_test.go b/engine/access/rpc/backend/historical_access_test.go index 42dd829dbbc..2632d216792 100644 --- a/engine/access/rpc/backend/historical_access_test.go +++ b/engine/access/rpc/backend/historical_access_test.go @@ -57,6 +57,7 @@ func (suite *Suite) TestHistoricalTransactionResult() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // Successfully return the transaction from the historical node @@ -116,6 +117,7 @@ func (suite *Suite) TestHistoricalTransaction() { DefaultSnapshotHistoryLimit, nil, false, + false, ) // Successfully return the transaction from the historical node diff --git a/engine/access/rpc/backend/retry_test.go b/engine/access/rpc/backend/retry_test.go index 2189223118a..07bb77aa8a9 100644 --- a/engine/access/rpc/backend/retry_test.go +++ b/engine/access/rpc/backend/retry_test.go @@ -62,6 +62,7 @@ func (suite *Suite) TestTransactionRetry() { DefaultSnapshotHistoryLimit, nil, false, + false, ) retry := newRetry().SetBackend(backend).Activate() backend.retry = retry @@ -152,6 +153,7 @@ func (suite *Suite) TestSuccessfulTransactionsDontRetry() { DefaultSnapshotHistoryLimit, nil, false, + false, ) retry := newRetry().SetBackend(backend).Activate() backend.retry = retry diff --git a/engine/access/rpc/rate_limit_test.go b/engine/access/rpc/rate_limit_test.go index 8ff5695c3c6..b4b1d1e6980 100644 --- a/engine/access/rpc/rate_limit_test.go +++ b/engine/access/rpc/rate_limit_test.go @@ -168,7 +168,9 @@ func (suite *RateLimitTestSuite) SetupTest() { suite.log, 0, nil, - false) + false, + false, + ) rpcEngBuilder, err := NewBuilder( suite.log, diff --git a/engine/access/secure_grpcr_test.go b/engine/access/secure_grpcr_test.go index 783ed0d3110..5e122951eb0 100644 --- a/engine/access/secure_grpcr_test.go +++ b/engine/access/secure_grpcr_test.go @@ -151,7 +151,9 @@ func (suite *SecureGRPCTestSuite) SetupTest() { suite.log, 0, nil, - false) + false, + false, + ) rpcEngBuilder, err := rpc.NewBuilder( suite.log, diff --git a/module/metrics.go b/module/metrics.go index 2b889b98c44..3c1d6dfdc37 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -751,6 +751,14 @@ type BackendScriptsMetrics interface { // ScriptExecutionErrorOnArchiveNode records script execution failures in Archive Nodes ScriptExecutionErrorOnExecutionNode() + + ScriptExecutionResultMismatch() + + ScriptExecutionResultMatch() + + ScriptExecutionErrorMismatch() + + ScriptExecutionErrorMatch() } type TransactionMetrics interface { diff --git a/module/metrics/noop.go b/module/metrics/noop.go index 9bf5be48f0d..1d48d16882a 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -199,6 +199,10 @@ func (nc *NoopCollector) RuntimeTransactionProgramsCacheHit() func (nc *NoopCollector) ScriptExecuted(dur time.Duration, size int) {} func (nc *NoopCollector) ScriptExecutionErrorOnArchiveNode() {} func (nc *NoopCollector) ScriptExecutionErrorOnExecutionNode() {} +func (nc *NoopCollector) ScriptExecutionResultMismatch() {} +func (nc *NoopCollector) ScriptExecutionResultMatch() {} +func (nc *NoopCollector) ScriptExecutionErrorMismatch() {} +func (nc *NoopCollector) ScriptExecutionErrorMatch() {} func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {} func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {} func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {} diff --git a/module/metrics/transaction.go b/module/metrics/transaction.go index bbcb50414bd..50fca53bf39 100644 --- a/module/metrics/transaction.go +++ b/module/metrics/transaction.go @@ -25,6 +25,7 @@ type TransactionCollector struct { transactionSize prometheus.Histogram scriptExecutedDuration *prometheus.HistogramVec scriptExecutionErrorOnExecutor *prometheus.CounterVec + scriptExecutionComparison *prometheus.CounterVec scriptSize prometheus.Histogram transactionResultDuration *prometheus.HistogramVec } @@ -109,6 +110,12 @@ func NewTransactionCollector( Subsystem: subsystemTransactionSubmission, Help: "histogram for the internal errors for executing a script for a block on the archive node", }, []string{"source"}), + scriptExecutionComparison: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "script_execution_comparison", + Namespace: namespaceAccess, + Subsystem: subsystemTransactionSubmission, + Help: "histogram for the comparison outcomes of executing a script on the archive and execution node", + }, []string{"outcome"}), transactionResultDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "transaction_result_fetched_duration", Namespace: namespaceAccess, @@ -144,15 +151,34 @@ func (tc *TransactionCollector) ScriptExecuted(dur time.Duration, size int) { } func (tc *TransactionCollector) ScriptExecutionErrorOnArchiveNode() { - // record the execution error along with blockID and scriptHash for Archive node + // record the execution error count tc.scriptExecutionErrorOnExecutor.WithLabelValues("archive").Inc() } func (tc *TransactionCollector) ScriptExecutionErrorOnExecutionNode() { - // record the execution error along with blockID and scriptHash for Execution node + // record the execution error count tc.scriptExecutionErrorOnExecutor.WithLabelValues("execution").Inc() } +func (tc *TransactionCollector) ScriptExecutionResultMismatch() { + // record the execution error count + tc.scriptExecutionComparison.WithLabelValues("result_mismatch").Inc() +} + +func (tc *TransactionCollector) ScriptExecutionResultMatch() { + // record the execution error count + tc.scriptExecutionComparison.WithLabelValues("result_match").Inc() +} +func (tc *TransactionCollector) ScriptExecutionErrorMismatch() { + // record the execution error count + tc.scriptExecutionComparison.WithLabelValues("error_mismatch").Inc() +} + +func (tc *TransactionCollector) ScriptExecutionErrorMatch() { + // record the execution error count + tc.scriptExecutionComparison.WithLabelValues("error_match").Inc() +} + // TransactionResult metrics func (tc *TransactionCollector) TransactionResultFetched(dur time.Duration, size int) { diff --git a/module/mock/access_metrics.go b/module/mock/access_metrics.go index 83690a36625..cde9953582c 100644 --- a/module/mock/access_metrics.go +++ b/module/mock/access_metrics.go @@ -73,6 +73,16 @@ func (_m *AccessMetrics) ScriptExecuted(dur time.Duration, size int) { _m.Called(dur, size) } +// ScriptExecutionErrorMatch provides a mock function with given fields: +func (_m *AccessMetrics) ScriptExecutionErrorMatch() { + _m.Called() +} + +// ScriptExecutionErrorMismatch provides a mock function with given fields: +func (_m *AccessMetrics) ScriptExecutionErrorMismatch() { + _m.Called() +} + // ScriptExecutionErrorOnArchiveNode provides a mock function with given fields: func (_m *AccessMetrics) ScriptExecutionErrorOnArchiveNode() { _m.Called() @@ -83,6 +93,16 @@ func (_m *AccessMetrics) ScriptExecutionErrorOnExecutionNode() { _m.Called() } +// ScriptExecutionResultMatch provides a mock function with given fields: +func (_m *AccessMetrics) ScriptExecutionResultMatch() { + _m.Called() +} + +// ScriptExecutionResultMismatch provides a mock function with given fields: +func (_m *AccessMetrics) ScriptExecutionResultMismatch() { + _m.Called() +} + // TotalConnectionsInPool provides a mock function with given fields: connectionCount, connectionPoolSize func (_m *AccessMetrics) TotalConnectionsInPool(connectionCount uint, connectionPoolSize uint) { _m.Called(connectionCount, connectionPoolSize) diff --git a/module/mock/backend_scripts_metrics.go b/module/mock/backend_scripts_metrics.go index 8a4eaf0ab33..60c3fe9e06d 100644 --- a/module/mock/backend_scripts_metrics.go +++ b/module/mock/backend_scripts_metrics.go @@ -18,6 +18,16 @@ func (_m *BackendScriptsMetrics) ScriptExecuted(dur time.Duration, size int) { _m.Called(dur, size) } +// ScriptExecutionErrorMatch provides a mock function with given fields: +func (_m *BackendScriptsMetrics) ScriptExecutionErrorMatch() { + _m.Called() +} + +// ScriptExecutionErrorMismatch provides a mock function with given fields: +func (_m *BackendScriptsMetrics) ScriptExecutionErrorMismatch() { + _m.Called() +} + // ScriptExecutionErrorOnArchiveNode provides a mock function with given fields: func (_m *BackendScriptsMetrics) ScriptExecutionErrorOnArchiveNode() { _m.Called() @@ -28,6 +38,16 @@ func (_m *BackendScriptsMetrics) ScriptExecutionErrorOnExecutionNode() { _m.Called() } +// ScriptExecutionResultMatch provides a mock function with given fields: +func (_m *BackendScriptsMetrics) ScriptExecutionResultMatch() { + _m.Called() +} + +// ScriptExecutionResultMismatch provides a mock function with given fields: +func (_m *BackendScriptsMetrics) ScriptExecutionResultMismatch() { + _m.Called() +} + type mockConstructorTestingTNewBackendScriptsMetrics interface { mock.TestingT Cleanup(func())