Skip to content

Commit

Permalink
Merge branch 'master' into janez/fix-metering-invalidation
Browse files Browse the repository at this point in the history
  • Loading branch information
j1010001 authored Sep 12, 2024
2 parents b5da663 + df5769f commit bda6d0c
Show file tree
Hide file tree
Showing 36 changed files with 2,099 additions and 97 deletions.
17 changes: 13 additions & 4 deletions access/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,20 @@ func (v *TransactionValidator) checkExpiry(tx *flow.TransactionBody) error {
return nil
}

func (v *TransactionValidator) checkCanBeParsed(tx *flow.TransactionBody) error {
func (v *TransactionValidator) checkCanBeParsed(tx *flow.TransactionBody) (err error) {
defer func() {
if r := recover(); r != nil {
if panicErr, ok := r.(error); ok {
err = InvalidScriptError{ParserErr: panicErr}
} else {
err = InvalidScriptError{ParserErr: fmt.Errorf("non-error-typed panic: %v", r)}
}
}
}()
if v.options.CheckScriptsParse {
_, err := parser.ParseProgram(nil, tx.Script, parser.Config{})
if err != nil {
return InvalidScriptError{ParserErr: err}
_, parseErr := parser.ParseProgram(nil, tx.Script, parser.Config{})
if parseErr != nil {
return InvalidScriptError{ParserErr: parseErr}
}
}

Expand Down
32 changes: 32 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/onflow/flow-go/engine/access/subscription"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
"github.com/onflow/flow-go/engine/common/stop"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/common/version"
"github.com/onflow/flow-go/engine/execution/computation/query"
Expand Down Expand Up @@ -173,6 +174,7 @@ type AccessNodeConfig struct {
programCacheSize uint
checkPayerBalance bool
versionControlEnabled bool
stopControlEnabled bool
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -276,6 +278,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
programCacheSize: 0,
checkPayerBalance: false,
versionControlEnabled: true,
stopControlEnabled: false,
}
}

Expand Down Expand Up @@ -325,6 +328,7 @@ type FlowAccessNodeBuilder struct {
ExecutionDatastoreManager edstorage.DatastoreManager
ExecutionDataTracker tracker.Storage
VersionControl *version.VersionControl
StopControl *stop.StopControl

// The sync engine participants provider is the libp2p peer store for the access node
// which is not available until after the network has started.
Expand Down Expand Up @@ -994,6 +998,10 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil, err
}

if builder.stopControlEnabled {
builder.StopControl.RegisterHeightRecorder(builder.ExecutionIndexer)
}

return builder.ExecutionIndexer, nil
}, builder.IndexerDependencies)
}
Expand Down Expand Up @@ -1260,6 +1268,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"version-control-enabled",
defaultConfig.versionControlEnabled,
"whether to enable the version control feature. Default value is true")
flags.BoolVar(&builder.stopControlEnabled,
"stop-control-enabled",
defaultConfig.stopControlEnabled,
"whether to enable the stop control feature. Default value is false")
// ExecutionDataRequester config
flags.BoolVar(&builder.executionDataSyncEnabled,
"execution-data-sync-enabled",
Expand Down Expand Up @@ -1590,6 +1602,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.IndexerDependencies.Add(ingestionDependable)
versionControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(versionControlDependable)
stopControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(stopControlDependable)
var lastFullBlockHeight *counters.PersistentStrictMonotonicCounter

builder.
Expand Down Expand Up @@ -1824,6 +1838,24 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

return versionControl, nil
}).
Component("stop control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.stopControlEnabled {
noop := &module.NoopReadyDoneAware{}
stopControlDependable.Init(noop)
return noop, nil
}

stopControl := stop.NewStopControl(
builder.Logger,
)

builder.VersionControl.AddVersionUpdatesConsumer(stopControl.OnVersionUpdate)

builder.StopControl = stopControl
stopControlDependable.Init(builder.StopControl)

return stopControl, nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down
46 changes: 44 additions & 2 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/onflow/flow-go/engine/execution/checker"
"github.com/onflow/flow-go/engine/execution/computation"
"github.com/onflow/flow-go/engine/execution/computation/committer"
txmetrics "github.com/onflow/flow-go/engine/execution/computation/metrics"
"github.com/onflow/flow-go/engine/execution/ingestion"
"github.com/onflow/flow-go/engine/execution/ingestion/fetcher"
"github.com/onflow/flow-go/engine/execution/ingestion/loader"
Expand Down Expand Up @@ -127,7 +128,7 @@ type ExecutionNode struct {

ingestionUnit *engine.Unit

collector module.ExecutionMetrics
collector *metrics.ExecutionCollector
executionState state.ExecutionState
followerState protocol.FollowerState
committee hotstuff.DynamicCommittee
Expand Down Expand Up @@ -160,6 +161,7 @@ type ExecutionNode struct {
executionDataTracker tracker.Storage
blobService network.BlobService
blobserviceDependable *module.ProxiedReadyDoneAware
metricsProvider txmetrics.TransactionExecutionMetricsProvider
}

func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
Expand Down Expand Up @@ -228,6 +230,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
Component("block data upload manager", exeNode.LoadBlockUploaderManager).
Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader).
Component("S3 block data uploader", exeNode.LoadS3BlockDataUploader).
Component("transaction execution metrics", exeNode.LoadTransactionExecutionMetrics).
Component("provider engine", exeNode.LoadProviderEngine).
Component("checker engine", exeNode.LoadCheckerEngine).
Component("ingestion engine", exeNode.LoadIngestionEngine).
Expand Down Expand Up @@ -544,10 +547,27 @@ func (exeNode *ExecutionNode) LoadProviderEngine(

vmCtx := fvm.NewContext(opts...)

var collector module.ExecutionMetrics
collector = exeNode.collector
if exeNode.exeConf.transactionExecutionMetricsEnabled {
// inject the transaction execution metrics
collector = exeNode.collector.WithTransactionCallback(
func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) {
exeNode.metricsProvider.Collect(
info.BlockID,
info.BlockHeight,
txmetrics.TransactionExecutionMetrics{
TransactionID: info.TransactionID,
ExecutionTime: dur,
ExecutionEffortWeights: stats.ComputationIntensities,
})
})
}

ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer)
manager, err := computation.New(
node.Logger,
exeNode.collector,
collector,
node.Tracer,
node.Me,
node.State,
Expand Down Expand Up @@ -1048,6 +1068,9 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
// consistency of collection can be checked by checking hash, and hash comes from trusted source (blocks from consensus follower)
// hence we not need to check origin
requester.WithValidateStaking(false),
// we have observed execution nodes occasionally fail to retrieve collections using this engine, which can cause temporary execution halts
// setting a retry maximum of 10s results in a much faster recovery from these faults (default is 2m)
requester.WithRetryMaximum(10*time.Second),
)

if err != nil {
Expand Down Expand Up @@ -1127,6 +1150,24 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD
return exeNode.scriptsEng, nil
}

func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics(
node *NodeConfig,
) (module.ReadyDoneAware, error) {
lastFinalizedHeader := node.LastFinalizedHeader

metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider(
node.Logger,
exeNode.executionState,
node.Storage.Headers,
lastFinalizedHeader.Height,
exeNode.exeConf.transactionExecutionMetricsBufferSize,
)

node.ProtocolEvents.AddConsumer(metricsProvider)
exeNode.metricsProvider = metricsProvider
return metricsProvider, nil
}

func (exeNode *ExecutionNode) LoadConsensusCommittee(
node *NodeConfig,
) (
Expand Down Expand Up @@ -1328,6 +1369,7 @@ func (exeNode *ExecutionNode) LoadGrpcServer(
exeNode.results,
exeNode.txResults,
node.Storage.Commits,
exeNode.metricsProvider,
node.RootChainID,
signature.NewBlockSignerDecoder(exeNode.committee),
exeNode.exeConf.apiRatelimits,
Expand Down
62 changes: 33 additions & 29 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,37 @@ import (

// ExecutionConfig contains the configs for starting up execution nodes
type ExecutionConfig struct {
rpcConf rpc.Config
triedir string
executionDataDir string
registerDir string
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
chunkDataPackDir string
chunkDataPackCacheSize uint
chunkDataPackRequestsCacheSize uint32
requestInterval time.Duration
extensiveLog bool
pauseExecution bool
chunkDataPackQueryTimeout time.Duration
chunkDataPackDeliveryTimeout time.Duration
enableBlockDataUpload bool
gcpBucketName string
s3BucketName string
apiRatelimits map[string]int
apiBurstlimits map[string]int
executionDataAllowedPeers string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
blobstoreRateLimit int
blobstoreBurstLimit int
chunkDataPackRequestWorkers uint
maxGracefulStopDuration time.Duration
importCheckpointWorkerCount int
rpcConf rpc.Config
triedir string
executionDataDir string
registerDir string
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
chunkDataPackDir string
chunkDataPackCacheSize uint
chunkDataPackRequestsCacheSize uint32
requestInterval time.Duration
extensiveLog bool
pauseExecution bool
chunkDataPackQueryTimeout time.Duration
chunkDataPackDeliveryTimeout time.Duration
enableBlockDataUpload bool
gcpBucketName string
s3BucketName string
apiRatelimits map[string]int
apiBurstlimits map[string]int
executionDataAllowedPeers string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
blobstoreRateLimit int
blobstoreBurstLimit int
chunkDataPackRequestWorkers uint
maxGracefulStopDuration time.Duration
importCheckpointWorkerCount int
transactionExecutionMetricsEnabled bool
transactionExecutionMetricsBufferSize uint

// evm tracing configuration
evmTracingEnabled bool
Expand Down Expand Up @@ -122,6 +124,8 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.IntVar(&exeConf.blobstoreBurstLimit, "blobstore-burst-limit", 0, "outgoing burst limit for Execution Data blobstore")
flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing")
flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap")
flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics")
flags.UintVar(&exeConf.transactionExecutionMetricsBufferSize, "tx-execution-metrics-buffer-size", 200, "buffer size for transaction execution metrics. The buffer size is the number of blocks that are kept in memory by the metrics provider engine")
flags.BoolVar(&exeConf.evmTracingEnabled, "evm-tracing-enabled", false, "enable EVM tracing, when set it will generate traces and upload them to the GCP bucket provided by the --evm-traces-gcp-bucket. Warning: this might affect speed of execution")
flags.StringVar(&exeConf.evmTracesGCPBucket, "evm-traces-gcp-bucket", "", "define GCP bucket name used for uploading EVM traces, must be used in combination with --evm-tracing-enabled. if left empty the upload step is skipped")

Expand Down
33 changes: 33 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/stop"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/common/version"
"github.com/onflow/flow-go/engine/execution/computation/query"
Expand Down Expand Up @@ -164,6 +165,7 @@ type ObserverServiceConfig struct {
executionDataPruningInterval time.Duration
localServiceAPIEnabled bool
versionControlEnabled bool
stopControlEnabled bool
executionDataDir string
executionDataStartHeight uint64
executionDataConfig edrequester.ExecutionDataConfig
Expand Down Expand Up @@ -239,6 +241,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
executionDataPruningInterval: pruner.DefaultPruningInterval,
localServiceAPIEnabled: false,
versionControlEnabled: true,
stopControlEnabled: false,
executionDataDir: filepath.Join(homedir, ".flow", "execution_data"),
executionDataStartHeight: 0,
executionDataConfig: edrequester.ExecutionDataConfig{
Expand Down Expand Up @@ -280,6 +283,7 @@ type ObserverServiceBuilder struct {
TxResultsIndex *index.TransactionResultsIndex
IndexerDependencies *cmd.DependencyList
VersionControl *version.VersionControl
StopControl *stop.StopControl

ExecutionDataDownloader execution_data.Downloader
ExecutionDataRequester state_synchronization.ExecutionDataRequester
Expand Down Expand Up @@ -681,6 +685,10 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"version-control-enabled",
defaultConfig.versionControlEnabled,
"whether to enable the version control feature. Default value is true")
flags.BoolVar(&builder.stopControlEnabled,
"stop-control-enabled",
defaultConfig.stopControlEnabled,
"whether to enable the stop control feature. Default value is false")
flags.BoolVar(&builder.localServiceAPIEnabled, "local-service-api-enabled", defaultConfig.localServiceAPIEnabled, "whether to use local indexed data for api queries")
flags.StringVar(&builder.registersDBPath, "execution-state-dir", defaultConfig.registersDBPath, "directory to use for execution-state database")
flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file")
Expand Down Expand Up @@ -1523,6 +1531,10 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return nil, err
}

if builder.stopControlEnabled {
builder.StopControl.RegisterHeightRecorder(builder.ExecutionIndexer)
}

return builder.ExecutionIndexer, nil
}, builder.IndexerDependencies)
}
Expand Down Expand Up @@ -1826,6 +1838,8 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {

versionControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(versionControlDependable)
stopControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(stopControlDependable)

builder.Component("version control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.versionControlEnabled {
Expand Down Expand Up @@ -1859,6 +1873,25 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {

return versionControl, nil
})
builder.Component("stop control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.stopControlEnabled {
noop := &module.NoopReadyDoneAware{}
stopControlDependable.Init(noop)
return noop, nil
}

stopControl := stop.NewStopControl(
builder.Logger,
)

builder.VersionControl.AddVersionUpdatesConsumer(stopControl.OnVersionUpdate)

builder.StopControl = stopControl
stopControlDependable.Init(builder.StopControl)

return stopControl, nil
})

builder.Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
accessMetrics := builder.AccessMetrics
config := builder.rpcConf
Expand Down
Loading

0 comments on commit bda6d0c

Please sign in to comment.