Skip to content

Commit

Permalink
Simplify filter
Browse files Browse the repository at this point in the history
  • Loading branch information
marctc committed Nov 20, 2023
1 parent 1973c4b commit 810bba9
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 37 deletions.
2 changes: 1 addition & 1 deletion storage/remote/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
}

func TestDecodeWriteRequest(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil, nil, 0)
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)

actual, err := DecodeWriteRequest(bytes.NewReader(buf))
Expand Down
47 changes: 24 additions & 23 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met

func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples.
req, _, err := buildWriteRequest(nil, metadata, pBuf, nil, nil, 0)
req, _, err := buildWriteRequest(nil, metadata, pBuf, nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -586,21 +586,23 @@ func isSampleOld(sampleAgeLimit time.Duration, ts int64) bool {
return sampleTs.Before(limitTs)
}

func isTimeSeriesOld(sampleAgeLimit time.Duration, ts prompb.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
case len(ts.Samples) > 0:
return isSampleOld(sampleAgeLimit, ts.Samples[0].Timestamp)
case len(ts.Histograms) > 0:
return isSampleOld(sampleAgeLimit, ts.Histograms[0].Timestamp)
case len(ts.Exemplars) > 0:
return isSampleOld(sampleAgeLimit, ts.Exemplars[0].Timestamp)
default:
return false
func isTimeSeriesOldFilter(sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool {
return func(ts prompb.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
case len(ts.Samples) > 0:
return isSampleOld(sampleAgeLimit, ts.Samples[0].Timestamp)
case len(ts.Histograms) > 0:
return isSampleOld(sampleAgeLimit, ts.Histograms[0].Timestamp)
case len(ts.Exemplars) > 0:
return isSampleOld(sampleAgeLimit, ts.Exemplars[0].Timestamp)
default:
return false
}
}
}

Expand Down Expand Up @@ -1535,7 +1537,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf, nil, 0)
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf, nil)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
Expand All @@ -1556,8 +1558,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
nil,
pBuf,
*buf,
isTimeSeriesOld,
time.Duration(s.qm.cfg.SampleAgeLimit),
isTimeSeriesOldFilter(time.Duration(s.qm.cfg.SampleAgeLimit)),
)
if err != nil {
return err
Expand Down Expand Up @@ -1669,12 +1670,12 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
}
}

func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(time.Duration, prompb.TimeSeries) bool, ageLimit time.Duration) (int64, []prompb.TimeSeries) {
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, []prompb.TimeSeries) {
var highest int64

writeIdx := 0
for i, ts := range timeSeries {
if filter != nil && filter(ageLimit, ts) {
if filter != nil && filter(ts) {
continue
}

Expand All @@ -1698,8 +1699,8 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(time.Duration,
return highest, timeSeries
}

func buildWriteRequest(timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte, filter func(time.Duration, prompb.TimeSeries) bool, ageLimit time.Duration) ([]byte, int64, error) {
highest, timeSeries := buildTimeSeries(timeSeries, filter, ageLimit)
func buildWriteRequest(timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, error) {
highest, timeSeries := buildTimeSeries(timeSeries, filter)

req := &prompb.WriteRequest{
Timeseries: timeSeries,
Expand Down
10 changes: 5 additions & 5 deletions storage/remote/queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,7 @@ func TestBuildTimeSeries(t *testing.T) {
testCases := []struct {
name string
ts []prompb.TimeSeries
filter func(duration time.Duration, ts prompb.TimeSeries) bool
filter func(ts prompb.TimeSeries) bool
highestTs int64
responseLen int
}{
Expand Down Expand Up @@ -1475,7 +1475,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(duration time.Duration, ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
highestTs: 1234567893,
},
Expand Down Expand Up @@ -1515,7 +1515,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(duration time.Duration, ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
highestTs: 1234567893,
},
Expand Down Expand Up @@ -1555,7 +1555,7 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(duration time.Duration, ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
responseLen: 2,
highestTs: 1234567897,
},
Expand All @@ -1564,7 +1564,7 @@ func TestBuildTimeSeries(t *testing.T) {
// Run the test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
highest, result := buildTimeSeries(tc.ts, tc.filter, time.Duration(0))
highest, result := buildTimeSeries(tc.ts, tc.filter)
require.NotNil(t, result)
require.Equal(t, tc.responseLen, len(result))
require.Equal(t, tc.highestTs, highest)
Expand Down
16 changes: 8 additions & 8 deletions storage/remote/write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
)

func TestRemoteWriteHandler(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil, nil, 0)
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)

req, err := http.NewRequest("", "", bytes.NewReader(buf))
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestOutOfOrderSample(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}}, nil, nil, nil, nil, 0)
}}, nil, nil, nil, nil)
require.NoError(t, err)

req, err := http.NewRequest("", "", bytes.NewReader(buf))
Expand All @@ -112,7 +112,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
}}, nil, nil, nil, nil, 0)
}}, nil, nil, nil, nil)
require.NoError(t, err)

req, err := http.NewRequest("", "", bytes.NewReader(buf))
Expand All @@ -135,7 +135,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
}}, nil, nil, nil, nil, 0)
}}, nil, nil, nil, nil)
require.NoError(t, err)

req, err := http.NewRequest("", "", bytes.NewReader(buf))
Expand Down Expand Up @@ -164,7 +164,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
{Name: "test_label_name_" + num, Value: labelValue + num},
},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
}}, nil, nil, nil, nil, 0)
}}, nil, nil, nil, nil)
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(b, err)
Expand All @@ -182,7 +182,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
}

func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil, nil, 0)
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)

req, err := http.NewRequest("", "", bytes.NewReader(buf))
Expand Down Expand Up @@ -219,7 +219,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {

handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head())

buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, 0)
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil)
require.NoError(b, err)

req, err := http.NewRequest("", "", bytes.NewReader(buf))
Expand All @@ -232,7 +232,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {

var bufRequests [][]byte
for i := 0; i < 100; i++ {
buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil, 0)
buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil)
require.NoError(b, err)
bufRequests = append(bufRequests, buf)
}
Expand Down

0 comments on commit 810bba9

Please sign in to comment.