From 7dca39699bebcab5c7a6fec3c3ec4c01f075f31f Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Thu, 6 Jun 2024 13:33:13 -0400 Subject: [PATCH] Replace all chain namespaces with labels (#3053) --- api/metrics/prefix_gatherer.go | 38 ++- api/metrics/prefix_gatherer_test.go | 56 +++- chains/linearizable_vm.go | 3 - chains/manager.go | 310 ++++++++++++++---- database/meterdb/db.go | 16 +- database/meterdb/db_test.go | 4 +- node/node.go | 145 ++++++-- snow/consensus/snowman/consensus_test.go | 2 +- snow/context.go | 4 + snow/networking/benchlist/benchlist.go | 33 +- snow/networking/benchlist/benchlist_test.go | 4 + snow/networking/benchlist/manager.go | 23 +- snow/networking/benchlist/metrics.go | 36 -- snow/networking/handler/handler.go | 11 +- snow/networking/handler/handler_test.go | 7 + snow/networking/handler/health_test.go | 1 + snow/networking/handler/metrics.go | 22 +- snow/networking/router/chain_router.go | 5 +- .../networking/router/chain_router_metrics.go | 17 +- snow/networking/router/chain_router_test.go | 35 +- snow/networking/router/mock_router.go | 8 +- snow/networking/router/router.go | 3 +- snow/networking/router/traced_router.go | 6 +- snow/networking/sender/sender_test.go | 12 +- snow/networking/timeout/manager.go | 18 +- snow/networking/timeout/manager_test.go | 2 +- snow/networking/timeout/metrics.go | 93 ++---- snow/networking/tracker/resource_tracker.go | 29 +- snow/snowtest/snowtest.go | 3 +- tests/e2e/x/transfer/virtuous.go | 20 +- utils/resource/metrics.go | 27 +- utils/resource/usage.go | 2 +- utils/timer/adaptive_timeout_manager.go | 31 +- utils/timer/adaptive_timeout_manager_test.go | 3 +- vms/metervm/block_vm.go | 22 +- vms/metervm/vertex_vm.go | 22 +- vms/platformvm/vm_test.go | 4 +- vms/proposervm/batched_vm_test.go | 2 + vms/proposervm/block_test.go | 3 + vms/proposervm/config.go | 5 + vms/proposervm/post_fork_option_test.go | 2 + vms/proposervm/state_syncable_vm_test.go | 2 + vms/proposervm/vm.go | 18 +- vms/proposervm/vm_test.go | 12 + vms/rpcchainvm/vm_client.go | 17 +- vms/rpcchainvm/vm_server.go | 57 ++-- 46 files changed, 759 insertions(+), 436 deletions(-) delete mode 100644 snow/networking/benchlist/metrics.go diff --git a/api/metrics/prefix_gatherer.go b/api/metrics/prefix_gatherer.go index 1f0b78a24380..fae7adb26e84 100644 --- a/api/metrics/prefix_gatherer.go +++ b/api/metrics/prefix_gatherer.go @@ -4,8 +4,8 @@ package metrics import ( + "errors" "fmt" - "slices" "github.com/prometheus/client_golang/prometheus" "google.golang.org/protobuf/proto" @@ -15,7 +15,11 @@ import ( dto "github.com/prometheus/client_model/go" ) -var _ MultiGatherer = (*prefixGatherer)(nil) +var ( + _ MultiGatherer = (*prefixGatherer)(nil) + + errOverlappingNamespaces = errors.New("prefix could create overlapping namespaces") +) // NewPrefixGatherer returns a new MultiGatherer that merges metrics by adding a // prefix to their names. @@ -31,12 +35,14 @@ func (g *prefixGatherer) Register(prefix string, gatherer prometheus.Gatherer) e g.lock.Lock() defer g.lock.Unlock() - // TODO: Restrict prefixes to avoid potential conflicts - if slices.Contains(g.names, prefix) { - return fmt.Errorf("%w: %q", - errDuplicateGatherer, - prefix, - ) + for _, existingPrefix := range g.names { + if eitherIsPrefix(prefix, existingPrefix) { + return fmt.Errorf("%w: %q conflicts with %q", + errOverlappingNamespaces, + prefix, + existingPrefix, + ) + } } g.names = append(g.names, prefix) @@ -64,3 +70,19 @@ func (g *prefixedGatherer) Gather() ([]*dto.MetricFamily, error) { } return metricFamilies, err } + +// eitherIsPrefix returns true if either [a] is a prefix of [b] or [b] is a +// prefix of [a]. +// +// This function accounts for the usage of the namespace boundary, so "hello" is +// not considered a prefix of "helloworld". However, "hello" is considered a +// prefix of "hello_world". +func eitherIsPrefix(a, b string) bool { + if len(a) > len(b) { + a, b = b, a + } + return a == b[:len(a)] && // a is a prefix of b + (len(a) == 0 || // a is empty + len(a) == len(b) || // a is equal to b + b[len(a)] == metric.NamespaceSeparatorByte) // a ends at a namespace boundary of b +} diff --git a/api/metrics/prefix_gatherer_test.go b/api/metrics/prefix_gatherer_test.go index ba37540b01e3..ff2526e6742e 100644 --- a/api/metrics/prefix_gatherer_test.go +++ b/api/metrics/prefix_gatherer_test.go @@ -134,7 +134,7 @@ func TestPrefixGatherer_Register(t *testing.T) { prefixGatherer: firstPrefixGatherer(), prefix: firstPrefixedGatherer.prefix, gatherer: secondPrefixedGatherer.gatherer, - expectedErr: errDuplicateGatherer, + expectedErr: errOverlappingNamespaces, expectedPrefixGatherer: firstPrefixGatherer(), }, } @@ -148,3 +148,57 @@ func TestPrefixGatherer_Register(t *testing.T) { }) } } + +func TestEitherIsPrefix(t *testing.T) { + tests := []struct { + name string + a string + b string + expected bool + }{ + { + name: "empty strings", + a: "", + b: "", + expected: true, + }, + { + name: "an empty string", + a: "", + b: "hello", + expected: true, + }, + { + name: "same strings", + a: "x", + b: "x", + expected: true, + }, + { + name: "different strings", + a: "x", + b: "y", + expected: false, + }, + { + name: "splits namespace", + a: "hello", + b: "hello_world", + expected: true, + }, + { + name: "is prefix before separator", + a: "hello", + b: "helloworld", + expected: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + require.Equal(test.expected, eitherIsPrefix(test.a, test.b)) + require.Equal(test.expected, eitherIsPrefix(test.b, test.a)) + }) + } +} diff --git a/chains/linearizable_vm.go b/chains/linearizable_vm.go index 0521e418667f..e7e99b77cb93 100644 --- a/chains/linearizable_vm.go +++ b/chains/linearizable_vm.go @@ -6,7 +6,6 @@ package chains import ( "context" - "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" @@ -29,7 +28,6 @@ type initializeOnLinearizeVM struct { vmToInitialize common.VM vmToLinearize *linearizeOnInitializeVM - registerer metrics.MultiGatherer ctx *snow.Context db database.Database genesisBytes []byte @@ -42,7 +40,6 @@ type initializeOnLinearizeVM struct { func (vm *initializeOnLinearizeVM) Linearize(ctx context.Context, stopVertexID ids.ID) error { vm.vmToLinearize.stopVertexID = stopVertexID - vm.ctx.Metrics = vm.registerer return vm.vmToInitialize.Initialize( ctx, vm.ctx, diff --git a/chains/manager.go b/chains/manager.go index 8548954e1c5e..bdc6d0ef0180 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -13,7 +13,6 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "github.com/ava-labs/avalanchego/api/health" @@ -74,8 +73,19 @@ import ( ) const ( + ChainLabel = "chain" + defaultChannelSize = 1 initialQueueSize = 3 + + avalancheNamespace = constants.PlatformName + metric.NamespaceSeparator + "avalanche" + handlerNamespace = constants.PlatformName + metric.NamespaceSeparator + "handler" + meterchainvmNamespace = constants.PlatformName + metric.NamespaceSeparator + "meterchainvm" + meterdagvmNamespace = constants.PlatformName + metric.NamespaceSeparator + "meterdagvm" + proposervmNamespace = constants.PlatformName + metric.NamespaceSeparator + "proposervm" + p2pNamespace = constants.PlatformName + metric.NamespaceSeparator + "p2p" + snowmanNamespace = constants.PlatformName + metric.NamespaceSeparator + "snowman" + stakeNamespace = constants.PlatformName + metric.NamespaceSeparator + "stake" ) var ( @@ -207,7 +217,9 @@ type ManagerConfig struct { // ShutdownNodeFunc allows the chain manager to issue a request to shutdown the node ShutdownNodeFunc func(exitCode int) MeterVMEnabled bool // Should each VM be wrapped with a MeterVM - Metrics metrics.MultiGatherer + + Metrics metrics.MultiGatherer + MeterDBMetrics metrics.MultiGatherer FrontierPollFrequency time.Duration ConsensusAppConcurrency int @@ -259,10 +271,60 @@ type manager struct { // snowman++ related interface to allow validators retrieval validatorState validators.State + + avalancheGatherer metrics.MultiGatherer // chainID + handlerGatherer metrics.MultiGatherer // chainID + meterChainVMGatherer metrics.MultiGatherer // chainID + meterDAGVMGatherer metrics.MultiGatherer // chainID + proposervmGatherer metrics.MultiGatherer // chainID + p2pGatherer metrics.MultiGatherer // chainID + snowmanGatherer metrics.MultiGatherer // chainID + stakeGatherer metrics.MultiGatherer // chainID + vmGatherer map[ids.ID]metrics.MultiGatherer // vmID -> chainID } // New returns a new Manager -func New(config *ManagerConfig) Manager { +func New(config *ManagerConfig) (Manager, error) { + avalancheGatherer := metrics.NewLabelGatherer(ChainLabel) + if err := config.Metrics.Register(avalancheNamespace, avalancheGatherer); err != nil { + return nil, err + } + + handlerGatherer := metrics.NewLabelGatherer(ChainLabel) + if err := config.Metrics.Register(handlerNamespace, handlerGatherer); err != nil { + return nil, err + } + + meterChainVMGatherer := metrics.NewLabelGatherer(ChainLabel) + if err := config.Metrics.Register(meterchainvmNamespace, meterChainVMGatherer); err != nil { + return nil, err + } + + meterDAGVMGatherer := metrics.NewLabelGatherer(ChainLabel) + if err := config.Metrics.Register(meterdagvmNamespace, meterDAGVMGatherer); err != nil { + return nil, err + } + + proposervmGatherer := metrics.NewLabelGatherer(ChainLabel) + if err := config.Metrics.Register(proposervmNamespace, proposervmGatherer); err != nil { + return nil, err + } + + p2pGatherer := metrics.NewLabelGatherer(ChainLabel) + if err := config.Metrics.Register(p2pNamespace, p2pGatherer); err != nil { + return nil, err + } + + snowmanGatherer := metrics.NewLabelGatherer(ChainLabel) + if err := config.Metrics.Register(snowmanNamespace, snowmanGatherer); err != nil { + return nil, err + } + + stakeGatherer := metrics.NewLabelGatherer(ChainLabel) + if err := config.Metrics.Register(stakeNamespace, stakeGatherer); err != nil { + return nil, err + } + return &manager{ Aliaser: ids.NewAliaser(), ManagerConfig: *config, @@ -270,7 +332,17 @@ func New(config *ManagerConfig) Manager { chainsQueue: buffer.NewUnboundedBlockingDeque[ChainParameters](initialQueueSize), unblockChainCreatorCh: make(chan struct{}), chainCreatorShutdownCh: make(chan struct{}), - } + + avalancheGatherer: avalancheGatherer, + handlerGatherer: handlerGatherer, + meterChainVMGatherer: meterChainVMGatherer, + meterDAGVMGatherer: meterDAGVMGatherer, + proposervmGatherer: proposervmGatherer, + p2pGatherer: p2pGatherer, + snowmanGatherer: snowmanGatherer, + stakeGatherer: stakeGatherer, + vmGatherer: make(map[ids.ID]metrics.MultiGatherer), + }, nil } // QueueChainCreation queues a chain creation request @@ -419,16 +491,17 @@ func (m *manager) buildChain(chainParams ChainParameters, sb subnets.Subnet) (*c return nil, fmt.Errorf("error while creating chain's log %w", err) } - consensusMetrics := prometheus.NewRegistry() - chainNamespace := metric.AppendNamespace(constants.PlatformName, primaryAlias) - if err := m.Metrics.Register(chainNamespace, consensusMetrics); err != nil { - return nil, fmt.Errorf("error while registering chain's metrics %w", err) + snowmanMetrics, err := metrics.MakeAndRegister( + m.snowmanGatherer, + primaryAlias, + ) + if err != nil { + return nil, err } - vmMetrics := metrics.NewMultiGatherer() - vmNamespace := metric.AppendNamespace(chainNamespace, "vm") - if err := m.Metrics.Register(vmNamespace, vmMetrics); err != nil { - return nil, fmt.Errorf("error while registering vm's metrics %w", err) + vmMetrics, err := m.getOrMakeVMRegisterer(chainParams.VMID, primaryAlias) + if err != nil { + return nil, err } ctx := &snow.ConsensusContext{ @@ -454,10 +527,11 @@ func (m *manager) buildChain(chainParams ChainParameters, sb subnets.Subnet) (*c ValidatorState: m.validatorState, ChainDataDir: chainDataDir, }, + PrimaryAlias: primaryAlias, + Registerer: snowmanMetrics, BlockAcceptor: m.BlockAcceptorGroup, TxAcceptor: m.TxAcceptorGroup, VertexAcceptor: m.VertexAcceptorGroup, - Registerer: consensusMetrics, } // Get a factory for the vm we want to use on our chain @@ -551,10 +625,20 @@ func (m *manager) createAvalancheChain( State: snow.Initializing, }) - meterDB, err := meterdb.New("db", ctx.Registerer, m.DB) + primaryAlias := m.PrimaryAliasOrDefault(ctx.ChainID) + meterDBReg, err := metrics.MakeAndRegister( + m.MeterDBMetrics, + primaryAlias, + ) + if err != nil { + return nil, err + } + + meterDB, err := meterdb.New(meterDBReg, m.DB) if err != nil { return nil, err } + prefixDB := prefixdb.New(ctx.ChainID[:], meterDB) vmDB := prefixdb.New(VMDBPrefix, prefixDB) vertexDB := prefixdb.New(VertexDBPrefix, prefixDB) @@ -562,22 +646,19 @@ func (m *manager) createAvalancheChain( txBootstrappingDB := prefixdb.New(TxBootstrappingDBPrefix, prefixDB) blockBootstrappingDB := prefixdb.New(BlockBootstrappingDBPrefix, prefixDB) - // This converts the prefix for all the Avalanche consensus metrics from - // `avalanche_{chainID}_` into `avalanche_{chainID}_avalanche_` so that - // there are no conflicts when registering the Snowman consensus metrics. - avalancheConsensusMetrics := prometheus.NewRegistry() - primaryAlias := m.PrimaryAliasOrDefault(ctx.ChainID) - chainNamespace := metric.AppendNamespace(constants.PlatformName, primaryAlias) - avalancheDAGNamespace := metric.AppendNamespace(chainNamespace, "avalanche") - if err := m.Metrics.Register(avalancheDAGNamespace, avalancheConsensusMetrics); err != nil { - return nil, fmt.Errorf("error while registering DAG metrics %w", err) + avalancheMetrics, err := metrics.MakeAndRegister( + m.avalancheGatherer, + primaryAlias, + ) + if err != nil { + return nil, err } - vtxBlocker, err := queue.NewWithMissing(vertexBootstrappingDB, "vtx", avalancheConsensusMetrics) + vtxBlocker, err := queue.NewWithMissing(vertexBootstrappingDB, "vtx", avalancheMetrics) if err != nil { return nil, err } - txBlocker, err := queue.New(txBootstrappingDB, "tx", avalancheConsensusMetrics) + txBlocker, err := queue.New(txBootstrappingDB, "tx", avalancheMetrics) if err != nil { return nil, err } @@ -591,7 +672,7 @@ func (m *manager) createAvalancheChain( m.TimeoutManager, p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, sb, - avalancheConsensusMetrics, + avalancheMetrics, ) if err != nil { return nil, fmt.Errorf("couldn't initialize avalanche sender: %w", err) @@ -627,7 +708,15 @@ func (m *manager) createAvalancheChain( dagVM := vm if m.MeterVMEnabled { - dagVM = metervm.NewVertexVM(dagVM) + meterdagvmReg, err := metrics.MakeAndRegister( + m.meterDAGVMGatherer, + primaryAlias, + ) + if err != nil { + return nil, err + } + + dagVM = metervm.NewVertexVM(dagVM, meterdagvmReg) } if m.TracingEnabled { dagVM = tracedvm.NewVertexVM(dagVM, m.Tracer) @@ -645,17 +734,6 @@ func (m *manager) createAvalancheChain( }, ) - avalancheRegisterer := metrics.NewMultiGatherer() - snowmanRegisterer := metrics.NewMultiGatherer() - if err := ctx.Context.Metrics.Register("avalanche", avalancheRegisterer); err != nil { - return nil, err - } - if err := ctx.Context.Metrics.Register("", snowmanRegisterer); err != nil { - return nil, err - } - - ctx.Context.Metrics = avalancheRegisterer - // The channel through which a VM may send messages to the consensus engine // VM uses this channel to notify engine that a block is ready to be made msgChan := make(chan common.Message, defaultChannelSize) @@ -695,14 +773,20 @@ func (m *manager) createAvalancheChain( zap.Uint64("numHistoricalBlocks", numHistoricalBlocks), ) - chainAlias := m.PrimaryAliasOrDefault(ctx.ChainID) - // Note: this does not use [dagVM] to ensure we use the [vm]'s height index. untracedVMWrappedInsideProposerVM := NewLinearizeOnInitializeVM(vm) var vmWrappedInsideProposerVM block.ChainVM = untracedVMWrappedInsideProposerVM if m.TracingEnabled { - vmWrappedInsideProposerVM = tracedvm.NewBlockVM(vmWrappedInsideProposerVM, chainAlias, m.Tracer) + vmWrappedInsideProposerVM = tracedvm.NewBlockVM(vmWrappedInsideProposerVM, primaryAlias, m.Tracer) + } + + proposervmReg, err := metrics.MakeAndRegister( + m.proposervmGatherer, + primaryAlias, + ) + if err != nil { + return nil, err } // Note: vmWrappingProposerVM is the VM that the Snowman engines should be @@ -717,11 +801,20 @@ func (m *manager) createAvalancheChain( NumHistoricalBlocks: numHistoricalBlocks, StakingLeafSigner: m.StakingTLSSigner, StakingCertLeaf: m.StakingTLSCert, + Registerer: proposervmReg, }, ) if m.MeterVMEnabled { - vmWrappingProposerVM = metervm.NewBlockVM(vmWrappingProposerVM) + meterchainvmReg, err := metrics.MakeAndRegister( + m.meterChainVMGatherer, + primaryAlias, + ) + if err != nil { + return nil, err + } + + vmWrappingProposerVM = metervm.NewBlockVM(vmWrappingProposerVM, meterchainvmReg) } if m.TracingEnabled { vmWrappingProposerVM = tracedvm.NewBlockVM(vmWrappingProposerVM, "proposervm", m.Tracer) @@ -734,7 +827,6 @@ func (m *manager) createAvalancheChain( vmToInitialize: vmWrappingProposerVM, vmToLinearize: untracedVMWrappedInsideProposerVM, - registerer: snowmanRegisterer, ctx: ctx.Context, db: vmDB, genesisBytes: genesisData, @@ -756,16 +848,32 @@ func (m *manager) createAvalancheChain( sampleK = int(bootstrapWeight) } - connectedValidators, err := tracker.NewMeteredPeers(ctx.Registerer) + stakeReg, err := metrics.MakeAndRegister( + m.stakeGatherer, + primaryAlias, + ) + if err != nil { + return nil, err + } + + connectedValidators, err := tracker.NewMeteredPeers(stakeReg) if err != nil { return nil, fmt.Errorf("error creating peer tracker: %w", err) } vdrs.RegisterSetCallbackListener(ctx.SubnetID, connectedValidators) + p2pReg, err := metrics.MakeAndRegister( + m.p2pGatherer, + primaryAlias, + ) + if err != nil { + return nil, err + } + peerTracker, err := p2p.NewPeerTracker( ctx.Log, "peer_tracker", - ctx.Registerer, + p2pReg, set.Of(ctx.NodeID), nil, ) @@ -773,6 +881,14 @@ func (m *manager) createAvalancheChain( return nil, fmt.Errorf("error creating peer tracker: %w", err) } + handlerReg, err := metrics.MakeAndRegister( + m.handlerGatherer, + primaryAlias, + ) + if err != nil { + return nil, err + } + // Asynchronously passes messages from the network to the consensus engine h, err := handler.New( ctx, @@ -785,6 +901,7 @@ func (m *manager) createAvalancheChain( sb, connectedValidators, peerTracker, + handlerReg, ) if err != nil { return nil, fmt.Errorf("error initializing network handler: %w", err) @@ -867,7 +984,7 @@ func (m *manager) createAvalancheChain( ctx.Log, m.BootstrapMaxTimeGetAncestors, m.BootstrapAncestorsMaxContainersSent, - avalancheConsensusMetrics, + avalancheMetrics, ) if err != nil { return nil, fmt.Errorf("couldn't initialize avalanche base message handler: %w", err) @@ -899,7 +1016,7 @@ func (m *manager) createAvalancheChain( avalancheBootstrapper, err := avbootstrap.New( avalancheBootstrapperConfig, snowmanBootstrapper.Start, - avalancheConsensusMetrics, + avalancheMetrics, ) if err != nil { return nil, fmt.Errorf("error initializing avalanche bootstrapper: %w", err) @@ -923,12 +1040,12 @@ func (m *manager) createAvalancheChain( }) // Register health check for this chain - if err := m.Health.RegisterHealthCheck(chainAlias, h, ctx.SubnetID.String()); err != nil { - return nil, fmt.Errorf("couldn't add health check for chain %s: %w", chainAlias, err) + if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil { + return nil, fmt.Errorf("couldn't add health check for chain %s: %w", primaryAlias, err) } return &chain{ - Name: chainAlias, + Name: primaryAlias, Context: ctx, VM: dagVM, Handler: h, @@ -953,10 +1070,20 @@ func (m *manager) createSnowmanChain( State: snow.Initializing, }) - meterDB, err := meterdb.New("db", ctx.Registerer, m.DB) + primaryAlias := m.PrimaryAliasOrDefault(ctx.ChainID) + meterDBReg, err := metrics.MakeAndRegister( + m.MeterDBMetrics, + primaryAlias, + ) if err != nil { return nil, err } + + meterDB, err := meterdb.New(meterDBReg, m.DB) + if err != nil { + return nil, err + } + prefixDB := prefixdb.New(ctx.ChainID[:], meterDB) vmDB := prefixdb.New(VMDBPrefix, prefixDB) bootstrappingDB := prefixdb.New(ChainBootstrappingDBPrefix, prefixDB) @@ -1049,9 +1176,16 @@ func (m *manager) createSnowmanChain( zap.Uint64("numHistoricalBlocks", numHistoricalBlocks), ) - chainAlias := m.PrimaryAliasOrDefault(ctx.ChainID) if m.TracingEnabled { - vm = tracedvm.NewBlockVM(vm, chainAlias, m.Tracer) + vm = tracedvm.NewBlockVM(vm, primaryAlias, m.Tracer) + } + + proposervmReg, err := metrics.MakeAndRegister( + m.proposervmGatherer, + primaryAlias, + ) + if err != nil { + return nil, err } vm = proposervm.New( @@ -1064,11 +1198,20 @@ func (m *manager) createSnowmanChain( NumHistoricalBlocks: numHistoricalBlocks, StakingLeafSigner: m.StakingTLSSigner, StakingCertLeaf: m.StakingTLSCert, + Registerer: proposervmReg, }, ) if m.MeterVMEnabled { - vm = metervm.NewBlockVM(vm) + meterchainvmReg, err := metrics.MakeAndRegister( + m.meterChainVMGatherer, + primaryAlias, + ) + if err != nil { + return nil, err + } + + vm = metervm.NewBlockVM(vm, meterchainvmReg) } if m.TracingEnabled { vm = tracedvm.NewBlockVM(vm, "proposervm", m.Tracer) @@ -1103,16 +1246,32 @@ func (m *manager) createSnowmanChain( sampleK = int(bootstrapWeight) } - connectedValidators, err := tracker.NewMeteredPeers(ctx.Registerer) + stakeReg, err := metrics.MakeAndRegister( + m.stakeGatherer, + primaryAlias, + ) + if err != nil { + return nil, err + } + + connectedValidators, err := tracker.NewMeteredPeers(stakeReg) if err != nil { return nil, fmt.Errorf("error creating peer tracker: %w", err) } vdrs.RegisterSetCallbackListener(ctx.SubnetID, connectedValidators) + p2pReg, err := metrics.MakeAndRegister( + m.p2pGatherer, + primaryAlias, + ) + if err != nil { + return nil, err + } + peerTracker, err := p2p.NewPeerTracker( ctx.Log, "peer_tracker", - ctx.Registerer, + p2pReg, set.Of(ctx.NodeID), nil, ) @@ -1120,6 +1279,14 @@ func (m *manager) createSnowmanChain( return nil, fmt.Errorf("error creating peer tracker: %w", err) } + handlerReg, err := metrics.MakeAndRegister( + m.handlerGatherer, + primaryAlias, + ) + if err != nil { + return nil, err + } + // Asynchronously passes messages from the network to the consensus engine h, err := handler.New( ctx, @@ -1132,6 +1299,7 @@ func (m *manager) createSnowmanChain( sb, connectedValidators, peerTracker, + handlerReg, ) if err != nil { return nil, fmt.Errorf("couldn't initialize message handler: %w", err) @@ -1244,12 +1412,12 @@ func (m *manager) createSnowmanChain( }) // Register health checks - if err := m.Health.RegisterHealthCheck(chainAlias, h, ctx.SubnetID.String()); err != nil { - return nil, fmt.Errorf("couldn't add health check for chain %s: %w", chainAlias, err) + if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil { + return nil, fmt.Errorf("couldn't add health check for chain %s: %w", primaryAlias, err) } return &chain{ - Name: chainAlias, + Name: primaryAlias, Context: ctx, VM: vm, Handler: h, @@ -1390,3 +1558,27 @@ func (m *manager) getChainConfig(id ids.ID) (ChainConfig, error) { return ChainConfig{}, nil } + +func (m *manager) getOrMakeVMRegisterer(vmID ids.ID, chainAlias string) (metrics.MultiGatherer, error) { + vmGatherer, ok := m.vmGatherer[vmID] + if !ok { + vmName := constants.VMName(vmID) + vmNamespace := metric.AppendNamespace(constants.PlatformName, vmName) + vmGatherer = metrics.NewLabelGatherer(ChainLabel) + err := m.Metrics.Register( + vmNamespace, + vmGatherer, + ) + if err != nil { + return nil, err + } + m.vmGatherer[vmID] = vmGatherer + } + + chainReg := metrics.NewPrefixGatherer() + err := vmGatherer.Register( + chainAlias, + chainReg, + ) + return chainReg, err +} diff --git a/database/meterdb/db.go b/database/meterdb/db.go index 5f9ef51df168..af41746b32e4 100644 --- a/database/meterdb/db.go +++ b/database/meterdb/db.go @@ -98,7 +98,6 @@ type Database struct { // New returns a new database with added metrics func New( - namespace string, reg prometheus.Registerer, db database.Database, ) (*Database, error) { @@ -106,25 +105,22 @@ func New( db: db, calls: prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Name: "calls", - Help: "number of calls to the database", + Name: "calls", + Help: "number of calls to the database", }, methodLabels, ), duration: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: namespace, - Name: "duration", - Help: "time spent in database calls (ns)", + Name: "duration", + Help: "time spent in database calls (ns)", }, methodLabels, ), size: prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Name: "size", - Help: "size of data passed in database calls", + Name: "size", + Help: "size of data passed in database calls", }, methodLabels, ), diff --git a/database/meterdb/db_test.go b/database/meterdb/db_test.go index 48a8966b2772..57cedc181043 100644 --- a/database/meterdb/db_test.go +++ b/database/meterdb/db_test.go @@ -18,7 +18,7 @@ func TestInterface(t *testing.T) { for name, test := range database.Tests { t.Run(name, func(t *testing.T) { baseDB := memdb.New() - db, err := New("", prometheus.NewRegistry(), baseDB) + db, err := New(prometheus.NewRegistry(), baseDB) require.NoError(t, err) test(t, db) @@ -28,7 +28,7 @@ func TestInterface(t *testing.T) { func newDB(t testing.TB) database.Database { baseDB := memdb.New() - db, err := New("", prometheus.NewRegistry(), baseDB) + db, err := New(prometheus.NewRegistry(), baseDB) require.NoError(t, err) return db } diff --git a/node/node.go b/node/node.go index 847979b9b63a..09fb05d06e86 100644 --- a/node/node.go +++ b/node/node.go @@ -91,9 +91,17 @@ const ( ipResolutionTimeout = 30 * time.Second - apiNamespace = constants.PlatformName + metric.NamespaceSeparator + "api" - dbNamespace = constants.PlatformName + metric.NamespaceSeparator + "db_internal" - networkNamespace = constants.PlatformName + metric.NamespaceSeparator + "network" + apiNamespace = constants.PlatformName + metric.NamespaceSeparator + "api" + benchlistNamespace = constants.PlatformName + metric.NamespaceSeparator + "benchlist" + dbNamespace = constants.PlatformName + metric.NamespaceSeparator + "db" + healthNamespace = constants.PlatformName + metric.NamespaceSeparator + "health" + meterDBNamespace = constants.PlatformName + metric.NamespaceSeparator + "meterdb" + networkNamespace = constants.PlatformName + metric.NamespaceSeparator + "network" + processNamespace = constants.PlatformName + metric.NamespaceSeparator + "process" + requestsNamespace = constants.PlatformName + metric.NamespaceSeparator + "requests" + resourceTrackerNamespace = constants.PlatformName + metric.NamespaceSeparator + "resource_tracker" + responsesNamespace = constants.PlatformName + metric.NamespaceSeparator + "responses" + systemResourcesNamespace = constants.PlatformName + metric.NamespaceSeparator + "system_resources" ) var ( @@ -165,7 +173,10 @@ func New( return nil, fmt.Errorf("couldn't initialize tracer: %w", err) } - n.initMetrics() + if err := n.initMetrics(); err != nil { + return nil, fmt.Errorf("couldn't initialize metrics: %w", err) + } + n.initNAT() if err := n.initAPIServer(); err != nil { // Start the API Server return nil, fmt.Errorf("couldn't initialize API server: %w", err) @@ -213,7 +224,7 @@ func New( logger.Warn("sybil control is not enforced") n.vdrs = newOverriddenManager(constants.PrimaryNetworkID, n.vdrs) } - if err := n.initResourceManager(n.MetricsRegisterer); err != nil { + if err := n.initResourceManager(); err != nil { return nil, fmt.Errorf("problem initializing resource manager: %w", err) } n.initCPUTargeter(&config.CPUTargeterConfig) @@ -363,8 +374,8 @@ type Node struct { DoneShuttingDown sync.WaitGroup // Metrics Registerer - MetricsRegisterer *prometheus.Registry - MetricsGatherer metrics.MultiGatherer + MetricsGatherer metrics.MultiGatherer + MeterDBMetricsGatherer metrics.MultiGatherer VMAliaser ids.Aliaser VMManager vms.Manager @@ -531,6 +542,16 @@ func (n *Node) initNetworking(reg prometheus.Registerer) error { // Configure benchlist n.Config.BenchlistConfig.Validators = n.vdrs n.Config.BenchlistConfig.Benchable = n.chainRouter + n.Config.BenchlistConfig.BenchlistRegisterer = metrics.NewLabelGatherer(chains.ChainLabel) + + err = n.MetricsGatherer.Register( + benchlistNamespace, + n.Config.BenchlistConfig.BenchlistRegisterer, + ) + if err != nil { + return err + } + n.benchlistManager = benchlist.NewManager(&n.Config.BenchlistConfig) n.uptimeCalculator = uptime.NewLockedCalculator() @@ -770,7 +791,15 @@ func (n *Node) initDatabase() error { n.DB = versiondb.New(n.DB) } - n.DB, err = meterdb.New("db", n.MetricsRegisterer, n.DB) + meterDBReg, err := metrics.MakeAndRegister( + n.MeterDBMetricsGatherer, + "all", + ) + if err != nil { + return err + } + + n.DB, err = meterdb.New(meterDBReg, n.DB) if err != nil { return err } @@ -891,9 +920,13 @@ func (n *Node) initChains(genesisBytes []byte) error { return n.chainManager.StartChainCreator(platformChain) } -func (n *Node) initMetrics() { - n.MetricsRegisterer = prometheus.NewRegistry() - n.MetricsGatherer = metrics.NewMultiGatherer() +func (n *Node) initMetrics() error { + n.MetricsGatherer = metrics.NewPrefixGatherer() + n.MeterDBMetricsGatherer = metrics.NewLabelGatherer(chains.ChainLabel) + return n.MetricsGatherer.Register( + meterDBNamespace, + n.MeterDBMetricsGatherer, + ) } func (n *Node) initNAT() { @@ -1043,11 +1076,27 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { cChainID, ) + requestsReg, err := metrics.MakeAndRegister( + n.MetricsGatherer, + requestsNamespace, + ) + if err != nil { + return err + } + + responseReg, err := metrics.MakeAndRegister( + n.MetricsGatherer, + responsesNamespace, + ) + if err != nil { + return err + } + n.timeoutManager, err = timeout.NewManager( &n.Config.AdaptiveTimeoutConfig, n.benchlistManager, - "requests", - n.MetricsRegisterer, + requestsReg, + responseReg, ) if err != nil { return err @@ -1065,8 +1114,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { n.Config.TrackedSubnets, n.Shutdown, n.Config.RouterHealthConfig, - "requests", - n.MetricsRegisterer, + requestsReg, ) if err != nil { return fmt.Errorf("couldn't initialize chain router: %w", err) @@ -1076,7 +1124,8 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { if err != nil { return fmt.Errorf("failed to initialize subnets: %w", err) } - n.chainManager = chains.New( + + n.chainManager, err = chains.New( &chains.ManagerConfig{ SybilProtectionEnabled: n.Config.SybilProtectionEnabled, StakingTLSSigner: n.StakingTLSSigner, @@ -1108,6 +1157,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { ShutdownNodeFunc: n.Shutdown, MeterVMEnabled: n.Config.MeterVMEnabled, Metrics: n.MetricsGatherer, + MeterDBMetrics: n.MeterDBMetricsGatherer, SubnetConfigs: n.Config.SubnetConfigs, ChainConfigs: n.Config.ChainConfigs, FrontierPollFrequency: n.Config.FrontierPollFrequency, @@ -1125,6 +1175,9 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { Subnets: subnets, }, ) + if err != nil { + return err + } // Notify the API server when new chains are created n.chainManager.AddRegistrant(n.APIServer) @@ -1246,19 +1299,23 @@ func (n *Node) initMetricsAPI() error { return nil } - if err := n.MetricsGatherer.Register(constants.PlatformName, n.MetricsRegisterer); err != nil { + processReg, err := metrics.MakeAndRegister( + n.MetricsGatherer, + processNamespace, + ) + if err != nil { return err } // Current state of process metrics. processCollector := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}) - if err := n.MetricsRegisterer.Register(processCollector); err != nil { + if err := processReg.Register(processCollector); err != nil { return err } // Go process metrics using debug.GCStats. goCollector := collectors.NewGoCollector() - if err := n.MetricsRegisterer.Register(goCollector); err != nil { + if err := processReg.Register(goCollector); err != nil { return err } @@ -1375,11 +1432,18 @@ func (n *Node) initInfoAPI() error { // initHealthAPI initializes the Health API service // Assumes n.Log, n.Net, n.APIServer, n.HTTPLog already initialized func (n *Node) initHealthAPI() error { - healthChecker, err := health.New(n.Log, n.MetricsRegisterer) + healthReg, err := metrics.MakeAndRegister( + n.MetricsGatherer, + healthNamespace, + ) + if err != nil { + return err + } + + n.health, err = health.New(n.Log, healthReg) if err != nil { return err } - n.health = healthChecker if !n.Config.HealthAPIEnabled { n.Log.Info("skipping health API initialization because it has been disabled") @@ -1387,18 +1451,18 @@ func (n *Node) initHealthAPI() error { } n.Log.Info("initializing Health API") - err = healthChecker.RegisterHealthCheck("network", n.Net, health.ApplicationTag) + err = n.health.RegisterHealthCheck("network", n.Net, health.ApplicationTag) if err != nil { return fmt.Errorf("couldn't register network health check: %w", err) } - err = healthChecker.RegisterHealthCheck("router", n.chainRouter, health.ApplicationTag) + err = n.health.RegisterHealthCheck("router", n.chainRouter, health.ApplicationTag) if err != nil { return fmt.Errorf("couldn't register router health check: %w", err) } // TODO: add database health to liveness check - err = healthChecker.RegisterHealthCheck("database", n.DB, health.ApplicationTag) + err = n.health.RegisterHealthCheck("database", n.DB, health.ApplicationTag) if err != nil { return fmt.Errorf("couldn't register database health check: %w", err) } @@ -1430,7 +1494,7 @@ func (n *Node) initHealthAPI() error { return fmt.Errorf("couldn't register resource health check: %w", err) } - handler, err := health.NewGetAndPostHandler(n.Log, healthChecker) + handler, err := health.NewGetAndPostHandler(n.Log, n.health) if err != nil { return err } @@ -1445,7 +1509,7 @@ func (n *Node) initHealthAPI() error { } err = n.APIServer.AddRoute( - health.NewGetHandler(healthChecker.Readiness), + health.NewGetHandler(n.health.Readiness), "health", "/readiness", ) @@ -1454,7 +1518,7 @@ func (n *Node) initHealthAPI() error { } err = n.APIServer.AddRoute( - health.NewGetHandler(healthChecker.Health), + health.NewGetHandler(n.health.Health), "health", "/health", ) @@ -1463,7 +1527,7 @@ func (n *Node) initHealthAPI() error { } return n.APIServer.AddRoute( - health.NewGetHandler(healthChecker.Liveness), + health.NewGetHandler(n.health.Liveness), "health", "/liveness", ) @@ -1513,14 +1577,21 @@ func (n *Node) initAPIAliases(genesisBytes []byte) error { } // Initialize [n.resourceManager]. -func (n *Node) initResourceManager(reg prometheus.Registerer) error { +func (n *Node) initResourceManager() error { + systemResourcesRegisterer, err := metrics.MakeAndRegister( + n.MetricsGatherer, + systemResourcesNamespace, + ) + if err != nil { + return err + } resourceManager, err := resource.NewManager( n.Log, n.Config.DatabaseConfig.Path, n.Config.SystemTrackerFrequency, n.Config.SystemTrackerCPUHalflife, n.Config.SystemTrackerDiskHalflife, - reg, + systemResourcesRegisterer, ) if err != nil { return err @@ -1528,7 +1599,19 @@ func (n *Node) initResourceManager(reg prometheus.Registerer) error { n.resourceManager = resourceManager n.resourceManager.TrackProcess(os.Getpid()) - n.resourceTracker, err = tracker.NewResourceTracker(reg, n.resourceManager, &meter.ContinuousFactory{}, n.Config.SystemTrackerProcessingHalflife) + resourceTrackerRegisterer, err := metrics.MakeAndRegister( + n.MetricsGatherer, + resourceTrackerNamespace, + ) + if err != nil { + return err + } + n.resourceTracker, err = tracker.NewResourceTracker( + resourceTrackerRegisterer, + n.resourceManager, + &meter.ContinuousFactory{}, + n.Config.SystemTrackerProcessingHalflife, + ) return err } diff --git a/snow/consensus/snowman/consensus_test.go b/snow/consensus/snowman/consensus_test.go index b4cb5b03a494..bb51790b76f7 100644 --- a/snow/consensus/snowman/consensus_test.go +++ b/snow/consensus/snowman/consensus_test.go @@ -1369,7 +1369,7 @@ func ErrorOnAddDecidedBlockTest(t *testing.T, factory Factory) { require.ErrorIs(err, errUnknownParentBlock) } -func gatherCounterGauge(t *testing.T, reg *prometheus.Registry) map[string]float64 { +func gatherCounterGauge(t *testing.T, reg prometheus.Gatherer) map[string]float64 { ms, err := reg.Gather() require.NoError(t, err) mss := make(map[string]float64) diff --git a/snow/context.go b/snow/context.go index 2fa501571890..26fc67f213a8 100644 --- a/snow/context.go +++ b/snow/context.go @@ -65,6 +65,10 @@ type Registerer interface { type ConsensusContext struct { *Context + // PrimaryAlias is the primary alias of the chain this context exists + // within. + PrimaryAlias string + // Registers all consensus metrics. Registerer Registerer diff --git a/snow/networking/benchlist/benchlist.go b/snow/networking/benchlist/benchlist.go index 2bf68e049864..453395379435 100644 --- a/snow/networking/benchlist/benchlist.go +++ b/snow/networking/benchlist/benchlist.go @@ -9,11 +9,13 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/heap" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/utils/timer/mockable" @@ -50,8 +52,9 @@ type failureStreak struct { type benchlist struct { lock sync.RWMutex // Context of the chain this is the benchlist for - ctx *snow.ConsensusContext - metrics metrics + ctx *snow.ConsensusContext + + numBenched, weightBenched prometheus.Gauge // Used to notify the timer that it should recalculate when it should fire resetTimer chan struct{} @@ -99,13 +102,22 @@ func NewBenchlist( minimumFailingDuration, duration time.Duration, maxPortion float64, + reg prometheus.Registerer, ) (Benchlist, error) { if maxPortion < 0 || maxPortion >= 1 { return nil, fmt.Errorf("max portion of benched stake must be in [0,1) but got %f", maxPortion) } benchlist := &benchlist{ - ctx: ctx, + ctx: ctx, + numBenched: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "benched_num", + Help: "Number of currently benched validators", + }), + weightBenched: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "benched_weight", + Help: "Weight of currently benched validators", + }), resetTimer: make(chan struct{}, 1), failureStreaks: make(map[ids.NodeID]failureStreak), benchlistSet: set.Set[ids.NodeID]{}, @@ -117,7 +129,12 @@ func NewBenchlist( duration: duration, maxPortion: maxPortion, } - if err := benchlist.metrics.Initialize(ctx.Registerer); err != nil { + + err := utils.Err( + reg.Register(benchlist.numBenched), + reg.Register(benchlist.weightBenched), + ) + if err != nil { return nil, err } @@ -188,7 +205,7 @@ func (b *benchlist) removedExpiredNodes() { b.benchable.Unbenched(b.ctx.ChainID, nodeID) } - b.metrics.numBenched.Set(float64(b.benchedHeap.Len())) + b.numBenched.Set(float64(b.benchedHeap.Len())) benchedStake, err := b.vdrs.SubsetWeight(b.ctx.SubnetID, b.benchlistSet) if err != nil { b.ctx.Log.Error("error calculating benched stake", @@ -197,7 +214,7 @@ func (b *benchlist) removedExpiredNodes() { ) return } - b.metrics.weightBenched.Set(float64(benchedStake)) + b.weightBenched.Set(float64(benchedStake)) } func (b *benchlist) durationToSleep() time.Duration { @@ -338,6 +355,6 @@ func (b *benchlist) bench(nodeID ids.NodeID) { } // Update metrics - b.metrics.numBenched.Set(float64(b.benchedHeap.Len())) - b.metrics.weightBenched.Set(float64(newBenchedStake)) + b.numBenched.Set(float64(b.benchedHeap.Len())) + b.weightBenched.Set(float64(newBenchedStake)) } diff --git a/snow/networking/benchlist/benchlist_test.go b/snow/networking/benchlist/benchlist_test.go index 45568392297e..3a52be818f75 100644 --- a/snow/networking/benchlist/benchlist_test.go +++ b/snow/networking/benchlist/benchlist_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/ids" @@ -49,6 +50,7 @@ func TestBenchlistAdd(t *testing.T) { minimumFailingDuration, duration, maxPortion, + prometheus.NewRegistry(), ) require.NoError(err) b := benchIntf.(*benchlist) @@ -173,6 +175,7 @@ func TestBenchlistMaxStake(t *testing.T) { minimumFailingDuration, duration, maxPortion, + prometheus.NewRegistry(), ) require.NoError(err) b := benchIntf.(*benchlist) @@ -295,6 +298,7 @@ func TestBenchlistRemove(t *testing.T) { minimumFailingDuration, duration, maxPortion, + prometheus.NewRegistry(), ) require.NoError(err) b := benchIntf.(*benchlist) diff --git a/snow/networking/benchlist/manager.go b/snow/networking/benchlist/manager.go index e6ac45da4400..e19c54410447 100644 --- a/snow/networking/benchlist/manager.go +++ b/snow/networking/benchlist/manager.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/validators" @@ -39,12 +40,13 @@ type Manager interface { // Config defines the configuration for a benchlist type Config struct { - Benchable Benchable `json:"-"` - Validators validators.Manager `json:"-"` - Threshold int `json:"threshold"` - MinimumFailingDuration time.Duration `json:"minimumFailingDuration"` - Duration time.Duration `json:"duration"` - MaxPortion float64 `json:"maxPortion"` + Benchable Benchable `json:"-"` + Validators validators.Manager `json:"-"` + BenchlistRegisterer metrics.MultiGatherer `json:"-"` + Threshold int `json:"threshold"` + MinimumFailingDuration time.Duration `json:"minimumFailingDuration"` + Duration time.Duration `json:"duration"` + MaxPortion float64 `json:"maxPortion"` } type manager struct { @@ -108,6 +110,14 @@ func (m *manager) RegisterChain(ctx *snow.ConsensusContext) error { return nil } + reg, err := metrics.MakeAndRegister( + m.config.BenchlistRegisterer, + ctx.PrimaryAlias, + ) + if err != nil { + return err + } + benchlist, err := NewBenchlist( ctx, m.config.Benchable, @@ -116,6 +126,7 @@ func (m *manager) RegisterChain(ctx *snow.ConsensusContext) error { m.config.MinimumFailingDuration, m.config.Duration, m.config.MaxPortion, + reg, ) if err != nil { return err diff --git a/snow/networking/benchlist/metrics.go b/snow/networking/benchlist/metrics.go deleted file mode 100644 index 25f9e50f7da8..000000000000 --- a/snow/networking/benchlist/metrics.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package benchlist - -import ( - "fmt" - - "github.com/prometheus/client_golang/prometheus" -) - -type metrics struct { - numBenched, weightBenched prometheus.Gauge -} - -func (m *metrics) Initialize(registerer prometheus.Registerer) error { - m.numBenched = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "benchlist", - Name: "benched_num", - Help: "Number of currently benched validators", - }) - if err := registerer.Register(m.numBenched); err != nil { - return fmt.Errorf("failed to register num benched statistics due to %w", err) - } - - m.weightBenched = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "benchlist", - Name: "benched_weight", - Help: "Weight of currently benched validators", - }) - if err := registerer.Register(m.weightBenched); err != nil { - return fmt.Errorf("failed to register weight benched statistics due to %w", err) - } - - return nil -} diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 9388d2d66be6..1eb42ca0dcdc 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -140,6 +140,7 @@ func New( subnet subnets.Subnet, peerTracker commontracker.Peers, p2pTracker *p2p.PeerTracker, + reg prometheus.Registerer, ) (Handler, error) { h := &handler{ ctx: ctx, @@ -160,7 +161,7 @@ func New( var err error - h.metrics, err = newMetrics("handler", h.ctx.Registerer) + h.metrics, err = newMetrics(reg) if err != nil { return nil, fmt.Errorf("initializing handler metrics errored with: %w", err) } @@ -170,8 +171,8 @@ func New( h.ctx.SubnetID, h.validators, cpuTracker, - "handler", - h.ctx.Registerer, + "sync", + reg, ) if err != nil { return nil, fmt.Errorf("initializing sync message queue errored with: %w", err) @@ -181,8 +182,8 @@ func New( h.ctx.SubnetID, h.validators, cpuTracker, - "handler_async", - h.ctx.Registerer, + "async", + reg, ) if err != nil { return nil, fmt.Errorf("initializing async message queue errored with: %w", err) diff --git a/snow/networking/handler/handler_test.go b/snow/networking/handler/handler_test.go index e8ab5f85ebb1..cb24040643f3 100644 --- a/snow/networking/handler/handler_test.go +++ b/snow/networking/handler/handler_test.go @@ -77,6 +77,7 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), peerTracker, + prometheus.NewRegistry(), ) require.NoError(err) handler := handlerIntf.(*handler) @@ -183,6 +184,7 @@ func TestHandlerClosesOnError(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), peerTracker, + prometheus.NewRegistry(), ) require.NoError(err) handler := handlerIntf.(*handler) @@ -285,6 +287,7 @@ func TestHandlerDropsGossipDuringBootstrapping(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), peerTracker, + prometheus.NewRegistry(), ) require.NoError(err) handler := handlerIntf.(*handler) @@ -375,6 +378,7 @@ func TestHandlerDispatchInternal(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), peerTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -460,6 +464,7 @@ func TestHandlerSubnetConnector(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), peerTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -641,6 +646,7 @@ func TestDynamicEngineTypeDispatch(t *testing.T) { subnets.New(ids.EmptyNodeID, subnets.Config{}), commontracker.NewPeers(), peerTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -723,6 +729,7 @@ func TestHandlerStartError(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), peerTracker, + prometheus.NewRegistry(), ) require.NoError(err) diff --git a/snow/networking/handler/health_test.go b/snow/networking/handler/health_test.go index 163332735ea2..789d3464187e 100644 --- a/snow/networking/handler/health_test.go +++ b/snow/networking/handler/health_test.go @@ -93,6 +93,7 @@ func TestHealthCheckSubnet(t *testing.T) { sb, peerTracker, p2pTracker, + prometheus.NewRegistry(), ) require.NoError(err) diff --git a/snow/networking/handler/metrics.go b/snow/networking/handler/metrics.go index 9cd6c9ec4096..f3a21149f26c 100644 --- a/snow/networking/handler/metrics.go +++ b/snow/networking/handler/metrics.go @@ -16,36 +16,32 @@ type metrics struct { messageHandlingTime *prometheus.GaugeVec // op } -func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) { +func newMetrics(reg prometheus.Registerer) (*metrics, error) { m := &metrics{ expired: prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Name: "expired", - Help: "messages dropped because the deadline expired", + Name: "expired", + Help: "messages dropped because the deadline expired", }, opLabels, ), messages: prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Name: "messages", - Help: "messages handled", + Name: "messages", + Help: "messages handled", }, opLabels, ), messageHandlingTime: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: namespace, - Name: "message_handling_time", - Help: "time spent handling messages", + Name: "message_handling_time", + Help: "time spent handling messages", }, opLabels, ), lockingTime: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "locking_time", - Help: "time spent acquiring the context lock", + Name: "locking_time", + Help: "time spent acquiring the context lock", }), } return m, utils.Err( diff --git a/snow/networking/router/chain_router.go b/snow/networking/router/chain_router.go index 8d471fb768c6..27bf891ab4f9 100644 --- a/snow/networking/router/chain_router.go +++ b/snow/networking/router/chain_router.go @@ -101,8 +101,7 @@ func (cr *ChainRouter) Initialize( trackedSubnets set.Set[ids.ID], onFatal func(exitCode int), healthConfig HealthConfig, - metricsNamespace string, - metricsRegisterer prometheus.Registerer, + reg prometheus.Registerer, ) error { cr.log = log cr.chainHandlers = make(map[ids.ID]handler.Handler) @@ -126,7 +125,7 @@ func (cr *ChainRouter) Initialize( cr.peers[nodeID] = myself // Register metrics - rMetrics, err := newRouterMetrics(metricsNamespace, metricsRegisterer) + rMetrics, err := newRouterMetrics(reg) if err != nil { return err } diff --git a/snow/networking/router/chain_router_metrics.go b/snow/networking/router/chain_router_metrics.go index bc8f26223586..8855acc5ccdf 100644 --- a/snow/networking/router/chain_router_metrics.go +++ b/snow/networking/router/chain_router_metrics.go @@ -16,27 +16,24 @@ type routerMetrics struct { droppedRequests prometheus.Counter } -func newRouterMetrics(namespace string, registerer prometheus.Registerer) (*routerMetrics, error) { +func newRouterMetrics(registerer prometheus.Registerer) (*routerMetrics, error) { rMetrics := &routerMetrics{} rMetrics.outstandingRequests = prometheus.NewGauge( prometheus.GaugeOpts{ - Namespace: namespace, - Name: "outstanding", - Help: "Number of outstanding requests (all types)", + Name: "outstanding", + Help: "Number of outstanding requests (all types)", }, ) rMetrics.longestRunningRequest = prometheus.NewGauge( prometheus.GaugeOpts{ - Namespace: namespace, - Name: "longest_running", - Help: "Time (in ns) the longest request took", + Name: "longest_running", + Help: "Time (in ns) the longest request took", }, ) rMetrics.droppedRequests = prometheus.NewCounter( prometheus.CounterOpts{ - Namespace: namespace, - Name: "dropped", - Help: "Number of dropped requests (all types)", + Name: "dropped", + Help: "Number of dropped requests (all types)", }, ) diff --git a/snow/networking/router/chain_router_test.go b/snow/networking/router/chain_router_test.go index c17360f02486..19b889cd2d94 100644 --- a/snow/networking/router/chain_router_test.go +++ b/snow/networking/router/chain_router_test.go @@ -61,7 +61,7 @@ func TestShutdown(t *testing.T) { TimeoutHalflife: 5 * time.Minute, }, benchlist, - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -80,7 +80,6 @@ func TestShutdown(t *testing.T) { set.Set[ids.ID]{}, nil, HealthConfig{}, - "", prometheus.NewRegistry(), )) @@ -114,6 +113,7 @@ func TestShutdown(t *testing.T) { subnets.New(chainCtx.NodeID, subnets.Config{}), commontracker.NewPeers(), p2pTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -200,7 +200,6 @@ func TestShutdownTimesOut(t *testing.T) { vdrs := validators.NewManager() require.NoError(vdrs.AddStaker(ctx.SubnetID, ids.GenerateTestNodeID(), nil, ids.Empty, 1)) benchlist := benchlist.NewNoBenchlist() - metrics := prometheus.NewRegistry() // Ensure that the Ancestors request does not timeout tm, err := timeout.NewManager( &timer.AdaptiveTimeoutConfig{ @@ -211,8 +210,8 @@ func TestShutdownTimesOut(t *testing.T) { TimeoutHalflife: 5 * time.Minute, }, benchlist, - "", - metrics, + prometheus.NewRegistry(), + prometheus.NewRegistry(), ) require.NoError(err) @@ -231,8 +230,7 @@ func TestShutdownTimesOut(t *testing.T) { set.Set[ids.ID]{}, nil, HealthConfig{}, - "", - metrics, + prometheus.NewRegistry(), )) resourceTracker, err := tracker.NewResourceTracker( @@ -263,6 +261,7 @@ func TestShutdownTimesOut(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), p2pTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -360,7 +359,7 @@ func TestRouterTimeout(t *testing.T) { TimeoutHalflife: 5 * time.Minute, }, benchlist.NewNoBenchlist(), - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -380,7 +379,6 @@ func TestRouterTimeout(t *testing.T) { set.Set[ids.ID]{}, nil, HealthConfig{}, - "", prometheus.NewRegistry(), )) defer chainRouter.Shutdown(context.Background()) @@ -433,6 +431,7 @@ func TestRouterTimeout(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), p2pTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -729,7 +728,7 @@ func TestRouterHonorsRequestedEngine(t *testing.T) { TimeoutHalflife: 5 * time.Minute, }, benchlist.NewNoBenchlist(), - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -749,7 +748,6 @@ func TestRouterHonorsRequestedEngine(t *testing.T) { set.Set[ids.ID]{}, nil, HealthConfig{}, - "", prometheus.NewRegistry(), )) defer chainRouter.Shutdown(context.Background()) @@ -954,7 +952,7 @@ func TestValidatorOnlyMessageDrops(t *testing.T) { TimeoutHalflife: 5 * time.Minute, }, benchlist.NewNoBenchlist(), - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -974,7 +972,6 @@ func TestValidatorOnlyMessageDrops(t *testing.T) { set.Set[ids.ID]{}, nil, HealthConfig{}, - "", prometheus.NewRegistry(), )) defer chainRouter.Shutdown(context.Background()) @@ -1017,6 +1014,7 @@ func TestValidatorOnlyMessageDrops(t *testing.T) { sb, commontracker.NewPeers(), p2pTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -1115,7 +1113,7 @@ func TestConnectedSubnet(t *testing.T) { TimeoutHalflife: 5 * time.Minute, }, benchlist.NewNoBenchlist(), - "timeoutManager", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -1140,7 +1138,6 @@ func TestConnectedSubnet(t *testing.T) { trackedSubnets, nil, HealthConfig{}, - "", prometheus.NewRegistry(), )) @@ -1232,7 +1229,7 @@ func TestValidatorOnlyAllowedNodeMessageDrops(t *testing.T) { TimeoutHalflife: 5 * time.Minute, }, benchlist.NewNoBenchlist(), - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -1252,7 +1249,6 @@ func TestValidatorOnlyAllowedNodeMessageDrops(t *testing.T) { set.Set[ids.ID]{}, nil, HealthConfig{}, - "", prometheus.NewRegistry(), )) defer chainRouter.Shutdown(context.Background()) @@ -1299,6 +1295,7 @@ func TestValidatorOnlyAllowedNodeMessageDrops(t *testing.T) { sb, commontracker.NewPeers(), p2pTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -1582,7 +1579,7 @@ func newChainRouterTest(t *testing.T) (*ChainRouter, *common.EngineTest) { TimeoutHalflife: 5 * time.Minute, }, benchlist.NewNoBenchlist(), - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(t, err) @@ -1601,7 +1598,6 @@ func newChainRouterTest(t *testing.T) (*ChainRouter, *common.EngineTest) { set.Set[ids.ID]{}, nil, HealthConfig{}, - "", prometheus.NewRegistry(), )) @@ -1639,6 +1635,7 @@ func newChainRouterTest(t *testing.T) (*ChainRouter, *common.EngineTest) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), p2pTracker, + prometheus.NewRegistry(), ) require.NoError(t, err) diff --git a/snow/networking/router/mock_router.go b/snow/networking/router/mock_router.go index c9146a777138..548b32110775 100644 --- a/snow/networking/router/mock_router.go +++ b/snow/networking/router/mock_router.go @@ -125,17 +125,17 @@ func (mr *MockRouterMockRecorder) HealthCheck(arg0 any) *gomock.Call { } // Initialize mocks base method. -func (m *MockRouter) Initialize(nodeID ids.NodeID, log logging.Logger, timeouts timeout.Manager, shutdownTimeout time.Duration, criticalChains set.Set[ids.ID], sybilProtectionEnabled bool, trackedSubnets set.Set[ids.ID], onFatal func(int), healthConfig HealthConfig, metricsNamespace string, metricsRegisterer prometheus.Registerer) error { +func (m *MockRouter) Initialize(nodeID ids.NodeID, log logging.Logger, timeouts timeout.Manager, shutdownTimeout time.Duration, criticalChains set.Set[ids.ID], sybilProtectionEnabled bool, trackedSubnets set.Set[ids.ID], onFatal func(int), healthConfig HealthConfig, reg prometheus.Registerer) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Initialize", nodeID, log, timeouts, shutdownTimeout, criticalChains, sybilProtectionEnabled, trackedSubnets, onFatal, healthConfig, metricsNamespace, metricsRegisterer) + ret := m.ctrl.Call(m, "Initialize", nodeID, log, timeouts, shutdownTimeout, criticalChains, sybilProtectionEnabled, trackedSubnets, onFatal, healthConfig, reg) ret0, _ := ret[0].(error) return ret0 } // Initialize indicates an expected call of Initialize. -func (mr *MockRouterMockRecorder) Initialize(nodeID, log, timeouts, shutdownTimeout, criticalChains, sybilProtectionEnabled, trackedSubnets, onFatal, healthConfig, metricsNamespace, metricsRegisterer any) *gomock.Call { +func (mr *MockRouterMockRecorder) Initialize(nodeID, log, timeouts, shutdownTimeout, criticalChains, sybilProtectionEnabled, trackedSubnets, onFatal, healthConfig, reg any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Initialize", reflect.TypeOf((*MockRouter)(nil).Initialize), nodeID, log, timeouts, shutdownTimeout, criticalChains, sybilProtectionEnabled, trackedSubnets, onFatal, healthConfig, metricsNamespace, metricsRegisterer) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Initialize", reflect.TypeOf((*MockRouter)(nil).Initialize), nodeID, log, timeouts, shutdownTimeout, criticalChains, sybilProtectionEnabled, trackedSubnets, onFatal, healthConfig, reg) } // RegisterRequest mocks base method. diff --git a/snow/networking/router/router.go b/snow/networking/router/router.go index 4df5614c25fb..ef4765cb0965 100644 --- a/snow/networking/router/router.go +++ b/snow/networking/router/router.go @@ -36,8 +36,7 @@ type Router interface { trackedSubnets set.Set[ids.ID], onFatal func(exitCode int), healthConfig HealthConfig, - metricsNamespace string, - metricsRegisterer prometheus.Registerer, + reg prometheus.Registerer, ) error Shutdown(context.Context) AddChain(ctx context.Context, chain handler.Handler) diff --git a/snow/networking/router/traced_router.go b/snow/networking/router/traced_router.go index 4c52bce0827a..cbd2b6ed1205 100644 --- a/snow/networking/router/traced_router.go +++ b/snow/networking/router/traced_router.go @@ -47,8 +47,7 @@ func (r *tracedRouter) Initialize( trackedSubnets set.Set[ids.ID], onFatal func(exitCode int), healthConfig HealthConfig, - metricsNamespace string, - metricsRegisterer prometheus.Registerer, + reg prometheus.Registerer, ) error { return r.router.Initialize( nodeID, @@ -60,8 +59,7 @@ func (r *tracedRouter) Initialize( trackedSubnets, onFatal, healthConfig, - metricsNamespace, - metricsRegisterer, + reg, ) } diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index 9453f43e4faa..34f138f6db21 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -58,7 +58,7 @@ func TestTimeout(t *testing.T) { TimeoutCoefficient: 1.25, }, benchlist, - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -85,7 +85,6 @@ func TestTimeout(t *testing.T) { set.Set[ids.ID]{}, nil, router.HealthConfig{}, - "", prometheus.NewRegistry(), )) @@ -133,6 +132,7 @@ func TestTimeout(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), p2pTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -334,7 +334,7 @@ func TestReliableMessages(t *testing.T) { TimeoutCoefficient: 1.25, }, benchlist, - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -362,7 +362,6 @@ func TestReliableMessages(t *testing.T) { set.Set[ids.ID]{}, nil, router.HealthConfig{}, - "", prometheus.NewRegistry(), )) @@ -410,6 +409,7 @@ func TestReliableMessages(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), p2pTracker, + prometheus.NewRegistry(), ) require.NoError(err) @@ -491,7 +491,7 @@ func TestReliableMessagesToMyself(t *testing.T) { TimeoutCoefficient: 1.25, }, benchlist, - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -519,7 +519,6 @@ func TestReliableMessagesToMyself(t *testing.T) { set.Set[ids.ID]{}, nil, router.HealthConfig{}, - "", prometheus.NewRegistry(), )) @@ -567,6 +566,7 @@ func TestReliableMessagesToMyself(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), commontracker.NewPeers(), p2pTracker, + prometheus.NewRegistry(), ) require.NoError(err) diff --git a/snow/networking/timeout/manager.go b/snow/networking/timeout/manager.go index 89a7cc56d869..573dbe712bc5 100644 --- a/snow/networking/timeout/manager.go +++ b/snow/networking/timeout/manager.go @@ -71,27 +71,33 @@ type Manager interface { func NewManager( timeoutConfig *timer.AdaptiveTimeoutConfig, benchlistMgr benchlist.Manager, - metricsNamespace string, - metricsRegister prometheus.Registerer, + requestReg prometheus.Registerer, + responseReg prometheus.Registerer, ) (Manager, error) { tm, err := timer.NewAdaptiveTimeoutManager( timeoutConfig, - metricsNamespace, - metricsRegister, + requestReg, ) if err != nil { return nil, fmt.Errorf("couldn't create timeout manager: %w", err) } + + m, err := newTimeoutMetrics(responseReg) + if err != nil { + return nil, fmt.Errorf("couldn't create timeout metrics: %w", err) + } + return &manager{ - benchlistMgr: benchlistMgr, tm: tm, + benchlistMgr: benchlistMgr, + metrics: m, }, nil } type manager struct { tm timer.AdaptiveTimeoutManager benchlistMgr benchlist.Manager - metrics metrics + metrics *timeoutMetrics stopOnce sync.Once } diff --git a/snow/networking/timeout/manager_test.go b/snow/networking/timeout/manager_test.go index 49a05f78d8d8..d6109002f615 100644 --- a/snow/networking/timeout/manager_test.go +++ b/snow/networking/timeout/manager_test.go @@ -27,7 +27,7 @@ func TestManagerFire(t *testing.T) { TimeoutHalflife: 5 * time.Minute, }, benchlist, - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(t, err) diff --git a/snow/networking/timeout/metrics.go b/snow/networking/timeout/metrics.go index 101bda856255..3f217d5f7ad7 100644 --- a/snow/networking/timeout/metrics.go +++ b/snow/networking/timeout/metrics.go @@ -4,7 +4,6 @@ package timeout import ( - "fmt" "sync" "time" @@ -17,83 +16,61 @@ import ( ) const ( - responseNamespace = "response" - opLabel = "op" + chainLabel = "chain" + opLabel = "op" ) -var opLabels = []string{opLabel} +var opLabels = []string{chainLabel, opLabel} -type metrics struct { - lock sync.Mutex - chainToMetrics map[ids.ID]*chainMetrics -} - -func (m *metrics) RegisterChain(ctx *snow.ConsensusContext) error { - m.lock.Lock() - defer m.lock.Unlock() +type timeoutMetrics struct { + messages *prometheus.CounterVec // chain + op + messageLatencies *prometheus.GaugeVec // chain + op - if m.chainToMetrics == nil { - m.chainToMetrics = map[ids.ID]*chainMetrics{} - } - if _, exists := m.chainToMetrics[ctx.ChainID]; exists { - return fmt.Errorf("chain %s has already been registered", ctx.ChainID) - } - cm, err := newChainMetrics(ctx.Registerer) - if err != nil { - return fmt.Errorf("couldn't create metrics for chain %s: %w", ctx.ChainID, err) - } - m.chainToMetrics[ctx.ChainID] = cm - return nil + lock sync.RWMutex + chainIDToAlias map[ids.ID]string } -// Record that a response of type [op] took [latency] -func (m *metrics) Observe(chainID ids.ID, op message.Op, latency time.Duration) { - m.lock.Lock() - defer m.lock.Unlock() - - cm, exists := m.chainToMetrics[chainID] - if !exists { - // TODO should this log an error? - return - } - cm.observe(op, latency) -} - -// chainMetrics contains message response time metrics for a chain -type chainMetrics struct { - messages *prometheus.CounterVec // op - messageLatencies *prometheus.GaugeVec // op -} - -func newChainMetrics(reg prometheus.Registerer) (*chainMetrics, error) { - cm := &chainMetrics{ +func newTimeoutMetrics(reg prometheus.Registerer) (*timeoutMetrics, error) { + m := &timeoutMetrics{ messages: prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: responseNamespace, - Name: "messages", - Help: "number of responses", + Name: "messages", + Help: "number of responses", }, opLabels, ), messageLatencies: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: responseNamespace, - Name: "message_latencies", - Help: "message latencies (ns)", + Name: "message_latencies", + Help: "message latencies (ns)", }, opLabels, ), + chainIDToAlias: make(map[ids.ID]string), } - return cm, utils.Err( - reg.Register(cm.messages), - reg.Register(cm.messageLatencies), + return m, utils.Err( + reg.Register(m.messages), + reg.Register(m.messageLatencies), ) } -func (cm *chainMetrics) observe(op message.Op, latency time.Duration) { +func (m *timeoutMetrics) RegisterChain(ctx *snow.ConsensusContext) error { + m.lock.Lock() + defer m.lock.Unlock() + + m.chainIDToAlias[ctx.ChainID] = ctx.PrimaryAlias + return nil +} + +// Record that a response of type [op] took [latency] +func (m *timeoutMetrics) Observe(chainID ids.ID, op message.Op, latency time.Duration) { + m.lock.RLock() + defer m.lock.RUnlock() + labels := prometheus.Labels{ - opLabel: op.String(), + chainLabel: m.chainIDToAlias[chainID], + opLabel: op.String(), } - cm.messages.With(labels).Inc() - cm.messageLatencies.With(labels).Add(float64(latency)) + m.messages.With(labels).Inc() + m.messageLatencies.With(labels).Add(float64(latency)) } diff --git a/snow/networking/tracker/resource_tracker.go b/snow/networking/tracker/resource_tracker.go index 7b480d242551..d8f5da99192f 100644 --- a/snow/networking/tracker/resource_tracker.go +++ b/snow/networking/tracker/resource_tracker.go @@ -218,7 +218,7 @@ func NewResourceTracker( meters: linked.NewHashmap[ids.NodeID, meter.Meter](), } var err error - t.metrics, err = newCPUTrackerMetrics("resource_tracker", reg) + t.metrics, err = newCPUTrackerMetrics(reg) if err != nil { return nil, fmt.Errorf("initializing resourceTracker metrics errored with: %w", err) } @@ -293,32 +293,27 @@ type trackerMetrics struct { diskSpaceAvailable prometheus.Gauge } -func newCPUTrackerMetrics(namespace string, reg prometheus.Registerer) (*trackerMetrics, error) { +func newCPUTrackerMetrics(reg prometheus.Registerer) (*trackerMetrics, error) { m := &trackerMetrics{ processingTimeMetric: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "processing_time", - Help: "Tracked processing time over all nodes. Value expected to be in [0, number of CPU cores], but can go higher due to IO bound processes and thread multiplexing", + Name: "processing_time", + Help: "Tracked processing time over all nodes. Value expected to be in [0, number of CPU cores], but can go higher due to IO bound processes and thread multiplexing", }), cpuMetric: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "cpu_usage", - Help: "CPU usage tracked by the resource manager. Value should be in [0, number of CPU cores]", + Name: "cpu_usage", + Help: "CPU usage tracked by the resource manager. Value should be in [0, number of CPU cores]", }), diskReadsMetric: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "disk_reads", - Help: "Disk reads (bytes/sec) tracked by the resource manager", + Name: "disk_reads", + Help: "Disk reads (bytes/sec) tracked by the resource manager", }), diskWritesMetric: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "disk_writes", - Help: "Disk writes (bytes/sec) tracked by the resource manager", + Name: "disk_writes", + Help: "Disk writes (bytes/sec) tracked by the resource manager", }), diskSpaceAvailable: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "disk_available_space", - Help: "Available space remaining (bytes) on the database volume", + Name: "disk_available_space", + Help: "Available space remaining (bytes) on the database volume", }), } err := utils.Err( diff --git a/snow/snowtest/snowtest.go b/snow/snowtest/snowtest.go index 86374f766514..3cacc8e873bf 100644 --- a/snow/snowtest/snowtest.go +++ b/snow/snowtest/snowtest.go @@ -40,6 +40,7 @@ func (noOpAcceptor) Accept(*snow.ConsensusContext, ids.ID, []byte) error { func ConsensusContext(ctx *snow.Context) *snow.ConsensusContext { return &snow.ConsensusContext{ Context: ctx, + PrimaryAlias: ctx.ChainID.String(), Registerer: prometheus.NewRegistry(), BlockAcceptor: noOpAcceptor{}, TxAcceptor: noOpAcceptor{}, @@ -89,7 +90,7 @@ func Context(tb testing.TB, chainID ids.ID) *snow.Context { Log: logging.NoLog{}, BCLookup: aliaser, - Metrics: metrics.NewMultiGatherer(), + Metrics: metrics.NewPrefixGatherer(), ValidatorState: validatorState, ChainDataDir: "", diff --git a/tests/e2e/x/transfer/virtuous.go b/tests/e2e/x/transfer/virtuous.go index 35d6afe1b17e..10a2359e7f9e 100644 --- a/tests/e2e/x/transfer/virtuous.go +++ b/tests/e2e/x/transfer/virtuous.go @@ -9,8 +9,10 @@ import ( "math/rand" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/ava-labs/avalanchego/chains" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/tests" @@ -28,10 +30,14 @@ import ( const ( totalRounds = 50 - xBlksProcessingMetric = "avalanche_X_blks_processing" - xBlksAcceptedMetric = "avalanche_X_blks_accepted_count" + blksProcessingMetric = "avalanche_snowman_blks_processing" + blksAcceptedMetric = "avalanche_snowman_blks_accepted_count" ) +var xChainMetricLabels = prometheus.Labels{ + chains.ChainLabel: "X", +} + // This test requires that the network not have ongoing blocks and // cannot reliably be run in parallel. var _ = e2e.DescribeXChainSerial("[Virtuous Transfer Tx AVAX]", func() { @@ -55,7 +61,7 @@ var _ = e2e.DescribeXChainSerial("[Virtuous Transfer Tx AVAX]", func() { require.NoError(err) for _, metrics := range allNodeMetrics { - xBlksProcessing, ok := tests.GetMetricValue(metrics, xBlksProcessingMetric, nil) + xBlksProcessing, ok := tests.GetMetricValue(metrics, blksProcessingMetric, xChainMetricLabels) if !ok || xBlksProcessing > 0 { return false } @@ -248,13 +254,13 @@ RECEIVER NEW BALANCE (AFTER) : %21d AVAX // +0 since X-chain tx must have been processed and accepted // by now - currentXBlksProcessing, _ := tests.GetMetricValue(mm, xBlksProcessingMetric, nil) - previousXBlksProcessing, _ := tests.GetMetricValue(prev, xBlksProcessingMetric, nil) + currentXBlksProcessing, _ := tests.GetMetricValue(mm, blksProcessingMetric, xChainMetricLabels) + previousXBlksProcessing, _ := tests.GetMetricValue(prev, blksProcessingMetric, xChainMetricLabels) require.Equal(currentXBlksProcessing, previousXBlksProcessing) // +1 since X-chain tx must have been accepted by now - currentXBlksAccepted, _ := tests.GetMetricValue(mm, xBlksAcceptedMetric, nil) - previousXBlksAccepted, _ := tests.GetMetricValue(prev, xBlksAcceptedMetric, nil) + currentXBlksAccepted, _ := tests.GetMetricValue(mm, blksAcceptedMetric, xChainMetricLabels) + previousXBlksAccepted, _ := tests.GetMetricValue(prev, blksAcceptedMetric, xChainMetricLabels) require.Equal(currentXBlksAccepted, previousXBlksAccepted+1) metricsBeforeTx[u] = mm diff --git a/utils/resource/metrics.go b/utils/resource/metrics.go index 3ce87ade258c..42d12f1ccc74 100644 --- a/utils/resource/metrics.go +++ b/utils/resource/metrics.go @@ -17,45 +17,40 @@ type metrics struct { numDiskWritesBytes *prometheus.GaugeVec } -func newMetrics(namespace string, registerer prometheus.Registerer) (*metrics, error) { +func newMetrics(registerer prometheus.Registerer) (*metrics, error) { m := &metrics{ numCPUCycles: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: namespace, - Name: "num_cpu_cycles", - Help: "Total number of CPU cycles", + Name: "num_cpu_cycles", + Help: "Total number of CPU cycles", }, []string{"processID"}, ), numDiskReads: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: namespace, - Name: "num_disk_reads", - Help: "Total number of disk reads", + Name: "num_disk_reads", + Help: "Total number of disk reads", }, []string{"processID"}, ), numDiskReadBytes: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: namespace, - Name: "num_disk_read_bytes", - Help: "Total number of disk read bytes", + Name: "num_disk_read_bytes", + Help: "Total number of disk read bytes", }, []string{"processID"}, ), numDiskWrites: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: namespace, - Name: "num_disk_writes", - Help: "Total number of disk writes", + Name: "num_disk_writes", + Help: "Total number of disk writes", }, []string{"processID"}, ), numDiskWritesBytes: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: namespace, - Name: "num_disk_write_bytes", - Help: "Total number of disk write bytes", + Name: "num_disk_write_bytes", + Help: "Total number of disk write bytes", }, []string{"processID"}, ), diff --git a/utils/resource/usage.go b/utils/resource/usage.go index 32a9d1965c90..32ffbfe4aa85 100644 --- a/utils/resource/usage.go +++ b/utils/resource/usage.go @@ -94,7 +94,7 @@ func NewManager( diskHalflife time.Duration, metricsRegisterer prometheus.Registerer, ) (Manager, error) { - processMetrics, err := newMetrics("system_resources", metricsRegisterer) + processMetrics, err := newMetrics(metricsRegisterer) if err != nil { return nil, err } diff --git a/utils/timer/adaptive_timeout_manager.go b/utils/timer/adaptive_timeout_manager.go index 493769018ba2..5d8670bb56e2 100644 --- a/utils/timer/adaptive_timeout_manager.go +++ b/utils/timer/adaptive_timeout_manager.go @@ -92,8 +92,7 @@ type adaptiveTimeoutManager struct { func NewAdaptiveTimeoutManager( config *AdaptiveTimeoutConfig, - metricsNamespace string, - metricsRegister prometheus.Registerer, + reg prometheus.Registerer, ) (AdaptiveTimeoutManager, error) { switch { case config.InitialTimeout > config.MaximumTimeout: @@ -108,24 +107,20 @@ func NewAdaptiveTimeoutManager( tm := &adaptiveTimeoutManager{ networkTimeoutMetric: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Name: "current_timeout", - Help: "Duration of current network timeout in nanoseconds", + Name: "current_timeout", + Help: "Duration of current network timeout in nanoseconds", }), avgLatency: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Name: "average_latency", - Help: "Average network latency in nanoseconds", + Name: "average_latency", + Help: "Average network latency in nanoseconds", }), numTimeouts: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Name: "timeouts", - Help: "Number of timed out requests", + Name: "timeouts", + Help: "Number of timed out requests", }), numPendingTimeouts: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Name: "pending_timeouts", - Help: "Number of pending timeouts", + Name: "pending_timeouts", + Help: "Number of pending timeouts", }), minimumTimeout: config.MinimumTimeout, maximumTimeout: config.MaximumTimeout, @@ -139,10 +134,10 @@ func NewAdaptiveTimeoutManager( tm.averager = math.NewAverager(float64(config.InitialTimeout), config.TimeoutHalflife, tm.clock.Time()) err := utils.Err( - metricsRegister.Register(tm.networkTimeoutMetric), - metricsRegister.Register(tm.avgLatency), - metricsRegister.Register(tm.numTimeouts), - metricsRegister.Register(tm.numPendingTimeouts), + reg.Register(tm.networkTimeoutMetric), + reg.Register(tm.avgLatency), + reg.Register(tm.numTimeouts), + reg.Register(tm.numPendingTimeouts), ) return tm, err } diff --git a/utils/timer/adaptive_timeout_manager_test.go b/utils/timer/adaptive_timeout_manager_test.go index 5b725303f385..e522b525272e 100644 --- a/utils/timer/adaptive_timeout_manager_test.go +++ b/utils/timer/adaptive_timeout_manager_test.go @@ -83,7 +83,7 @@ func TestAdaptiveTimeoutManagerInit(t *testing.T) { } for _, test := range tests { - _, err := NewAdaptiveTimeoutManager(&test.config, "", prometheus.NewRegistry()) + _, err := NewAdaptiveTimeoutManager(&test.config, prometheus.NewRegistry()) require.ErrorIs(t, err, test.expectedErr) } } @@ -97,7 +97,6 @@ func TestAdaptiveTimeoutManager(t *testing.T) { TimeoutHalflife: 5 * time.Minute, TimeoutCoefficient: 1.25, }, - "", prometheus.NewRegistry(), ) require.NoError(t, err) diff --git a/vms/metervm/block_vm.go b/vms/metervm/block_vm.go index 6d951f344b2b..da64f9af01d2 100644 --- a/vms/metervm/block_vm.go +++ b/vms/metervm/block_vm.go @@ -8,7 +8,6 @@ import ( "github.com/prometheus/client_golang/prometheus" - "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" @@ -32,10 +31,14 @@ type blockVM struct { ssVM block.StateSyncableVM blockMetrics - clock mockable.Clock + registry prometheus.Registerer + clock mockable.Clock } -func NewBlockVM(vm block.ChainVM) block.ChainVM { +func NewBlockVM( + vm block.ChainVM, + reg prometheus.Registerer, +) block.ChainVM { buildBlockVM, _ := vm.(block.BuildBlockWithContextChainVM) batchedVM, _ := vm.(block.BatchedChainVM) ssVM, _ := vm.(block.StateSyncableVM) @@ -44,6 +47,7 @@ func NewBlockVM(vm block.ChainVM) block.ChainVM { buildBlockVM: buildBlockVM, batchedVM: batchedVM, ssVM: ssVM, + registry: reg, } } @@ -58,26 +62,16 @@ func (vm *blockVM) Initialize( fxs []*common.Fx, appSender common.AppSender, ) error { - registerer := prometheus.NewRegistry() err := vm.blockMetrics.Initialize( vm.buildBlockVM != nil, vm.batchedVM != nil, vm.ssVM != nil, - registerer, + vm.registry, ) if err != nil { return err } - multiGatherer := metrics.NewMultiGatherer() - if err := chainCtx.Metrics.Register("metervm", registerer); err != nil { - return err - } - if err := chainCtx.Metrics.Register("", multiGatherer); err != nil { - return err - } - chainCtx.Metrics = multiGatherer - return vm.ChainVM.Initialize(ctx, chainCtx, db, genesisBytes, upgradeBytes, configBytes, toEngine, fxs, appSender) } diff --git a/vms/metervm/vertex_vm.go b/vms/metervm/vertex_vm.go index 7cbd47a67475..936a688de99d 100644 --- a/vms/metervm/vertex_vm.go +++ b/vms/metervm/vertex_vm.go @@ -8,7 +8,6 @@ import ( "github.com/prometheus/client_golang/prometheus" - "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/consensus/snowstorm" @@ -22,16 +21,21 @@ var ( _ snowstorm.Tx = (*meterTx)(nil) ) -func NewVertexVM(vm vertex.LinearizableVMWithEngine) vertex.LinearizableVMWithEngine { +func NewVertexVM( + vm vertex.LinearizableVMWithEngine, + reg prometheus.Registerer, +) vertex.LinearizableVMWithEngine { return &vertexVM{ LinearizableVMWithEngine: vm, + registry: reg, } } type vertexVM struct { vertex.LinearizableVMWithEngine vertexMetrics - clock mockable.Clock + registry prometheus.Registerer + clock mockable.Clock } func (vm *vertexVM) Initialize( @@ -45,20 +49,10 @@ func (vm *vertexVM) Initialize( fxs []*common.Fx, appSender common.AppSender, ) error { - registerer := prometheus.NewRegistry() - if err := vm.vertexMetrics.Initialize(registerer); err != nil { + if err := vm.vertexMetrics.Initialize(vm.registry); err != nil { return err } - multiGatherer := metrics.NewMultiGatherer() - if err := chainCtx.Metrics.Register("metervm", registerer); err != nil { - return err - } - if err := chainCtx.Metrics.Register("", multiGatherer); err != nil { - return err - } - chainCtx.Metrics = multiGatherer - return vm.LinearizableVMWithEngine.Initialize( ctx, chainCtx, diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index e6e645d74242..24fa707a8e32 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -1429,7 +1429,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { TimeoutCoefficient: 1.25, }, benchlist, - "", + prometheus.NewRegistry(), prometheus.NewRegistry(), ) require.NoError(err) @@ -1453,7 +1453,6 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { set.Set[ids.ID]{}, nil, router.HealthConfig{}, - "", prometheus.NewRegistry(), )) @@ -1544,6 +1543,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { subnets.New(ctx.NodeID, subnets.Config{}), tracker.NewPeers(), peerTracker, + prometheus.NewRegistry(), ) require.NoError(err) diff --git a/vms/proposervm/batched_vm_test.go b/vms/proposervm/batched_vm_test.go index a6e9ffb2b1d6..be134823c894 100644 --- a/vms/proposervm/batched_vm_test.go +++ b/vms/proposervm/batched_vm_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/database" @@ -868,6 +869,7 @@ func initTestRemoteProposerVM( NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) diff --git a/vms/proposervm/block_test.go b/vms/proposervm/block_test.go index d8c867058f52..12b18a75d681 100644 --- a/vms/proposervm/block_test.go +++ b/vms/proposervm/block_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -74,6 +75,7 @@ func TestPostForkCommonComponents_buildChild(t *testing.T) { DurangoTime: time.Unix(0, 0), StakingCertLeaf: &staking.Certificate{}, StakingLeafSigner: pk, + Registerer: prometheus.NewRegistry(), }, ChainVM: innerVM, blockBuilderVM: innerBlockBuilderVM, @@ -386,6 +388,7 @@ func TestPostDurangoBuildChildResetScheduler(t *testing.T) { DurangoTime: time.Unix(0, 0), StakingCertLeaf: &staking.Certificate{}, StakingLeafSigner: pk, + Registerer: prometheus.NewRegistry(), }, ChainVM: block.NewMockChainVM(ctrl), ctx: &snow.Context{ diff --git a/vms/proposervm/config.go b/vms/proposervm/config.go index a7eb4ff0db9b..296f6a60520c 100644 --- a/vms/proposervm/config.go +++ b/vms/proposervm/config.go @@ -7,6 +7,8 @@ import ( "crypto" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/ava-labs/avalanchego/staking" ) @@ -32,6 +34,9 @@ type Config struct { // Block certificate StakingCertLeaf *staking.Certificate + + // Registerer for prometheus metrics + Registerer prometheus.Registerer } func (c *Config) IsDurangoActivated(timestamp time.Time) bool { diff --git a/vms/proposervm/post_fork_option_test.go b/vms/proposervm/post_fork_option_test.go index 39c6434dddf6..43b7d5f5b90f 100644 --- a/vms/proposervm/post_fork_option_test.go +++ b/vms/proposervm/post_fork_option_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/database" @@ -548,6 +549,7 @@ func TestOptionTimestampValidity(t *testing.T) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) diff --git a/vms/proposervm/state_syncable_vm_test.go b/vms/proposervm/state_syncable_vm_test.go index 4f44adc0bf74..479c311b5fed 100644 --- a/vms/proposervm/state_syncable_vm_test.go +++ b/vms/proposervm/state_syncable_vm_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/database" @@ -65,6 +66,7 @@ func helperBuildStateSyncTestObjects(t *testing.T) (*fullVM, *VM) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) diff --git a/vms/proposervm/vm.go b/vms/proposervm/vm.go index dfff407a03d2..4442aca65a9b 100644 --- a/vms/proposervm/vm.go +++ b/vms/proposervm/vm.go @@ -9,10 +9,8 @@ import ( "fmt" "time" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/cache/metercacher" "github.com/ava-labs/avalanchego/database" @@ -130,21 +128,9 @@ func (vm *VM) Initialize( fxs []*common.Fx, appSender common.AppSender, ) error { - // TODO: Add a helper for this metrics override, it is performed in multiple - // places. - registerer := prometheus.NewRegistry() - if err := chainCtx.Metrics.Register("proposervm", registerer); err != nil { - return err - } - multiGatherer := metrics.NewMultiGatherer() - if err := chainCtx.Metrics.Register("", multiGatherer); err != nil { - return err - } - chainCtx.Metrics = multiGatherer - vm.ctx = chainCtx vm.db = versiondb.New(prefixdb.New(dbPrefix, db)) - baseState, err := state.NewMetered(vm.db, "state", registerer) + baseState, err := state.NewMetered(vm.db, "state", vm.Config.Registerer) if err != nil { return err } @@ -153,7 +139,7 @@ func (vm *VM) Initialize( vm.Tree = tree.New() innerBlkCache, err := metercacher.New( "inner_block_cache", - registerer, + vm.Config.Registerer, cache.NewSizedLRU( innerBlkCacheSize, cachedBlockSize, diff --git a/vms/proposervm/vm_test.go b/vms/proposervm/vm_test.go index fb717c203f7f..a2536375d48c 100644 --- a/vms/proposervm/vm_test.go +++ b/vms/proposervm/vm_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -132,6 +133,7 @@ func initTestProposerVM( NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -812,6 +814,7 @@ func TestExpiredBuildBlock(t *testing.T) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -1128,6 +1131,7 @@ func TestInnerVMRollback(t *testing.T) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -1206,6 +1210,7 @@ func TestInnerVMRollback(t *testing.T) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -1608,6 +1613,7 @@ func TestRejectedHeightNotIndexed(t *testing.T) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -1779,6 +1785,7 @@ func TestRejectedOptionHeightNotIndexed(t *testing.T) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -1913,6 +1920,7 @@ func TestVMInnerBlkCache(t *testing.T) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -2123,6 +2131,7 @@ func TestVM_VerifyBlockWithContext(t *testing.T) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -2324,6 +2333,7 @@ func TestHistoricalBlockDeletion(t *testing.T) { NumHistoricalBlocks: DefaultNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -2415,6 +2425,7 @@ func TestHistoricalBlockDeletion(t *testing.T) { NumHistoricalBlocks: numHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) @@ -2459,6 +2470,7 @@ func TestHistoricalBlockDeletion(t *testing.T) { NumHistoricalBlocks: newNumHistoricalBlocks, StakingLeafSigner: pTestSigner, StakingCertLeaf: pTestCert, + Registerer: prometheus.NewRegistry(), }, ) diff --git a/vms/rpcchainvm/vm_client.go b/vms/rpcchainvm/vm_client.go index 038a728c0ffe..6e6417725f11 100644 --- a/vms/rpcchainvm/vm_client.go +++ b/vms/rpcchainvm/vm_client.go @@ -18,6 +18,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "github.com/ava-labs/avalanchego/api/keystore/gkeystore" + "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/chains/atomic/gsharedmemory" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/rpcdb" @@ -135,15 +136,19 @@ func (vm *VMClient) Initialize( } // Register metrics - registerer := prometheus.NewRegistry() - vm.grpcServerMetrics = grpc_prometheus.NewServerMetrics() - if err := registerer.Register(vm.grpcServerMetrics); err != nil { + serverReg, err := metrics.MakeAndRegister( + chainCtx.Metrics, + "rpcchainvm", + ) + if err != nil { return err } - if err := chainCtx.Metrics.Register("rpcchainvm", registerer); err != nil { + vm.grpcServerMetrics = grpc_prometheus.NewServerMetrics() + if err := serverReg.Register(vm.grpcServerMetrics); err != nil { return err } - if err := chainCtx.Metrics.Register("", vm); err != nil { + + if err := chainCtx.Metrics.Register("plugin", vm); err != nil { return err } @@ -225,7 +230,7 @@ func (vm *VMClient) Initialize( } vm.State, err = chain.NewMeteredState( - registerer, + serverReg, &chain.Config{ DecidedCacheSize: decidedCacheSize, MissingCacheSize: missingCacheSize, diff --git a/vms/rpcchainvm/vm_server.go b/vms/rpcchainvm/vm_server.go index 67a55187426a..b33fd3e5b5fe 100644 --- a/vms/rpcchainvm/vm_server.go +++ b/vms/rpcchainvm/vm_server.go @@ -72,9 +72,9 @@ type VMServer struct { allowShutdown *utils.Atomic[bool] - processMetrics prometheus.Gatherer - db database.Database - log logging.Logger + metrics prometheus.Gatherer + db database.Database + log logging.Logger serverCloser grpcutils.ServerCloser connCloser wrappers.Closer @@ -125,28 +125,47 @@ func (vm *VMServer) Initialize(ctx context.Context, req *vmpb.InitializeRequest) return nil, err } - registerer := prometheus.NewRegistry() + pluginMetrics := metrics.NewPrefixGatherer() + vm.metrics = pluginMetrics + + processMetrics, err := metrics.MakeAndRegister( + pluginMetrics, + "process", + ) + if err != nil { + return nil, err + } // Current state of process metrics processCollector := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}) - if err := registerer.Register(processCollector); err != nil { + if err := processMetrics.Register(processCollector); err != nil { return nil, err } // Go process metrics using debug.GCStats goCollector := collectors.NewGoCollector() - if err := registerer.Register(goCollector); err != nil { + if err := processMetrics.Register(goCollector); err != nil { + return nil, err + } + + grpcMetrics, err := metrics.MakeAndRegister( + pluginMetrics, + "grpc", + ) + if err != nil { return nil, err } // gRPC client metrics grpcClientMetrics := grpc_prometheus.NewClientMetrics() - if err := registerer.Register(grpcClientMetrics); err != nil { + if err := grpcMetrics.Register(grpcClientMetrics); err != nil { return nil, err } - // Register metrics for each Go plugin processes - vm.processMetrics = registerer + vmMetrics := metrics.NewPrefixGatherer() + if err := pluginMetrics.Register("vm", vmMetrics); err != nil { + return nil, err + } // Dial the database dbClientConn, err := grpcutils.Dial( @@ -225,7 +244,7 @@ func (vm *VMServer) Initialize(ctx context.Context, req *vmpb.InitializeRequest) Keystore: keystoreClient, SharedMemory: sharedMemoryClient, BCLookup: bcLookupClient, - Metrics: metrics.NewMultiGatherer(), + Metrics: vmMetrics, // Signs warp messages WarpSigner: warpSignerClient, @@ -567,22 +586,8 @@ func (vm *VMServer) AppGossip(ctx context.Context, req *vmpb.AppGossipMsg) (*emp } func (vm *VMServer) Gather(context.Context, *emptypb.Empty) (*vmpb.GatherResponse, error) { - // Gather metrics registered to snow context Gatherer. These - // metrics are defined by the underlying vm implementation. - mfs, err := vm.ctx.Metrics.Gather() - if err != nil { - return nil, err - } - - // Gather metrics registered by rpcchainvm server Gatherer. These - // metrics are collected for each Go plugin process. - pluginMetrics, err := vm.processMetrics.Gather() - if err != nil { - return nil, err - } - mfs = append(mfs, pluginMetrics...) - - return &vmpb.GatherResponse{MetricFamilies: mfs}, err + metrics, err := vm.metrics.Gather() + return &vmpb.GatherResponse{MetricFamilies: metrics}, err } func (vm *VMServer) GetAncestors(ctx context.Context, req *vmpb.GetAncestorsRequest) (*vmpb.GetAncestorsResponse, error) {