Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix monitoring metrics for individual collectors #389

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions collectors/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,79 @@ func (d *descriptorCache) Store(prefix string, data []*monitoring.MetricDescript
defer d.lock.Unlock()
d.cache[prefix] = &entry
}

// collectorCache is a cache for MonitoringCollectors
type CollectorCache struct {
cache map[string]*collectorCacheEntry
lock sync.RWMutex
ttl time.Duration
}

// collectorCacheEntry is a cache entry for a MonitoringCollector
type collectorCacheEntry struct {
collector *MonitoringCollector
expiry time.Time
}

// NewCollectorCache returns a new CollectorCache with the given TTL
func NewCollectorCache(ttl time.Duration) *CollectorCache {
c := &CollectorCache{
cache: make(map[string]*collectorCacheEntry),
ttl: ttl,
}

go c.cleanup()
return c
}

// Get returns a MonitoringCollector if the key is found and not expired
// If key is found it resets the TTL for the collector
func (c *CollectorCache) Get(key string) (*MonitoringCollector, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

entry, ok := c.cache[key]

if !ok {
return nil, false
}

if time.Now().After(entry.expiry) {
delete(c.cache, key)
return nil, false
}

entry.expiry = time.Now().Add(c.ttl)
return entry.collector, true
}

func (c *CollectorCache) Store(key string, collector *MonitoringCollector) {
entry := &collectorCacheEntry{
collector: collector,
expiry: time.Now().Add(c.ttl),
}

c.lock.Lock()
defer c.lock.Unlock()
c.cache[key] = entry
}

func (c *CollectorCache) cleanup() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
c.removeExpired()
}
}

func (c *CollectorCache) removeExpired() {
c.lock.Lock()
defer c.lock.Unlock()

now := time.Now()
for key, entry := range c.cache {
if now.After(entry.expiry) {
delete(c.cache, key)
}
}
}
53 changes: 53 additions & 0 deletions collectors/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,56 @@ func TestDescriptorCache(t *testing.T) {
t.Error("cache entries should have expired")
}
}

func TestCollectorCache(t *testing.T) {
createCollector := func(id string) *MonitoringCollector {
return &MonitoringCollector{
projectID: id,
}
}

t.Run("basic cache Op", func(t *testing.T) {
ttl := 1 * time.Second
cache := NewCollectorCache(ttl)
collector := createCollector("test-project")
key := "test-key"

cache.Store(key, collector)

if _, found := cache.Get("test-key"); !found {
t.Error("Collector should be available in cache before TTL")
}

time.Sleep(2 * ttl)
if _, found := cache.Get("test-key"); found {
t.Error("Collector should have expired")
}
})

t.Run("multiple collectors", func(t *testing.T) {
ttl := 1 * time.Second
cache := NewCollectorCache(ttl)

collectors := map[string]*MonitoringCollector{
"test-key-1": createCollector("test-project-1"),
"test-key-2": createCollector("test-project-2"),
"test-key-3": createCollector("test-project-3"),
}

for k, v := range collectors {
cache.Store(k, v)
}

for k, original := range collectors {
cached, found := cache.Get(k)
if !found {
t.Errorf("Collector %s not found in cache", k)
continue
}

if cached.projectID != original.projectID {
t.Errorf("Wrong collector for key %s. Got projectId %s, want %s", k, cached.projectID, original.projectID)
}
}
})
}
56 changes: 44 additions & 12 deletions stackdriver_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"slices"
"strings"
"time"

"github.com/PuerkitoBio/rehttp"
"github.com/alecthomas/kingpin/v2"
Expand Down Expand Up @@ -185,6 +186,7 @@ type handler struct {
metricsExtraFilters []collectors.MetricFilter
additionalGatherer prometheus.Gatherer
m *monitoring.Service
collectors *collectors.CollectorCache
}

func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -203,35 +205,65 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, m *monitoring.Service, logger *slog.Logger, additionalGatherer prometheus.Gatherer) *handler {
var ttl time.Duration
// Add collector caching TTL as max of deltas aggregation or descriptor caching
if *monitoringMetricsAggregateDeltas || *monitoringDescriptorCacheTTL > 0 {
ttl = *monitoringMetricsDeltasTTL
if *monitoringDescriptorCacheTTL > ttl {
ttl = *monitoringDescriptorCacheTTL
}
} else {
ttl = 2 * time.Hour
}

logger.Info("Creating collector cache", "ttl", ttl)

h := &handler{
logger: logger,
projectIDs: projectIDs,
metricsPrefixes: metricPrefixes,
metricsExtraFilters: metricExtraFilters,
additionalGatherer: additionalGatherer,
m: m,
collectors: collectors.NewCollectorCache(ttl),
}

h.handler = h.innerHandler(nil)
return h
}

func (h *handler) getCollector(project string, filters map[string]bool) (*collectors.MonitoringCollector, error) {
filterdPrefixes := h.filterMetricTypePrefixes(filters)
collectorKey := fmt.Sprintf("%s-%v", project, filterdPrefixes)

if collector, found := h.collectors.Get(collectorKey); found {
return collector, nil
}

collector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{
MetricTypePrefixes: filterdPrefixes,
ExtraFilters: h.metricsExtraFilters,
RequestInterval: *monitoringMetricsInterval,
RequestOffset: *monitoringMetricsOffset,
IngestDelay: *monitoringMetricsIngestDelay,
FillMissingLabels: *collectorFillMissingLabels,
DropDelegatedProjects: *monitoringDropDelegatedProjects,
AggregateDeltas: *monitoringMetricsAggregateDeltas,
DescriptorCacheTTL: *monitoringDescriptorCacheTTL,
DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle,
}, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL))
if err != nil {
return nil, err
}
h.collectors.Store(collectorKey, collector)
return collector, nil
}

func (h *handler) innerHandler(filters map[string]bool) http.Handler {
registry := prometheus.NewRegistry()

for _, project := range h.projectIDs {
monitoringCollector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{
MetricTypePrefixes: h.filterMetricTypePrefixes(filters),
ExtraFilters: h.metricsExtraFilters,
RequestInterval: *monitoringMetricsInterval,
RequestOffset: *monitoringMetricsOffset,
IngestDelay: *monitoringMetricsIngestDelay,
FillMissingLabels: *collectorFillMissingLabels,
DropDelegatedProjects: *monitoringDropDelegatedProjects,
AggregateDeltas: *monitoringMetricsAggregateDeltas,
DescriptorCacheTTL: *monitoringDescriptorCacheTTL,
DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle,
}, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL))
monitoringCollector, err := h.getCollector(project, filters)
if err != nil {
h.logger.Error("error creating monitoring collector", "err", err)
os.Exit(1)
Expand Down
Loading