From 5d45984ff7addfb0476ea36121722f52c7bfff9f Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Sat, 4 Feb 2023 10:02:24 -0500 Subject: [PATCH] Working implementation using cloud.google.com/go/monitoring/apiv3 Signed-off-by: Kyle Eckhart --- collectors/delta_counter.go | 6 +- collectors/delta_distribution.go | 6 +- collectors/monitoring_collector.go | 308 ++++++++++++++++------------- collectors/monitoring_metrics.go | 26 +-- go.mod | 6 +- go.sum | 15 +- stackdriver_exporter.go | 93 ++++----- 7 files changed, 235 insertions(+), 225 deletions(-) diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go index c3798ab3..0d37d676 100644 --- a/collectors/delta_counter.go +++ b/collectors/delta_counter.go @@ -22,7 +22,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "google.golang.org/api/monitoring/v3" + "google.golang.org/genproto/googleapis/api/metric" ) type CollectedMetric struct { @@ -36,7 +36,7 @@ type DeltaCounterStore interface { // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming // value to an existing entry in the underlying store - Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) + Increment(metricDescriptor *metric.MetricDescriptor, currentValue *ConstMetric) // ListMetrics will return all known entries in the store for a metricDescriptorName ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric @@ -62,7 +62,7 @@ func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCou } } -func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { +func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *metric.MetricDescriptor, currentValue *ConstMetric) { if currentValue == nil { return } diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index 83bc96ae..fb6a5fe8 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -22,7 +22,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "google.golang.org/api/monitoring/v3" + "google.golang.org/genproto/googleapis/api/metric" ) type CollectedHistogram struct { @@ -36,7 +36,7 @@ type DeltaDistributionStore interface { // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming // value to an existing entry in the underlying store - Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) + Increment(metricDescriptor *metric.MetricDescriptor, currentValue *HistogramMetric) // ListMetrics will return all known entries in the store for a metricDescriptorName ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram @@ -62,7 +62,7 @@ func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) Del } } -func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { +func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *metric.MetricDescriptor, currentValue *HistogramMetric) { if currentValue == nil { return } diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index b01367e8..2d70e8b6 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -21,11 +21,16 @@ import ( "sync" "time" + monitoringv3 "cloud.google.com/go/monitoring/apiv3" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" - "google.golang.org/api/monitoring/v3" + "google.golang.org/api/iterator" + "google.golang.org/genproto/googleapis/api/distribution" + "google.golang.org/genproto/googleapis/api/metric" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/prometheus-community/stackdriver_exporter/utils" ) @@ -44,7 +49,7 @@ type MonitoringCollector struct { metricsInterval time.Duration metricsOffset time.Duration metricsIngestDelay bool - monitoringService *monitoring.Service + metricClient *monitoringv3.MetricClient apiCallsTotalMetric prometheus.Counter scrapesTotalMetric prometheus.Counter scrapeErrorsTotalMetric prometheus.Counter @@ -57,6 +62,7 @@ type MonitoringCollector struct { deltaCounterStore DeltaCounterStore deltaDistributionStore DeltaDistributionStore aggregateDeltas bool + timeout time.Duration } type MonitoringCollectorOptions struct { @@ -80,9 +86,11 @@ type MonitoringCollectorOptions struct { DropDelegatedProjects bool // AggregateDeltas decides if DELTA metrics should be treated as a counter using the provided counterStore/distributionStore or a gauge AggregateDeltas bool + // ClientTimeout controls how long each GCP request has to complete + ClientTimeout time.Duration } -func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, distributionStore DeltaDistributionStore) (*MonitoringCollector, error) { +func NewMonitoringCollector(projectID string, metricClient *monitoringv3.MetricClient, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, distributionStore DeltaDistributionStore) (*MonitoringCollector, error) { const subsystem = "monitoring" apiCallsTotalMetric := prometheus.NewCounter( @@ -152,7 +160,7 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv metricsInterval: opts.RequestInterval, metricsOffset: opts.RequestOffset, metricsIngestDelay: opts.IngestDelay, - monitoringService: monitoringService, + metricClient: metricClient, apiCallsTotalMetric: apiCallsTotalMetric, scrapesTotalMetric: scrapesTotalMetric, scrapeErrorsTotalMetric: scrapeErrorsTotalMetric, @@ -165,6 +173,7 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv deltaCounterStore: counterStore, deltaDistributionStore: distributionStore, aggregateDeltas: opts.AggregateDeltas, + timeout: opts.ClientTimeout, } return monitoringCollector, nil @@ -206,101 +215,6 @@ func (c *MonitoringCollector) Collect(ch chan<- prometheus.Metric) { } func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metric, begun time.Time) error { - metricDescriptorsFunction := func(page *monitoring.ListMetricDescriptorsResponse) error { - var wg = &sync.WaitGroup{} - - c.apiCallsTotalMetric.Inc() - - // It has been noticed that the same metric descriptor can be obtained from different GCP - // projects. When that happens, metrics are fetched twice and it provokes the error: - // "collected metric xxx was collected before with the same name and label values" - // - // Metric descriptor project is irrelevant when it comes to fetch metrics, as they will be - // fetched from all the delegated projects filtering by metric type. Considering that, we - // can filter descriptors to keep just one per type. - // - // The following makes sure metric descriptors are unique to avoid fetching more than once - uniqueDescriptors := make(map[string]*monitoring.MetricDescriptor) - for _, descriptor := range page.MetricDescriptors { - uniqueDescriptors[descriptor.Type] = descriptor - } - - errChannel := make(chan error, len(uniqueDescriptors)) - - endTime := time.Now().UTC().Add(c.metricsOffset * -1) - startTime := endTime.Add(c.metricsInterval * -1) - - for _, metricDescriptor := range uniqueDescriptors { - wg.Add(1) - go func(metricDescriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric, startTime, endTime time.Time) { - defer wg.Done() - level.Debug(c.logger).Log("msg", "retrieving Google Stackdriver Monitoring metrics for descriptor", "descriptor", metricDescriptor.Type) - filter := fmt.Sprintf("metric.type=\"%s\"", metricDescriptor.Type) - if c.monitoringDropDelegatedProjects { - filter = fmt.Sprintf( - "project=\"%s\" AND metric.type=\"%s\"", - c.projectID, - metricDescriptor.Type) - } - - if c.metricsIngestDelay && - metricDescriptor.Metadata != nil && - metricDescriptor.Metadata.IngestDelay != "" { - ingestDelay := metricDescriptor.Metadata.IngestDelay - ingestDelayDuration, err := time.ParseDuration(ingestDelay) - if err != nil { - level.Error(c.logger).Log("msg", "error parsing ingest delay from metric metadata", "descriptor", metricDescriptor.Type, "err", err, "delay", ingestDelay) - errChannel <- err - return - } - level.Debug(c.logger).Log("msg", "adding ingest delay", "descriptor", metricDescriptor.Type, "delay", ingestDelay) - endTime = endTime.Add(ingestDelayDuration * -1) - startTime = startTime.Add(ingestDelayDuration * -1) - } - - for _, ef := range c.metricsFilters { - if strings.Contains(metricDescriptor.Type, ef.Prefix) { - filter = fmt.Sprintf("%s AND (%s)", filter, ef.Modifier) - } - } - - level.Debug(c.logger).Log("msg", "retrieving Google Stackdriver Monitoring metrics with filter", "filter", filter) - - timeSeriesListCall := c.monitoringService.Projects.TimeSeries.List(utils.ProjectResource(c.projectID)). - Filter(filter). - IntervalStartTime(startTime.Format(time.RFC3339Nano)). - IntervalEndTime(endTime.Format(time.RFC3339Nano)) - - for { - c.apiCallsTotalMetric.Inc() - page, err := timeSeriesListCall.Do() - if err != nil { - level.Error(c.logger).Log("msg", "error retrieving Time Series metrics for descriptor", "descriptor", metricDescriptor.Type, "err", err) - errChannel <- err - break - } - if page == nil { - break - } - if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch, begun); err != nil { - level.Error(c.logger).Log("msg", "error reporting Time Series metrics for descriptor", "descriptor", metricDescriptor.Type, "err", err) - errChannel <- err - break - } - if page.NextPageToken == "" { - break - } - timeSeriesListCall.PageToken(page.NextPageToken) - } - }(metricDescriptor, ch, startTime, endTime) - } - - wg.Wait() - close(errChannel) - - return <-errChannel - } - var wg = &sync.WaitGroup{} errChannel := make(chan error, len(c.metricsTypePrefixes)) @@ -310,7 +224,6 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri go func(metricsTypePrefix string) { defer wg.Done() level.Debug(c.logger).Log("msg", "listing Google Stackdriver Monitoring metric descriptors starting with", "prefix", metricsTypePrefix) - ctx := context.Background() filter := fmt.Sprintf("metric.type = starts_with(\"%s\")", metricsTypePrefix) if c.monitoringDropDelegatedProjects { filter = fmt.Sprintf( @@ -318,10 +231,40 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri c.projectID, metricsTypePrefix) } - if err := c.monitoringService.Projects.MetricDescriptors.List(utils.ProjectResource(c.projectID)). - Filter(filter). - Pages(ctx, metricDescriptorsFunction); err != nil { - errChannel <- err + + req := &monitoringpb.ListMetricDescriptorsRequest{ + Name: utils.ProjectResource(c.projectID), + Filter: filter, + } + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + it := c.metricClient.ListMetricDescriptors(ctx, req) + + apiCalls := 1.0 + var descriptors []*metric.MetricDescriptor + for { + // There's nothing exposed in https://pkg.go.dev/google.golang.org/api/iterator@v0.103.0 which lets you + // know an API call was initiated. You might think https://pkg.go.dev/google.golang.org/api/iterator@v0.103.0#NewPager + // could do it but the pageSize is a superficial page size that the consumer sets and has impact on API call paging. + // If we know there are no items left in the current page and there's a non-empty page token then calling Next() is going to initiate an API call + if it.PageInfo().Remaining() == 0 && it.PageInfo().Token != "" { + apiCalls += 1.0 + } + descriptor, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + errChannel <- fmt.Errorf("error while fetching descriptors for %s, %v", metricsTypePrefix, err) + break + } + descriptors = append(descriptors, descriptor) + } + c.apiCallsTotalMetric.Add(apiCalls) + + if err := c.collectMetricsForDescriptors(ch, begun, descriptors); err != nil { + errChannel <- fmt.Errorf("error while fetching descriptors for %s, %v", metricsTypePrefix, err) } }(metricsTypePrefix) } @@ -333,15 +276,121 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri return <-errChannel } +func (c *MonitoringCollector) collectMetricsForDescriptors(ch chan<- prometheus.Metric, begun time.Time, descriptors []*metric.MetricDescriptor) error { + var wg = &sync.WaitGroup{} + // It has been noticed that the same metric descriptor can be obtained from different GCP + // projects. When that happens, metrics are fetched twice and it provokes the error: + // "collected metric xxx was collected before with the same name and label values" + // + // Metric descriptor project is irrelevant when it comes to fetch metrics, as they will be + // fetched from all the delegated projects filtering by metric type. Considering that, we + // can filter descriptors to keep just one per type. + // + // The following makes sure metric descriptors are unique to avoid fetching more than once + uniqueDescriptors := make(map[string]*metric.MetricDescriptor) + for _, descriptor := range descriptors { + uniqueDescriptors[descriptor.Type] = descriptor + } + + errChannel := make(chan error, len(uniqueDescriptors)) + + endTime := time.Now().UTC().Add(c.metricsOffset * -1) + startTime := endTime.Add(c.metricsInterval * -1) + + for _, metricDescriptor := range uniqueDescriptors { + wg.Add(1) + go func(metricDescriptor *metric.MetricDescriptor, ch chan<- prometheus.Metric, startTime, endTime time.Time) { + defer wg.Done() + level.Debug(c.logger).Log("msg", "retrieving Google Stackdriver Monitoring metrics for descriptor", "descriptor", metricDescriptor.Type) + filter := fmt.Sprintf("metric.type=\"%s\"", metricDescriptor.Type) + if c.monitoringDropDelegatedProjects { + filter = fmt.Sprintf( + "project=\"%s\" AND metric.type=\"%s\"", + c.projectID, + metricDescriptor.Type) + } + + if c.metricsIngestDelay && + metricDescriptor.Metadata != nil && + metricDescriptor.Metadata.IngestDelay != nil { + ingestDelay := metricDescriptor.Metadata.IngestDelay.AsDuration() + level.Debug(c.logger).Log("msg", "adding ingest delay", "descriptor", metricDescriptor.Type, "delay", ingestDelay) + endTime = endTime.Add(ingestDelay * -1) + startTime = startTime.Add(ingestDelay * -1) + } + + for _, ef := range c.metricsFilters { + if strings.Contains(metricDescriptor.Type, ef.Prefix) { + filter = fmt.Sprintf("%s AND (%s)", filter, ef.Modifier) + } + } + + level.Debug(c.logger).Log("msg", "retrieving Google Stackdriver Monitoring metrics with filter", "filter", filter) + + request := &monitoringpb.ListTimeSeriesRequest{ + Name: utils.ProjectResource(c.projectID), + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamppb.Timestamp{ + Seconds: endTime.Unix(), + Nanos: 0, + }, + StartTime: ×tamppb.Timestamp{ + Seconds: startTime.Unix(), + Nanos: 0, + }, + }, + View: monitoringpb.ListTimeSeriesRequest_FULL, + } + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + it := c.metricClient.ListTimeSeries(ctx, request) + + var results []*monitoringpb.TimeSeries + apiCalls := 1.0 + for { + // There's nothing exposed in https://pkg.go.dev/google.golang.org/api/iterator@v0.103.0 which lets you + // know an API call was initiated. You might think https://pkg.go.dev/google.golang.org/api/iterator@v0.103.0#NewPager + // could do it but the pageSize is a superficial page size that the consumer sets and has impact on API call paging. + // If we know there are no items left in the current page and there's a non-empty page token then calling Next() is going to initiate an API call + if it.PageInfo().Remaining() == 0 && it.PageInfo().Token != "" { + apiCalls += 1.0 + } + timeSeries, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + level.Error(c.logger).Log("msg", "error retrieving Time Series metrics for descriptor", "descriptor", metricDescriptor.Type, "err", err) + errChannel <- err + break + } + results = append(results, timeSeries) + } + c.apiCallsTotalMetric.Add(apiCalls) + + if err := c.reportTimeSeriesMetrics(results, metricDescriptor, ch, begun); err != nil { + level.Error(c.logger).Log("msg", "error reporting Time Series metrics for descriptor", "descriptor", metricDescriptor.Type, "err", err) + errChannel <- err + } + }(metricDescriptor, ch, startTime, endTime) + } + + wg.Wait() + close(errChannel) + + return <-errChannel +} + func (c *MonitoringCollector) reportTimeSeriesMetrics( - page *monitoring.ListTimeSeriesResponse, - metricDescriptor *monitoring.MetricDescriptor, + results []*monitoringpb.TimeSeries, + metricDescriptor *metric.MetricDescriptor, ch chan<- prometheus.Metric, begun time.Time, ) error { var metricValue float64 var metricValueType prometheus.ValueType - var newestTSPoint *monitoring.Point + var newestTSPoint *monitoringpb.Point timeSeriesMetrics, err := NewTimeSeriesMetrics(metricDescriptor, ch, @@ -353,13 +402,10 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( if err != nil { return fmt.Errorf("error creating the TimeSeriesMetrics %v", err) } - for _, timeSeries := range page.TimeSeries { + for _, timeSeries := range results { newestEndTime := time.Unix(0, 0) for _, point := range timeSeries.Points { - endTime, err := time.Parse(time.RFC3339Nano, point.Interval.EndTime) - if err != nil { - return fmt.Errorf("Error parsing TimeSeries Point interval end time `%s`: %s", point.Interval.EndTime, err) - } + endTime := time.Unix(point.Interval.EndTime.GetSeconds(), int64(point.Interval.EndTime.GetNanos())) if endTime.After(newestEndTime) { newestEndTime = endTime newestTSPoint = point @@ -402,32 +448,32 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( } switch timeSeries.MetricKind { - case "GAUGE": + case metric.MetricDescriptor_GAUGE: metricValueType = prometheus.GaugeValue - case "DELTA": + case metric.MetricDescriptor_DELTA: if c.aggregateDeltas { metricValueType = prometheus.CounterValue } else { metricValueType = prometheus.GaugeValue } - case "CUMULATIVE": + case metric.MetricDescriptor_CUMULATIVE: metricValueType = prometheus.CounterValue default: continue } switch timeSeries.ValueType { - case "BOOL": + case metric.MetricDescriptor_BOOL: metricValue = 0 - if *newestTSPoint.Value.BoolValue { + if newestTSPoint.Value.GetBoolValue() { metricValue = 1 } - case "INT64": - metricValue = float64(*newestTSPoint.Value.Int64Value) - case "DOUBLE": - metricValue = *newestTSPoint.Value.DoubleValue - case "DISTRIBUTION": - dist := newestTSPoint.Value.DistributionValue + case metric.MetricDescriptor_INT64: + metricValue = float64(newestTSPoint.Value.GetInt64Value()) + case metric.MetricDescriptor_DOUBLE: + metricValue = newestTSPoint.Value.GetDoubleValue() + case metric.MetricDescriptor_DISTRIBUTION: + dist := newestTSPoint.Value.GetDistributionValue() buckets, err := c.generateHistogramBuckets(dist) if err == nil { @@ -449,25 +495,23 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( } func (c *MonitoringCollector) generateHistogramBuckets( - dist *monitoring.Distribution, + dist *distribution.Distribution, ) (map[float64]uint64, error) { - opts := dist.BucketOptions var bucketKeys []float64 - switch { - case opts.ExplicitBuckets != nil: - // @see https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TypedValue#explicit + switch opts := dist.BucketOptions.GetOptions().(type) { + case *distribution.Distribution_BucketOptions_ExplicitBuckets: bucketKeys = make([]float64, len(opts.ExplicitBuckets.Bounds)+1) copy(bucketKeys, opts.ExplicitBuckets.Bounds) - case opts.LinearBuckets != nil: - // @see https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TypedValue#linear + case *distribution.Distribution_BucketOptions_LinearBuckets: + // @see https://pkg.go.dev/google.golang.org/genproto/googleapis/api/distribution#Distribution_BucketOptions_Linear // NumFiniteBuckets is inclusive so bucket count is num+2 num := int(opts.LinearBuckets.NumFiniteBuckets) bucketKeys = make([]float64, num+2) for i := 0; i <= num; i++ { bucketKeys[i] = opts.LinearBuckets.Offset + (float64(i) * opts.LinearBuckets.Width) } - case opts.ExponentialBuckets != nil: - // @see https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TypedValue#exponential + case *distribution.Distribution_BucketOptions_ExponentialBuckets: + // @see https://pkg.go.dev/google.golang.org/genproto/googleapis/api/distribution#Distribution_BucketOptions_Exponential // NumFiniteBuckets is inclusive so bucket count is num+2 num := int(opts.ExponentialBuckets.NumFiniteBuckets) bucketKeys = make([]float64, num+2) diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index d6bbced7..b339cb27 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -16,15 +16,17 @@ package collectors import ( "time" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/prometheus/client_golang/prometheus" - "google.golang.org/api/monitoring/v3" + "google.golang.org/genproto/googleapis/api/distribution" + "google.golang.org/genproto/googleapis/api/metric" "sort" "github.com/prometheus-community/stackdriver_exporter/utils" ) -func buildFQName(timeSeries *monitoring.TimeSeries) string { +func buildFQName(timeSeries *monitoringpb.TimeSeries) string { // The metric name to report is composed by the 3 parts: // 1. namespace is a constant prefix (stackdriver) // 2. subsystem is the monitored resource type (ie gce_instance) @@ -33,7 +35,7 @@ func buildFQName(timeSeries *monitoring.TimeSeries) string { } type timeSeriesMetrics struct { - metricDescriptor *monitoring.MetricDescriptor + metricDescriptor *metric.MetricDescriptor ch chan<- prometheus.Metric @@ -46,7 +48,7 @@ type timeSeriesMetrics struct { aggregateDeltas bool } -func NewTimeSeriesMetrics(descriptor *monitoring.MetricDescriptor, +func NewTimeSeriesMetrics(descriptor *metric.MetricDescriptor, ch chan<- prometheus.Metric, fillMissingLabels bool, deltaCounterStore DeltaCounterStore, @@ -88,7 +90,7 @@ type ConstMetric struct { type HistogramMetric struct { fqName string labelKeys []string - dist *monitoring.Distribution + dist *distribution.Distribution buckets map[float64]uint64 labelValues []string reportTime time.Time @@ -96,11 +98,11 @@ type HistogramMetric struct { keysHash uint64 } -func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) { +func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoringpb.TimeSeries, reportTime time.Time, labelKeys []string, dist *distribution.Distribution, buckets map[float64]uint64, labelValues []string, metricKind metric.MetricDescriptor_MetricKind) { fqName := buildFQName(timeSeries) var v HistogramMetric - if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { + if t.fillMissingLabels || (metricKind == metric.MetricDescriptor_DELTA && t.aggregateDeltas) { v = HistogramMetric{ fqName: fqName, labelKeys: labelKeys, @@ -113,7 +115,7 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time } } - if metricKind == "DELTA" && t.aggregateDeltas { + if metricKind == metric.MetricDescriptor_DELTA && t.aggregateDeltas { t.deltaDistributionStore.Increment(t.metricDescriptor, &v) return } @@ -130,7 +132,7 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist, buckets, labelValues) } -func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric { +func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, dist *distribution.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric { return prometheus.NewMetricWithTimestamp( reportTime, prometheus.MustNewConstHistogram( @@ -143,11 +145,11 @@ func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Tim ) } -func (t *timeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string, metricKind string) { +func (t *timeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoringpb.TimeSeries, reportTime time.Time, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string, metricKind metric.MetricDescriptor_MetricKind) { fqName := buildFQName(timeSeries) var v ConstMetric - if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { + if t.fillMissingLabels || (metricKind == metric.MetricDescriptor_DELTA && t.aggregateDeltas) { v = ConstMetric{ fqName: fqName, labelKeys: labelKeys, @@ -160,7 +162,7 @@ func (t *timeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer } } - if metricKind == "DELTA" && t.aggregateDeltas { + if metricKind == metric.MetricDescriptor_DELTA && t.aggregateDeltas { t.deltaCounterStore.Increment(t.metricDescriptor, &v) return } diff --git a/go.mod b/go.mod index 749e5df3..af6bd3fb 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/prometheus-community/stackdriver_exporter go 1.18 require ( - github.com/PuerkitoBio/rehttp v1.1.0 + cloud.google.com/go/monitoring v1.8.0 github.com/fatih/camelcase v1.0.0 github.com/go-kit/log v0.2.1 github.com/onsi/ginkgo v1.16.5 @@ -14,6 +14,8 @@ require ( golang.org/x/net v0.5.0 golang.org/x/oauth2 v0.4.0 google.golang.org/api v0.108.0 + google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef + google.golang.org/protobuf v1.28.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 ) @@ -40,9 +42,7 @@ require ( golang.org/x/sys v0.4.0 // indirect golang.org/x/text v0.6.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect google.golang.org/grpc v1.51.0 // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 2cc6d242..5309b6fc 100644 --- a/go.sum +++ b/go.sum @@ -5,17 +5,13 @@ cloud.google.com/go/compute v1.14.0/go.mod h1:YfLtxrj9sU4Yxv+sXzZkyPjEyPBZfXHUvj cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs= +cloud.google.com/go/monitoring v1.8.0 h1:c9riaGSPQ4dUKWB+M1Fl0N+iLxstMbCktdEwYSPGDvA= +cloud.google.com/go/monitoring v1.8.0/go.mod h1:E7PtoMJ1kQXWxPjB6mv2fhC5/15jInuulFdYYtlcvT4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/PuerkitoBio/rehttp v1.1.0 h1:JFZ7OeK+hbJpTxhNB0NDZT47AuXqCU0Smxfjtph7/Rs= -github.com/PuerkitoBio/rehttp v1.1.0/go.mod h1:LUwKPoDbDIA2RL5wYZCNsQ90cx4OJ4AWBmq6KzWZL1s= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aybabtme/iocontrol v0.0.0-20150809002002-ad15bcfc95a0 h1:0NmehRCgyk5rljDQLKUO+cRJCnduDyn11+zGZIc9Z48= -github.com/aybabtme/iocontrol v0.0.0-20150809002002-ad15bcfc95a0/go.mod h1:6L7zgvqo0idzI7IO8de6ZC051AfXb5ipkIJ7bIA2tGA= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -27,7 +23,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -116,7 +111,6 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= @@ -143,7 +137,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -163,16 +156,12 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 35b7c373..81ea5d5a 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -19,7 +19,7 @@ import ( "os" "strings" - "github.com/PuerkitoBio/rehttp" + "cloud.google.com/go/monitoring/apiv3" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -30,7 +30,6 @@ import ( "golang.org/x/net/context" "golang.org/x/oauth2/google" "google.golang.org/api/compute/v1" - "google.golang.org/api/monitoring/v3" "google.golang.org/api/option" "gopkg.in/alecthomas/kingpin.v2" @@ -57,26 +56,10 @@ var ( "google.project-id", "Comma seperated list of Google Project IDs.", ).String() - stackdriverMaxRetries = kingpin.Flag( - "stackdriver.max-retries", "Max number of retries that should be attempted on 503 errors from stackdriver.", - ).Default("0").Int() - - stackdriverHttpTimeout = kingpin.Flag( - "stackdriver.http-timeout", "How long should stackdriver_exporter wait for a result from the Stackdriver API.", + gcpMetricClientTimeout = kingpin.Flag( + "stackdriver.client-timeout", "How long should the collector wait for a response from the GCP Monitoring APIs", ).Default("10s").Duration() - stackdriverMaxBackoffDuration = kingpin.Flag( - "stackdriver.max-backoff", "Max time between each request in an exp backoff scenario.", - ).Default("5s").Duration() - - stackdriverBackoffJitterBase = kingpin.Flag( - "stackdriver.backoff-jitter", "The amount of jitter to introduce in a exp backoff scenario.", - ).Default("1s").Duration() - - stackdriverRetryStatuses = kingpin.Flag( - "stackdriver.retry-statuses", "The HTTP statuses that should trigger a retry.", - ).Default("503").Ints() - // Monitoring collector flags monitoringMetricsTypePrefixes = kingpin.Flag( @@ -130,29 +113,6 @@ func getDefaultGCPProject(ctx context.Context) (*string, error) { return &credentials.ProjectID, nil } -func createMonitoringService(ctx context.Context) (*monitoring.Service, error) { - googleClient, err := google.DefaultClient(ctx, monitoring.MonitoringReadScope) - if err != nil { - return nil, fmt.Errorf("Error creating Google client: %v", err) - } - - googleClient.Timeout = *stackdriverHttpTimeout - googleClient.Transport = rehttp.NewTransport( - googleClient.Transport, // need to wrap DefaultClient transport - rehttp.RetryAll( - rehttp.RetryMaxRetries(*stackdriverMaxRetries), - rehttp.RetryStatuses(*stackdriverRetryStatuses...)), // Cloud support suggests retrying on 503 errors - rehttp.ExpJitterDelay(*stackdriverBackoffJitterBase, *stackdriverMaxBackoffDuration), // Set timeout to <10s as that is prom default timeout - ) - - monitoringService, err := monitoring.NewService(ctx, option.WithHTTPClient(googleClient)) - if err != nil { - return nil, fmt.Errorf("Error creating Google Stackdriver Monitoring service: %v", err) - } - - return monitoringService, nil -} - type handler struct { handler http.Handler logger log.Logger @@ -161,7 +121,7 @@ type handler struct { metricsPrefixes []string metricsExtraFilters []collectors.MetricFilter additionalGatherer prometheus.Gatherer - m *monitoring.Service + metricClient *monitoring.MetricClient } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -179,14 +139,14 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.handler.ServeHTTP(w, r) } -func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, m *monitoring.Service, logger log.Logger, additionalGatherer prometheus.Gatherer) *handler { +func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, metricClient *monitoring.MetricClient, logger log.Logger, additionalGatherer prometheus.Gatherer) *handler { h := &handler{ logger: logger, projectIDs: projectIDs, metricsPrefixes: metricPrefixes, metricsExtraFilters: metricExtraFilters, additionalGatherer: additionalGatherer, - m: m, + metricClient: metricClient, } h.handler = h.innerHandler(nil) @@ -197,16 +157,24 @@ func (h *handler) innerHandler(filters map[string]bool) http.Handler { registry := prometheus.NewRegistry() for _, project := range h.projectIDs { - monitoringCollector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{ - MetricTypePrefixes: h.filterMetricTypePrefixes(filters), - ExtraFilters: h.metricsExtraFilters, - RequestInterval: *monitoringMetricsInterval, - RequestOffset: *monitoringMetricsOffset, - IngestDelay: *monitoringMetricsIngestDelay, - FillMissingLabels: *collectorFillMissingLabels, - DropDelegatedProjects: *monitoringDropDelegatedProjects, - AggregateDeltas: *monitoringMetricsAggregateDeltas, - }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, *monitoringMetricsDeltasTTL), collectors.NewInMemoryDeltaDistributionStore(h.logger, *monitoringMetricsDeltasTTL)) + monitoringCollector, err := collectors.NewMonitoringCollector( + project, + h.metricClient, + collectors.MonitoringCollectorOptions{ + MetricTypePrefixes: h.filterMetricTypePrefixes(filters), + ExtraFilters: h.metricsExtraFilters, + RequestInterval: *monitoringMetricsInterval, + RequestOffset: *monitoringMetricsOffset, + IngestDelay: *monitoringMetricsIngestDelay, + FillMissingLabels: *collectorFillMissingLabels, + DropDelegatedProjects: *monitoringDropDelegatedProjects, + AggregateDeltas: *monitoringMetricsAggregateDeltas, + ClientTimeout: *gcpMetricClientTimeout, + }, + h.logger, + collectors.NewInMemoryDeltaCounterStore(h.logger, *monitoringMetricsDeltasTTL), + collectors.NewInMemoryDeltaDistributionStore(h.logger, *monitoringMetricsDeltasTTL), + ) if err != nil { level.Error(h.logger).Log("err", err) os.Exit(1) @@ -265,7 +233,14 @@ func main() { level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext()) level.Info(logger).Log("msg", "Using Google Cloud Project ID", "projectID", *projectID) - monitoringService, err := createMonitoringService(ctx) + //TODO do we need a connection pool or is a single connection okay? + //pool := option.WithGRPCConnectionPool(10) + + // Notes from: https://pkg.go.dev/cloud.google.com/go#hdr-Timeouts_and_Cancellation + // The ctx used by the client should never have a timeout as it can interfere with credential refreshing + // Transient errors will be retried when correctness allows. Cancellation/timeout should be handled via the context + // provided to a function which makes an API call + metricClient, err := monitoring.NewMetricClient(ctx, option.WithScopes("https://www.googleapis.com/auth/monitoring.read")) if err != nil { level.Error(logger).Log("msg", "failed to create monitoring service", "err", err) os.Exit(1) @@ -277,12 +252,12 @@ func main() { if *metricsPath == *stackdriverMetricsPath { handler := newHandler( - projectIDs, metricsTypePrefixes, metricExtraFilters, monitoringService, logger, prometheus.DefaultGatherer) + projectIDs, metricsTypePrefixes, metricExtraFilters, metricClient, logger, prometheus.DefaultGatherer) http.Handle(*metricsPath, promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, handler)) } else { level.Info(logger).Log("msg", "Serving Stackdriver metrics at separate path", "path", *stackdriverMetricsPath) handler := newHandler( - projectIDs, metricsTypePrefixes, metricExtraFilters, monitoringService, logger, nil) + projectIDs, metricsTypePrefixes, metricExtraFilters, metricClient, logger, nil) http.Handle(*stackdriverMetricsPath, promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, handler)) http.Handle(*metricsPath, promhttp.Handler()) }