Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix monitoring metrics for individual collectors #389

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions collectors/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,47 @@ func (d *descriptorCache) Store(prefix string, data []*monitoring.MetricDescript
defer d.lock.Unlock()
d.cache[prefix] = &entry
}

// collectorCache is a cache for MonitoringCollectors
type CollectorCache struct {
cache map[string]*collectorCacheEntry
lock sync.RWMutex
ttl time.Duration
}

// collectorCacheEntry is a cache entry for a MonitoringCollector
type collectorCacheEntry struct {
collector *MonitoringCollector
expiry time.Time
}

// NewCollectorCache returns a new CollectorCache with the given TTL
func NewCollectorCache(ttl time.Duration) *CollectorCache {
return &CollectorCache{
cache: make(map[string]*collectorCacheEntry),
ttl: ttl,
}
}
ananya-mallik-ps marked this conversation as resolved.
Show resolved Hide resolved

// Get returns a MonitoringCollector if the key is found and not expired
func (c *CollectorCache) Get(key string) (*MonitoringCollector, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

entry, ok := c.cache[key]
if !ok || time.Now().After(entry.expiry) {
return nil, false
}
ananya-mallik-ps marked this conversation as resolved.
Show resolved Hide resolved
return entry.collector, true
}

func (c *CollectorCache) Store(key string, collector *MonitoringCollector) {
entry := &collectorCacheEntry{
collector: collector,
expiry: time.Now().Add(c.ttl),
}

c.lock.Lock()
defer c.lock.Unlock()
c.cache[key] = entry
}
53 changes: 53 additions & 0 deletions collectors/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,56 @@ func TestDescriptorCache(t *testing.T) {
t.Error("cache entries should have expired")
}
}

func TestCollectorCache(t *testing.T) {
createCollector := func(id string) *MonitoringCollector {
return &MonitoringCollector{
projectID: id,
}
}

t.Run("basic cache Op", func(t *testing.T) {
ttl := 1 * time.Second
cache := NewCollectorCache(ttl)
collector := createCollector("test-project")
key := "test-key"

cache.Store(key, collector)

if _, found := cache.Get("test-key"); !found {
t.Error("Collector should be available in cache before TTL")
}

time.Sleep(2 * ttl)
if _, found := cache.Get("test-key"); found {
t.Error("Collector should have expired")
}
})

t.Run("multiple collectors", func(t *testing.T) {
ttl := 1 * time.Second
cache := NewCollectorCache(ttl)

collectors := map[string]*MonitoringCollector{
"test-key-1": createCollector("test-project-1"),
"test-key-2": createCollector("test-project-2"),
"test-key-3": createCollector("test-project-3"),
}

for k, v := range collectors {
cache.Store(k, v)
}

for k, original := range collectors {
cached, found := cache.Get(k)
if !found {
t.Errorf("Collector %s not found in cache", k)
continue
}

if cached.projectID != original.projectID {
t.Errorf("Wrong collector for key %s. Got projectId %s, want %s", k, cached.projectID, original.projectID)
}
}
})
}
57 changes: 45 additions & 12 deletions stackdriver_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"slices"
"strings"
"time"

"github.com/PuerkitoBio/rehttp"
"github.com/alecthomas/kingpin/v2"
Expand Down Expand Up @@ -185,6 +186,7 @@ type handler struct {
metricsExtraFilters []collectors.MetricFilter
additionalGatherer prometheus.Gatherer
m *monitoring.Service
collectors *collectors.CollectorCache
}

func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -203,35 +205,66 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, m *monitoring.Service, logger *slog.Logger, additionalGatherer prometheus.Gatherer) *handler {
ttl := 2 * time.Hour
// Add collector caching TTL as max of deltas aggregation or descriptor caching
if *monitoringMetricsAggregateDeltas || *monitoringDescriptorCacheTTL > 0 {
featureTTL := *monitoringMetricsDeltasTTL
if *monitoringDescriptorCacheTTL > featureTTL {
featureTTL = *monitoringDescriptorCacheTTL
}
if featureTTL > ttl {
ttl = featureTTL
}
}

logger.Info("Creating collector cache", "ttl", ttl)

h := &handler{
logger: logger,
projectIDs: projectIDs,
metricsPrefixes: metricPrefixes,
metricsExtraFilters: metricExtraFilters,
additionalGatherer: additionalGatherer,
m: m,
collectors: collectors.NewCollectorCache(ttl),
}

h.handler = h.innerHandler(nil)
return h
}

func (h *handler) getCollector(project string, filters map[string]bool) (*collectors.MonitoringCollector, error) {
filterdPrefixes := h.filterMetricTypePrefixes(filters)
collectorKey := fmt.Sprintf("%s-%v", project, filterdPrefixes)

if collector, found := h.collectors.Get(collectorKey); found {
return collector, nil
}

collector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{
MetricTypePrefixes: filterdPrefixes,
ExtraFilters: h.metricsExtraFilters,
RequestInterval: *monitoringMetricsInterval,
RequestOffset: *monitoringMetricsOffset,
IngestDelay: *monitoringMetricsIngestDelay,
FillMissingLabels: *collectorFillMissingLabels,
DropDelegatedProjects: *monitoringDropDelegatedProjects,
AggregateDeltas: *monitoringMetricsAggregateDeltas,
DescriptorCacheTTL: *monitoringDescriptorCacheTTL,
DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle,
}, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL))
if err != nil {
return nil, err
}
h.collectors.Store(collectorKey, collector)
return collector, nil
}

func (h *handler) innerHandler(filters map[string]bool) http.Handler {
registry := prometheus.NewRegistry()

for _, project := range h.projectIDs {
monitoringCollector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{
MetricTypePrefixes: h.filterMetricTypePrefixes(filters),
ExtraFilters: h.metricsExtraFilters,
RequestInterval: *monitoringMetricsInterval,
RequestOffset: *monitoringMetricsOffset,
IngestDelay: *monitoringMetricsIngestDelay,
FillMissingLabels: *collectorFillMissingLabels,
DropDelegatedProjects: *monitoringDropDelegatedProjects,
AggregateDeltas: *monitoringMetricsAggregateDeltas,
DescriptorCacheTTL: *monitoringDescriptorCacheTTL,
DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle,
}, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL))
monitoringCollector, err := h.getCollector(project, filters)
if err != nil {
h.logger.Error("error creating monitoring collector", "err", err)
os.Exit(1)
Expand Down
Loading