Skip to content

Commit

Permalink
manager: Refactored tests.
Browse files Browse the repository at this point in the history
Signed-off-by: bwplotka <[email protected]>
  • Loading branch information
bwplotka committed Oct 1, 2024
1 parent e62f0c8 commit 53c2496
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 176 deletions.
5 changes: 2 additions & 3 deletions model/textparse/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,9 @@ func New(b []byte, contentType string, parseClassicHistograms, skipOMCTSeries bo
}
switch mediaType {
case "application/openmetrics-text":
opts := func(o *openMetricsParserOptions) {
return NewOpenMetricsParser(b, st, func(o *openMetricsParserOptions) {
o.SkipCTSeries = skipOMCTSeries
}
return NewOpenMetricsParser(b, st, opts), nil
}), nil
case "application/vnd.google.protobuf":
return NewProtobufParser(b, parseClassicHistograms, st), nil
default:
Expand Down
319 changes: 146 additions & 173 deletions scrape/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package scrape

import (
"bytes"
"context"
"fmt"
"net/http"
Expand All @@ -30,7 +31,9 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -721,7 +724,7 @@ scrape_configs:
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
}

func setupScrapeManager(t *testing.T, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) {
func setupScrapeManager(t *testing.T, honorTimestamps bool, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) {
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{
Expand All @@ -742,7 +745,7 @@ func setupScrapeManager(t *testing.T, enableCTZeroIngestion bool) (*collectResul
ScrapeTimeout: model.Duration(5 * time.Second),
ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0, config.PrometheusProto},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test", HonorTimestamps: honorTimestamps}},
}))

return app, scrapeManager
Expand Down Expand Up @@ -771,181 +774,151 @@ func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server
return server
}

func prepareWriteData(t *testing.T, mName, typ string, counterSampleProto *dto.Counter, counterSampleText string) []byte {
var toWrite []byte
switch typ {
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
ctrType := dto.MetricType_COUNTER
toWrite = protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Counter: counterSampleProto}},
// TestManagerCTZeroIngestion tests scrape manager for various CT cases.
func TestManagerCTZeroIngestion(t *testing.T) {
const (
// _total suffix is required, otherwise expfmt with OMText will mark metric as "unknown"
expectedMetricName = "expected_metric_total"
expectedCreatedMetricName = "expected_metric_created"
expectedSampleValue = 17.0
)

for _, testFormat := range []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0} {
t.Run(fmt.Sprintf("format=%s", testFormat), func(t *testing.T) {
for _, testWithCT := range []bool{false, true} {
t.Run(fmt.Sprintf("withCT=%v", testWithCT), func(t *testing.T) {
for _, testCTZeroIngest := range []bool{false, true} {
t.Run(fmt.Sprintf("ctZeroIngest=%v", testCTZeroIngest), func(t *testing.T) {
sampleTs := time.Now()
ctTs := time.Time{}
if testWithCT {
ctTs = sampleTs.Add(-2 * time.Minute)
}

// TODO(bwplotka): Add more types than just counter?
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, ctTs)
app, scrapeManager := setupScrapeManager(t, true, testCTZeroIngest)

// Perform the test.
doOneScrape(t, scrapeManager, app, setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded))

// Verify results.
// Verify what we got vs expectations around CT injection.
samples := findSamplesForMetric(app.resultFloats, expectedMetricName)
if testWithCT && testCTZeroIngest {
require.Len(t, samples, 2)
require.Equal(t, 0.0, samples[0].f)

// TODO(bwplotka): Something is wrong, likely due to expfmt bug.
// OMText
// TS
// n, err = writeOpenMetricsFloat(w, float64(*metric.TimestampMs)/1000)
// CT
// n, err = writeOpenMetricsFloat(w, float64(createdTimestamp.AsTime().UnixNano())/1e9)

require.Equal(t, timestamp.FromTime(ctTs), samples[0].t) // Failing for OM Text
require.Equal(t, expectedSampleValue, samples[1].f)
require.Equal(t, timestamp.FromTime(sampleTs), samples[1].t)
} else {
require.Len(t, samples, 1)
require.Equal(t, expectedSampleValue, samples[0].f)
require.Equal(t, timestamp.FromTime(sampleTs), samples[0].t)
}

// Verify what we got vs expectations around additional _created series for OM text.
// enableCTZeroInjection also kills that _created line.
createdSeriesSamples := findSamplesForMetric(app.resultFloats, expectedCreatedMetricName)
if testFormat == config.OpenMetricsText1_0_0 && testWithCT && !testCTZeroIngest {
// For OM Text, when counter has CT, and feature flag disabled we should see _created lines.
require.Len(t, createdSeriesSamples, 1)
// Conversion taken from common/expfmt.writeOpenMetricsFloat.
// We don't check the ct timestamp as explicit ts was not implemented in expfmt.Encoder,
// but exists in OM https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created
// We can implement this, but we want to potentially get rid of OM 1.0 CT lines
require.Equal(t, float64(timestamppb.New(ctTs).AsTime().UnixNano())/1e9, createdSeriesSamples[0].f)
} else {
require.Len(t, createdSeriesSamples, 0)

Check failure on line 840 in scrape/manager_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

empty: use require.Empty (testifylint)
}
})
}
})
}
})
case "application/openmetrics-text; version=1.0.0; charset=utf-8":
toWrite = []byte(counterSampleText)
}
return toWrite
}

// TestManagerCTZeroIngestion tests scrape manager for CT cases.
func TestManagerCTZeroIngestion(t *testing.T) {
const mName = "expected_counter"
func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName string, v float64, ts time.Time, ct time.Time) (encoded []byte) {
t.Helper()

type expectCTLineAppended struct {
value float64
ts int64
counter := &dto.Counter{Value: proto.Float64(v)}
if !ct.IsZero() {
counter.CreatedTimestamp = timestamppb.New(ct)
}
ctrType := dto.MetricType_COUNTER
inputMetric := &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{
TimestampMs: proto.Int64(timestamp.FromTime(ts)),
Counter: counter,
}},
}
switch format {
case config.PrometheusProto:
return protoMarshalDelimited(t, inputMetric)
case config.OpenMetricsText1_0_0:
buf := &bytes.Buffer{}
require.NoError(t, expfmt.NewEncoder(buf, expfmt.NewFormat(expfmt.TypeOpenMetrics), expfmt.WithCreatedLines(), expfmt.WithUnit()).Encode(inputMetric))
_, _ = buf.WriteString("# EOF")

t.Log("produced OM text to expose:", buf.String())
return buf.Bytes()
default:
t.Fatalf("not implemented format: %v", format)
return nil
}
}

for _, tc := range []struct {
name string
counterSampleProto *dto.Counter
counterSampleText string
enableCTZeroIngestion bool
expectCTLineAppended []expectCTLineAppended
typ string
}{
{
name: "Protobuf disabled with CT on counter",
counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "Protobuf enabled with CT on counter",
counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
enableCTZeroIngestion: true,
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "Protobuf enabled without CT on counter",
counterSampleProto: &dto.Counter{
Value: proto.Float64(1.0),
},
enableCTZeroIngestion: true,
typ: "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited",
},
{
name: "OMText disabled with CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
expected_counter_created 1000
# EOF`,
expectCTLineAppended: []expectCTLineAppended{{
value: 17.0,
ts: 1520879607789,
}},
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
},
{
name: "OMText enabled with CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
expected_counter_created 1000
# EOF`,
enableCTZeroIngestion: true,
expectCTLineAppended: []expectCTLineAppended{
{
value: 0.0,
ts: 1000,
},
{
value: 17.0,
ts: 1520879607789,
},
},
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
},
{
name: "OMText enabled without CT on counter",
counterSampleText: `# TYPE expected_counter counter
expected_counter 17.0 1520879607.789
# EOF`,
enableCTZeroIngestion: true,
expectCTLineAppended: []expectCTLineAppended{{
value: 17.0,
ts: 1520879607789,
}},
typ: "application/openmetrics-text; version=1.0.0; charset=utf-8",
},
} {
t.Run(tc.name, func(t *testing.T) {
app, scrapeManager := setupScrapeManager(t, tc.enableCTZeroIngestion)

toWrite := prepareWriteData(t, mName, tc.typ, tc.counterSampleProto, tc.counterSampleText)

server := setupTestServer(t, tc.typ, toWrite)
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)

// Add fake target directly into tsets + reload
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
scrapeManager.reload()
func doOneScrape(t *testing.T, manager *Manager, appender *collectResultAppender, server *httptest.Server) {
t.Helper()

var got []float64
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
app.mtx.Lock()
defer app.mtx.Unlock()

// Check if scrape happened and grab the relevant samples, they have to be there - or it's a bug
// and it's not worth waiting.
for _, f := range app.resultFloats {
if f.metric.Get(model.MetricNameLabel) == mName {
got = append(got, f.f)
}
}
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)

if len(app.resultFloats) > 0 {
return nil
}
return fmt.Errorf("expected some samples, got none")
}), "after 1 minute")
scrapeManager.Stop()

switch tc.typ {
case "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited":
// Check for zero samples, assuming we only injected always one sample.
// Did it contain CT to inject? If yes, was CT zero enabled?
if tc.counterSampleProto.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
require.Len(t, got, 2)
require.Equal(t, 0.0, got[0])
require.Equal(t, tc.counterSampleProto.GetValue(), got[1])
return
}
// Add fake target directly into tsets + reload
manager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
manager.reload()

// Expect only one, valid sample.
require.Len(t, got, 1)
require.Equal(t, tc.counterSampleProto.GetValue(), got[0])
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
appender.mtx.Lock()
defer appender.mtx.Unlock()

case "application/openmetrics-text; version=1.0.0; charset=utf-8":
require.Len(t, got, len(tc.expectCTLineAppended))
for i, e := range tc.expectCTLineAppended {
require.Equal(t, e.value, got[i])
}
// Check if scrape happened and grab the relevant samples.
if len(appender.resultFloats) > 0 {
return nil
}
return fmt.Errorf("expected some float samples, got none")
}), "after 1 minute")
manager.Stop()
}

// We expect _created lines to be appended as a new metric if ct ingestion is disabled
if !tc.enableCTZeroIngestion {
require.Equal(t, "expected_counter_created", app.resultFloats[1].metric.Get(model.MetricNameLabel))
}
}
})
func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) {
for _, f := range floats {
if f.metric.Get(model.MetricNameLabel) == metricName {
ret = append(ret, f)
}
}
return ret
}

func TestUnregisterMetrics(t *testing.T) {
Expand All @@ -962,10 +935,10 @@ func TestUnregisterMetrics(t *testing.T) {
}

func applyConfig(
t *testing.T,
config string,
scrapeManager *Manager,
discoveryManager *discovery.Manager,
t *testing.T,
config string,
scrapeManager *Manager,
discoveryManager *discovery.Manager,
) {
t.Helper()

Expand Down Expand Up @@ -1018,11 +991,11 @@ func writeIntoFile(t *testing.T, content, filePattern string) *os.File {
}

func requireTargets(
t *testing.T,
scrapeManager *Manager,
jobName string,
waitToAppear bool,
expectedTargets []string,
t *testing.T,
scrapeManager *Manager,
jobName string,
waitToAppear bool,
expectedTargets []string,
) {
t.Helper()

Expand Down

0 comments on commit 53c2496

Please sign in to comment.