Skip to content

Commit

Permalink
Merge pull request #4496 from Guitarheroua/guitarheroua/3129-access-a…
Browse files Browse the repository at this point in the history
…dd-circuit-breaker
  • Loading branch information
durkmurder authored Jul 28, 2023
2 parents 86d1724 + f66dd13 commit 2aca88e
Show file tree
Hide file tree
Showing 27 changed files with 1,032 additions and 287 deletions.
26 changes: 24 additions & 2 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
PreferredExecutionNodeIDs: nil,
FixedExecutionNodeIDs: nil,
ArchiveAddressList: nil,
CircuitBreakerConfig: rpcConnection.CircuitBreakerConfig{
Enabled: false,
RestoreTimeout: 60 * time.Second,
MaxFailures: 5,
MaxRequests: 1,
},
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
},
Expand Down Expand Up @@ -690,7 +696,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.StringToIntVar(&builder.apiBurstlimits, "api-burst-limits", defaultConfig.apiBurstlimits, "burst limits for Access API methods e.g. Ping=100,GetTransaction=100 etc.")
flags.BoolVar(&builder.supportsObserver, "supports-observer", defaultConfig.supportsObserver, "true if this staked access node supports observer or follower connections")
flags.StringVar(&builder.PublicNetworkConfig.BindAddress, "public-network-address", defaultConfig.PublicNetworkConfig.BindAddress, "staked access node's public network bind address")

flags.BoolVar(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled, "circuit-breaker-enabled", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled, "specifies whether the circuit breaker is enabled for collection and execution API clients.")
flags.DurationVar(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout, "circuit-breaker-restore-timeout", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout, "duration after which the circuit breaker will restore the connection to the client after closing it due to failures. Default value is 60s")
flags.Uint32Var(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures, "circuit-breaker-max-failures", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures, "maximum number of failed calls to the client that will cause the circuit breaker to close the connection. Default value is 5")
flags.Uint32Var(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests, "circuit-breaker-max-requests", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests, "maximum number of requests to check if connection restored after timeout. Default value is 1")
// ExecutionDataRequester config
flags.BoolVar(&builder.executionDataSyncEnabled, "execution-data-sync-enabled", defaultConfig.executionDataSyncEnabled, "whether to enable the execution data sync protocol")
flags.StringVar(&builder.executionDataDir, "execution-data-dir", defaultConfig.executionDataDir, "directory to use for Execution Data database")
Expand Down Expand Up @@ -754,6 +763,17 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
return errors.New("state-stream-response-limit must be greater than or equal to 0")
}
}
if builder.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled {
if builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures == 0 {
return errors.New("circuit-breaker-max-failures must be greater than 0")
}
if builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests == 0 {
return errors.New("circuit-breaker-max-requests must be greater than 0")
}
if builder.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout <= 0 {
return errors.New("circuit-breaker-restore-timeout must be greater than 0")
}
}

return nil
})
Expand Down Expand Up @@ -1065,6 +1085,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.Logger,
accessMetrics,
config.MaxMsgSize,
backendConfig.CircuitBreakerConfig,
),
}

Expand All @@ -1087,7 +1108,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
backendConfig.FixedExecutionNodeIDs,
node.Logger,
backend.DefaultSnapshotHistoryLimit,
backendConfig.ArchiveAddressList)
backendConfig.ArchiveAddressList,
backendConfig.CircuitBreakerConfig.Enabled)

engineBuilder, err := rpc.NewBuilder(
node.Logger,
Expand Down
4 changes: 3 additions & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
node.Logger,
accessMetrics,
config.MaxMsgSize,
backendConfig.CircuitBreakerConfig,
),
}

Expand All @@ -944,7 +945,8 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
backendConfig.FixedExecutionNodeIDs,
node.Logger,
backend.DefaultSnapshotHistoryLimit,
backendConfig.ArchiveAddressList)
backendConfig.ArchiveAddressList,
backendConfig.CircuitBreakerConfig.Enabled)

observerCollector := metrics.NewObserverCollector()
restHandler, err := restapiproxy.NewRestProxyHandler(
Expand Down
8 changes: 4 additions & 4 deletions cmd/util/cmd/execution-state-extract/export_report.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"EpochCounter": 0,
"PreviousStateCommitment": "1c9f9d343cb8d4610e0b2c1eb74d6ea2f2f8aef2d666281dc22870e3efaa607b",
"CurrentStateCommitment": "1c9f9d343cb8d4610e0b2c1eb74d6ea2f2f8aef2d666281dc22870e3efaa607b",
"ReportSucceeded": true
"EpochCounter": 0,
"PreviousStateCommitment": "1c9f9d343cb8d4610e0b2c1eb74d6ea2f2f8aef2d666281dc22870e3efaa607b",
"CurrentStateCommitment": "1c9f9d343cb8d4610e0b2c1eb74d6ea2f2f8aef2d666281dc22870e3efaa607b",
"ReportSucceeded": true
}
5 changes: 5 additions & 0 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (suite *Suite) RunTest(
suite.log,
backend.DefaultSnapshotHistoryLimit,
nil,
false,
)
handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, access.WithBlockSignerDecoder(suite.signerIndicesDecoder))
f(handler, db, all)
Expand Down Expand Up @@ -329,6 +330,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {
suite.log,
backend.DefaultSnapshotHistoryLimit,
nil,
false,
)

handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
Expand Down Expand Up @@ -655,6 +657,7 @@ func (suite *Suite) TestGetSealedTransaction() {
suite.log,
backend.DefaultSnapshotHistoryLimit,
nil,
false,
)

handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
Expand Down Expand Up @@ -794,6 +797,7 @@ func (suite *Suite) TestGetTransactionResult() {
suite.log,
backend.DefaultSnapshotHistoryLimit,
nil,
false,
)

handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
Expand Down Expand Up @@ -985,6 +989,7 @@ func (suite *Suite) TestExecuteScript() {
suite.log,
backend.DefaultSnapshotHistoryLimit,
nil,
false,
)

handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me)
Expand Down
3 changes: 2 additions & 1 deletion engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
nil,
suite.log,
0,
nil)
nil,
false)

// create rpc engine builder
rpcEngBuilder, err := rpc.NewBuilder(
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ func (suite *RestAPITestSuite) SetupTest() {
nil,
suite.log,
0,
nil)
nil,
false)

rpcEngBuilder, err := rpc.NewBuilder(
suite.log,
Expand Down
43 changes: 21 additions & 22 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (
"strconv"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

lru "github.com/hashicorp/golang-lru"
"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/cmd/build"
Expand All @@ -27,9 +26,6 @@ import (
accessproto "github.com/onflow/flow/protobuf/go/flow/access"
)

// maxExecutionNodesCnt is the max number of execution nodes that will be contacted to complete an execution api request
const maxExecutionNodesCnt = 3

// minExecutionNodesCnt is the minimum number of execution nodes expected to have sent the execution receipt for a block
const minExecutionNodesCnt = 2

Expand Down Expand Up @@ -84,13 +80,14 @@ type Backend struct {

// Config defines the configurable options for creating Backend
type Config struct {
ExecutionClientTimeout time.Duration // execution API GRPC client timeout
CollectionClientTimeout time.Duration // collection API GRPC client timeout
ConnectionPoolSize uint // size of the cache for storing collection and execution connections
MaxHeightRange uint // max size of height range requests
PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs
FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs
ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node
ExecutionClientTimeout time.Duration // execution API GRPC client timeout
CollectionClientTimeout time.Duration // collection API GRPC client timeout
ConnectionPoolSize uint // size of the cache for storing collection and execution connections
MaxHeightRange uint // max size of height range requests
PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs
FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs
ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node
CircuitBreakerConfig connection.CircuitBreakerConfig // the configuration for circuit breaker
}

func New(
Expand All @@ -113,6 +110,7 @@ func New(
log zerolog.Logger,
snapshotHistoryLimit int,
archiveAddressList []string,
circuitBreakerEnabled bool,
) *Backend {
retry := newRetry()
if retryEnabled {
Expand All @@ -133,6 +131,9 @@ func New(
archivePorts[idx] = port
}

// create node communicator, that will be used in sub-backend logic for interacting with API calls
nodeCommunicator := NewNodeCommunicator(circuitBreakerEnabled)

b := &Backend{
state: state,
// create the sub-backends
Expand All @@ -146,6 +147,7 @@ func New(
loggedScripts: loggedScripts,
archiveAddressList: archiveAddressList,
archivePorts: archivePorts,
nodeCommunicator: nodeCommunicator,
},
backendTransactions: backendTransactions{
staticCollectionRPC: collectionRPC,
Expand All @@ -161,6 +163,7 @@ func New(
connFactory: connFactory,
previousAccessNodes: historicalAccessNodes,
log: log,
nodeCommunicator: nodeCommunicator,
},
backendEvents: backendEvents{
state: state,
Expand All @@ -169,6 +172,7 @@ func New(
connFactory: connFactory,
log: log,
maxHeightRange: maxHeightRange,
nodeCommunicator: nodeCommunicator,
},
backendBlockHeaders: backendBlockHeaders{
headers: headers,
Expand All @@ -184,6 +188,7 @@ func New(
executionReceipts: executionReceipts,
connFactory: connFactory,
log: log,
nodeCommunicator: nodeCommunicator,
},
backendExecutionResults: backendExecutionResults{
executionResults: executionResults,
Expand Down Expand Up @@ -338,7 +343,7 @@ func (b *Backend) GetLatestProtocolStateSnapshot(_ context.Context) ([]byte, err
return convert.SnapshotToBytes(validSnapshot)
}

// executionNodesForBlockID returns upto maxExecutionNodesCnt number of randomly chosen execution node identities
// executionNodesForBlockID returns upto maxNodesCnt number of randomly chosen execution node identities
// which have executed the given block ID.
// If no such execution node is found, an InsufficientExecutionReceipts error is returned.
func executionNodesForBlockID(
Expand Down Expand Up @@ -409,17 +414,11 @@ func executionNodesForBlockID(
return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err)
}

// randomly choose upto maxExecutionNodesCnt identities
executionIdentitiesRandom, err := subsetENs.Sample(maxExecutionNodesCnt)
if err != nil {
return nil, fmt.Errorf("sampling failed: %w", err)
}

if len(executionIdentitiesRandom) == 0 {
if len(subsetENs) == 0 {
return nil, fmt.Errorf("no matching execution node found for block ID %v", blockID)
}

return executionIdentitiesRandom, nil
return subsetENs, nil
}

// findAllExecutionNodes find all the execution nodes ids from the execution receipts that have been received for the
Expand Down
60 changes: 31 additions & 29 deletions engine/access/rpc/backend/backend_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -25,6 +24,7 @@ type backendAccounts struct {
executionReceipts storage.ExecutionReceipts
connFactory connection.ConnectionFactory
log zerolog.Logger
nodeCommunicator *NodeCommunicator
}

func (b *backendAccounts) GetAccount(ctx context.Context, address flow.Address) (*flow.Account, error) {
Expand Down Expand Up @@ -109,34 +109,39 @@ func (b *backendAccounts) getAccountAtBlockID(
// other ENs are logged and swallowed. If all ENs fail to return a valid response, then an
// error aggregating all failures is returned.
func (b *backendAccounts) getAccountFromAnyExeNode(ctx context.Context, execNodes flow.IdentityList, req *execproto.GetAccountAtBlockIDRequest) (*execproto.GetAccountAtBlockIDResponse, error) {
var errors *multierror.Error
for _, execNode := range execNodes {
// TODO: use the GRPC Client interceptor
start := time.Now()

resp, err := b.tryGetAccount(ctx, execNode, req)
duration := time.Since(start)
if err == nil {
// return if any execution node replied successfully
b.log.Debug().
Str("execution_node", execNode.String()).
var resp *execproto.GetAccountAtBlockIDResponse
errToReturn := b.nodeCommunicator.CallAvailableNode(
execNodes,
func(node *flow.Identity) error {
var err error
// TODO: use the GRPC Client interceptor
start := time.Now()

resp, err = b.tryGetAccount(ctx, node, req)
duration := time.Since(start)
if err == nil {
// return if any execution node replied successfully
b.log.Debug().
Str("execution_node", node.String()).
Hex("block_id", req.GetBlockId()).
Hex("address", req.GetAddress()).
Int64("rtt_ms", duration.Milliseconds()).
Msg("Successfully got account info")
return nil
}
b.log.Error().
Str("execution_node", node.String()).
Hex("block_id", req.GetBlockId()).
Hex("address", req.GetAddress()).
Int64("rtt_ms", duration.Milliseconds()).
Msg("Successfully got account info")
return resp, nil
}
b.log.Error().
Str("execution_node", execNode.String()).
Hex("block_id", req.GetBlockId()).
Hex("address", req.GetAddress()).
Int64("rtt_ms", duration.Milliseconds()).
Err(err).
Msg("failed to execute GetAccount")
errors = multierror.Append(errors, err)
}

return nil, errors.ErrorOrNil()
Err(err).
Msg("failed to execute GetAccount")
return err
},
nil,
)

return resp, errToReturn
}

func (b *backendAccounts) tryGetAccount(ctx context.Context, execNode *flow.Identity, req *execproto.GetAccountAtBlockIDRequest) (*execproto.GetAccountAtBlockIDResponse, error) {
Expand All @@ -148,9 +153,6 @@ func (b *backendAccounts) tryGetAccount(ctx context.Context, execNode *flow.Iden

resp, err := execRPCClient.GetAccountAtBlockID(ctx, req)
if err != nil {
if status.Code(err) == codes.Unavailable {
b.connFactory.InvalidateExecutionAPIClient(execNode.Address)
}
return nil, err
}
return resp, nil
Expand Down
Loading

0 comments on commit 2aca88e

Please sign in to comment.