diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go deleted file mode 100644 index c3798ab3..00000000 --- a/collectors/delta_counter.go +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright 2022 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collectors - -import ( - "fmt" - "sort" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "google.golang.org/api/monitoring/v3" -) - -type CollectedMetric struct { - metric *ConstMetric - lastCollectedAt time.Time -} - -// DeltaCounterStore defines a set of functions which must be implemented in order to be used as a DeltaCounterStore -// which accumulates DELTA Counter metrics over time -type DeltaCounterStore interface { - - // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming - // value to an existing entry in the underlying store - Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) - - // ListMetrics will return all known entries in the store for a metricDescriptorName - ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric -} - -type metricEntry struct { - collected map[uint64]*CollectedMetric - mutex *sync.RWMutex -} - -type inMemoryDeltaCounterStore struct { - store *sync.Map - ttl time.Duration - logger log.Logger -} - -// NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory -func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { - return &inMemoryDeltaCounterStore{ - store: &sync.Map{}, - logger: logger, - ttl: ttl, - } -} - -func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { - if currentValue == nil { - return - } - - tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &metricEntry{ - collected: map[uint64]*CollectedMetric{}, - mutex: &sync.RWMutex{}, - }) - entry := tmp.(*metricEntry) - - key := toCounterKey(currentValue) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - existing := entry.collected[key] - - if existing == nil { - level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime) - entry.collected[key] = &CollectedMetric{currentValue, time.Now()} - return - } - - if existing.metric.reportTime.Before(currentValue.reportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry.collected[key].metric.reportTime, "incoming_time", currentValue.reportTime) - currentValue.value = currentValue.value + existing.metric.value - existing.metric = currentValue - existing.lastCollectedAt = time.Now() - return - } - - level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.metric.reportTime, "incoming_time", currentValue.reportTime) -} - -func toCounterKey(c *ConstMetric) uint64 { - labels := make(map[string]string) - keysCopy := append([]string{}, c.labelKeys...) - for i := range c.labelKeys { - labels[c.labelKeys[i]] = c.labelValues[i] - } - sort.Strings(keysCopy) - - var keyParts []string - for _, k := range keysCopy { - keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) - } - hashText := fmt.Sprintf("%s|%s", c.fqName, strings.Join(keyParts, "|")) - h := hashNew() - h = hashAdd(h, hashText) - - return h -} - -func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric { - output := map[string][]*CollectedMetric{} - now := time.Now() - ttlWindowStart := now.Add(-s.ttl) - - tmp, exists := s.store.Load(metricDescriptorName) - if !exists { - return output - } - entry := tmp.(*metricEntry) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - for key, collected := range entry.collected { - //Scan and remove metrics which are outside the TTL - if ttlWindowStart.After(collected.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.fqName) - delete(entry.collected, key) - continue - } - - metrics, exists := output[collected.metric.fqName] - if !exists { - metrics = make([]*CollectedMetric, 0) - } - metricCopy := *collected.metric - outputEntry := CollectedMetric{ - metric: &metricCopy, - lastCollectedAt: collected.lastCollectedAt, - } - output[collected.metric.fqName] = append(metrics, &outputEntry) - } - - return output -} diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go deleted file mode 100644 index 83bc96ae..00000000 --- a/collectors/delta_distribution.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright 2022 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collectors - -import ( - "fmt" - "sort" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "google.golang.org/api/monitoring/v3" -) - -type CollectedHistogram struct { - histogram *HistogramMetric - lastCollectedAt time.Time -} - -// DeltaDistributionStore defines a set of functions which must be implemented in order to be used as a DeltaDistributionStore -// which accumulates DELTA histogram metrics over time -type DeltaDistributionStore interface { - - // Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming - // value to an existing entry in the underlying store - Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) - - // ListMetrics will return all known entries in the store for a metricDescriptorName - ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram -} - -type histogramEntry struct { - collected map[uint64]*CollectedHistogram - mutex *sync.RWMutex -} - -type inMemoryDeltaDistributionStore struct { - store *sync.Map - ttl time.Duration - logger log.Logger -} - -// NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory -func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { - return &inMemoryDeltaDistributionStore{ - store: &sync.Map{}, - logger: logger, - ttl: ttl, - } -} - -func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { - if currentValue == nil { - return - } - - tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &histogramEntry{ - collected: map[uint64]*CollectedHistogram{}, - mutex: &sync.RWMutex{}, - }) - entry := tmp.(*histogramEntry) - - key := toHistogramKey(currentValue) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - existing := entry.collected[key] - - if existing == nil { - level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.fqName, "key", key, "incoming_time", currentValue.reportTime) - entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} - return - } - - if existing.histogram.reportTime.Before(currentValue.reportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime) - existing.histogram = mergeHistograms(existing.histogram, currentValue) - existing.lastCollectedAt = time.Now() - return - } - - level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime) -} - -func toHistogramKey(hist *HistogramMetric) uint64 { - labels := make(map[string]string) - keysCopy := append([]string{}, hist.labelKeys...) - for i := range hist.labelKeys { - labels[hist.labelKeys[i]] = hist.labelValues[i] - } - sort.Strings(keysCopy) - - var keyParts []string - for _, k := range keysCopy { - keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) - } - hashText := fmt.Sprintf("%s|%s", hist.fqName, strings.Join(keyParts, "|")) - h := hashNew() - h = hashAdd(h, hashText) - - return h -} - -func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *HistogramMetric { - for key, value := range existing.buckets { - current.buckets[key] += value - } - - // Calculate a new mean and overall count - mean := existing.dist.Mean - mean += current.dist.Mean - mean /= 2 - - var count uint64 - for _, v := range current.buckets { - count += v - } - - current.dist.Mean = mean - current.dist.Count = int64(count) - - return current -} - -func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram { - output := map[string][]*CollectedHistogram{} - now := time.Now() - ttlWindowStart := now.Add(-s.ttl) - - tmp, exists := s.store.Load(metricDescriptorName) - if !exists { - return output - } - entry := tmp.(*histogramEntry) - - entry.mutex.Lock() - defer entry.mutex.Unlock() - for key, collected := range entry.collected { - //Scan and remove metrics which are outside the TTL - if ttlWindowStart.After(collected.lastCollectedAt) { - level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.fqName) - delete(entry.collected, key) - continue - } - - metrics, exists := output[collected.histogram.fqName] - if !exists { - metrics = make([]*CollectedHistogram, 0) - } - histCopy := *collected.histogram - outputEntry := CollectedHistogram{ - histogram: &histCopy, - lastCollectedAt: collected.lastCollectedAt, - } - output[collected.histogram.fqName] = append(metrics, &outputEntry) - } - - return output -} diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index 124c8bbf..4f9ddb30 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -54,8 +54,8 @@ type MonitoringCollector struct { collectorFillMissingLabels bool monitoringDropDelegatedProjects bool logger log.Logger - deltaCounterStore DeltaCounterStore - deltaDistributionStore DeltaDistributionStore + counterStore DeltaCounterStore + histogramStore DeltaHistogramStore aggregateDeltas bool descriptorCache DescriptorCache } @@ -110,7 +110,17 @@ func (d *googleDescriptorCache) Store(prefix string, data []*monitoring.MetricDe d.inner.Store(prefix, data) } -func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, distributionStore DeltaDistributionStore) (*MonitoringCollector, error) { +type DeltaCounterStore interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) + ListMetrics(metricDescriptorName string) []*ConstMetric +} + +type DeltaHistogramStore interface { + Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) + ListMetrics(metricDescriptorName string) []*HistogramMetric +} + +func NewMonitoringCollector(projectID string, monitoringService *monitoring.Service, opts MonitoringCollectorOptions, logger log.Logger, counterStore DeltaCounterStore, histogramStore DeltaHistogramStore) (*MonitoringCollector, error) { const subsystem = "monitoring" apiCallsTotalMetric := prometheus.NewCounter( @@ -200,8 +210,8 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv collectorFillMissingLabels: opts.FillMissingLabels, monitoringDropDelegatedProjects: opts.DropDelegatedProjects, logger: logger, - deltaCounterStore: counterStore, - deltaDistributionStore: distributionStore, + counterStore: counterStore, + histogramStore: histogramStore, aggregateDeltas: opts.AggregateDeltas, descriptorCache: descriptorCache, } @@ -398,11 +408,11 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( var metricValueType prometheus.ValueType var newestTSPoint *monitoring.Point - timeSeriesMetrics, err := NewTimeSeriesMetrics(metricDescriptor, + timeSeriesMetrics, err := newTimeSeriesMetrics(metricDescriptor, ch, c.collectorFillMissingLabels, - c.deltaCounterStore, - c.deltaDistributionStore, + c.counterStore, + c.histogramStore, c.aggregateDeltas, ) if err != nil { diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index d6bbced7..a1b08046 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -21,6 +21,7 @@ import ( "sort" + "github.com/prometheus-community/stackdriver_exporter/hash" "github.com/prometheus-community/stackdriver_exporter/utils" ) @@ -41,27 +42,27 @@ type timeSeriesMetrics struct { constMetrics map[string][]*ConstMetric histogramMetrics map[string][]*HistogramMetric - deltaCounterStore DeltaCounterStore - deltaDistributionStore DeltaDistributionStore - aggregateDeltas bool + counterStore DeltaCounterStore + histogramStore DeltaHistogramStore + aggregateDeltas bool } -func NewTimeSeriesMetrics(descriptor *monitoring.MetricDescriptor, +func newTimeSeriesMetrics(descriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric, fillMissingLabels bool, - deltaCounterStore DeltaCounterStore, - deltaDistributionStore DeltaDistributionStore, + counterStore DeltaCounterStore, + histogramStore DeltaHistogramStore, aggregateDeltas bool) (*timeSeriesMetrics, error) { return &timeSeriesMetrics{ - metricDescriptor: descriptor, - ch: ch, - fillMissingLabels: fillMissingLabels, - constMetrics: make(map[string][]*ConstMetric), - histogramMetrics: make(map[string][]*HistogramMetric), - deltaCounterStore: deltaCounterStore, - deltaDistributionStore: deltaDistributionStore, - aggregateDeltas: aggregateDeltas, + metricDescriptor: descriptor, + ch: ch, + fillMissingLabels: fillMissingLabels, + constMetrics: make(map[string][]*ConstMetric), + histogramMetrics: make(map[string][]*HistogramMetric), + counterStore: counterStore, + histogramStore: histogramStore, + aggregateDeltas: aggregateDeltas, }, nil } @@ -75,25 +76,28 @@ func (t *timeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *pr } type ConstMetric struct { - fqName string - labelKeys []string - valueType prometheus.ValueType - value float64 - labelValues []string - reportTime time.Time - - keysHash uint64 + FqName string + LabelKeys []string + ValueType prometheus.ValueType + Value float64 + LabelValues []string + ReportTime time.Time + CollectionTime time.Time + + KeysHash uint64 } type HistogramMetric struct { - fqName string - labelKeys []string - dist *monitoring.Distribution - buckets map[float64]uint64 - labelValues []string - reportTime time.Time - - keysHash uint64 + FqName string + LabelKeys []string + Mean float64 + Count uint64 + Buckets map[float64]uint64 + LabelValues []string + ReportTime time.Time + CollectionTime time.Time + + KeysHash uint64 } func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) { @@ -102,19 +106,21 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time var v HistogramMetric if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = HistogramMetric{ - fqName: fqName, - labelKeys: labelKeys, - dist: dist, - buckets: buckets, - labelValues: labelValues, - reportTime: reportTime, - - keysHash: hashLabelKeys(labelKeys), + FqName: fqName, + LabelKeys: labelKeys, + Mean: dist.Mean, + Count: uint64(dist.Count), + Buckets: buckets, + LabelValues: labelValues, + ReportTime: reportTime, + CollectionTime: time.Now(), + + KeysHash: hashLabelKeys(labelKeys), } } if metricKind == "DELTA" && t.aggregateDeltas { - t.deltaDistributionStore.Increment(t.metricDescriptor, &v) + t.histogramStore.Increment(t.metricDescriptor, &v) return } @@ -127,16 +133,16 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time return } - t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist, buckets, labelValues) + t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist.Mean, uint64(dist.Count), buckets, labelValues) } -func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric { +func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, mean float64, count uint64, buckets map[float64]uint64, labelValues []string) prometheus.Metric { return prometheus.NewMetricWithTimestamp( reportTime, prometheus.MustNewConstHistogram( t.newMetricDesc(fqName, labelKeys), - uint64(dist.Count), - dist.Mean*float64(dist.Count), // Stackdriver does not provide the sum, but we can fake it + count, + mean*float64(count), // Stackdriver does not provide the sum, but we can fake it buckets, labelValues..., ), @@ -149,19 +155,20 @@ func (t *timeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer var v ConstMetric if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = ConstMetric{ - fqName: fqName, - labelKeys: labelKeys, - valueType: metricValueType, - value: metricValue, - labelValues: labelValues, - reportTime: reportTime, - - keysHash: hashLabelKeys(labelKeys), + FqName: fqName, + LabelKeys: labelKeys, + ValueType: metricValueType, + Value: metricValue, + LabelValues: labelValues, + ReportTime: reportTime, + CollectionTime: time.Now(), + + KeysHash: hashLabelKeys(labelKeys), } } if metricKind == "DELTA" && t.aggregateDeltas { - t.deltaCounterStore.Increment(t.metricDescriptor, &v) + t.counterStore.Increment(t.metricDescriptor, &v) return } @@ -190,13 +197,13 @@ func (t *timeSeriesMetrics) newConstMetric(fqName string, reportTime time.Time, } func hashLabelKeys(labelKeys []string) uint64 { - dh := hashNew() + dh := hash.New() sortedKeys := make([]string, len(labelKeys)) copy(sortedKeys, labelKeys) sort.Strings(sortedKeys) for _, key := range sortedKeys { - dh = hashAdd(dh, key) - dh = hashAddByte(dh, separatorByte) + dh = hash.Add(dh, key) + dh = hash.AddByte(dh, hash.SeparatorByte) } return dh } @@ -213,7 +220,7 @@ func (t *timeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*Cons if len(vs) > 1 { var needFill bool for i := 1; i < len(vs); i++ { - if vs[0].keysHash != vs[i].keysHash { + if vs[0].KeysHash != vs[i].KeysHash { needFill = true } } @@ -223,7 +230,7 @@ func (t *timeSeriesMetrics) completeConstMetrics(constMetrics map[string][]*Cons } for _, v := range vs { - t.ch <- t.newConstMetric(v.fqName, v.reportTime, v.labelKeys, v.valueType, v.value, v.labelValues) + t.ch <- t.newConstMetric(v.FqName, v.ReportTime, v.LabelKeys, v.ValueType, v.Value, v.LabelValues) } } } @@ -233,7 +240,7 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi if len(vs) > 1 { var needFill bool for i := 1; i < len(vs); i++ { - if vs[0].keysHash != vs[i].keysHash { + if vs[0].KeysHash != vs[i].KeysHash { needFill = true } } @@ -242,41 +249,39 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi } } for _, v := range vs { - t.ch <- t.newConstHistogram(v.fqName, v.reportTime, v.labelKeys, v.dist, v.buckets, v.labelValues) + t.ch <- t.newConstHistogram(v.FqName, v.ReportTime, v.LabelKeys, v.Mean, v.Count, v.Buckets, v.LabelValues) } } } func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Time) { - descriptorMetrics := t.deltaCounterStore.ListMetrics(t.metricDescriptor.Name) + descriptorMetrics := t.counterStore.ListMetrics(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) constMetrics := map[string][]*ConstMetric{} - for _, metrics := range descriptorMetrics { - for _, collected := range metrics { - // If the metric wasn't collected we should still export it at the next sample time to avoid staleness - if reportingStartTime.After(collected.lastCollectedAt) { - // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many - // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional - // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.metric.reportTime).Truncate(time.Minute) - collected.metric.reportTime = now.Add(-reportingLag) - } - if t.fillMissingLabels { - if _, exists := constMetrics[collected.metric.fqName]; !exists { - constMetrics[collected.metric.fqName] = []*ConstMetric{} - } - constMetrics[collected.metric.fqName] = append(constMetrics[collected.metric.fqName], collected.metric) - } else { - t.ch <- t.newConstMetric( - collected.metric.fqName, - collected.metric.reportTime, - collected.metric.labelKeys, - collected.metric.valueType, - collected.metric.value, - collected.metric.labelValues, - ) + for _, collected := range descriptorMetrics { + // If the metric wasn't collected we should still export it at the next sample time to avoid staleness + if reportingStartTime.After(collected.CollectionTime) { + // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many + // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional + // for a monitoring.MetricDescriptor + reportingLag := collected.CollectionTime.Sub(collected.ReportTime).Truncate(time.Minute) + collected.ReportTime = now.Add(-reportingLag) + } + if t.fillMissingLabels { + if _, exists := constMetrics[collected.FqName]; !exists { + constMetrics[collected.FqName] = []*ConstMetric{} } + constMetrics[collected.FqName] = append(constMetrics[collected.FqName], collected) + } else { + t.ch <- t.newConstMetric( + collected.FqName, + collected.ReportTime, + collected.LabelKeys, + collected.ValueType, + collected.Value, + collected.LabelValues, + ) } } @@ -286,35 +291,34 @@ func (t *timeSeriesMetrics) completeDeltaConstMetrics(reportingStartTime time.Ti } func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime time.Time) { - descriptorMetrics := t.deltaDistributionStore.ListMetrics(t.metricDescriptor.Name) + descriptorMetrics := t.histogramStore.ListMetrics(t.metricDescriptor.Name) now := time.Now().Truncate(time.Minute) histograms := map[string][]*HistogramMetric{} - for _, metrics := range descriptorMetrics { - for _, collected := range metrics { - // If the histogram wasn't collected we should still export it at the next sample time to avoid staleness - if reportingStartTime.After(collected.lastCollectedAt) { - // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many - // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional - // for a monitoring.MetricDescriptor - reportingLag := collected.lastCollectedAt.Sub(collected.histogram.reportTime).Truncate(time.Minute) - collected.histogram.reportTime = now.Add(-reportingLag) - } - if t.fillMissingLabels { - if _, exists := histograms[collected.histogram.fqName]; !exists { - histograms[collected.histogram.fqName] = []*HistogramMetric{} - } - histograms[collected.histogram.fqName] = append(histograms[collected.histogram.fqName], collected.histogram) - } else { - t.ch <- t.newConstHistogram( - collected.histogram.fqName, - collected.histogram.reportTime, - collected.histogram.labelKeys, - collected.histogram.dist, - collected.histogram.buckets, - collected.histogram.labelValues, - ) + for _, collected := range descriptorMetrics { + // If the histogram wasn't collected we should still export it at the next sample time to avoid staleness + if reportingStartTime.After(collected.CollectionTime) { + // Ideally we could use monitoring.MetricDescriptorMetadata.SamplePeriod to determine how many + // samples were missed to adjust this but monitoring.MetricDescriptorMetadata is viewed as optional + // for a monitoring.MetricDescriptor + reportingLag := collected.CollectionTime.Sub(collected.ReportTime).Truncate(time.Minute) + collected.ReportTime = now.Add(-reportingLag) + } + if t.fillMissingLabels { + if _, exists := histograms[collected.FqName]; !exists { + histograms[collected.FqName] = []*HistogramMetric{} } + histograms[collected.FqName] = append(histograms[collected.FqName], collected) + } else { + t.ch <- t.newConstHistogram( + collected.FqName, + collected.ReportTime, + collected.LabelKeys, + collected.Mean, + collected.Count, + collected.Buckets, + collected.LabelValues, + ) } } @@ -326,21 +330,21 @@ func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime tim func fillConstMetricsLabels(metrics []*ConstMetric) []*ConstMetric { allKeys := make(map[string]struct{}) for _, metric := range metrics { - for _, key := range metric.labelKeys { + for _, key := range metric.LabelKeys { allKeys[key] = struct{}{} } } for _, metric := range metrics { - if len(metric.labelKeys) != len(allKeys) { + if len(metric.LabelKeys) != len(allKeys) { metricKeys := make(map[string]struct{}) - for _, key := range metric.labelKeys { + for _, key := range metric.LabelKeys { metricKeys[key] = struct{}{} } for key := range allKeys { if _, ok := metricKeys[key]; !ok { - metric.labelKeys = append(metric.labelKeys, key) - metric.labelValues = append(metric.labelValues, "") + metric.LabelKeys = append(metric.LabelKeys, key) + metric.LabelValues = append(metric.LabelValues, "") } } } @@ -352,21 +356,21 @@ func fillConstMetricsLabels(metrics []*ConstMetric) []*ConstMetric { func fillHistogramMetricsLabels(metrics []*HistogramMetric) []*HistogramMetric { allKeys := make(map[string]struct{}) for _, metric := range metrics { - for _, key := range metric.labelKeys { + for _, key := range metric.LabelKeys { allKeys[key] = struct{}{} } } for _, metric := range metrics { - if len(metric.labelKeys) != len(allKeys) { + if len(metric.LabelKeys) != len(allKeys) { metricKeys := make(map[string]struct{}) - for _, key := range metric.labelKeys { + for _, key := range metric.LabelKeys { metricKeys[key] = struct{}{} } for key := range allKeys { if _, ok := metricKeys[key]; !ok { - metric.labelKeys = append(metric.labelKeys, key) - metric.labelValues = append(metric.labelValues, "") + metric.LabelKeys = append(metric.LabelKeys, key) + metric.LabelValues = append(metric.LabelValues, "") } } } diff --git a/delta/counter.go b/delta/counter.go new file mode 100644 index 00000000..068b7f5c --- /dev/null +++ b/delta/counter.go @@ -0,0 +1,132 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/hash" +) + +type MetricEntry struct { + Collected map[uint64]*collectors.ConstMetric + mutex *sync.RWMutex +} + +type InMemoryCounterStore struct { + store *sync.Map + ttl time.Duration + logger log.Logger +} + +// NewInMemoryCounterStore returns an implementation of CounterStore which is persisted in-memory +func NewInMemoryCounterStore(logger log.Logger, ttl time.Duration) *InMemoryCounterStore { + store := &InMemoryCounterStore{ + store: &sync.Map{}, + logger: logger, + ttl: ttl, + } + + return store +} + +func (s *InMemoryCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *collectors.ConstMetric) { + if currentValue == nil { + return + } + + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &MetricEntry{ + Collected: map[uint64]*collectors.ConstMetric{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*MetricEntry) + + key := toCounterKey(currentValue) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.Collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.FqName, "key", key, "current_value", currentValue.Value, "incoming_time", currentValue.ReportTime) + entry.Collected[key] = currentValue + return + } + + if existing.ReportTime.Before(currentValue.ReportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.FqName, "key", key, "current_value", existing.Value, "adding", currentValue.Value, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) + currentValue.Value = currentValue.Value + existing.Value + entry.Collected[key] = currentValue + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for counter", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) +} + +func toCounterKey(c *collectors.ConstMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, c.LabelKeys...) + for i := range c.LabelKeys { + labels[c.LabelKeys[i]] = c.LabelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", c.FqName, strings.Join(keyParts, "|")) + h := hash.New() + h = hash.Add(h, hashText) + + return h +} + +func (s *InMemoryCounterStore) ListMetrics(metricDescriptorName string) []*collectors.ConstMetric { + var output []*collectors.ConstMetric + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { + return output + } + entry := tmp.(*MetricEntry) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.Collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.CollectionTime) { + level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.FqName) + delete(entry.Collected, key) + continue + } + + //Dereference to create shallow copy + metricCopy := *collected + output = append(output, &metricCopy) + } + + return output +} diff --git a/delta/counter_test.go b/delta/counter_test.go new file mode 100644 index 00000000..31fceceb --- /dev/null +++ b/delta/counter_test.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta_test + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/prometheus/common/promlog" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/delta" +) + +var _ = Describe("Counter", func() { + var store *delta.InMemoryCounterStore + var metric *collectors.ConstMetric + descriptor := &monitoring.MetricDescriptor{Name: "This is a metric"} + + BeforeEach(func() { + store = delta.NewInMemoryCounterStore(promlog.New(&promlog.Config{}), time.Minute) + metric = &collectors.ConstMetric{ + FqName: "counter_name", + LabelKeys: []string{"labelKey"}, + ValueType: 1, + Value: 10, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 4321, + } + }) + + It("can return tracked counters", func() { + store.Increment(descriptor, metric) + metrics := store.ListMetrics(descriptor.Name) + + Expect(len(metrics)).To(Equal(1)) + Expect(metrics[0]).To(Equal(metric)) + }) + + It("can increment counters multiple times", func() { + store.Increment(descriptor, metric) + + metric2 := &collectors.ConstMetric{ + FqName: "counter_name", + LabelKeys: []string{"labelKey"}, + ValueType: 1, + Value: 20, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second).Add(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 4321, + } + + store.Increment(descriptor, metric2) + + metrics := store.ListMetrics(descriptor.Name) + Expect(len(metrics)).To(Equal(1)) + Expect(metrics[0].Value).To(Equal(float64(30))) + }) + + It("will remove counters outside of TTL", func() { + metric.CollectionTime = metric.CollectionTime.Add(-time.Hour) + + store.Increment(descriptor, metric) + + metrics := store.ListMetrics(descriptor.Name) + Expect(len(metrics)).To(Equal(0)) + }) +}) diff --git a/delta/delta_suite_test.go b/delta/delta_suite_test.go new file mode 100644 index 00000000..f0e625f2 --- /dev/null +++ b/delta/delta_suite_test.go @@ -0,0 +1,26 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestDelta(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Delta Suite") +} diff --git a/delta/histogram.go b/delta/histogram.go new file mode 100644 index 00000000..9672ebcc --- /dev/null +++ b/delta/histogram.go @@ -0,0 +1,151 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/hash" +) + +type HistogramEntry struct { + Collected map[uint64]*collectors.HistogramMetric + mutex *sync.RWMutex +} + +type InMemoryHistogramStore struct { + store *sync.Map + ttl time.Duration + logger log.Logger +} + +// NewInMemoryHistogramStore returns an implementation of HistogramStore which is persisted in-memory +func NewInMemoryHistogramStore(logger log.Logger, ttl time.Duration) *InMemoryHistogramStore { + store := &InMemoryHistogramStore{ + store: &sync.Map{}, + logger: logger, + ttl: ttl, + } + + return store +} + +func (s *InMemoryHistogramStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *collectors.HistogramMetric) { + if currentValue == nil { + return + } + + tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &HistogramEntry{ + Collected: map[uint64]*collectors.HistogramMetric{}, + mutex: &sync.RWMutex{}, + }) + entry := tmp.(*HistogramEntry) + + key := toHistogramKey(currentValue) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.Collected[key] + + if existing == nil { + level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.FqName, "key", key, "incoming_time", currentValue.ReportTime) + entry.Collected[key] = currentValue + return + } + + if existing.ReportTime.Before(currentValue.ReportTime) { + level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) + entry.Collected[key] = mergeHistograms(existing, currentValue) + return + } + + level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) +} + +func toHistogramKey(hist *collectors.HistogramMetric) uint64 { + labels := make(map[string]string) + keysCopy := append([]string{}, hist.LabelKeys...) + for i := range hist.LabelKeys { + labels[hist.LabelKeys[i]] = hist.LabelValues[i] + } + sort.Strings(keysCopy) + + var keyParts []string + for _, k := range keysCopy { + keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k])) + } + hashText := fmt.Sprintf("%s|%s", hist.FqName, strings.Join(keyParts, "|")) + h := hash.New() + h = hash.Add(h, hashText) + + return h +} + +func mergeHistograms(existing *collectors.HistogramMetric, current *collectors.HistogramMetric) *collectors.HistogramMetric { + for key, value := range existing.Buckets { + current.Buckets[key] += value + } + + // Calculate a new mean and overall count + mean := existing.Mean + mean += current.Mean + mean /= 2 + + var count uint64 + for _, v := range current.Buckets { + count += v + } + + current.Mean = mean + current.Count = count + + return current +} + +func (s *InMemoryHistogramStore) ListMetrics(metricDescriptorName string) []*collectors.HistogramMetric { + var output []*collectors.HistogramMetric + now := time.Now() + ttlWindowStart := now.Add(-s.ttl) + + tmp, exists := s.store.Load(metricDescriptorName) + if !exists { + return output + } + entry := tmp.(*HistogramEntry) + + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.Collected { + //Scan and remove metrics which are outside the TTL + if ttlWindowStart.After(collected.CollectionTime) { + level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.FqName) + delete(entry.Collected, key) + continue + } + + copy := *collected + output = append(output, ©) + } + + return output +} diff --git a/delta/histogram_test.go b/delta/histogram_test.go new file mode 100644 index 00000000..ac0e27f4 --- /dev/null +++ b/delta/histogram_test.go @@ -0,0 +1,64 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package delta_test + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/prometheus/common/promlog" + "google.golang.org/api/monitoring/v3" + + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/delta" +) + +var _ = Describe("HistogramStore", func() { + var store *delta.InMemoryHistogramStore + var histogram *collectors.HistogramMetric + descriptor := &monitoring.MetricDescriptor{Name: "This is a metric"} + + BeforeEach(func() { + store = delta.NewInMemoryHistogramStore(promlog.New(&promlog.Config{}), time.Minute) + histogram = &collectors.HistogramMetric{ + FqName: "histogram_name", + LabelKeys: []string{"labelKey"}, + Mean: 10, + Count: 100, + Buckets: map[float64]uint64{1.00000000000000000001: 1000}, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 8765, + } + }) + + It("can return tracked histograms", func() { + store.Increment(descriptor, histogram) + metrics := store.ListMetrics(descriptor.Name) + + Expect(len(metrics)).To(Equal(1)) + Expect(metrics[0]).To(Equal(histogram)) + }) + + It("will remove histograms outside of TTL", func() { + histogram.CollectionTime = histogram.CollectionTime.Add(-time.Hour) + + store.Increment(descriptor, histogram) + + metrics := store.ListMetrics(descriptor.Name) + Expect(len(metrics)).To(Equal(0)) + }) +}) diff --git a/collectors/fnv.go b/hash/fnv.go similarity index 72% rename from collectors/fnv.go rename to hash/fnv.go index 8891c287..07648036 100644 --- a/collectors/fnv.go +++ b/hash/fnv.go @@ -11,9 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -package collectors +package hash -const separatorByte = 255 +const SeparatorByte = 255 // https://github.com/prometheus/client_golang/blob/master/prometheus/fnv.go // Inline and byte-free variant of hash/fnv's fnv64a. @@ -23,13 +23,13 @@ const ( prime64 = 1099511628211 ) -// hashNew initializies a new fnv64a hash value. -func hashNew() uint64 { +// New initializies a new fnv64a hash value. +func New() uint64 { return offset64 } -// hashAdd adds a string to a fnv64a hash value, returning the updated hash. -func hashAdd(h uint64, s string) uint64 { +// Add adds a string to a fnv64a hash value, returning the updated hash. +func Add(h uint64, s string) uint64 { for i := 0; i < len(s); i++ { h ^= uint64(s[i]) h *= prime64 @@ -37,8 +37,8 @@ func hashAdd(h uint64, s string) uint64 { return h } -// hashAddByte adds a byte to a fnv64a hash value, returning the updated hash. -func hashAddByte(h uint64, b byte) uint64 { +// AddByte adds a byte to a fnv64a hash value, returning the updated hash. +func AddByte(h uint64, b byte) uint64 { h ^= uint64(b) h *= prime64 return h diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 65fe6319..3cebe08d 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -37,6 +37,7 @@ import ( "google.golang.org/api/option" "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/delta" "github.com/prometheus-community/stackdriver_exporter/utils" ) @@ -220,7 +221,7 @@ func (h *handler) innerHandler(filters map[string]bool) http.Handler { AggregateDeltas: *monitoringMetricsAggregateDeltas, DescriptorCacheTTL: *monitoringDescriptorCacheTTL, DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle, - }, h.logger, collectors.NewInMemoryDeltaCounterStore(h.logger, *monitoringMetricsDeltasTTL), collectors.NewInMemoryDeltaDistributionStore(h.logger, *monitoringMetricsDeltasTTL)) + }, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL)) if err != nil { level.Error(h.logger).Log("err", err) os.Exit(1)