Skip to content

Commit

Permalink
Merge pull request onflow#6512 from AndriiDiachuk/add-time-to-seal-me…
Browse files Browse the repository at this point in the history
…trics

Add a Time To Seal metric to access node to track time it takes to seal a transaction
  • Loading branch information
franklywatson authored Oct 24, 2024
2 parents e5c4a1e + e316783 commit 0027ace
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 3 deletions.
14 changes: 14 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type AccessNodeConfig struct {
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
logTxTimeToSealed bool
retryEnabled bool
rpcMetricsEnabled bool
executionDataSyncEnabled bool
Expand Down Expand Up @@ -243,6 +244,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
logTxTimeToFinalized: false,
logTxTimeToExecuted: false,
logTxTimeToFinalizedExecuted: false,
logTxTimeToSealed: false,
pingEnabled: false,
retryEnabled: false,
rpcMetricsEnabled: false,
Expand Down Expand Up @@ -304,6 +306,7 @@ type FlowAccessNodeBuilder struct {
CollectionsToMarkFinalized *stdmap.Times
CollectionsToMarkExecuted *stdmap.Times
BlocksToMarkExecuted *stdmap.Times
BlockTransactions *stdmap.IdentifierMap
TransactionMetrics *metrics.TransactionCollector
TransactionValidationMetrics *metrics.TransactionValidationCollector
RestMetrics *metrics.RestCollector
Expand Down Expand Up @@ -1239,6 +1242,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"log-tx-time-to-finalized-executed",
defaultConfig.logTxTimeToFinalizedExecuted,
"log transaction time to finalized and executed")
flags.BoolVar(&builder.logTxTimeToSealed,
"log-tx-time-to-sealed",
defaultConfig.logTxTimeToSealed,
"log transaction time to sealed")
flags.BoolVar(&builder.pingEnabled,
"ping-enabled",
defaultConfig.pingEnabled,
Expand Down Expand Up @@ -1682,6 +1689,11 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return err
}

builder.BlockTransactions, err = stdmap.NewIdentifierMap(10000)
if err != nil {
return err
}

builder.BlocksToMarkExecuted, err = stdmap.NewTimes(1 * 300) // assume 1 block per second * 300 seconds

return err
Expand All @@ -1693,6 +1705,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.logTxTimeToFinalized,
builder.logTxTimeToExecuted,
builder.logTxTimeToFinalizedExecuted,
builder.logTxTimeToSealed,
)
return nil
}).
Expand Down Expand Up @@ -1727,6 +1740,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.BlocksToMarkExecuted,
builder.Storage.Collections,
builder.Storage.Blocks,
builder.BlockTransactions,
)
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type ObserverServiceConfig struct {
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
logTxTimeToSealed bool
executionDataSyncEnabled bool
executionDataIndexingEnabled bool
executionDataDBMode string
Expand Down Expand Up @@ -222,6 +223,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
logTxTimeToFinalized: false,
logTxTimeToExecuted: false,
logTxTimeToFinalizedExecuted: false,
logTxTimeToSealed: false,
executionDataSyncEnabled: false,
executionDataIndexingEnabled: false,
executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(),
Expand Down Expand Up @@ -658,6 +660,10 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"log-tx-time-to-finalized-executed",
defaultConfig.logTxTimeToFinalizedExecuted,
"log transaction time to finalized and executed")
flags.BoolVar(&builder.logTxTimeToSealed,
"log-tx-time-to-sealed",
defaultConfig.logTxTimeToSealed,
"log transaction time to sealed")
flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics")
flags.BoolVar(&builder.executionDataIndexingEnabled,
"execution-data-indexing-enabled",
Expand Down Expand Up @@ -1670,6 +1676,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.logTxTimeToFinalized,
builder.logTxTimeToExecuted,
builder.logTxTimeToFinalizedExecuted,
builder.logTxTimeToSealed,
)
return nil
})
Expand Down
9 changes: 9 additions & 0 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ func (suite *Suite) TestGetSealedTransaction() {
require.NoError(suite.T(), err)
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(suite.T(), err)

bnd, err := backend.New(backend.Params{State: suite.state,
CollectionRPC: suite.collClient,
Expand Down Expand Up @@ -672,6 +674,7 @@ func (suite *Suite) TestGetSealedTransaction() {
blocksToMarkExecuted,
collections,
all.Blocks,
blockTransactions,
)
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -820,6 +823,8 @@ func (suite *Suite) TestGetTransactionResult() {
require.NoError(suite.T(), err)
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(suite.T(), err)

bnd, err := backend.New(backend.Params{State: suite.state,
CollectionRPC: suite.collClient,
Expand Down Expand Up @@ -851,6 +856,7 @@ func (suite *Suite) TestGetTransactionResult() {
blocksToMarkExecuted,
collections,
all.Blocks,
blockTransactions,
)
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -1083,6 +1089,8 @@ func (suite *Suite) TestExecuteScript() {
require.NoError(suite.T(), err)
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(suite.T(), err)

collectionExecutedMetric, err := indexer.NewCollectionExecutedMetricImpl(
suite.log,
Expand All @@ -1092,6 +1100,7 @@ func (suite *Suite) TestExecuteScript() {
blocksToMarkExecuted,
collections,
all.Blocks,
blockTransactions,
)
require.NoError(suite.T(), err)

Expand Down
3 changes: 3 additions & 0 deletions engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func (s *Suite) SetupTest() {
require.NoError(s.T(), err)
blocksToMarkExecuted, err := stdmap.NewTimes(100)
require.NoError(s.T(), err)
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(s.T(), err)

s.proto.state.On("Identity").Return(s.obsIdentity, nil)
s.proto.state.On("Params").Return(s.proto.params)
Expand Down Expand Up @@ -177,6 +179,7 @@ func (s *Suite) SetupTest() {
blocksToMarkExecuted,
s.collections,
s.blocks,
blockTransactions,
)
require.NoError(s.T(), err)
}
Expand Down
1 change: 1 addition & 0 deletions model/flow/transaction_timing.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type TransactionTiming struct {
Received time.Time
Finalized time.Time
Executed time.Time
Sealed time.Time
}

func (t TransactionTiming) ID() Identifier {
Expand Down
4 changes: 4 additions & 0 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,10 @@ type TransactionMetrics interface {
// works if the transaction was earlier added as received.
TransactionFinalized(txID flow.Identifier, when time.Time)

// TransactionSealed reports the time spent between the transaction being received and sealed. Reporting only
// works if the transaction was earlier added as received.
TransactionSealed(txID flow.Identifier, when time.Time)

// TransactionExecuted reports the time spent between the transaction being received and executed. Reporting only
// works if the transaction was earlier added as received.
TransactionExecuted(txID flow.Identifier, when time.Time)
Expand Down
1 change: 1 addition & 0 deletions module/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (nc *NoopCollector) ScriptExecutionNotIndexed()
func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {}
func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionSealed(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {}
func (nc *NoopCollector) TransactionValidated() {}
Expand Down
53 changes: 53 additions & 0 deletions module/metrics/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ type TransactionCollector struct {
logTimeToFinalized bool
logTimeToExecuted bool
logTimeToFinalizedExecuted bool
logTimeToSealed bool
timeToFinalized prometheus.Summary
timeToExecuted prometheus.Summary
timeToFinalizedExecuted prometheus.Summary
timeToSealed prometheus.Summary
transactionSubmission *prometheus.CounterVec
transactionSize prometheus.Histogram
scriptExecutedDuration *prometheus.HistogramVec
Expand All @@ -40,6 +42,7 @@ func NewTransactionCollector(
logTimeToFinalized bool,
logTimeToExecuted bool,
logTimeToFinalizedExecuted bool,
logTimeToSealed bool,
) *TransactionCollector {

tc := &TransactionCollector{
Expand All @@ -48,6 +51,7 @@ func NewTransactionCollector(
logTimeToFinalized: logTimeToFinalized,
logTimeToExecuted: logTimeToExecuted,
logTimeToFinalizedExecuted: logTimeToFinalizedExecuted,
logTimeToSealed: logTimeToSealed,
timeToFinalized: promauto.NewSummary(prometheus.SummaryOpts{
Name: "time_to_finalized_seconds",
Namespace: namespaceAccess,
Expand Down Expand Up @@ -91,6 +95,20 @@ func NewTransactionCollector(
AgeBuckets: 5,
BufCap: 500,
}),
timeToSealed: promauto.NewSummary(prometheus.SummaryOpts{
Name: "time_to_seal_seconds",
Namespace: namespaceAccess,
Subsystem: subsystemTransactionTiming,
Help: "the duration of how long it took between the transaction was received until it was sealed",
Objectives: map[float64]float64{
0.01: 0.001,
0.5: 0.05,
0.99: 0.001,
},
MaxAge: 10 * time.Minute,
AgeBuckets: 5,
BufCap: 500,
}),
transactionSubmission: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "transaction_submission",
Namespace: namespaceAccess,
Expand Down Expand Up @@ -269,6 +287,27 @@ func (tc *TransactionCollector) TransactionExecuted(txID flow.Identifier, when t
}
}

func (tc *TransactionCollector) TransactionSealed(txID flow.Identifier, when time.Time) {
t, updated := tc.transactionTimings.Adjust(txID, func(t *flow.TransactionTiming) *flow.TransactionTiming {
t.Sealed = when
return t
})

if !updated {
tc.log.Debug().
Str("transaction_id", txID.String()).
Msg("failed to update TransactionSealed metric")
return
}

tc.trackTTS(t, tc.logTimeToSealed)

// remove transaction timing from mempool if sealed
if !t.Sealed.IsZero() {
tc.transactionTimings.Remove(txID)
}
}

func (tc *TransactionCollector) trackTTF(t *flow.TransactionTiming, log bool) {
if t.Received.IsZero() || t.Finalized.IsZero() {
return
Expand Down Expand Up @@ -317,6 +356,20 @@ func (tc *TransactionCollector) trackTTFE(t *flow.TransactionTiming, log bool) {
}
}

func (tc *TransactionCollector) trackTTS(t *flow.TransactionTiming, log bool) {
if t.Received.IsZero() || t.Sealed.IsZero() {
return
}
duration := t.Sealed.Sub(t.Received).Seconds()

tc.timeToSealed.Observe(duration)

if log {
tc.log.Info().Str("transaction_id", t.TransactionID.String()).Float64("duration", duration).
Msg("transaction time to sealed")
}
}

func (tc *TransactionCollector) TransactionSubmissionFailed() {
tc.transactionSubmission.WithLabelValues("failed").Inc()
}
Expand Down
5 changes: 5 additions & 0 deletions module/mock/access_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions module/mock/transaction_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 39 additions & 2 deletions module/state_synchronization/indexer/collection_executed_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type CollectionExecutedMetricImpl struct {

collections storage.Collections
blocks storage.Blocks

blockTransactions *stdmap.IdentifierMap // Map to track transactions for each block for sealed metrics
}

func NewCollectionExecutedMetricImpl(
Expand All @@ -35,6 +37,7 @@ func NewCollectionExecutedMetricImpl(
blocksToMarkExecuted *stdmap.Times,
collections storage.Collections,
blocks storage.Blocks,
blockTransactions *stdmap.IdentifierMap,
) (*CollectionExecutedMetricImpl, error) {
return &CollectionExecutedMetricImpl{
log: log,
Expand All @@ -44,16 +47,32 @@ func NewCollectionExecutedMetricImpl(
blocksToMarkExecuted: blocksToMarkExecuted,
collections: collections,
blocks: blocks,
blockTransactions: blockTransactions,
}, nil
}

// CollectionFinalized tracks collections to mark finalized
func (c *CollectionExecutedMetricImpl) CollectionFinalized(light flow.LightCollection) {
if ti, found := c.collectionsToMarkFinalized.ByID(light.ID()); found {
lightID := light.ID()
if ti, found := c.collectionsToMarkFinalized.ByID(lightID); found {

block, err := c.blocks.ByCollectionID(lightID)
if err != nil {
c.log.Warn().Err(err).Msg("could not find block by collection ID")
return
}
blockID := block.ID()

for _, t := range light.Transactions {
c.accessMetrics.TransactionFinalized(t, ti)

err = c.blockTransactions.Append(blockID, t)
if err != nil {
c.log.Warn().Err(err).Msg("could not append finalized tx to track sealed transactions")
continue
}
}
c.collectionsToMarkFinalized.Remove(light.ID())
c.collectionsToMarkFinalized.Remove(lightID)
}
}

Expand Down Expand Up @@ -88,6 +107,24 @@ func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block) {

for _, t := range l.Transactions {
c.accessMetrics.TransactionFinalized(t, now)
err = c.blockTransactions.Append(blockID, t)

if err != nil {
c.log.Warn().Err(err).Msg("could not append finalized tx to track sealed transactions")
continue
}
}
}

// Process block seals
for _, s := range block.Payload.Seals {
transactions, found := c.blockTransactions.Get(s.BlockID)

if found {
for _, t := range transactions {
c.accessMetrics.TransactionSealed(t, now)
}
c.blockTransactions.Remove(s.BlockID)
}
}

Expand Down
Loading

0 comments on commit 0027ace

Please sign in to comment.