diff --git a/collectors/cache.go b/collectors/cache.go index f9e807d..1761fdc 100644 --- a/collectors/cache.go +++ b/collectors/cache.go @@ -72,3 +72,79 @@ func (d *descriptorCache) Store(prefix string, data []*monitoring.MetricDescript defer d.lock.Unlock() d.cache[prefix] = &entry } + +// collectorCache is a cache for MonitoringCollectors +type CollectorCache struct { + cache map[string]*collectorCacheEntry + lock sync.RWMutex + ttl time.Duration +} + +// collectorCacheEntry is a cache entry for a MonitoringCollector +type collectorCacheEntry struct { + collector *MonitoringCollector + expiry time.Time +} + +// NewCollectorCache returns a new CollectorCache with the given TTL +func NewCollectorCache(ttl time.Duration) *CollectorCache { + c := &CollectorCache{ + cache: make(map[string]*collectorCacheEntry), + ttl: ttl, + } + + go c.cleanup() + return c +} + +// Get returns a MonitoringCollector if the key is found and not expired +// If key is found it resets the TTL for the collector +func (c *CollectorCache) Get(key string) (*MonitoringCollector, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + + entry, ok := c.cache[key] + + if !ok { + return nil, false + } + + if time.Now().After(entry.expiry) { + delete(c.cache, key) + return nil, false + } + + entry.expiry = time.Now().Add(c.ttl) + return entry.collector, true +} + +func (c *CollectorCache) Store(key string, collector *MonitoringCollector) { + entry := &collectorCacheEntry{ + collector: collector, + expiry: time.Now().Add(c.ttl), + } + + c.lock.Lock() + defer c.lock.Unlock() + c.cache[key] = entry +} + +func (c *CollectorCache) cleanup() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for range ticker.C { + c.removeExpired() + } +} + +func (c *CollectorCache) removeExpired() { + c.lock.Lock() + defer c.lock.Unlock() + + now := time.Now() + for key, entry := range c.cache { + if now.After(entry.expiry) { + delete(c.cache, key) + } + } +} diff --git a/collectors/cache_test.go b/collectors/cache_test.go index 7164baf..3be2eec 100644 --- a/collectors/cache_test.go +++ b/collectors/cache_test.go @@ -74,3 +74,56 @@ func TestDescriptorCache(t *testing.T) { t.Error("cache entries should have expired") } } + +func TestCollectorCache(t *testing.T) { + createCollector := func(id string) *MonitoringCollector { + return &MonitoringCollector{ + projectID: id, + } + } + + t.Run("basic cache Op", func(t *testing.T) { + ttl := 1 * time.Second + cache := NewCollectorCache(ttl) + collector := createCollector("test-project") + key := "test-key" + + cache.Store(key, collector) + + if _, found := cache.Get("test-key"); !found { + t.Error("Collector should be available in cache before TTL") + } + + time.Sleep(2 * ttl) + if _, found := cache.Get("test-key"); found { + t.Error("Collector should have expired") + } + }) + + t.Run("multiple collectors", func(t *testing.T) { + ttl := 1 * time.Second + cache := NewCollectorCache(ttl) + + collectors := map[string]*MonitoringCollector{ + "test-key-1": createCollector("test-project-1"), + "test-key-2": createCollector("test-project-2"), + "test-key-3": createCollector("test-project-3"), + } + + for k, v := range collectors { + cache.Store(k, v) + } + + for k, original := range collectors { + cached, found := cache.Get(k) + if !found { + t.Errorf("Collector %s not found in cache", k) + continue + } + + if cached.projectID != original.projectID { + t.Errorf("Wrong collector for key %s. Got projectId %s, want %s", k, cached.projectID, original.projectID) + } + } + }) +} diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 4fc1c0a..b2ffcb3 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -20,6 +20,7 @@ import ( "os" "slices" "strings" + "time" "github.com/PuerkitoBio/rehttp" "github.com/alecthomas/kingpin/v2" @@ -185,6 +186,7 @@ type handler struct { metricsExtraFilters []collectors.MetricFilter additionalGatherer prometheus.Gatherer m *monitoring.Service + collectors *collectors.CollectorCache } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -203,6 +205,19 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, m *monitoring.Service, logger *slog.Logger, additionalGatherer prometheus.Gatherer) *handler { + var ttl time.Duration + // Add collector caching TTL as max of deltas aggregation or descriptor caching + if *monitoringMetricsAggregateDeltas || *monitoringDescriptorCacheTTL > 0 { + ttl = *monitoringMetricsDeltasTTL + if *monitoringDescriptorCacheTTL > ttl { + ttl = *monitoringDescriptorCacheTTL + } + } else { + ttl = 2 * time.Hour + } + + logger.Info("Creating collector cache", "ttl", ttl) + h := &handler{ logger: logger, projectIDs: projectIDs, @@ -210,28 +225,45 @@ func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters metricsExtraFilters: metricExtraFilters, additionalGatherer: additionalGatherer, m: m, + collectors: collectors.NewCollectorCache(ttl), } h.handler = h.innerHandler(nil) return h } +func (h *handler) getCollector(project string, filters map[string]bool) (*collectors.MonitoringCollector, error) { + filterdPrefixes := h.filterMetricTypePrefixes(filters) + collectorKey := fmt.Sprintf("%s-%v", project, filterdPrefixes) + + if collector, found := h.collectors.Get(collectorKey); found { + return collector, nil + } + + collector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{ + MetricTypePrefixes: filterdPrefixes, + ExtraFilters: h.metricsExtraFilters, + RequestInterval: *monitoringMetricsInterval, + RequestOffset: *monitoringMetricsOffset, + IngestDelay: *monitoringMetricsIngestDelay, + FillMissingLabels: *collectorFillMissingLabels, + DropDelegatedProjects: *monitoringDropDelegatedProjects, + AggregateDeltas: *monitoringMetricsAggregateDeltas, + DescriptorCacheTTL: *monitoringDescriptorCacheTTL, + DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle, + }, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL)) + if err != nil { + return nil, err + } + h.collectors.Store(collectorKey, collector) + return collector, nil +} + func (h *handler) innerHandler(filters map[string]bool) http.Handler { registry := prometheus.NewRegistry() for _, project := range h.projectIDs { - monitoringCollector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{ - MetricTypePrefixes: h.filterMetricTypePrefixes(filters), - ExtraFilters: h.metricsExtraFilters, - RequestInterval: *monitoringMetricsInterval, - RequestOffset: *monitoringMetricsOffset, - IngestDelay: *monitoringMetricsIngestDelay, - FillMissingLabels: *collectorFillMissingLabels, - DropDelegatedProjects: *monitoringDropDelegatedProjects, - AggregateDeltas: *monitoringMetricsAggregateDeltas, - DescriptorCacheTTL: *monitoringDescriptorCacheTTL, - DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle, - }, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL)) + monitoringCollector, err := h.getCollector(project, filters) if err != nil { h.logger.Error("error creating monitoring collector", "err", err) os.Exit(1)