Skip to content

Commit

Permalink
tests and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed May 3, 2024
1 parent 85faf86 commit 27e1b00
Show file tree
Hide file tree
Showing 12 changed files with 718 additions and 31 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,6 @@ require (
go.opentelemetry.io/collector/confmap/provider/httpprovider v0.99.0 // indirect
go.opentelemetry.io/collector/confmap/provider/httpsprovider v0.99.0 // indirect
go.opentelemetry.io/collector/filter v0.99.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.99.0 // indirect
go.opentelemetry.io/contrib/config v0.5.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0 // indirect
Expand Down Expand Up @@ -715,9 +714,10 @@ replace (
// * There is a release of Prometheus which contains
// prometheus/prometheus#13002
// and prometheus/prometheus#13497
// and https://github.com/grafana/prometheus/pull/34
// We use the last v1-related tag as the replace statement does not work for v2
// tags without the v2 suffix to the module root.
replace github.com/prometheus/prometheus => github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406 // cmp_header_order branch
replace github.com/prometheus/prometheus => github.com/grafana/prometheus v1.8.2-0.20240503114359-53f3689112ff // thampiotr/expose-disabling-staleness-markers

replace gopkg.in/yaml.v2 => github.com/rfratto/go-yaml v0.0.0-20211119180816-77389c3526dc

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1066,8 +1066,8 @@ github.com/grafana/opentelemetry-collector/service v0.0.0-20240429170914-d1e1018
github.com/grafana/opentelemetry-collector/service v0.0.0-20240429170914-d1e101852ba5/go.mod h1:0djU5YbUIZw4Y+KNYk07tZztXrMK/LNAZyfH8b7f7xA=
github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd h1:vNHdecaOmYgSHMEQRgyzWacV++N38Jp8qLZg0RCsfFo=
github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd/go.mod h1:kR16GJ0ZwWVQ2osW3pgtDJU1a/GXpufrwio0kLG14cg=
github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406 h1:LVIOYe5j92m10wluP5hgeHqSkOLnZzcPxhYCkdbLXCE=
github.com/grafana/prometheus v1.8.2-0.20240130142130-51b39f24d406/go.mod h1:SRw624aMAxTfryAcP8rOjg4S/sHHaetx2lyJJ2nM83g=
github.com/grafana/prometheus v1.8.2-0.20240503114359-53f3689112ff h1:t81+PfBE+XEserwuBJILA67hOv9g74X4Gs36IzCaRQA=
github.com/grafana/prometheus v1.8.2-0.20240503114359-53f3689112ff/go.mod h1:SRw624aMAxTfryAcP8rOjg4S/sHHaetx2lyJJ2nM83g=
github.com/grafana/pyroscope-go/godeltaprof v0.1.7 h1:C11j63y7gymiW8VugJ9ZW0pWfxTZugdSJyC48olk5KY=
github.com/grafana/pyroscope-go/godeltaprof v0.1.7/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE=
github.com/grafana/pyroscope/api v0.4.0 h1:J86DxoNeLOvtJhB1Cn65JMZkXe682D+RqeoIUiYc/eo=
Expand Down
25 changes: 17 additions & 8 deletions internal/component/discovery/distributed_targets.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package discovery

import (
"github.com/grafana/alloy/internal/service/cluster"
"github.com/grafana/ckit/peer"
"github.com/grafana/ckit/shard"

"github.com/grafana/alloy/internal/service/cluster"
)

// DistributedTargets uses the node's Lookup method to distribute discovery
Expand All @@ -17,17 +19,12 @@ type DistributedTargets struct {
// NewDistributedTargets creates the abstraction that allows components to
// dynamically shard targets between components.
func NewDistributedTargets(clusteringEnabled bool, cluster cluster.Cluster, allTargets []Target) *DistributedTargets {
// TODO(@tpaschalis): Make this into a single code-path to simplify logic.
if !clusteringEnabled || cluster == nil {
return &DistributedTargets{
localTargets: allTargets,
remoteTargetKeys: map[shard.Key]struct{}{},
}
cluster = disabledCluster{}
}

peerCount := len(cluster.Peers())
localCap := len(allTargets) + 1
if peerCount != 0 {
if peerCount := len(cluster.Peers()); peerCount != 0 {
localCap = (len(allTargets) + 1) / peerCount
}

Expand Down Expand Up @@ -80,3 +77,15 @@ func (dt *DistributedTargets) MovedToRemoteInstance(prev *DistributedTargets) []
func keyFor(tgt Target) shard.Key {
return shard.Key(tgt.NonMetaLabels().Hash())
}

type disabledCluster struct{}

var _ cluster.Cluster = disabledCluster{}

func (l disabledCluster) Lookup(key shard.Key, replicationFactor int, op shard.Op) ([]peer.Peer, error) {
return nil, nil
}

func (l disabledCluster) Peers() []peer.Peer {
return nil
}
22 changes: 12 additions & 10 deletions internal/component/discovery/distributed_targets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package discovery

import (
"fmt"
"github.com/grafana/alloy/internal/service/cluster"
"math/rand"
"testing"

"github.com/grafana/ckit/peer"
"github.com/grafana/ckit/shard"
"github.com/stretchr/testify/require"
"math/rand"
"testing"

"github.com/grafana/alloy/internal/service/cluster"
)

var (
Expand Down Expand Up @@ -126,13 +128,13 @@ var movedToRemoteInstanceTestCases = []struct {
expectedMovedTargets []Target
}{
{
name: "no previous targets distribution",
name: "no previous targets distribution",
previous: nil,
current: testDistTargets(map[shard.Key][]peer.Peer{
keyFor(target1): {peer1Self},
keyFor(target2): {peer2},
keyFor(target3): {peer1Self},
}),
previous: nil,
expectedMovedTargets: nil,
},
{
Expand Down Expand Up @@ -290,18 +292,18 @@ func mkTarget(kv ...string) Target {
return target
}

type fakeCluster struct {
lookupMap map[shard.Key][]peer.Peer
peers []peer.Peer
}

func testDistTargets(lookupMap map[shard.Key][]peer.Peer) *DistributedTargets {
return NewDistributedTargets(true, &fakeCluster{
peers: allTestPeers,
lookupMap: lookupMap,
}, allTestTargets)
}

type fakeCluster struct {
lookupMap map[shard.Key][]peer.Peer
peers []peer.Peer
}

func (f *fakeCluster) Lookup(key shard.Key, _ int, _ shard.Op) ([]peer.Peer, error) {
if key == magicErrorKey {
return nil, fmt.Errorf("test error for magic error key")
Expand Down
22 changes: 13 additions & 9 deletions internal/component/prometheus/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ import (
"time"

"github.com/alecthomas/units"
client_prometheus "github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/alloy/internal/alloy/logging/level"
"github.com/grafana/alloy/internal/component"
component_config "github.com/grafana/alloy/internal/component/common/config"
Expand All @@ -18,14 +27,6 @@ import (
"github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/useragent"
client_prometheus "github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
)

func init() {
Expand Down Expand Up @@ -239,7 +240,10 @@ func (c *Component) Run(ctx context.Context) error {

newTargetGroups, movedTargets := c.distributeTargets(targets, jobName, args)

// Make sure the targets that moved to another instance are not marked as stale.
// Make sure the targets that moved to another instance are NOT marked as stale. This is specific to how
// Prometheus handles marking series as stale: it is the client's responsibility to inject the
// staleness markers. In our case, for targets that moved to another instance in the cluster, we hand
// over this responsibility to the new owning instance. We must not inject staleness marker here.
c.scraper.DisableEndOfRunStalenessMarkers(jobName, movedTargets)

select {
Expand Down
Loading

0 comments on commit 27e1b00

Please sign in to comment.