Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Commit

Permalink
stats, metrics: deduplicate TimeSeries with non-unique Metric.Type
Browse files Browse the repository at this point in the history
Since existence, the stats exporter was sending Stackdriver Metrics
only split up by chunks of maxUploadSize of 200, but Metric-s with the
exact same Type were still uploaded in the same CreateTimeSeriesRequest
which would cause:

    err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written:
    Field timeSeries[?] had an invalid value: Duplicate TimeSeries encountered.
    Only one point can be written per TimeSeries per request.: timeSeries[?]

and the previous remedy just relied on a synchronization of
SetReportingPeriod of 60+s which would aggregate stats/view.Data.

This change now splits up such Metrics so even if uploads are made
in less than 60s, CreateTimeSeriesRequest-s will be uniquely uploaded
and won't cause Stackdriver's backend to trip up.

Fixes #73
  • Loading branch information
odeke-em committed Jan 20, 2019
1 parent 7f88e3b commit 0e2df90
Show file tree
Hide file tree
Showing 5 changed files with 703 additions and 484 deletions.
2 changes: 1 addition & 1 deletion equivalence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestStatsAndMetricsEquivalence(t *testing.T) {
vdl := []*view.Data{vd}
sctreql := se.makeReq(vdl, maxTimeSeriesPerUpload)
tsl, _ := se.protoMetricToTimeSeries(ctx, last.Node, last.Resource, last.Metrics[0])
pctreql := []*monitoringpb.CreateTimeSeriesRequest{se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)}
pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
if !reflect.DeepEqual(sctreql, pctreql) {
t.Errorf("#%d: TimeSeries Mismatch\nStats CreateTimeSeriesRequest:\n\t%v\nProto CreateTimeSeriesRequest:\n\t%v\n",
i, sctreql, pctreql)
Expand Down
77 changes: 67 additions & 10 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,78 @@ func (se *statsExporter) handleMetricsUpload(payloads []*metricPayload) error {
end = len(allTimeSeries)
}
batch := allTimeSeries[start:end]
ctsreq := se.combineTimeSeriesToCreateTimeSeriesRequest(batch)
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
// span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
// TODO(@odeke-em, @jbd): Don't fail fast here, perhaps batch errors?
// return err
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(batch)
for _, ctsreq := range ctsreql {
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
// TODO(@odeke-em): Don't fail fast here, perhaps batch errors?
// return err
}
}
}

return nil
}

func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) *monitoringpb.CreateTimeSeriesRequest {
func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) {
if len(ts) == 0 {
return nil
}
return &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(se.o.ProjectID),
TimeSeries: ts,

// Since there are scenarios in which Metrics with the same Type
// can be bunched in the same TimeSeries, we have to ensure that
// we create a unique CreateTimeSeriesRequest with entirely unique Metrics
// per TimeSeries, lest we'll encounter:
//
// err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written:
// Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered.
// Only one point can be written per TimeSeries per request.: timeSeries[2]
//
// This scenario happens when we are using the OpenCensus Agent in which multiple metrics
// are streamed by various client applications.
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73
uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
seenMetrics := make(map[string]struct{})

for _, tti := range ts {
signature := tti.Metric.GetType()
if _, alreadySeen := seenMetrics[signature]; !alreadySeen {
uniqueTimeSeries = append(uniqueTimeSeries, tti)
seenMetrics[signature] = struct{}{}
} else {
nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti)
}
}

// UniqueTimeSeries can be bunched up together
// While for each nonUniqueTimeSeries, we have
// to make a unique CreateTimeSeriesRequest.
ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(se.o.ProjectID),
TimeSeries: uniqueTimeSeries,
})

// Now recursively also combine the non-unique TimeSeries
// that were singly added to nonUniqueTimeSeries.
// The reason is that we need optimal combinations
// for optimal combinations because:
// * "a/b/c"
// * "a/b/c"
// * "x/y/z"
// * "a/b/c"
// * "x/y/z"
// * "p/y/z"
// * "d/y/z"
//
// should produce:
// CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"]
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"]
nonUniqueRequests := se.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries)
ctsreql = append(ctsreql, nonUniqueRequests...)

return ctsreql
}

// protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest
Expand Down Expand Up @@ -468,8 +521,12 @@ func protoResourceToMonitoredResource(rsp *resourcepb.Resource) *monitoredrespb.
Type: "global",
}
}
typ := rsp.Type
if typ == "" {
typ = "global"
}
mrsp := &monitoredrespb.MonitoredResource{
Type: rsp.Type,
Type: typ,
}
if rsp.Labels != nil {
mrsp.Labels = make(map[string]string, len(rsp.Labels))
Expand Down
153 changes: 124 additions & 29 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"testing"

monitoring "cloud.google.com/go/monitoring/apiv3"
"github.com/golang/protobuf/ptypes/timestamp"
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
Expand All @@ -37,7 +38,7 @@ func TestProtoResourceToMonitoringResource(t *testing.T) {
want *monitoredrespb.MonitoredResource
}{
{in: nil, want: &monitoredrespb.MonitoredResource{Type: "global"}},
{in: &resourcepb.Resource{}, want: &monitoredrespb.MonitoredResource{}},
{in: &resourcepb.Resource{}, want: &monitoredrespb.MonitoredResource{Type: "global"}},
{
in: &resourcepb.Resource{
Type: "foo",
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) {

tests := []struct {
in *metricspb.Metric
want *monitoringpb.CreateTimeSeriesRequest
want []*monitoringpb.CreateTimeSeriesRequest
wantErr string
statsExporter *statsExporter
}{
Expand Down Expand Up @@ -135,33 +136,35 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) {
statsExporter: &statsExporter{
o: Options{ProjectID: "foo"},
},
want: &monitoringpb.CreateTimeSeriesRequest{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/with_metric_descriptor",
},
Resource: &monitoredrespb.MonitoredResource{
Type: "global",
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distributionpb.Distribution{
Count: 1,
Mean: 11.9,
SumOfSquaredDeviation: 0,
BucketCounts: []int64{0, 1, 0, 0, 0},
BucketOptions: &distributionpb.Distribution_BucketOptions{
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
Bounds: []float64{0, 10, 20, 30, 40},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/with_metric_descriptor",
},
Resource: &monitoredrespb.MonitoredResource{
Type: "global",
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distributionpb.Distribution{
Count: 1,
Mean: 11.9,
SumOfSquaredDeviation: 0,
BucketCounts: []int64{0, 1, 0, 0, 0},
BucketOptions: &distributionpb.Distribution_BucketOptions{
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
Bounds: []float64{0, 10, 20, 30, 40},
},
},
},
},
Expand Down Expand Up @@ -404,6 +407,98 @@ func TestProtoMetricsToMonitoringMetrics_fromProtoPoint(t *testing.T) {
}
}

func TestCombineTimeSeriesAndDeduplication(t *testing.T) {
se := new(statsExporter)

tests := []struct {
in []*monitoringpb.TimeSeries
want []*monitoringpb.CreateTimeSeriesRequest
}{
{
in: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "a/b/c",
},
},
{
Metric: &googlemetricpb.Metric{
Type: "a/b/c",
},
},
{
Metric: &googlemetricpb.Metric{
Type: "A/b/c",
},
},
{
Metric: &googlemetricpb.Metric{
Type: "a/b/c",
},
},
{
Metric: &googlemetricpb.Metric{
Type: "X/Y/Z",
},
},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: monitoring.MetricProjectPath(se.o.ProjectID),
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "a/b/c",
},
},
{
Metric: &googlemetricpb.Metric{
Type: "A/b/c",
},
},
{
Metric: &googlemetricpb.Metric{
Type: "X/Y/Z",
},
},
},
},
{
Name: monitoring.MetricProjectPath(se.o.ProjectID),
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "a/b/c",
},
},
},
},
{
Name: monitoring.MetricProjectPath(se.o.ProjectID),
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "a/b/c",
},
},
},
},
},
},
}

for i, tt := range tests {
got := se.combineTimeSeriesToCreateTimeSeriesRequest(tt.in)
want := tt.want
if !reflect.DeepEqual(got, want) {
gj, wj := serializeAsJSON(got), serializeAsJSON(want)
if gj != wj {
t.Errorf("#%d: Unmatched JSON\nGot:\n\t%s\nWant:\n\t%s", i, gj, wj)
}
}
}
}

func serializeAsJSON(v interface{}) string {
blob, _ := json.MarshalIndent(v, "", " ")
return string(blob)
Expand Down
37 changes: 20 additions & 17 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,35 +208,38 @@ func (e *statsExporter) uploadStats(vds []*view.Data) error {
return nil
}

func (e *statsExporter) makeReq(vds []*view.Data, limit int) []*monitoringpb.CreateTimeSeriesRequest {
func (se *statsExporter) makeReq(vds []*view.Data, limit int) []*monitoringpb.CreateTimeSeriesRequest {
var reqs []*monitoringpb.CreateTimeSeriesRequest
var timeSeries []*monitoringpb.TimeSeries

var allTimeSeries []*monitoringpb.TimeSeries
for _, vd := range vds {
for _, row := range vd.Rows {
tags, resource := e.getMonitoredResource(vd.View, append([]tag.Tag(nil), row.Tags...))
tags, resource := se.getMonitoredResource(vd.View, append([]tag.Tag(nil), row.Tags...))
ts := &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: e.metricType(vd.View),
Labels: newLabels(e.defaultLabels, tags),
Type: se.metricType(vd.View),
Labels: newLabels(se.defaultLabels, tags),
},
Resource: resource,
Points: []*monitoringpb.Point{newPoint(vd.View, row, vd.Start, vd.End)},
}
timeSeries = append(timeSeries, ts)
if len(timeSeries) == limit {
reqs = append(reqs, &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(e.o.ProjectID),
TimeSeries: timeSeries,
})
timeSeries = []*monitoringpb.TimeSeries{}
}
allTimeSeries = append(allTimeSeries, ts)
}
}

var timeSeries []*monitoringpb.TimeSeries
for _, ts := range allTimeSeries {
timeSeries = append(timeSeries, ts)
if len(timeSeries) == limit {
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(timeSeries)
reqs = append(reqs, ctsreql...)
timeSeries = timeSeries[:0]
}
}

if len(timeSeries) > 0 {
reqs = append(reqs, &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(e.o.ProjectID),
TimeSeries: timeSeries,
})
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(timeSeries)
reqs = append(reqs, ctsreql...)
}
return reqs
}
Expand Down
Loading

0 comments on commit 0e2df90

Please sign in to comment.