From 9366d3634e70bea85a87d33f6f23e5f9c673f8fa Mon Sep 17 00:00:00 2001 From: rghetia Date: Mon, 28 Oct 2019 17:53:55 -0700 Subject: [PATCH] Add option to provide resource based on metric descriptor. (#231) * Add option to provide resource based on metric descriptor. * change option to accomodate removing labels that are used for the resource * fix doc and review comments. * fix review comment. --- metrics.go | 17 ++- metrics_test.go | 311 ++++++++++++++++++++++++++++++++++++++++++++++++ stackdriver.go | 18 +++ 3 files changed, 345 insertions(+), 1 deletion(-) diff --git a/metrics.go b/metrics.go index 4b1fb4e..eb47357 100644 --- a/metrics.go +++ b/metrics.go @@ -34,6 +34,7 @@ import ( monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" "go.opencensus.io/metric/metricdata" "go.opencensus.io/resource" ) @@ -154,12 +155,26 @@ func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.M // TODO: (@rghetia) perhaps log this error from labels extraction, if non-nil. continue } + + var rsc *monitoredrespb.MonitoredResource + var mr monitoredresource.Interface + if se.o.ResourceByDescriptor != nil { + labels, mr = se.o.ResourceByDescriptor(&metric.Descriptor, labels) + // TODO(rghetia): optimize this. It is inefficient to convert this for all metrics. + rsc = convertMonitoredResourceToPB(mr) + if rsc.Type == "" { + rsc.Type = "global" + rsc.Labels = nil + } + } else { + rsc = resource + } timeSeries = append(timeSeries, &monitoringpb.TimeSeries{ Metric: &googlemetricpb.Metric{ Type: metricType, Labels: labels, }, - Resource: resource, + Resource: rsc, Points: sdPoints, }) } diff --git a/metrics_test.go b/metrics_test.go index 30fe5eb..e72eb01 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -31,6 +31,7 @@ import ( monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + "contrib.go.opencensus.io/exporter/stackdriver/monitoredresource" "go.opencensus.io/metric/metricdata" "go.opencensus.io/resource" "go.opencensus.io/trace" @@ -559,3 +560,313 @@ func TestMetricsToMonitoringMetrics_fromProtoPoint(t *testing.T) { } } } + +func TestResourceByDescriptor(t *testing.T) { + startTimestamp := ×tamp.Timestamp{ + Seconds: 1543160298, + Nanos: 100000090, + } + startTime, _ := ptypes.Timestamp(startTimestamp) + endTimestamp := ×tamp.Timestamp{ + Seconds: 1543160298, + Nanos: 100000997, + } + endTime, _ := ptypes.Timestamp(endTimestamp) + + tests := []struct { + in *metricdata.Metric + want []*monitoringpb.CreateTimeSeriesRequest + wantErr string + }{ + { + in: &metricdata.Metric{ + Descriptor: metricdata.Descriptor{ + Name: "custom_resource_one", + Description: "This is a test", + Unit: metricdata.UnitBytes, + Type: metricdata.TypeCumulativeInt64, + LabelKeys: []metricdata.LabelKey{ + { + Key: "k11", + }, + { + Key: "k12", + }, + }, + }, + Resource: nil, + TimeSeries: []*metricdata.TimeSeries{ + { + StartTime: startTime, + Points: []metricdata.Point{ + { + Time: endTime, + Value: int64(5), + }, + }, + LabelValues: []metricdata.LabelValue{ + { + Value: "v11", + }, + { + Value: "v12", + }, + }, + }, + }, + }, + want: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: "projects/foo", + TimeSeries: []*monitoringpb.TimeSeries{ + { + Metric: &googlemetricpb.Metric{ + Type: "custom.googleapis.com/opencensus/custom_resource_one", + Labels: map[string]string{ + "k12": "v12", + }, + }, + Resource: &monitoredrespb.MonitoredResource{ + Type: "one", + Labels: map[string]string{ + "k11": "v11", + }, + }, + Points: []*monitoringpb.Point{ + { + Interval: &monitoringpb.TimeInterval{ + StartTime: startTimestamp, + EndTime: endTimestamp, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: 5, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + in: &metricdata.Metric{ + Descriptor: metricdata.Descriptor{ + Name: "custom_resource_two", + Description: "This is a test", + Unit: metricdata.UnitBytes, + Type: metricdata.TypeCumulativeInt64, + LabelKeys: []metricdata.LabelKey{ + { + Key: "k21", + }, + { + Key: "k22", + }, + }, + }, + Resource: nil, + TimeSeries: []*metricdata.TimeSeries{ + { + StartTime: startTime, + Points: []metricdata.Point{ + { + Time: endTime, + Value: int64(5), + }, + }, + LabelValues: []metricdata.LabelValue{ + { + Value: "v21", + }, + { + Value: "v22", + }, + }, + }, + }, + }, + want: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: "projects/foo", + TimeSeries: []*monitoringpb.TimeSeries{ + { + Metric: &googlemetricpb.Metric{ + Type: "custom.googleapis.com/opencensus/custom_resource_two", + Labels: map[string]string{ + "k21": "v21", + }, + }, + Resource: &monitoredrespb.MonitoredResource{ + Type: "two", + Labels: map[string]string{ + "k22": "v22", + }, + }, + Points: []*monitoringpb.Point{ + { + Interval: &monitoringpb.TimeInterval{ + StartTime: startTimestamp, + EndTime: endTimestamp, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: 5, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + in: &metricdata.Metric{ + Descriptor: metricdata.Descriptor{ + Name: "custom_resource_other", + Description: "This is a test", + Unit: metricdata.UnitBytes, + Type: metricdata.TypeCumulativeInt64, + LabelKeys: []metricdata.LabelKey{ + { + Key: "k31", + }, + { + Key: "k32", + }, + }, + }, + Resource: nil, + TimeSeries: []*metricdata.TimeSeries{ + { + StartTime: startTime, + Points: []metricdata.Point{ + { + Time: endTime, + Value: int64(5), + }, + }, + LabelValues: []metricdata.LabelValue{ + { + Value: "v31", + }, + { + Value: "v32", + }, + }, + }, + }, + }, + want: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: "projects/foo", + TimeSeries: []*monitoringpb.TimeSeries{ + { + Metric: &googlemetricpb.Metric{ + Type: "custom.googleapis.com/opencensus/custom_resource_other", + Labels: map[string]string{ + "k31": "v31", + "k32": "v32", + }, + }, + Resource: &monitoredrespb.MonitoredResource{ + Type: "global", + }, + Points: []*monitoringpb.Point{ + { + Interval: &monitoringpb.TimeInterval{ + StartTime: startTimestamp, + EndTime: endTimestamp, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: 5, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + var se = &statsExporter{ + o: Options{ + ProjectID: "foo", + ResourceByDescriptor: getResourceByDescriptor, + }, + } + + for i, tt := range tests { + tsl, err := se.metricToMpbTs(context.Background(), tt.in) + if tt.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr) + } + continue + } + if err != nil { + t.Errorf("#%d: unexpected error: %v", i, err) + continue + } + + got := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl) + // Our saving grace is serialization equality since some + // unexported fields could be present in the various values. + if diff := cmpTSReqs(got, tt.want); diff != "" { + t.Fatalf("Test %d failed. Unexpected CreateTimeSeriesRequests -got +want: %s", i, diff) + } + } +} + +type customResource struct { + rt string + rm map[string]string +} + +var _ monitoredresource.Interface = (*customResource)(nil) + +func (cr *customResource) MonitoredResource() (resType string, labels map[string]string) { + return cr.rt, cr.rm +} + +var crEmpty = &customResource{rt: ""} + +func getResourceByDescriptor(md *metricdata.Descriptor, labels map[string]string) (map[string]string, monitoredresource.Interface) { + switch md.Name { + case "custom_resource_one": + cr := &customResource{ + rt: "one", + rm: map[string]string{ + "k11": labels["k11"], + }, + } + newLabels := removeLabel(labels, cr.rm) + return newLabels, cr + case "custom_resource_two": + cr := &customResource{ + rt: "two", + rm: map[string]string{ + "k22": labels["k22"], + }, + } + newLabels := removeLabel(labels, cr.rm) + return newLabels, cr + default: + return labels, crEmpty + } +} + +func removeLabel(m map[string]string, remove map[string]string) map[string]string { + newM := make(map[string]string) + for k, v := range m { + if _, ok := remove[k]; !ok { + newM[k] = v + } + } + return newM +} diff --git a/stackdriver.go b/stackdriver.go index 5e36c5f..ff2aee7 100644 --- a/stackdriver.go +++ b/stackdriver.go @@ -255,6 +255,24 @@ type Options struct { // to Stackdriver Monitoring. This is only used for Proto metrics export // for now. The minimum number of workers is 1. NumberOfWorkers int + + // ResourceByDescriptor may be provided to supply monitored resource dynamically + // based on the metric Descriptor. Most users will not need to set this, + // but should instead set ResourceDetector. + // + // The MonitoredResource and ResourceDetector fields are ignored if this + // field is set to a non-nil value. + // + // The ResourceByDescriptor is called to derive monitored resources from + // metric.Descriptor and the label map associated with the time-series. + // If any label is used for the derived resource then it will be removed + // from the label map. The remaining labels in the map are returned to + // be used with the time-series. + // + // If the func set to this field does not return valid resource even for one + // time-series then it will result into an error for the entire CreateTimeSeries request + // which may contain more than one time-series. + ResourceByDescriptor func(*metricdata.Descriptor, map[string]string) (map[string]string, monitoredresource.Interface) } const defaultTimeout = 5 * time.Second