Skip to content

Commit

Permalink
Merge branch 'master' into janez/version-boundary-to-cadence
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Oct 25, 2024
2 parents 2ccf370 + 8d5a3b3 commit d9d5ef4
Show file tree
Hide file tree
Showing 99 changed files with 3,988 additions and 1,591 deletions.
104 changes: 89 additions & 15 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/ingestion"
"github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/access/rest"
"github.com/onflow/flow-go/engine/access/rest/routes"
Expand All @@ -56,6 +57,7 @@ import (
"github.com/onflow/flow-go/engine/access/subscription"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/stop"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/common/version"
Expand Down Expand Up @@ -150,6 +152,7 @@ type AccessNodeConfig struct {
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
logTxTimeToSealed bool
retryEnabled bool
rpcMetricsEnabled bool
executionDataSyncEnabled bool
Expand All @@ -163,7 +166,6 @@ type AccessNodeConfig struct {
executionDataConfig edrequester.ExecutionDataConfig
PublicNetworkConfig PublicNetworkConfig
TxResultCacheSize uint
TxErrorMessagesCacheSize uint
executionDataIndexingEnabled bool
registersDBPath string
checkpointFile string
Expand All @@ -175,6 +177,7 @@ type AccessNodeConfig struct {
programCacheSize uint
checkPayerBalanceMode string
versionControlEnabled bool
storeTxResultErrorMessages bool
stopControlEnabled bool
registerDBPruneThreshold uint64
}
Expand Down Expand Up @@ -241,14 +244,14 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
logTxTimeToFinalized: false,
logTxTimeToExecuted: false,
logTxTimeToFinalizedExecuted: false,
logTxTimeToSealed: false,
pingEnabled: false,
retryEnabled: false,
rpcMetricsEnabled: false,
nodeInfoFile: "",
apiRatelimits: nil,
apiBurstlimits: nil,
TxResultCacheSize: 0,
TxErrorMessagesCacheSize: 1000,
PublicNetworkConfig: PublicNetworkConfig{
BindAddress: cmd.NotSet,
Metrics: metrics.NewNoopCollector(),
Expand Down Expand Up @@ -280,6 +283,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
programCacheSize: 0,
checkPayerBalanceMode: accessNode.Disabled.String(),
versionControlEnabled: true,
storeTxResultErrorMessages: false,
stopControlEnabled: false,
registerDBPruneThreshold: pruner.DefaultThreshold,
}
Expand All @@ -302,6 +306,7 @@ type FlowAccessNodeBuilder struct {
CollectionsToMarkFinalized *stdmap.Times
CollectionsToMarkExecuted *stdmap.Times
BlocksToMarkExecuted *stdmap.Times
BlockTransactions *stdmap.IdentifierMap
TransactionMetrics *metrics.TransactionCollector
TransactionValidationMetrics *metrics.TransactionValidationCollector
RestMetrics *metrics.RestCollector
Expand Down Expand Up @@ -351,6 +356,9 @@ type FlowAccessNodeBuilder struct {
stateStreamGrpcServer *grpcserver.GrpcServer

stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend

TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
}

func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
Expand Down Expand Up @@ -1234,14 +1242,17 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"log-tx-time-to-finalized-executed",
defaultConfig.logTxTimeToFinalizedExecuted,
"log transaction time to finalized and executed")
flags.BoolVar(&builder.logTxTimeToSealed,
"log-tx-time-to-sealed",
defaultConfig.logTxTimeToSealed,
"log transaction time to sealed")
flags.BoolVar(&builder.pingEnabled,
"ping-enabled",
defaultConfig.pingEnabled,
"whether to enable the ping process that pings all other peers and report the connectivity to metrics")
flags.BoolVar(&builder.retryEnabled, "retry-enabled", defaultConfig.retryEnabled, "whether to enable the retry mechanism at the access node level")
flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics")
flags.UintVar(&builder.TxResultCacheSize, "transaction-result-cache-size", defaultConfig.TxResultCacheSize, "transaction result cache size.(Disabled by default i.e 0)")
flags.UintVar(&builder.TxErrorMessagesCacheSize, "transaction-error-messages-cache-size", defaultConfig.TxErrorMessagesCacheSize, "transaction error messages cache size.(By default 1000)")
flags.StringVarP(&builder.nodeInfoFile,
"node-info-file",
"",
Expand Down Expand Up @@ -1375,7 +1386,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"tx-result-query-mode",
defaultConfig.rpcConf.BackendConfig.TxResultQueryMode,
"mode to use when querying transaction results. one of [local-only, execution-nodes-only(default), failover]")

flags.BoolVar(&builder.storeTxResultErrorMessages,
"store-tx-result-error-messages",
defaultConfig.storeTxResultErrorMessages,
"whether to enable storing transaction error messages into the db")
// Script Execution
flags.StringVar(&builder.rpcConf.BackendConfig.ScriptExecutionMode,
"script-execution-mode",
Expand Down Expand Up @@ -1488,9 +1502,6 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
return errors.New("circuit-breaker-restore-timeout must be greater than 0")
}
}
if builder.TxErrorMessagesCacheSize == 0 {
return errors.New("transaction-error-messages-cache-size must be greater than 0")
}

if builder.checkPayerBalanceMode != accessNode.Disabled.String() && !builder.executionDataIndexingEnabled {
return errors.New("execution-data-indexing-enabled must be set if check-payer-balance is enabled")
Expand Down Expand Up @@ -1603,7 +1614,8 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedBlockHeight storage.ConsumerProgress
var processedFinalizedBlockHeight storage.ConsumerProgress
var processedTxErrorMessagesBlockHeight storage.ConsumerProgress

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
Expand Down Expand Up @@ -1677,6 +1689,11 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return err
}

builder.BlockTransactions, err = stdmap.NewIdentifierMap(10000)
if err != nil {
return err
}

builder.BlocksToMarkExecuted, err = stdmap.NewTimes(1 * 300) // assume 1 block per second * 300 seconds

return err
Expand All @@ -1688,6 +1705,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.logTxTimeToFinalized,
builder.logTxTimeToExecuted,
builder.logTxTimeToFinalizedExecuted,
builder.logTxTimeToSealed,
)
return nil
}).
Expand Down Expand Up @@ -1722,6 +1740,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.BlocksToMarkExecuted,
builder.Storage.Collections,
builder.Storage.Blocks,
builder.BlockTransactions,
)
if err != nil {
return err
Expand Down Expand Up @@ -1799,8 +1818,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.TxResultsIndex = index.NewTransactionResultsIndex(builder.Reporter, builder.Storage.LightTransactionResults)
return nil
}).
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
processedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error {
processedFinalizedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
Expand All @@ -1817,6 +1836,13 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

return nil
}).
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
if builder.storeTxResultErrorMessages {
builder.Storage.TransactionResultErrorMessages = bstorage.NewTransactionResultErrorMessages(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
}

return nil
}).
Component("version control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.versionControlEnabled {
noop := &module.NoopReadyDoneAware{}
Expand Down Expand Up @@ -1944,7 +1970,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

}

nodeBackend, err := backend.New(backend.Params{
builder.nodeBackend, err = backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Expand All @@ -1954,6 +1980,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
Expand All @@ -1965,7 +1992,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
TxResultCacheSize: builder.TxResultCacheSize,
TxErrorMessagesCacheSize: builder.TxErrorMessagesCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
CheckPayerBalanceMode: checkPayerBalanceMode,
Expand Down Expand Up @@ -1997,8 +2023,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.AccessMetrics,
builder.rpcMetricsEnabled,
builder.Me,
nodeBackend,
nodeBackend,
builder.nodeBackend,
builder.nodeBackend,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
builder.stateStreamBackend,
Expand Down Expand Up @@ -2037,6 +2063,28 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("could not create requester engine: %w", err)
}

preferredENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

if builder.storeTxResultErrorMessages {
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
node.Logger,
node.State,
builder.nodeBackend,
node.Storage.Receipts,
node.Storage.TransactionResultErrorMessages,
preferredENIdentifiers,
fixedENIdentifiers,
)
}

builder.IngestEng, err = ingestion.New(
node.Logger,
node.EngineRegistry,
Expand All @@ -2050,8 +2098,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.Storage.Results,
node.Storage.Receipts,
builder.collectionExecutedMetric,
processedBlockHeight,
processedFinalizedBlockHeight,
lastFullBlockHeight,
builder.TxResultErrorMessagesCore,
)
if err != nil {
return nil, err
Expand All @@ -2069,6 +2118,31 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return builder.RequestEng, nil
})

if builder.storeTxResultErrorMessages {
builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
processedTxErrorMessagesBlockHeight = bstorage.NewConsumerProgress(
builder.DB,
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
)
return nil
})
builder.Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
engine, err := tx_error_messages.New(
node.Logger,
node.State,
node.Storage.Headers,
processedTxErrorMessagesBlockHeight,
builder.TxResultErrorMessagesCore,
)
if err != nil {
return nil, err
}
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)

return engine, nil
})
}

if builder.supportsObserver {
builder.Component("public sync request handler", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
syncRequestHandler, err := synceng.NewRequestHandlerEngine(
Expand Down
1 change: 1 addition & 0 deletions cmd/bootstrap/utils/key_generation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

sdk "github.com/onflow/flow-go-sdk"
sdkcrypto "github.com/onflow/flow-go-sdk/crypto"

"github.com/onflow/flow-go/fvm/systemcontracts"
"github.com/onflow/flow-go/model/bootstrap"
model "github.com/onflow/flow-go/model/bootstrap"
Expand Down
1 change: 1 addition & 0 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

client "github.com/onflow/flow-go-sdk/access/grpc"
sdkcrypto "github.com/onflow/flow-go-sdk/crypto"

"github.com/onflow/flow-go/admin/commands"
collectionCommands "github.com/onflow/flow-go/admin/commands/collection"
storageCommands "github.com/onflow/flow-go/admin/commands/storage"
Expand Down
1 change: 1 addition & 0 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

client "github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/consensus"
Expand Down
Loading

0 comments on commit d9d5ef4

Please sign in to comment.