diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b4f51830f..3890cbeede 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ internal API changes are not present. Main (unreleased) ----------------- +### Bugfixes + +- Fixed an issue with `prometheus.scrape` in which targets that move from one + cluster instance to another could have a staleness marker inserted and result + in a gap in metrics (@thampiotr) + v1.1.0-rc.0 ----------- diff --git a/go.mod b/go.mod index 8fe5d40b3d..87f2f5c331 100644 --- a/go.mod +++ b/go.mod @@ -714,9 +714,10 @@ replace ( // * There is a release of Prometheus which contains // prometheus/prometheus#13002 // and prometheus/prometheus#13497 +// and https://github.com/grafana/prometheus/pull/34 // We use the last v1-related tag as the replace statement does not work for v2 // tags without the v2 suffix to the module root. -replace github.com/prometheus/prometheus => github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406 // cmp_header_order branch +replace github.com/prometheus/prometheus => github.com/grafana/prometheus v1.8.2-0.20240513094155-793c8c9fe88e // cmp_header_order_and_staleness_disabling branch replace gopkg.in/yaml.v2 => github.com/rfratto/go-yaml v0.0.0-20211119180816-77389c3526dc diff --git a/go.sum b/go.sum index 81107247e8..41fba4d1fe 100644 --- a/go.sum +++ b/go.sum @@ -1068,8 +1068,8 @@ github.com/grafana/opentelemetry-collector/service v0.0.0-20240429170914-d1e1018 github.com/grafana/opentelemetry-collector/service v0.0.0-20240429170914-d1e101852ba5/go.mod h1:0djU5YbUIZw4Y+KNYk07tZztXrMK/LNAZyfH8b7f7xA= github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd h1:vNHdecaOmYgSHMEQRgyzWacV++N38Jp8qLZg0RCsfFo= github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd/go.mod h1:kR16GJ0ZwWVQ2osW3pgtDJU1a/GXpufrwio0kLG14cg= -github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406 h1:LVIOYe5j92m10wluP5hgeHqSkOLnZzcPxhYCkdbLXCE= -github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406/go.mod h1:SRw624aMAxTfryAcP8rOjg4S/sHHaetx2lyJJ2nM83g= +github.com/grafana/prometheus v1.8.2-0.20240513094155-793c8c9fe88e h1:uQDMlJKE+h6TloPTTiSyA8FSMJeU8mQfg1MY1/UrCKA= +github.com/grafana/prometheus v1.8.2-0.20240513094155-793c8c9fe88e/go.mod h1:SRw624aMAxTfryAcP8rOjg4S/sHHaetx2lyJJ2nM83g= github.com/grafana/pyroscope-go/godeltaprof v0.1.7 h1:C11j63y7gymiW8VugJ9ZW0pWfxTZugdSJyC48olk5KY= github.com/grafana/pyroscope-go/godeltaprof v0.1.7/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE= github.com/grafana/pyroscope/api v0.4.0 h1:J86DxoNeLOvtJhB1Cn65JMZkXe682D+RqeoIUiYc/eo= diff --git a/internal/component/discovery/discovery.go b/internal/component/discovery/discovery.go index 453a17bd77..9cd28ba76d 100644 --- a/internal/component/discovery/discovery.go +++ b/internal/component/discovery/discovery.go @@ -8,8 +8,6 @@ import ( "time" "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/service/cluster" - "github.com/grafana/ckit/shard" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -20,53 +18,6 @@ import ( // component. type Target map[string]string -// DistributedTargets uses the node's Lookup method to distribute discovery -// targets when a component runs in a cluster. -type DistributedTargets struct { - useClustering bool - cluster cluster.Cluster - targets []Target -} - -// NewDistributedTargets creates the abstraction that allows components to -// dynamically shard targets between components. -func NewDistributedTargets(e bool, n cluster.Cluster, t []Target) DistributedTargets { - return DistributedTargets{e, n, t} -} - -// Get distributes discovery targets a clustered environment. -// -// If a cluster size is 1, then all targets will be returned. -func (t *DistributedTargets) Get() []Target { - // TODO(@tpaschalis): Make this into a single code-path to simplify logic. - if !t.useClustering || t.cluster == nil { - return t.targets - } - - peerCount := len(t.cluster.Peers()) - resCap := (len(t.targets) + 1) - if peerCount != 0 { - resCap = (len(t.targets) + 1) / peerCount - } - - res := make([]Target, 0, resCap) - - for _, tgt := range t.targets { - peers, err := t.cluster.Lookup(shard.StringKey(tgt.NonMetaLabels().String()), 1, shard.OpReadWrite) - if err != nil { - // This can only fail in case we ask for more owners than the - // available peers. This will never happen, but in any case we fall - // back to owning the target ourselves. - res = append(res, tgt) - } - if len(peers) == 0 || peers[0].Self { - res = append(res, tgt) - } - } - - return res -} - // Labels converts Target into a set of sorted labels. func (t Target) Labels() labels.Labels { var lset labels.Labels diff --git a/internal/component/discovery/distributed_targets.go b/internal/component/discovery/distributed_targets.go new file mode 100644 index 0000000000..c0b2831b8d --- /dev/null +++ b/internal/component/discovery/distributed_targets.go @@ -0,0 +1,92 @@ +package discovery + +import ( + "github.com/grafana/ckit/peer" + "github.com/grafana/ckit/shard" + + "github.com/grafana/alloy/internal/service/cluster" +) + +// DistributedTargets uses the node's Lookup method to distribute discovery +// targets when a component runs in a cluster. +type DistributedTargets struct { + localTargets []Target + // localTargetKeys is used to cache the key hash computation. Improves time performance by ~20%. + localTargetKeys []shard.Key + remoteTargetKeys map[shard.Key]struct{} +} + +// NewDistributedTargets creates the abstraction that allows components to +// dynamically shard targets between components. +func NewDistributedTargets(clusteringEnabled bool, cluster cluster.Cluster, allTargets []Target) *DistributedTargets { + if !clusteringEnabled || cluster == nil { + cluster = disabledCluster{} + } + + localCap := len(allTargets) + 1 + if peerCount := len(cluster.Peers()); peerCount != 0 { + localCap = (len(allTargets) + 1) / peerCount + } + + localTargets := make([]Target, 0, localCap) + localTargetKeys := make([]shard.Key, 0, localCap) + remoteTargetKeys := make(map[shard.Key]struct{}, len(allTargets)-localCap) + + for _, tgt := range allTargets { + targetKey := keyFor(tgt) + peers, err := cluster.Lookup(targetKey, 1, shard.OpReadWrite) + belongsToLocal := err != nil || len(peers) == 0 || peers[0].Self + + if belongsToLocal { + localTargets = append(localTargets, tgt) + localTargetKeys = append(localTargetKeys, targetKey) + } else { + remoteTargetKeys[targetKey] = struct{}{} + } + } + + return &DistributedTargets{ + localTargets: localTargets, + localTargetKeys: localTargetKeys, + remoteTargetKeys: remoteTargetKeys, + } +} + +// LocalTargets returns the targets that belong to the local cluster node. +func (dt *DistributedTargets) LocalTargets() []Target { + return dt.localTargets +} + +// MovedToRemoteInstance returns the set of local targets from prev +// that are no longer local in dt, indicating an active target has moved. +// Only targets which exist in both prev and dt are returned. If prev +// contains an empty list of targets, no targets are returned. +func (dt *DistributedTargets) MovedToRemoteInstance(prev *DistributedTargets) []Target { + if prev == nil { + return nil + } + var movedAwayTargets []Target + for i := 0; i < len(prev.localTargets); i++ { + key := prev.localTargetKeys[i] + if _, exist := dt.remoteTargetKeys[key]; exist { + movedAwayTargets = append(movedAwayTargets, prev.localTargets[i]) + } + } + return movedAwayTargets +} + +func keyFor(tgt Target) shard.Key { + return shard.Key(tgt.NonMetaLabels().Hash()) +} + +type disabledCluster struct{} + +var _ cluster.Cluster = disabledCluster{} + +func (l disabledCluster) Lookup(key shard.Key, replicationFactor int, op shard.Op) ([]peer.Peer, error) { + return nil, nil +} + +func (l disabledCluster) Peers() []peer.Peer { + return nil +} diff --git a/internal/component/discovery/distributed_targets_test.go b/internal/component/discovery/distributed_targets_test.go new file mode 100644 index 0000000000..fb810859aa --- /dev/null +++ b/internal/component/discovery/distributed_targets_test.go @@ -0,0 +1,316 @@ +package discovery + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/grafana/ckit/peer" + "github.com/grafana/ckit/shard" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/service/cluster" +) + +var ( + target1 = mkTarget("instance", "1", "host", "pie") + target2 = mkTarget("instance", "2", "host", "cake") + target3 = mkTarget("instance", "3", "host", "muffin") + allTestTargets = []Target{target1, target2, target3} + + peer1Self = peer.Peer{Name: "peer1", Addr: "peer1", Self: true, State: peer.StateParticipant} + peer2 = peer.Peer{Name: "peer2", Addr: "peer2", Self: false, State: peer.StateParticipant} + peer3 = peer.Peer{Name: "peer3", Addr: "peer3", Self: false, State: peer.StateParticipant} + allTestPeers = []peer.Peer{peer1Self, peer2, peer3} + + targetWithLookupError = mkTarget("instance", "-1", "host", "error") + magicErrorKey = keyFor(targetWithLookupError) +) + +var localTargetsTestCases = []struct { + name string + clusteringDisabled bool + cluster cluster.Cluster + allTargets []Target + expectedLocalTargets []Target +}{ + { + name: "all targets are local when clustering disabled", + clusteringDisabled: true, + cluster: &fakeCluster{}, + allTargets: allTestTargets, + expectedLocalTargets: allTestTargets, + }, + { + name: "all targets are local when cluster is nil", + allTargets: allTestTargets, + expectedLocalTargets: allTestTargets, + }, + { + name: "all targets are local when no peers are returned from cluster", + cluster: &fakeCluster{peers: nil}, + allTargets: allTestTargets, + expectedLocalTargets: allTestTargets, + }, + { + name: "only targets assigned to local node are seen as local", + cluster: &fakeCluster{ + peers: allTestPeers, + lookupMap: map[shard.Key][]peer.Peer{ + keyFor(target1): {peer1Self}, + keyFor(target2): {peer2}, + keyFor(target3): {peer1Self}, + }, + }, + allTargets: allTestTargets, + expectedLocalTargets: []Target{ + target1, target3, + }, + }, + { + name: "no targets assigned to local node if no keys match it", + cluster: &fakeCluster{ + peers: allTestPeers, + lookupMap: map[shard.Key][]peer.Peer{ + keyFor(target1): {peer2}, + keyFor(target2): {peer2}, + keyFor(target3): {peer3}, + }, + }, + allTargets: allTestTargets, + expectedLocalTargets: []Target{}, + }, + { + name: "additional replica peers do not affect local targets assignment", + cluster: &fakeCluster{ + peers: allTestPeers, + lookupMap: map[shard.Key][]peer.Peer{ + keyFor(target1): {peer1Self, peer2}, + keyFor(target2): {peer1Self, peer2}, + keyFor(target3): {peer2, peer3}, + }, + }, + allTargets: allTestTargets, + expectedLocalTargets: []Target{ + target1, target2, + }, + }, + { + name: "lookup errors fall back to local target assignment", + cluster: &fakeCluster{ + peers: allTestPeers, + lookupMap: map[shard.Key][]peer.Peer{ + magicErrorKey: {peer2}, + keyFor(target1): {peer1Self}, + }, + }, + allTargets: []Target{target1, targetWithLookupError}, + expectedLocalTargets: []Target{ + target1, targetWithLookupError, + }, + }, +} + +func TestDistributedTargets_LocalTargets(t *testing.T) { + for _, tt := range localTargetsTestCases { + t.Run(tt.name, func(t *testing.T) { + dt := NewDistributedTargets(!tt.clusteringDisabled, tt.cluster, tt.allTargets) + localTargets := dt.LocalTargets() + require.Equal(t, tt.expectedLocalTargets, localTargets) + }) + } +} + +var movedToRemoteInstanceTestCases = []struct { + name string + previous *DistributedTargets + current *DistributedTargets + expectedMovedTargets []Target +}{ + { + name: "no previous targets distribution", + previous: nil, + current: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer1Self}, + keyFor(target2): {peer2}, + keyFor(target3): {peer1Self}, + }), + expectedMovedTargets: nil, + }, + { + name: "nothing moved", + previous: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer1Self}, + keyFor(target2): {peer2}, + keyFor(target3): {peer1Self}, + }), + current: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer1Self}, + keyFor(target2): {peer2}, + keyFor(target3): {peer1Self}, + }), + expectedMovedTargets: nil, + }, + { + name: "all moved", + previous: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer3}, + keyFor(target2): {peer3}, + keyFor(target3): {peer2}, + }), + current: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer1Self}, + keyFor(target2): {peer2}, + keyFor(target3): {peer1Self}, + }), + expectedMovedTargets: nil, + }, + { + name: "all moved to local", + previous: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer3}, + keyFor(target2): {peer2}, + keyFor(target3): {peer2}, + }), + current: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer1Self}, + keyFor(target2): {peer1Self}, + keyFor(target3): {peer1Self}, + }), + expectedMovedTargets: nil, + }, + { + name: "all moved to remote", + previous: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer1Self}, + keyFor(target2): {peer1Self}, + keyFor(target3): {peer1Self}, + }), + current: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer3}, + keyFor(target2): {peer2}, + keyFor(target3): {peer2}, + }), + expectedMovedTargets: allTestTargets, + }, + { + name: "subset moved to remote", + previous: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer1Self}, + keyFor(target2): {peer1Self}, + keyFor(target3): {peer1Self}, + }), + current: testDistTargets(map[shard.Key][]peer.Peer{ + keyFor(target1): {peer3}, + keyFor(target2): {peer1Self}, + keyFor(target3): {peer2}, + }), + expectedMovedTargets: []Target{target1, target3}, + }, +} + +func TestDistributedTargets_MovedToRemoteInstance(t *testing.T) { + for _, tt := range movedToRemoteInstanceTestCases { + t.Run(tt.name, func(t *testing.T) { + movedTargets := tt.current.MovedToRemoteInstance(tt.previous) + require.Equal(t, tt.expectedMovedTargets, movedTargets) + }) + } +} + +/* + Recent run on M2 MacBook Air: + + $ go test -count=10 -benchmem ./internal/component/discovery -bench BenchmarkDistributedTargets | tee perf_new.txt + goos: darwin + goarch: arm64 + pkg: github.com/grafana/alloy/internal/component/discovery + BenchmarkDistributedTargets-8 28 42125823 ns/op 52016505 B/op 501189 allocs/op + ... + +Comparison to baseline before optimisations: + + $ benchstat perf_baseline.txt perf_new.txt + goos: darwin + goarch: arm64 + pkg: github.com/grafana/alloy/internal/component/discovery + │ perf_baseline.txt │ perf_new.txt │ + │ sec/op │ sec/op vs base │ + DistributedTargets-8 108.42m ± 14% 41.21m ± 2% -61.99% (p=0.000 n=10) + + │ perf_baseline.txt │ perf_new.txt │ + │ B/op │ B/op vs base │ + DistributedTargets-8 90.06Mi ± 0% 49.66Mi ± 0% -44.86% (p=0.000 n=10) + + │ perf_baseline.txt │ perf_new.txt │ + │ allocs/op │ allocs/op vs base │ + DistributedTargets-8 1572.1k ± 0% 501.2k ± 0% -68.12% (p=0.000 n=10) +*/ +func BenchmarkDistributedTargets(b *testing.B) { + const ( + numTargets = 100_000 + numPeers = 20 + ) + + targets := make([]Target, 0, numTargets) + for i := 0; i < numTargets; i++ { + targets = append(targets, mkTarget("instance", fmt.Sprintf("%d", i), "host", "pie", "location", "kitchen_counter", "flavour", "delicious", "size", "XXL")) + } + + peers := make([]peer.Peer, 0, numPeers) + for i := 0; i < numPeers; i++ { + peerName := fmt.Sprintf("peer_%d", i) + peers = append(peers, peer.Peer{Name: peerName, Addr: peerName, Self: i == 0, State: peer.StateParticipant}) + } + + randomLookupMap := make(map[shard.Key][]peer.Peer) + for _, target := range targets { + randomLookupMap[keyFor(target)] = []peer.Peer{peers[rand.Int()%numPeers]} + } + + fakeCluster := &fakeCluster{ + peers: peers, + lookupMap: randomLookupMap, + } + + b.ResetTimer() + + var prev *DistributedTargets + for i := 0; i < b.N; i++ { + dt := NewDistributedTargets(true, fakeCluster, targets) + _ = dt.LocalTargets() + _ = dt.MovedToRemoteInstance(prev) + prev = dt + } +} + +func mkTarget(kv ...string) Target { + target := make(Target) + for i := 0; i < len(kv); i += 2 { + target[kv[i]] = kv[i+1] + } + return target +} + +func testDistTargets(lookupMap map[shard.Key][]peer.Peer) *DistributedTargets { + return NewDistributedTargets(true, &fakeCluster{ + peers: allTestPeers, + lookupMap: lookupMap, + }, allTestTargets) +} + +type fakeCluster struct { + lookupMap map[shard.Key][]peer.Peer + peers []peer.Peer +} + +func (f *fakeCluster) Lookup(key shard.Key, _ int, _ shard.Op) ([]peer.Peer, error) { + if key == magicErrorKey { + return nil, fmt.Errorf("test error for magic error key") + } + return f.lookupMap[key], nil +} + +func (f *fakeCluster) Peers() []peer.Peer { + return f.peers +} diff --git a/internal/component/loki/source/kubernetes/kubernetes.go b/internal/component/loki/source/kubernetes/kubernetes.go index 0c69b0bc26..62591c4bec 100644 --- a/internal/component/loki/source/kubernetes/kubernetes.go +++ b/internal/component/loki/source/kubernetes/kubernetes.go @@ -188,7 +188,7 @@ func (c *Component) Update(args component.Arguments) error { func (c *Component) resyncTargets(targets []discovery.Target) { distTargets := discovery.NewDistributedTargets(c.args.Clustering.Enabled, c.cluster, targets) - targets = distTargets.Get() + targets = distTargets.LocalTargets() tailTargets := make([]*kubetail.Target, 0, len(targets)) for _, target := range targets { diff --git a/internal/component/prometheus/scrape/scrape.go b/internal/component/prometheus/scrape/scrape.go index 75f1933240..4693d08959 100644 --- a/internal/component/prometheus/scrape/scrape.go +++ b/internal/component/prometheus/scrape/scrape.go @@ -8,6 +8,15 @@ import ( "time" "github.com/alecthomas/units" + client_prometheus "github.com/prometheus/client_golang/prometheus" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/scrape" + "github.com/prometheus/prometheus/storage" + "github.com/grafana/alloy/internal/alloy/logging/level" "github.com/grafana/alloy/internal/component" component_config "github.com/grafana/alloy/internal/component/common/config" @@ -18,13 +27,6 @@ import ( "github.com/grafana/alloy/internal/service/http" "github.com/grafana/alloy/internal/service/labelstore" "github.com/grafana/alloy/internal/useragent" - client_prometheus "github.com/prometheus/client_golang/prometheus" - config_util "github.com/prometheus/common/config" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery/targetgroup" - "github.com/prometheus/prometheus/scrape" - "github.com/prometheus/prometheus/storage" ) func init() { @@ -124,13 +126,17 @@ type Component struct { opts component.Options cluster cluster.Cluster - reloadTargets chan struct{} + reloadTargets chan struct{} + targetsGauge client_prometheus.Gauge + movedTargetsCounter client_prometheus.Counter - mut sync.RWMutex - args Arguments - scraper *scrape.Manager - appendable *prometheus.Fanout - targetsGauge client_prometheus.Gauge + mut sync.RWMutex + args Arguments + scraper *scrape.Manager + appendable *prometheus.Fanout + + dtMutex sync.Mutex + distributedTargets *discovery.DistributedTargets } var ( @@ -175,13 +181,22 @@ func New(o component.Options, args Arguments) (*Component, error) { return nil, err } + movedTargetsCounter := client_prometheus.NewCounter(client_prometheus.CounterOpts{ + Name: "prometheus_scrape_targets_moved_total", + Help: "Number of targets that have moved from this cluster node to another one"}) + err = o.Registerer.Register(movedTargetsCounter) + if err != nil { + return nil, err + } + c := &Component{ - opts: o, - cluster: clusterData, - reloadTargets: make(chan struct{}, 1), - scraper: scraper, - appendable: alloyAppendable, - targetsGauge: targetsGauge, + opts: o, + cluster: clusterData, + reloadTargets: make(chan struct{}, 1), + scraper: scraper, + appendable: alloyAppendable, + targetsGauge: targetsGauge, + movedTargetsCounter: movedTargetsCounter, } // Call to Update() to set the receivers and targets once at the start. @@ -213,19 +228,26 @@ func (c *Component) Run(ctx context.Context) error { case <-c.reloadTargets: c.mut.RLock() var ( - targets = c.args.Targets - jobName = c.opts.ID - clusteringEnabled = c.args.Clustering.Enabled + targets = c.args.Targets + jobName = c.opts.ID + args = c.args ) - if c.args.JobName != "" { + c.mut.RUnlock() + + if args.JobName != "" { jobName = c.args.JobName } - c.mut.RUnlock() - promTargets := c.distTargets(targets, jobName, clusteringEnabled) + newTargetGroups, movedTargets := c.distributeTargets(targets, jobName, args) + + // Make sure the targets that moved to another instance are NOT marked as stale. This is specific to how + // Prometheus handles marking series as stale: it is the client's responsibility to inject the + // staleness markers. In our case, for targets that moved to another instance in the cluster, we hand + // over this responsibility to the new owning instance. We must not inject staleness marker here. + c.scraper.DisableEndOfRunStalenessMarkers(jobName, movedTargets) select { - case targetSetsChan <- promTargets: + case targetSetsChan <- newTargetGroups: level.Debug(c.opts.Logger).Log("msg", "passed new targets to scrape manager") case <-ctx.Done(): } @@ -233,6 +255,34 @@ func (c *Component) Run(ctx context.Context) error { } } +func (c *Component) distributeTargets( + targets []discovery.Target, + jobName string, + args Arguments, +) (map[string][]*targetgroup.Group, []*scrape.Target) { + var ( + newDistTargets = discovery.NewDistributedTargets(args.Clustering.Enabled, c.cluster, targets) + oldDistributedTargets *discovery.DistributedTargets + ) + + c.dtMutex.Lock() + oldDistributedTargets, c.distributedTargets = c.distributedTargets, newDistTargets + c.dtMutex.Unlock() + + newLocalTargets := newDistTargets.LocalTargets() + c.targetsGauge.Set(float64(len(newLocalTargets))) + promNewTargets := c.componentTargetsToPromTargetGroups(jobName, newLocalTargets) + + movedTargets := newDistTargets.MovedToRemoteInstance(oldDistributedTargets) + c.movedTargetsCounter.Add(float64(len(movedTargets))) + // For moved targets, we need to populate prom labels in the same way as the scraper does, so that they match + // the currently running scrape loop's targets. This is not needed for new targets, as they will be populated + // by the scrape loop itself during the sync. + promMovedTargets := c.populatePromLabels(movedTargets, jobName, args) + + return promNewTargets, promMovedTargets +} + // Update implements component.Component. func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) @@ -311,20 +361,6 @@ func getPromScrapeConfigs(jobName string, c Arguments) *config.ScrapeConfig { return &dec } -func (c *Component) distTargets( - targets []discovery.Target, - jobName string, - clustering bool, -) map[string][]*targetgroup.Group { - // NOTE(@tpaschalis) First approach, manually building the - // 'clustered' targets implementation every time. - dt := discovery.NewDistributedTargets(clustering, c.cluster, targets) - alloyTargets := dt.Get() - c.targetsGauge.Set(float64(len(alloyTargets))) - promTargets := c.componentTargetsToProm(jobName, alloyTargets) - return promTargets -} - // ScraperStatus reports the status of the scraper's jobs. type ScraperStatus struct { TargetStatus []TargetStatus `alloy:"target,block,optional"` @@ -374,7 +410,7 @@ func (c *Component) DebugInfo() interface{} { } } -func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group { +func (c *Component) componentTargetsToPromTargetGroups(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group { promGroup := &targetgroup.Group{Source: jobName} for _, tg := range tgs { promGroup.Targets = append(promGroup.Targets, convertLabelSet(tg)) @@ -383,6 +419,21 @@ func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Targe return map[string][]*targetgroup.Group{jobName: {promGroup}} } +func (c *Component) populatePromLabels(targets []discovery.Target, jobName string, args Arguments) []*scrape.Target { + lb := labels.NewBuilder(labels.EmptyLabels()) + promTargets, errs := scrape.TargetsFromGroup( + c.componentTargetsToPromTargetGroups(jobName, targets)[jobName][0], + getPromScrapeConfigs(c.opts.ID, args), + false, /* noDefaultScrapePort - always false in this component */ + make([]*scrape.Target, len(targets)), /* targets slice to reuse */ + lb, + ) + for _, err := range errs { + level.Warn(c.opts.Logger).Log("msg", "error while populating labels of targets using prom config", "err", err) + } + return promTargets +} + func convertLabelSet(tg discovery.Target) model.LabelSet { lset := make(model.LabelSet, len(tg)) for k, v := range tg { diff --git a/internal/component/prometheus/scrape/scrape_clustering_test.go b/internal/component/prometheus/scrape/scrape_clustering_test.go new file mode 100644 index 0000000000..ea3964fd79 --- /dev/null +++ b/internal/component/prometheus/scrape/scrape_clustering_test.go @@ -0,0 +1,375 @@ +package scrape + +import ( + "context" + "fmt" + "net" + "sync" + "testing" + "time" + + "github.com/grafana/ckit/peer" + "github.com/grafana/ckit/shard" + client "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/service/cluster" + "github.com/grafana/alloy/internal/service/http" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/alloy/internal/util/assertmetrics" + "github.com/grafana/alloy/internal/util/testappender" + "github.com/grafana/alloy/internal/util/testtarget" +) + +const ( + testTimeout = 30 * time.Second +) + +var ( + peer1Self = peer.Peer{Name: "peer1", Addr: "peer1", Self: true, State: peer.StateParticipant} + peer2 = peer.Peer{Name: "peer2", Addr: "peer2", Self: false, State: peer.StateParticipant} + peer3 = peer.Peer{Name: "peer3", Addr: "peer3", Self: false, State: peer.StateParticipant} + + // There is a race condition in prometheus where calls to NewManager can race over a package-global variable when + // calling targetMetadataCache.registerManager(m). This is a workaround to prevent this for now. + //TODO(thampiotr): Open an issue in prometheus to fix this? + promManagerMutex sync.Mutex +) + +type testCase struct { + name string + initialTargetsAssignment map[peer.Peer][]int + updatedTargetsAssignment map[peer.Peer][]int + expectedStalenessInjections []int + expectedMovedTargetsTotal int +} + +var testCases = []testCase{ + { + name: "no targets move", + initialTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {1, 2, 3}, + peer2: {4, 5}, + peer3: {6, 7}, + }, + updatedTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {1, 2, 3}, + peer2: {4, 5}, + peer3: {6, 7}, + }, + expectedMovedTargetsTotal: 0, + }, + { + name: "one target added", + initialTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {1, 2}, + peer2: {4, 5}, + peer3: {6, 7}, + }, + updatedTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {1, 2, 3}, + peer2: {4, 5}, + peer3: {6, 7}, + }, + expectedMovedTargetsTotal: 0, + }, + { + name: "staleness injected when two targets disappear", + initialTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {1, 2, 3}, + peer2: {4, 5}, + peer3: {6, 7}, + }, + updatedTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {2}, + peer2: {4, 5}, + peer3: {6, 7}, + }, + expectedStalenessInjections: []int{1, 3}, + expectedMovedTargetsTotal: 0, + }, + { + name: "no staleness injected when two targets move to other instances", + initialTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {1, 2, 3, 4, 5}, + peer2: {6, 7}, + peer3: {8, 9}, + }, + updatedTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {2, 4, 5}, + peer2: {1, 6, 7}, + peer3: {3, 8, 9}, + }, + expectedMovedTargetsTotal: 2, + }, + { + name: "staleness injected when one target disappeared and another moved", + initialTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {1, 2, 3, 4, 5}, + peer2: {6, 7}, + peer3: {8, 9}, + }, + updatedTargetsAssignment: map[peer.Peer][]int{ + peer1Self: {2, 4, 5}, + peer2: {1, 6, 7}, + peer3: {8, 9}, + }, + expectedStalenessInjections: []int{3}, + expectedMovedTargetsTotal: 1, + }, +} + +func TestDetectingMovedTargets(t *testing.T) { + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + alloyMetricsReg := client.NewRegistry() + fakeCluster := &fakeCluster{ + peers: []peer.Peer{peer1Self, peer2, peer3}, + } + opts := testOptions(t, alloyMetricsReg, fakeCluster) + args := testArgs() + + appender := testappender.NewCollectingAppender() + args.ForwardTo = []storage.Appendable{testappender.ConstantAppendable{Inner: appender}} + + testTargets, shutdownTargets := createTestTargets(tc) + defer shutdownTargets() + + // Set initial targets + args.Targets = getActiveTargets(tc.initialTargetsAssignment, testTargets) + setUpClusterLookup(fakeCluster, tc.initialTargetsAssignment, testTargets) + + // Create and start the component + promManagerMutex.Lock() + s, err := New(opts, args) + promManagerMutex.Unlock() + + require.NoError(t, err) + ctx, cancelRun := context.WithTimeout(context.Background(), testTimeout) + runErr := make(chan error) + go func() { + err := s.Run(ctx) + runErr <- err + }() + + // Verify metrics and scraping of the right targets + waitForMetricValue(t, alloyMetricsReg, "prometheus_scrape_targets_gauge", float64(len(tc.initialTargetsAssignment[peer1Self]))) + waitForMetricValue(t, alloyMetricsReg, "prometheus_scrape_targets_moved_total", float64(0)) + waitForTargetsToBeScraped(t, appender, tc.initialTargetsAssignment[peer1Self]) + + // Update targets + args.Targets = getActiveTargets(tc.updatedTargetsAssignment, testTargets) + setUpClusterLookup(fakeCluster, tc.updatedTargetsAssignment, testTargets) + require.NoError(t, s.Update(args)) + + // Verify metrics and scraping of the right targets + waitForMetricValue(t, alloyMetricsReg, "prometheus_scrape_targets_gauge", float64(len(tc.updatedTargetsAssignment[peer1Self]))) + waitForMetricValue(t, alloyMetricsReg, "prometheus_scrape_targets_moved_total", float64(tc.expectedMovedTargetsTotal)) + waitForTargetsToBeScraped(t, appender, tc.updatedTargetsAssignment[peer1Self]) + + // Verify staleness injections + waitForStalenessInjections(t, appender, tc.expectedStalenessInjections) + + cancelRun() + select { + case err := <-runErr: + require.NoError(t, err) + } + }) + } +} + +func testArgs() Arguments { + var args Arguments + args.SetToDefault() + args.Clustering.Enabled = true + args.ScrapeInterval = 100 * time.Millisecond + args.ScrapeTimeout = args.ScrapeInterval + args.HonorLabels = true + return args +} + +func testOptions(t *testing.T, alloyMetricsReg *client.Registry, fakeCluster *fakeCluster) component.Options { + opts := component.Options{ + Logger: util.TestAlloyLogger(t), + Registerer: alloyMetricsReg, + ID: "prometheus.scrape.test", + GetServiceData: func(name string) (interface{}, error) { + switch name { + case http.ServiceName: + return http.Data{ + HTTPListenAddr: "localhost:12345", + MemoryListenAddr: "alloy.internal:1245", + BaseHTTPPath: "/", + DialFunc: (&net.Dialer{}).DialContext, + }, nil + + case cluster.ServiceName: + return fakeCluster, nil + case labelstore.ServiceName: + return labelstore.New(nil, alloyMetricsReg), nil + default: + return nil, fmt.Errorf("service %q does not exist", name) + } + }, + } + return opts +} + +func getActiveTargets(assignment map[peer.Peer][]int, testTargets map[int]*testtarget.TestTarget) []discovery.Target { + active := make([]discovery.Target, 0) + for _, targets := range assignment { + for _, id := range targets { + active = append(active, testTargets[id].Target()) + } + } + return active +} + +func createTestTargets(tc testCase) (map[int]*testtarget.TestTarget, func()) { + testTargets := map[int]*testtarget.TestTarget{} + gatherTestTargets := func(targets []int) { + for _, id := range targets { + if _, ok := testTargets[id]; !ok { + testTargets[id] = testTargetWithId(id) + } + } + } + + for _, targets := range tc.initialTargetsAssignment { + gatherTestTargets(targets) + } + for _, targets := range tc.updatedTargetsAssignment { + gatherTestTargets(targets) + } + + shutdownTargets := func() { + for _, t := range testTargets { + t.Close() + } + } + return testTargets, shutdownTargets +} + +func setUpClusterLookup(fakeCluster *fakeCluster, assignment map[peer.Peer][]int, targets map[int]*testtarget.TestTarget) { + fakeCluster.lookupMap = make(map[shard.Key][]peer.Peer) + for owningPeer, ownedTargets := range assignment { + for _, id := range ownedTargets { + fakeCluster.lookupMap[shard.Key(targets[id].Target().NonMetaLabels().Hash())] = []peer.Peer{owningPeer} + } + } +} + +func waitForTargetsToBeScraped(t *testing.T, appender testappender.CollectingAppender, targets []int) { + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + for _, id := range targets { + verifyTestTargetExposed(t, id, appender) + } + }, + 15*time.Second, + 10*time.Millisecond, + ) +} + +func waitForMetricValue(t *testing.T, alloyMetricsReg *client.Registry, name string, value float64) { + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + assertmetrics.AssertValueInReg( + t, + alloyMetricsReg, + name, + nil, + value, + ) + }, + testTimeout, + 10*time.Millisecond, + ) +} + +func testTargetWithId(id int) *testtarget.TestTarget { + t := testtarget.NewTestTarget() + t.AddCounter(client.CounterOpts{ + Name: "test_counter", + Help: "A test counter", + ConstLabels: map[string]string{ + "instance": fmt.Sprintf("%d", id), + }, + }).Add(100 + float64(id)) + t.AddGauge(client.GaugeOpts{ + Name: "test_gauge", + Help: "A test gauge", + ConstLabels: map[string]string{ + "instance": fmt.Sprintf("%d", id), + }, + }).Set(10 + float64(id)) + return t +} + +func verifyTestTargetExposed(t assert.TestingT, id int, appender testappender.CollectingAppender) { + counter := appender.LatestSampleFor(fmt.Sprintf(`{__name__="test_counter", instance="%d", job="prometheus.scrape.test"}`, id)) + assert.NotNil(t, counter) + if counter == nil { + return + } + assert.Equal(t, 100+float64(id), counter.Value) + + gauge := appender.LatestSampleFor(fmt.Sprintf(`{__name__="test_gauge", instance="%d", job="prometheus.scrape.test"}`, id)) + assert.NotNil(t, gauge) + if gauge == nil { + return + } + assert.Equal(t, 10+float64(id), gauge.Value) +} + +func waitForStalenessInjections(t *testing.T, appender testappender.CollectingAppender, expectedTargets []int) { + assert.EventuallyWithT( + t, + func(t *assert.CollectT) { + for _, targetId := range expectedTargets { + verifyStalenessInjectedForTarget(t, targetId, appender) + } + }, + testTimeout, + 10*time.Millisecond, + ) +} + +func verifyStalenessInjectedForTarget(t assert.TestingT, targetId int, appender testappender.CollectingAppender) { + counter := appender.LatestSampleFor(fmt.Sprintf(`{__name__="test_counter", instance="%d", job="prometheus.scrape.test"}`, targetId)) + assert.NotNil(t, counter) + if counter == nil { + return + } + assert.True(t, value.IsStaleNaN(counter.Value)) + + gauge := appender.LatestSampleFor(fmt.Sprintf(`{__name__="test_gauge", instance="%d", job="prometheus.scrape.test"}`, targetId)) + assert.NotNil(t, gauge) + if gauge == nil { + return + } + assert.True(t, value.IsStaleNaN(gauge.Value)) +} + +type fakeCluster struct { + lookupMap map[shard.Key][]peer.Peer + peers []peer.Peer +} + +func (f *fakeCluster) Lookup(key shard.Key, _ int, _ shard.Op) ([]peer.Peer, error) { + return f.lookupMap[key], nil +} + +func (f *fakeCluster) Peers() []peer.Peer { + return f.peers +} diff --git a/internal/component/pyroscope/scrape/scrape.go b/internal/component/pyroscope/scrape/scrape.go index 1abdda6401..906d3f067b 100644 --- a/internal/component/pyroscope/scrape/scrape.go +++ b/internal/component/pyroscope/scrape/scrape.go @@ -7,14 +7,15 @@ import ( "sync" "time" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/grafana/alloy/internal/alloy/logging/level" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/service/cluster" "github.com/grafana/alloy/internal/service/http" - config_util "github.com/prometheus/common/config" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/grafana/alloy/internal/component" component_config "github.com/grafana/alloy/internal/component/common/config" @@ -296,9 +297,9 @@ func (c *Component) Run(ctx context.Context) error { case <-c.reloadTargets: c.mut.RLock() var ( - tgs = c.args.Targets - jobName = c.opts.ID - clustering = c.args.Clustering.Enabled + tgs = c.args.Targets + jobName = c.opts.ID + clusteringEnabled = c.args.Clustering.Enabled ) if c.args.JobName != "" { jobName = c.args.JobName @@ -307,8 +308,8 @@ func (c *Component) Run(ctx context.Context) error { // NOTE(@tpaschalis) First approach, manually building the // 'clustered' targets implementation every time. - ct := discovery.NewDistributedTargets(clustering, c.cluster, tgs) - promTargets := c.componentTargetsToProm(jobName, ct.Get()) + ct := discovery.NewDistributedTargets(clusteringEnabled, c.cluster, tgs) + promTargets := c.componentTargetsToProm(jobName, ct.LocalTargets()) select { case targetSetsChan <- promTargets: diff --git a/internal/util/assertmetrics/assert_metrics.go b/internal/util/assertmetrics/assert_metrics.go new file mode 100644 index 0000000000..0c61bfb94a --- /dev/null +++ b/internal/util/assertmetrics/assert_metrics.go @@ -0,0 +1,45 @@ +package assertmetrics + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" +) + +func ReadMetrics(t assert.TestingT, reg *prometheus.Registry) string { + // Start server to expose prom metrics + srv := httptest.NewServer(promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + defer srv.Close() + + // Get the metrics body + resp, err := http.Get(fmt.Sprintf("%s/metrics", srv.URL)) + assert.NoError(t, err, "error fetching metrics") + + // Return body as text + body, err := io.ReadAll(resp.Body) + assert.NoError(t, err, "error reading response body") + assert.NoError(t, resp.Body.Close(), "error closing response body") + return string(body) +} + +func AssertValueInReg(t assert.TestingT, reg *prometheus.Registry, metricName string, labels labels.Labels, value float64) { + AssertValueInStr(t, ReadMetrics(t, reg), metricName, labels, value) +} + +func AssertValueInStr(t assert.TestingT, allMetrics string, metricName string, labels labels.Labels, value float64) { + ls := "" + if len(labels) != 0 { + ls = strings.Replace(labels.String(), ", ", ",", -1) + } + + // NOTE: currently no support for exemplars or explicit timestamps + expectedMetric := fmt.Sprintf("%s%s %v", metricName, ls, value) + assert.Contains(t, allMetrics, expectedMetric, "expected metric not found") +} diff --git a/internal/util/assertmetrics/assert_metrics_test.go b/internal/util/assertmetrics/assert_metrics_test.go new file mode 100644 index 0000000000..1b42c02710 --- /dev/null +++ b/internal/util/assertmetrics/assert_metrics_test.go @@ -0,0 +1,42 @@ +package assertmetrics + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" +) + +func TestMetricValue(t *testing.T) { + gauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "some_metric", + Help: "A sample metric", + }) + gauge.Set(42) + + reg := prometheus.NewRegistry() + reg.MustRegister(gauge) + + metrics := ReadMetrics(t, reg) + AssertValueInStr(t, metrics, "some_metric", nil, 42) + + gauge.Set(31337) + AssertValueInReg(t, reg, "some_metric", nil, 31337) +} + +func TestMetricValueWithLabels(t *testing.T) { + gauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "some_metric", + Help: "A sample metric", + ConstLabels: prometheus.Labels{ + "foo": "bar", + "boo": "yah", + }, + }) + gauge.Set(42) + + reg := prometheus.NewRegistry() + reg.MustRegister(gauge) + + AssertValueInReg(t, reg, "some_metric", labels.FromStrings("foo", "bar", "boo", "yah"), 42) +} diff --git a/internal/util/assertmetrics/doc.go b/internal/util/assertmetrics/doc.go new file mode 100644 index 0000000000..09f3d60f6f --- /dev/null +++ b/internal/util/assertmetrics/doc.go @@ -0,0 +1,3 @@ +// Package assertmetrics provides utilities for verifying Prometheus metrics in tests given a Registry. See package +// tests for usage examples. +package assertmetrics diff --git a/internal/util/testappender/collectingappender.go b/internal/util/testappender/collectingappender.go new file mode 100644 index 0000000000..68b3f3c56d --- /dev/null +++ b/internal/util/testappender/collectingappender.go @@ -0,0 +1,93 @@ +package testappender + +import ( + "context" + "sync" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + "golang.org/x/exp/maps" +) + +type MetricSample struct { + Timestamp int64 + Value float64 + Labels labels.Labels +} + +// CollectingAppender is an Appender that collects the samples it receives in a map. Useful for testing and verifying +// the samples that are being written. +type CollectingAppender interface { + storage.Appender + CollectedSamples() map[string]*MetricSample + LatestSampleFor(labels string) *MetricSample +} + +type collectingAppender struct { + mut sync.Mutex + latestSamples map[string]*MetricSample +} + +func NewCollectingAppender() CollectingAppender { + return &collectingAppender{ + latestSamples: map[string]*MetricSample{}, + } +} + +func (c *collectingAppender) CollectedSamples() map[string]*MetricSample { + c.mut.Lock() + defer c.mut.Unlock() + cp := map[string]*MetricSample{} + maps.Copy(cp, c.latestSamples) + return cp +} + +func (c *collectingAppender) LatestSampleFor(labels string) *MetricSample { + c.mut.Lock() + defer c.mut.Unlock() + return c.latestSamples[labels] +} + +func (c *collectingAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + c.mut.Lock() + defer c.mut.Unlock() + c.latestSamples[l.String()] = &MetricSample{ + Timestamp: t, + Value: v, + Labels: l, + } + return ref, nil +} + +func (c *collectingAppender) Commit() error { + return nil +} + +func (c *collectingAppender) Rollback() error { + return nil +} + +func (c *collectingAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + panic("not implemented yet") +} + +func (c *collectingAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + panic("not implemented yet") +} + +func (c *collectingAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + panic("not implemented yet") +} + +type ConstantAppendable struct { + Inner CollectingAppender +} + +func (c ConstantAppendable) Appender(_ context.Context) storage.Appender { + return c.Inner +} + +var _ storage.Appendable = &ConstantAppendable{} diff --git a/internal/util/testtarget/test_target.go b/internal/util/testtarget/test_target.go new file mode 100644 index 0000000000..0dab941635 --- /dev/null +++ b/internal/util/testtarget/test_target.go @@ -0,0 +1,63 @@ +package testtarget + +import ( + "net/http/httptest" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/grafana/alloy/internal/component/discovery" +) + +func NewTestTarget() *TestTarget { + // Start server to expose prom metrics + registry := prometheus.NewRegistry() + srv := httptest.NewServer(promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + return &TestTarget{ + server: srv, + registry: registry, + } +} + +// TestTarget is a test target for prometheus metrics that exposes the metrics from provided registry via HTTP. +// It must be closed after use using Close method. +type TestTarget struct { + server *httptest.Server + registry *prometheus.Registry +} + +func (t *TestTarget) AddCounter(opts prometheus.CounterOpts) prometheus.Counter { + counter := prometheus.NewCounter(opts) + t.registry.MustRegister(counter) + return counter +} + +func (t *TestTarget) AddGauge(opts prometheus.GaugeOpts) prometheus.Gauge { + gauge := prometheus.NewGauge(opts) + t.registry.MustRegister(gauge) + return gauge +} + +func (t *TestTarget) AddHistogram(opts prometheus.HistogramOpts) prometheus.Histogram { + histogram := prometheus.NewHistogram(opts) + t.registry.MustRegister(histogram) + return histogram +} + +func (t *TestTarget) Target() discovery.Target { + return discovery.Target{ + "__address__": t.server.Listener.Addr().String(), + } +} + +func (t *TestTarget) Registry() *prometheus.Registry { + return t.registry +} + +func (t *TestTarget) URL() string { + return t.server.URL +} + +func (t *TestTarget) Close() { + t.server.Close() +} diff --git a/internal/util/testtarget/test_target_test.go b/internal/util/testtarget/test_target_test.go new file mode 100644 index 0000000000..1f45f5841d --- /dev/null +++ b/internal/util/testtarget/test_target_test.go @@ -0,0 +1,60 @@ +package testtarget + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/util/assertmetrics" +) + +func TestTargetHelper(t *testing.T) { + tt := NewTestTarget() + defer tt.Close() + + c := tt.AddCounter(prometheus.CounterOpts{ + Name: "test_counter", + Help: "A test counter", + }) + c.Add(10) + + g := tt.AddGauge(prometheus.GaugeOpts{ + Name: "test_gauge", + Help: "A test gauge", + }) + g.Set(123) + + h := tt.AddHistogram(prometheus.HistogramOpts{ + Name: "test_histogram", + Help: "A test histogram", + }) + h.Observe(3) + + expected := `# HELP test_counter A test counter +# TYPE test_counter counter +test_counter 10 +# HELP test_gauge A test gauge +# TYPE test_gauge gauge +test_gauge 123 +# HELP test_histogram A test histogram +# TYPE test_histogram histogram +test_histogram_bucket{le="0.005"} 0 +test_histogram_bucket{le="0.01"} 0 +test_histogram_bucket{le="0.025"} 0 +test_histogram_bucket{le="0.05"} 0 +test_histogram_bucket{le="0.1"} 0 +test_histogram_bucket{le="0.25"} 0 +test_histogram_bucket{le="0.5"} 0 +test_histogram_bucket{le="1"} 0 +test_histogram_bucket{le="2.5"} 0 +test_histogram_bucket{le="5"} 1 +test_histogram_bucket{le="10"} 1 +test_histogram_bucket{le="+Inf"} 1 +test_histogram_sum 3 +test_histogram_count 1 +` + + actual := assertmetrics.ReadMetrics(t, tt.Registry()) + require.Equal(t, expected, actual) +}