Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix staleness during target handover #703

Merged
merged 10 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ internal API changes are not present.
Main (unreleased)
-----------------

### Bugfixes

- Fixed an issue with `prometheus.scrape` in which targets that move from one
cluster instance to another could have a staleness marker inserted and result
in a gap in metrics (@thampiotr)

v1.1.0-rc.0
-----------

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,10 @@ replace (
// * There is a release of Prometheus which contains
// prometheus/prometheus#13002
// and prometheus/prometheus#13497
// and https://github.com/grafana/prometheus/pull/34
// We use the last v1-related tag as the replace statement does not work for v2
// tags without the v2 suffix to the module root.
replace github.com/prometheus/prometheus => github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406 // cmp_header_order branch
replace github.com/prometheus/prometheus => github.com/grafana/prometheus v1.8.2-0.20240513094155-793c8c9fe88e // cmp_header_order_and_staleness_disabling branch

replace gopkg.in/yaml.v2 => github.com/rfratto/go-yaml v0.0.0-20211119180816-77389c3526dc

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,8 @@ github.com/grafana/opentelemetry-collector/service v0.0.0-20240429170914-d1e1018
github.com/grafana/opentelemetry-collector/service v0.0.0-20240429170914-d1e101852ba5/go.mod h1:0djU5YbUIZw4Y+KNYk07tZztXrMK/LNAZyfH8b7f7xA=
github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd h1:vNHdecaOmYgSHMEQRgyzWacV++N38Jp8qLZg0RCsfFo=
github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd/go.mod h1:kR16GJ0ZwWVQ2osW3pgtDJU1a/GXpufrwio0kLG14cg=
github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406 h1:LVIOYe5j92m10wluP5hgeHqSkOLnZzcPxhYCkdbLXCE=
github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406/go.mod h1:SRw624aMAxTfryAcP8rOjg4S/sHHaetx2lyJJ2nM83g=
github.com/grafana/prometheus v1.8.2-0.20240513094155-793c8c9fe88e h1:uQDMlJKE+h6TloPTTiSyA8FSMJeU8mQfg1MY1/UrCKA=
github.com/grafana/prometheus v1.8.2-0.20240513094155-793c8c9fe88e/go.mod h1:SRw624aMAxTfryAcP8rOjg4S/sHHaetx2lyJJ2nM83g=
github.com/grafana/pyroscope-go/godeltaprof v0.1.7 h1:C11j63y7gymiW8VugJ9ZW0pWfxTZugdSJyC48olk5KY=
github.com/grafana/pyroscope-go/godeltaprof v0.1.7/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE=
github.com/grafana/pyroscope/api v0.4.0 h1:J86DxoNeLOvtJhB1Cn65JMZkXe682D+RqeoIUiYc/eo=
Expand Down
49 changes: 0 additions & 49 deletions internal/component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"time"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/service/cluster"
"github.com/grafana/ckit/shard"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
Expand All @@ -20,53 +18,6 @@ import (
// component.
type Target map[string]string

// DistributedTargets uses the node's Lookup method to distribute discovery
// targets when a component runs in a cluster.
type DistributedTargets struct {
useClustering bool
cluster cluster.Cluster
targets []Target
}

// NewDistributedTargets creates the abstraction that allows components to
// dynamically shard targets between components.
func NewDistributedTargets(e bool, n cluster.Cluster, t []Target) DistributedTargets {
return DistributedTargets{e, n, t}
}

// Get distributes discovery targets a clustered environment.
//
// If a cluster size is 1, then all targets will be returned.
func (t *DistributedTargets) Get() []Target {
// TODO(@tpaschalis): Make this into a single code-path to simplify logic.
if !t.useClustering || t.cluster == nil {
return t.targets
}

peerCount := len(t.cluster.Peers())
resCap := (len(t.targets) + 1)
if peerCount != 0 {
resCap = (len(t.targets) + 1) / peerCount
}

res := make([]Target, 0, resCap)

for _, tgt := range t.targets {
peers, err := t.cluster.Lookup(shard.StringKey(tgt.NonMetaLabels().String()), 1, shard.OpReadWrite)
if err != nil {
// This can only fail in case we ask for more owners than the
// available peers. This will never happen, but in any case we fall
// back to owning the target ourselves.
res = append(res, tgt)
}
if len(peers) == 0 || peers[0].Self {
res = append(res, tgt)
}
}

return res
}

// Labels converts Target into a set of sorted labels.
func (t Target) Labels() labels.Labels {
var lset labels.Labels
Expand Down
92 changes: 92 additions & 0 deletions internal/component/discovery/distributed_targets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package discovery

import (
"github.com/grafana/ckit/peer"
"github.com/grafana/ckit/shard"

"github.com/grafana/alloy/internal/service/cluster"
)

// DistributedTargets uses the node's Lookup method to distribute discovery
// targets when a component runs in a cluster.
type DistributedTargets struct {
localTargets []Target
// localTargetKeys is used to cache the key hash computation. Improves time performance by ~20%.
localTargetKeys []shard.Key
remoteTargetKeys map[shard.Key]struct{}
}

// NewDistributedTargets creates the abstraction that allows components to
// dynamically shard targets between components.
func NewDistributedTargets(clusteringEnabled bool, cluster cluster.Cluster, allTargets []Target) *DistributedTargets {
if !clusteringEnabled || cluster == nil {
cluster = disabledCluster{}
}

localCap := len(allTargets) + 1
if peerCount := len(cluster.Peers()); peerCount != 0 {
localCap = (len(allTargets) + 1) / peerCount
}

localTargets := make([]Target, 0, localCap)
localTargetKeys := make([]shard.Key, 0, localCap)
remoteTargetKeys := make(map[shard.Key]struct{}, len(allTargets)-localCap)

for _, tgt := range allTargets {
targetKey := keyFor(tgt)
peers, err := cluster.Lookup(targetKey, 1, shard.OpReadWrite)
belongsToLocal := err != nil || len(peers) == 0 || peers[0].Self

if belongsToLocal {
localTargets = append(localTargets, tgt)
localTargetKeys = append(localTargetKeys, targetKey)
} else {
remoteTargetKeys[targetKey] = struct{}{}
}
}

return &DistributedTargets{
localTargets: localTargets,
localTargetKeys: localTargetKeys,
remoteTargetKeys: remoteTargetKeys,
}
}

// LocalTargets returns the targets that belong to the local cluster node.
func (dt *DistributedTargets) LocalTargets() []Target {
return dt.localTargets
}

// MovedToRemoteInstance returns the set of local targets from prev
// that are no longer local in dt, indicating an active target has moved.
// Only targets which exist in both prev and dt are returned. If prev
// contains an empty list of targets, no targets are returned.
func (dt *DistributedTargets) MovedToRemoteInstance(prev *DistributedTargets) []Target {
if prev == nil {
return nil
}
var movedAwayTargets []Target
for i := 0; i < len(prev.localTargets); i++ {
key := prev.localTargetKeys[i]
if _, exist := dt.remoteTargetKeys[key]; exist {
movedAwayTargets = append(movedAwayTargets, prev.localTargets[i])
}
}
return movedAwayTargets
}

func keyFor(tgt Target) shard.Key {
return shard.Key(tgt.NonMetaLabels().Hash())
}

type disabledCluster struct{}

var _ cluster.Cluster = disabledCluster{}

func (l disabledCluster) Lookup(key shard.Key, replicationFactor int, op shard.Op) ([]peer.Peer, error) {
return nil, nil
}

func (l disabledCluster) Peers() []peer.Peer {
return nil
}
Loading
Loading