diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..00f94f3 --- /dev/null +++ b/metrics.go @@ -0,0 +1,436 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +/* +The code in this file is responsible for converting OpenCensus Proto metrics +directly to Stackdriver Metrics. +*/ + +import ( + "context" + "errors" + "fmt" + "github.com/golang/protobuf/ptypes/timestamp" + "go.opencensus.io/trace" + + distributionpb "google.golang.org/genproto/googleapis/api/distribution" + labelpb "google.golang.org/genproto/googleapis/api/label" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/resource" +) + +var ( + errLableExtraction = errors.New("error extracting labels") + errUnspecifiedMetricKind = errors.New("metric kind is unpsecified") +) + +// ExportMetrics exports OpenCensus Metrics to Stackdriver Monitoring. +func (se *statsExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { + if len(metrics) == 0 { + return nil + } + + for _, metric := range metrics { + se.metricsBundler.Add(metric, 1) + // TODO: [rghetia] handle errors. + } + + return nil +} + +func (se *statsExporter) handleMetricsUpload(metrics []*metricdata.Metric) { + err := se.uploadMetrics(metrics) + if err != nil { + se.o.handleError(err) + } +} + +func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error { + ctx, cancel := se.o.newContextWithTimeout() + defer cancel() + + ctx, span := trace.StartSpan( + ctx, + "contrib.go.opencensus.io/exporter/stackdriver.uploadMetrics", + trace.WithSampler(trace.NeverSample()), + ) + defer span.End() + + for _, metric := range metrics { + // Now create the metric descriptor remotely. + if err := se.createMetricDescriptorFromMetric(ctx, metric); err != nil { + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + //TODO: [rghetia] record error metrics. + continue + } + } + + var allTimeSeries []*monitoringpb.TimeSeries + for _, metric := range metrics { + tsl, err := se.metricToMpbTs(ctx, metric) + if err != nil { + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + //TODO: [rghetia] record error metrics. + continue + } + if tsl != nil { + allTimeSeries = append(allTimeSeries, tsl...) + } + } + + // Now batch timeseries up and then export. + for start, end := 0, 0; start < len(allTimeSeries); start = end { + end = start + maxTimeSeriesPerUpload + if end > len(allTimeSeries) { + end = len(allTimeSeries) + } + batch := allTimeSeries[start:end] + ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(batch) + for _, ctsreq := range ctsreql { + if err := createTimeSeries(ctx, se.c, ctsreq); err != nil { + span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) + // TODO(@rghetia): record error metrics + // return err + } + } + } + + return nil +} + +// metricToMpbTs converts a metric into a list of Stackdriver Monitoring v3 API TimeSeries +// but it doesn't invoke any remote API. +func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.Metric) ([]*monitoringpb.TimeSeries, error) { + if metric == nil { + return nil, errNilMetric + } + + resource := metricRscToMpbRsc(metric.Resource) + + metricName := metric.Descriptor.Name + metricType, _ := se.metricTypeFromProto(metricName) + metricLabelKeys := metric.Descriptor.LabelKeys + metricKind, _ := metricDescriptorTypeToMetricKind(metric) + + if metricKind == googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED { + // ignore these Timeserieses. TODO [rghetia] log errors. + return nil, nil + } + + timeSeries := make([]*monitoringpb.TimeSeries, 0, len(metric.TimeSeries)) + for _, ts := range metric.TimeSeries { + sdPoints, err := se.metricTsToMpbPoint(ts, metricKind) + if err != nil { + // TODO(@rghetia): record error metrics + continue + } + + // Each TimeSeries has labelValues which MUST be correlated + // with that from the MetricDescriptor + labels, err := metricLabelsToTsLabels(se.defaultLabels, metricLabelKeys, ts.LabelValues) + if err != nil { + // TODO: (@rghetia) perhaps log this error from labels extraction, if non-nil. + continue + } + timeSeries = append(timeSeries, &monitoringpb.TimeSeries{ + Metric: &googlemetricpb.Metric{ + Type: metricType, + Labels: labels, + }, + Resource: resource, + Points: sdPoints, + }) + } + + return timeSeries, nil +} + +func metricLabelsToTsLabels(defaults map[string]labelValue, labelKeys []string, labelValues []metricdata.LabelValue) (map[string]string, error) { + labels := make(map[string]string) + // Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched. + for key, label := range defaults { + labels[sanitize(key)] = label.val + } + + // Perform this sanity check now. + if len(labelKeys) != len(labelValues) { + return labels, fmt.Errorf("Length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues)) + } + + for i, labelKey := range labelKeys { + labelValue := labelValues[i] + labels[sanitize(labelKey)] = labelValue.Value + } + + return labels, nil +} + +// createMetricDescriptorFromMetric creates a metric descriptor from the OpenCensus metric +// and then creates it remotely using Stackdriver's API. +func (se *statsExporter) createMetricDescriptorFromMetric(ctx context.Context, metric *metricdata.Metric) error { + se.metricMu.Lock() + defer se.metricMu.Unlock() + + name := metric.Descriptor.Name + if _, created := se.metricDescriptors[name]; created { + return nil + } + + // Otherwise, we encountered a cache-miss and + // should create the metric descriptor remotely. + inMD, err := se.metricToMpbMetricDescriptor(metric) + if err != nil { + return err + } + + var md *googlemetricpb.MetricDescriptor + if builtinMetric(inMD.Type) { + gmrdesc := &monitoringpb.GetMetricDescriptorRequest{ + Name: inMD.Name, + } + md, err = getMetricDescriptor(ctx, se.c, gmrdesc) + } else { + + cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{ + Name: fmt.Sprintf("projects/%s", se.o.ProjectID), + MetricDescriptor: inMD, + } + md, err = createMetricDescriptor(ctx, se.c, cmrdesc) + } + + if err == nil { + // Now record the metric as having been created. + se.metricDescriptors[name] = md + } + + return err +} + +func (se *statsExporter) metricToMpbMetricDescriptor(metric *metricdata.Metric) (*googlemetricpb.MetricDescriptor, error) { + if metric == nil { + return nil, errNilMetric + } + + metricType, _ := se.metricTypeFromProto(metric.Descriptor.Name) + displayName := se.displayName(metric.Descriptor.Name) + metricKind, valueType := metricDescriptorTypeToMetricKind(metric) + + sdm := &googlemetricpb.MetricDescriptor{ + Name: fmt.Sprintf("projects/%s/metricDescriptors/%s", se.o.ProjectID, metricType), + DisplayName: displayName, + Description: metric.Descriptor.Description, + Unit: string(metric.Descriptor.Unit), + Type: metricType, + MetricKind: metricKind, + ValueType: valueType, + Labels: metricLableKeysToLabels(se.defaultLabels, metric.Descriptor.LabelKeys), + } + + return sdm, nil +} + +func metricLableKeysToLabels(defaults map[string]labelValue, labelKeys []string) []*labelpb.LabelDescriptor { + labelDescriptors := make([]*labelpb.LabelDescriptor, 0, len(defaults)+len(labelKeys)) + + // Fill in the defaults first. + for key, lbl := range defaults { + labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{ + Key: sanitize(key), + Description: lbl.desc, + ValueType: labelpb.LabelDescriptor_STRING, + }) + } + + // Now fill in those from the metric. + for _, key := range labelKeys { + labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{ + Key: sanitize(key), + Description: "", // TODO: [rghetia] when descriptor is available use that. + ValueType: labelpb.LabelDescriptor_STRING, // We only use string tags + }) + } + return labelDescriptors +} + +func metricDescriptorTypeToMetricKind(m *metricdata.Metric) (googlemetricpb.MetricDescriptor_MetricKind, googlemetricpb.MetricDescriptor_ValueType) { + if m == nil { + return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED + } + + switch m.Descriptor.Type { + case metricdata.TypeCumulativeInt64: + return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_INT64 + + case metricdata.TypeCumulativeFloat64: + return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DOUBLE + + case metricdata.TypeCumulativeDistribution: + return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DISTRIBUTION + + case metricdata.TypeGaugeFloat64: + return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DOUBLE + + case metricdata.TypeGaugeInt64: + return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_INT64 + + case metricdata.TypeGaugeDistribution: + return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DISTRIBUTION + + case metricdata.TypeSummary: + // TODO: [rghetia] after upgrading to proto version3, retrun UNRECOGNIZED instead of UNSPECIFIED + return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED + + default: + // TODO: [rghetia] after upgrading to proto version3, retrun UNRECOGNIZED instead of UNSPECIFIED + return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED + } +} + +func metricRscToMpbRsc(rs *resource.Resource) *monitoredrespb.MonitoredResource { + if rs == nil { + return &monitoredrespb.MonitoredResource{ + Type: "global", + } + } + typ := rs.Type + if typ == "" { + typ = "global" + } + mrsp := &monitoredrespb.MonitoredResource{ + Type: typ, + } + if rs.Labels != nil { + mrsp.Labels = make(map[string]string, len(rs.Labels)) + for k, v := range rs.Labels { + // TODO: [rghetia] add mapping between OC Labels and SD Labels. + mrsp.Labels[k] = v + } + } + return mrsp +} + +func (se *statsExporter) metricTsToMpbPoint(ts *metricdata.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) (sptl []*monitoringpb.Point, err error) { + for _, pt := range ts.Points { + + // If we have a last value aggregation point i.e. MetricDescriptor_GAUGE + // StartTime should be nil. + startTime := timestampProto(ts.StartTime) + if metricKind == googlemetricpb.MetricDescriptor_GAUGE { + startTime = nil + } + + spt, err := metricPointToMpbPoint(startTime, &pt) + if err != nil { + return nil, err + } + sptl = append(sptl, spt) + } + return sptl, nil +} + +func metricPointToMpbPoint(startTime *timestamp.Timestamp, pt *metricdata.Point) (*monitoringpb.Point, error) { + if pt == nil { + return nil, nil + } + + mptv, err := metricPointToMpbValue(pt) + if err != nil { + return nil, err + } + + mpt := &monitoringpb.Point{ + Value: mptv, + Interval: &monitoringpb.TimeInterval{ + StartTime: startTime, + EndTime: timestampProto(pt.Time), + }, + } + return mpt, nil +} + +func metricPointToMpbValue(pt *metricdata.Point) (*monitoringpb.TypedValue, error) { + if pt == nil { + return nil, nil + } + + var err error + var tval *monitoringpb.TypedValue + switch v := pt.Value.(type) { + default: + err = fmt.Errorf("protoToMetricPoint: unknown Data type: %T", pt.Value) + + case int64: + tval = &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: v, + }, + } + + case float64: + tval = &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: v, + }, + } + + case *metricdata.Distribution: + dv := v + var mv *monitoringpb.TypedValue_DistributionValue + var mean float64 + if dv.Count > 0 { + mean = float64(dv.Sum) / float64(dv.Count) + } + mv = &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distributionpb.Distribution{ + Count: dv.Count, + Mean: mean, + SumOfSquaredDeviation: dv.SumOfSquaredDeviation, + }, + } + + insertZeroBound := false + if bopts := dv.BucketOptions; bopts != nil { + insertZeroBound = shouldInsertZeroBound(bopts.Bounds...) + mv.DistributionValue.BucketOptions = &distributionpb.Distribution_BucketOptions{ + Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{ + // The first bucket bound should be 0.0 because the Metrics first bucket is + // [0, first_bound) but Stackdriver monitoring bucket bounds begin with -infinity + // (first bucket is (-infinity, 0)) + Bounds: addZeroBoundOnCondition(insertZeroBound, bopts.Bounds...), + }, + }, + } + } + mv.DistributionValue.BucketCounts = addZeroBucketCountOnCondition(insertZeroBound, metricBucketToBucketCounts(dv.Buckets)...) + + tval = &monitoringpb.TypedValue{Value: mv} + } + + return tval, err +} + +func metricBucketToBucketCounts(buckets []metricdata.Bucket) []int64 { + bucketCounts := make([]int64, len(buckets)) + for i, bucket := range buckets { + bucketCounts[i] = bucket.Count + } + return bucketCounts +} diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 0000000..b456223 --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,525 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "context" + "reflect" + "strings" + "testing" + + "github.com/golang/protobuf/ptypes/timestamp" + distributionpb "google.golang.org/genproto/googleapis/api/distribution" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/resource" + "time" +) + +var se = &statsExporter{ + o: Options{ProjectID: "foo"}, +} + +func TestMetricResourceToMonitoringResource(t *testing.T) { + tests := []struct { + in *resource.Resource + want *monitoredrespb.MonitoredResource + }{ + {in: nil, want: &monitoredrespb.MonitoredResource{Type: "global"}}, + {in: &resource.Resource{}, want: &monitoredrespb.MonitoredResource{Type: "global"}}, + { + in: &resource.Resource{ + Type: "foo", + }, + want: &monitoredrespb.MonitoredResource{ + Type: "foo", + }, + }, + { + in: &resource.Resource{ + Type: "foo", + Labels: map[string]string{}, + }, + want: &monitoredrespb.MonitoredResource{ + Type: "foo", + Labels: map[string]string{}, + }, + }, + { + in: &resource.Resource{ + Type: "foo", + Labels: map[string]string{"a": "A"}, + }, + want: &monitoredrespb.MonitoredResource{ + Type: "foo", + Labels: map[string]string{"a": "A"}, + }, + }, + } + + for i, tt := range tests { + got := metricRscToMpbRsc(tt.in) + if !reflect.DeepEqual(got, tt.want) { + gj, wj := serializeAsJSON(got), serializeAsJSON(tt.want) + if gj != wj { + t.Errorf("#%d: Unmatched JSON\nGot:\n\t%s\nWant:\n\t%s", i, gj, wj) + } + } + } +} + +func TestMetricToCreateTimeSeriesRequest(t *testing.T) { + startTimestamp := ×tamp.Timestamp{ + Seconds: 1543160298, + Nanos: 100000090, + } + endTimestamp := ×tamp.Timestamp{ + Seconds: 1543160298, + Nanos: 100000997, + } + + // TODO:[rghetia] add test for built-in metrics. + tests := []struct { + in *metricdata.Metric + want []*monitoringpb.CreateTimeSeriesRequest + wantErr string + }{ + { + in: &metricdata.Metric{ + Descriptor: metricdata.Descriptor{ + Name: "with_metric_descriptor", + Description: "This is a test", + Unit: metricdata.UnitBytes, + Type: metricdata.TypeGaugeDistribution, + }, + Resource: nil, + TimeSeries: []*metricdata.TimeSeries{ + { + StartTime: timestampToTime(startTimestamp), + Points: []metricdata.Point{ + { + Time: timestampToTime(endTimestamp), + Value: &metricdata.Distribution{ + Count: 1, + Sum: 11.9, + SumOfSquaredDeviation: 0, + Buckets: []metricdata.Bucket{ + {Count: 1}, {}, {}, {}, + }, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{10, 20, 30, 40}, + }, + }, + }, + }, + }, + }, + }, + want: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: "projects/foo", + TimeSeries: []*monitoringpb.TimeSeries{ + { + Metric: &googlemetricpb.Metric{ + Type: "custom.googleapis.com/opencensus/with_metric_descriptor", + }, + Resource: &monitoredrespb.MonitoredResource{ + Type: "global", + }, + Points: []*monitoringpb.Point{ + { + Interval: &monitoringpb.TimeInterval{ + EndTime: endTimestamp, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distributionpb.Distribution{ + Count: 1, + Mean: 11.9, + SumOfSquaredDeviation: 0, + BucketCounts: []int64{0, 1, 0, 0, 0}, + BucketOptions: &distributionpb.Distribution_BucketOptions{ + Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{ + Bounds: []float64{0, 10, 20, 30, 40}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + in: &metricdata.Metric{ + Descriptor: metricdata.Descriptor{ + Name: "with_metric_descriptor", + Description: "This is a test", + Unit: metricdata.UnitBytes, + Type: metricdata.TypeCumulativeDistribution, + }, + Resource: nil, + TimeSeries: []*metricdata.TimeSeries{ + { + StartTime: timestampToTime(startTimestamp), + Points: []metricdata.Point{ + { + Time: timestampToTime(endTimestamp), + Value: &metricdata.Distribution{ + Count: 1, + Sum: 11.9, + SumOfSquaredDeviation: 0, + Buckets: []metricdata.Bucket{ + {Count: 1}, {}, {}, {}, + }, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{10, 20, 30, 40}, + }, + }, + }, + }, + }, + }, + }, + want: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: "projects/foo", + TimeSeries: []*monitoringpb.TimeSeries{ + { + Metric: &googlemetricpb.Metric{ + Type: "custom.googleapis.com/opencensus/with_metric_descriptor", + }, + Resource: &monitoredrespb.MonitoredResource{ + Type: "global", + }, + Points: []*monitoringpb.Point{ + { + Interval: &monitoringpb.TimeInterval{ + StartTime: startTimestamp, + EndTime: endTimestamp, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distributionpb.Distribution{ + Count: 1, + Mean: 11.9, + SumOfSquaredDeviation: 0, + BucketCounts: []int64{0, 1, 0, 0, 0}, + BucketOptions: &distributionpb.Distribution_BucketOptions{ + Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{ + Bounds: []float64{0, 10, 20, 30, 40}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + for i, tt := range tests { + tsl, err := se.metricToMpbTs(context.Background(), tt.in) + if tt.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr) + } + continue + } + if err != nil { + t.Errorf("#%d: unexpected error: %v", i, err) + continue + } + + got := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl) + if !reflect.DeepEqual(got, tt.want) { + // Our saving grace is serialization equality since some + // unexported fields could be present in the various values. + gj, wj := serializeAsJSON(got), serializeAsJSON(tt.want) + if gj != wj { + t.Errorf("#%d: Unmatched JSON\nGot:\n\t%s\nWant:\n\t%s", i, gj, wj) + } + } + } +} + +func TestMetricDescriptorToMonitoringMetricDescriptor(t *testing.T) { + tests := []struct { + in *metricdata.Metric + want *googlemetricpb.MetricDescriptor + wantErr string + }{ + {in: nil, wantErr: "non-nil metric"}, + { + in: &metricdata.Metric{}, + want: &googlemetricpb.MetricDescriptor{ + Name: "projects/foo/metricDescriptors/custom.googleapis.com/opencensus", + Type: "custom.googleapis.com/opencensus", + DisplayName: "OpenCensus", + MetricKind: googlemetricpb.MetricDescriptor_GAUGE, + ValueType: googlemetricpb.MetricDescriptor_INT64, + }, + }, + { + in: &metricdata.Metric{ + Descriptor: metricdata.Descriptor{ + Name: "with_metric_descriptor", + Description: "This is with metric descriptor", + Unit: metricdata.UnitBytes, + Type: metricdata.TypeCumulativeInt64, + }, + }, + want: &googlemetricpb.MetricDescriptor{ + Name: "projects/foo/metricDescriptors/custom.googleapis.com/opencensus/with_metric_descriptor", + Type: "custom.googleapis.com/opencensus/with_metric_descriptor", + DisplayName: "OpenCensus/with_metric_descriptor", + Description: "This is with metric descriptor", + Unit: "By", + MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + ValueType: googlemetricpb.MetricDescriptor_INT64, + }, + }, + } + + for i, tt := range tests { + got, err := se.metricToMpbMetricDescriptor(tt.in) + if tt.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("#%d: \nGot %v\nWanted error substring %q", i, err, tt.wantErr) + } + continue + } + + if err != nil { + t.Errorf("#%d: Unexpected error: %v", i, err) + continue + } + + if !reflect.DeepEqual(got, tt.want) { + // Our saving grace is serialization equality since some + // unexported fields could be present in the various values. + gj, wj := serializeAsJSON(got), serializeAsJSON(tt.want) + if gj != wj { + t.Errorf("#%d: Unmatched JSON\nGot:\n\t%s\nWant:\n\t%s", i, gj, wj) + } + } + } +} + +func TestMetricTypeToMonitoringMetricKind(t *testing.T) { + tests := []struct { + in metricdata.Type + wantKind googlemetricpb.MetricDescriptor_MetricKind + wantValueType googlemetricpb.MetricDescriptor_ValueType + wantErr string + }{ + { + in: metricdata.TypeCumulativeInt64, + wantKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + wantValueType: googlemetricpb.MetricDescriptor_INT64, + }, + { + in: metricdata.TypeCumulativeFloat64, + wantKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + wantValueType: googlemetricpb.MetricDescriptor_DOUBLE, + }, + { + in: metricdata.TypeGaugeInt64, + wantKind: googlemetricpb.MetricDescriptor_GAUGE, + wantValueType: googlemetricpb.MetricDescriptor_INT64, + }, + { + in: metricdata.TypeGaugeFloat64, + wantKind: googlemetricpb.MetricDescriptor_GAUGE, + wantValueType: googlemetricpb.MetricDescriptor_DOUBLE, + }, + { + in: metricdata.TypeCumulativeDistribution, + wantKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + wantValueType: googlemetricpb.MetricDescriptor_DISTRIBUTION, + }, + { + in: metricdata.TypeGaugeDistribution, + wantKind: googlemetricpb.MetricDescriptor_GAUGE, + wantValueType: googlemetricpb.MetricDescriptor_DISTRIBUTION, + }, + { + in: metricdata.TypeSummary, + wantKind: googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, + wantValueType: googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED, + }, + } + + for i, tt := range tests { + md := &metricdata.Metric{ + Descriptor: metricdata.Descriptor{ + Name: "with_metric_descriptor", + Description: "This is with metric descriptor", + Unit: metricdata.UnitBytes, + Type: tt.in, + }, + } + + got, err := se.metricToMpbMetricDescriptor(md) + if tt.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("#%d: \nGot %v\nWanted error substring %q", i, err, tt.wantErr) + } + continue + } + + if err != nil { + t.Errorf("#%d: Unexpected error: %v", i, err) + continue + } + + if got.MetricKind != tt.wantKind { + t.Errorf("got %d, want %d\n", got.MetricKind, tt.wantKind) + } + if got.ValueType != tt.wantValueType { + t.Errorf("got %d, want %d\n", got.ValueType, tt.wantValueType) + } + } +} + +func TestMetricsToMonitoringMetrics_fromProtoPoint(t *testing.T) { + startTimestamp := ×tamp.Timestamp{ + Seconds: 1543160298, + Nanos: 100000090, + } + endTimestamp := ×tamp.Timestamp{ + Seconds: 1543160298, + Nanos: 100000997, + } + + tests := []struct { + in *metricdata.Point + want *monitoringpb.Point + wantErr string + }{ + { + in: &metricdata.Point{ + Time: timestampToTime(endTimestamp), + Value: &metricdata.Distribution{ + Count: 1, + Sum: 11.9, + SumOfSquaredDeviation: 0, + Buckets: []metricdata.Bucket{ + {}, {Count: 1}, {}, {}, {}, + }, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{0, 10, 20, 30, 40}, + }, + }, + }, + want: &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: startTimestamp, + EndTime: endTimestamp, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distributionpb.Distribution{ + Count: 1, + Mean: 11.9, + SumOfSquaredDeviation: 0, + BucketCounts: []int64{0, 1, 0, 0, 0}, + BucketOptions: &distributionpb.Distribution_BucketOptions{ + Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{ + Bounds: []float64{0, 10, 20, 30, 40}, + }, + }, + }, + }, + }, + }, + }, + }, + { + in: &metricdata.Point{ + Time: timestampToTime(endTimestamp), + Value: float64(50.0), + }, + want: &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: startTimestamp, + EndTime: endTimestamp, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 50}, + }, + }, + }, + { + in: &metricdata.Point{ + Time: timestampToTime(endTimestamp), + Value: int64(17), + }, + want: &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: startTimestamp, + EndTime: endTimestamp, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{Int64Value: 17}, + }, + }, + }, + } + + for i, tt := range tests { + mpt, err := metricPointToMpbPoint(startTimestamp, tt.in) + if tt.wantErr != "" { + continue + } + + if err != nil { + t.Errorf("#%d: unexpected error: %v", i, err) + continue + } + + if g, w := mpt, tt.want; !reflect.DeepEqual(g, w) { + // Our saving grace is serialization equality since some + // unexported fields could be present in the various values. + gj, wj := serializeAsJSON(g), serializeAsJSON(w) + if gj != wj { + t.Errorf("#%d: Unmatched JSON\nGot:\n\t%s\nWant:\n\t%s", i, gj, wj) + } + } + } +} + +func timestampToTime(ts *timestamp.Timestamp) time.Time { + if ts == nil { + return time.Unix(0, 0).UTC() + } else { + return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC() + } +} diff --git a/stackdriver.go b/stackdriver.go index e42ea9b..be163a5 100644 --- a/stackdriver.go +++ b/stackdriver.go @@ -70,6 +70,7 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" + "go.opencensus.io/metric/metricdata" ) // Options contains options for configuring the exporter. @@ -350,6 +351,11 @@ func (e *Exporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, return e.statsExporter.ExportMetricsProto(ctx, node, rsc, metrics) } +// ExportMetrics exports OpenCensus Metrics to Stackdriver Monitoring +func (e *Exporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { + return e.statsExporter.ExportMetrics(ctx, metrics) +} + // ExportSpan exports a SpanData to Stackdriver Trace. func (e *Exporter) ExportSpan(sd *trace.SpanData) { if len(e.traceExporter.o.DefaultTraceAttributes) > 0 { diff --git a/stats.go b/stats.go index c0f543e..2f8533b 100644 --- a/stats.go +++ b/stats.go @@ -25,14 +25,15 @@ import ( "sync" "time" - opencensus "go.opencensus.io" + "go.opencensus.io" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" "go.opencensus.io/trace" - monitoring "cloud.google.com/go/monitoring/apiv3" + "cloud.google.com/go/monitoring/apiv3" "github.com/golang/protobuf/ptypes/timestamp" + "go.opencensus.io/metric/metricdata" "google.golang.org/api/option" "google.golang.org/api/support/bundler" distributionpb "google.golang.org/genproto/googleapis/api/distribution" @@ -59,6 +60,7 @@ type statsExporter struct { viewDataBundler *bundler.Bundler protoMetricsBundler *bundler.Bundler + metricsBundler *bundler.Bundler createdViewsMu sync.Mutex createdViews map[string]*metricpb.MetricDescriptor // Views already created remotely @@ -66,6 +68,9 @@ type statsExporter struct { protoMu sync.Mutex protoMetricDescriptors map[string]*metricpb.MetricDescriptor // Saves the metric descriptors that were already created remotely + metricMu sync.Mutex + metricDescriptors map[string]*metricpb.MetricDescriptor // Saves the metric descriptors that were already created remotely + c *monitoring.MetricClient defaultLabels map[string]labelValue } @@ -94,6 +99,7 @@ func newStatsExporter(o Options) (*statsExporter, error) { o: o, createdViews: make(map[string]*metricpb.MetricDescriptor), protoMetricDescriptors: make(map[string]*metricpb.MetricDescriptor), + metricDescriptors: make(map[string]*metricpb.MetricDescriptor), } if o.DefaultMonitoringLabels != nil { @@ -112,13 +118,19 @@ func newStatsExporter(o Options) (*statsExporter, error) { payloads := bundle.([]*metricProtoPayload) e.handleMetricsProtoUpload(payloads) }) + e.metricsBundler = bundler.NewBundler((*metricdata.Metric)(nil), func(bundle interface{}) { + metrics := bundle.([]*metricdata.Metric) + e.handleMetricsUpload(metrics) + }) if delayThreshold := e.o.BundleDelayThreshold; delayThreshold > 0 { e.viewDataBundler.DelayThreshold = delayThreshold e.protoMetricsBundler.DelayThreshold = delayThreshold + e.metricsBundler.DelayThreshold = delayThreshold } if countThreshold := e.o.BundleCountThreshold; countThreshold > 0 { e.viewDataBundler.BundleCountThreshold = countThreshold e.protoMetricsBundler.BundleCountThreshold = countThreshold + e.metricsBundler.BundleCountThreshold = countThreshold } return e, nil } @@ -180,6 +192,7 @@ func (e *statsExporter) handleUpload(vds ...*view.Data) { func (e *statsExporter) Flush() { e.viewDataBundler.Flush() e.protoMetricsBundler.Flush() + e.metricsBundler.Flush() } func (e *statsExporter) uploadStats(vds []*view.Data) error {