Skip to content

Commit

Permalink
receiver/prometheusreceiver: add option to fallback to collector star…
Browse files Browse the repository at this point in the history
…ttime

This change adds an option to the metric adjuster to use an
approximation of the collector starttime as a fallback for the start
time of scraped cumulative metrics. This is useful when no start time is
found and when the collector starts up alongside its targets (like in
serverless environments or sidecar approaches).

Signed-off-by: Ridwan Sharif <[email protected]>
  • Loading branch information
ridwanmsharif committed Nov 14, 2024
1 parent 6db5d1a commit 05b85e8
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 26 deletions.
27 changes: 27 additions & 0 deletions .chloggen/starttime-fallback.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `UseCollectorStartTimeFallback` option for the start time metric adjuster to use the collector start time as an approximation of process start time as a fallback.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36364]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ The prometheus receiver also supports additional top-level options:
- **trim_metric_suffixes**: [**Experimental**] When set to true, this enables trimming unit and some counter type suffixes from metric names. For example, it would cause `singing_duration_seconds_total` to be trimmed to `singing_duration`. This can be useful when trying to restore the original metric names used in OpenTelemetry instrumentation. Defaults to false.
- **use_start_time_metric**: When set to true, this enables retrieving the start time of all counter metrics from the process_start_time_seconds metric. This is only correct if all counters on that endpoint started after the process start time, and the process is the only actor exporting the metric after the process started. It should not be used in "exporters" which export counters that may have started before the process itself. Use only if you know what you are doing, as this may result in incorrect rate calculations. Defaults to false.
- **start_time_metric_regex**: The regular expression for the start time metric, and is only applied when use_start_time_metric is enabled. Defaults to process_start_time_seconds.

- **use_collector_start_time_fallback**: When set to true, this option enables using the collector start time as the metric start time if the process_start_time_seconds metric yields no result (for example if targets expose no process_start_time_seconds metric). This is useful when the collector start time is a good approximation of the process start time - for example in serverless workloads when the collector is deployed as a sidecar. This is only applied when use_start_time_metric is enabled. Defaults to false.
For example,

```yaml
Expand Down
38 changes: 30 additions & 8 deletions receiver/prometheusreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,43 @@ import (
type Config struct {
PrometheusConfig *PromConfig `mapstructure:"config"`
TrimMetricSuffixes bool `mapstructure:"trim_metric_suffixes"`
// UseStartTimeMetric enables retrieving the start time of all counter metrics
// from the process_start_time_seconds metric. This is only correct if all counters on that endpoint
// started after the process start time, and the process is the only actor exporting the metric after
// the process started. It should not be used in "exporters" which export counters that may have
// started before the process itself. Use only if you know what you are doing, as this may result
// in incorrect rate calculations.
UseStartTimeMetric bool `mapstructure:"use_start_time_metric"`
StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"`

// Settings for adjusting metrics. Will default to using an InitialPointAdjuster
// which will use the first scraped point to define the start time for the timeseries.
AdjustOpts MetricAdjusterOpts `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// ReportExtraScrapeMetrics - enables reporting of additional metrics for Prometheus client like scrape_body_size_bytes
ReportExtraScrapeMetrics bool `mapstructure:"report_extra_scrape_metrics"`

TargetAllocator *targetallocator.Config `mapstructure:"target_allocator"`
}

type MetricAdjusterOpts struct {
// UseStartTimeMetric enables retrieving the start time of all counter
// metrics from the process_start_time_seconds metric. This is only correct
// if all counters on that endpoint started after the process start time,
// and the process is the only actor exporting the metric after the process
// started. It should not be used in "exporters" which export counters that
// may have started before the process itself. Use only if you know what you
// are doing, as this may result in incorrect rate calculations.
UseStartTimeMetric bool `mapstructure:"use_start_time_metric"`
StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"`

// UseCollectorStartTimeFallback enables using a fallback start time if a
// start time is otherwise unavailable when adjusting metrics. This would
// happen if the UseStartTimeMetric is used but the application doesn't emit
// a process_start_time_seconds metric or a metric that matches the
// StartTimeMetricRegex provided.
//
// If enabled, the fallback start time used for adjusted metrics is an
// approximation of the collector start time.
//
// This option should be used when the collector start time is a good
// approximation of the process start time - for example in serverless
// workloads when the collector is deployed as a sidecar.
UseCollectorStartTimeFallback bool `mapstructure:"use_collector_start_time_fallback"`
}

// Validate checks the receiver configuration is valid.
func (cfg *Config) Validate() error {
if !containsScrapeConfig(cfg) && cfg.TargetAllocator == nil {
Expand Down
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestLoadConfig(t *testing.T) {
r1 := cfg.(*Config)
assert.Equal(t, "demo", r1.PrometheusConfig.ScrapeConfigs[0].JobName)
assert.Equal(t, 5*time.Second, time.Duration(r1.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval))
assert.True(t, r1.UseStartTimeMetric)
assert.True(t, r1.AdjustOpts.UseStartTimeMetric)
assert.True(t, r1.TrimMetricSuffixes)
assert.Equal(t, "^(.+_)*process_start_time_seconds$", r1.StartTimeMetricRegex)
assert.Equal(t, "^(.+_)*process_start_time_seconds$", r1.AdjustOpts.StartTimeMetricRegex)
assert.True(t, r1.ReportExtraScrapeMetrics)

assert.Equal(t, "http://my-targetallocator-service", r1.TargetAllocator.Endpoint)
Expand Down
3 changes: 2 additions & 1 deletion receiver/prometheusreceiver/internal/appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NewAppendable(
useStartTimeMetric bool,
startTimeMetricRegex *regexp.Regexp,
useCreatedMetric bool,
useCollectorStartTimeFallback bool,
enableNativeHistograms bool,
externalLabels labels.Labels,
trimSuffixes bool,
Expand All @@ -45,7 +46,7 @@ func NewAppendable(
if !useStartTimeMetric {
metricAdjuster = NewInitialPointAdjuster(set.Logger, gcInterval, useCreatedMetric)
} else {
metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex)
metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex, useCollectorStartTimeFallback)
}

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverID: set.ID, Transport: transport, ReceiverCreateSettings: set})
Expand Down
16 changes: 14 additions & 2 deletions receiver/prometheusreceiver/internal/starttimemetricadjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"errors"
"regexp"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
Expand All @@ -19,21 +20,32 @@ var (

type startTimeMetricAdjuster struct {
startTimeMetricRegex *regexp.Regexp
fallbackStartTime *time.Time
logger *zap.Logger
}

// NewStartTimeMetricAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on a start time metric.
func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp) MetricsAdjuster {
func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp, useCollectorStartTimeFallback bool) MetricsAdjuster {
var fallbackStartTime *time.Time
if useCollectorStartTimeFallback {
now := time.Now()
fallbackStartTime = &now
}
return &startTimeMetricAdjuster{
startTimeMetricRegex: startTimeMetricRegex,
fallbackStartTime: fallbackStartTime,
logger: logger,
}
}

func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
startTime, err := stma.getStartTime(metrics)
if err != nil {
return err
if stma.fallbackStartTime == nil {
return err
}
stma.logger.Warn("Couldn't get start time for metrics. Using fallback start time.", zap.Error(err))
startTime = float64(stma.fallbackStartTime.Unix())
}

startTimeTs := timestampFromFloat64(startTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package internal
import (
"regexp"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestStartTimeMetricMatch(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex)
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, false)
if tt.expectedErr != nil {
assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr)
return
Expand Down Expand Up @@ -154,3 +155,100 @@ func TestStartTimeMetricMatch(t *testing.T) {
})
}
}

func TestStartTimeMetricFallback(t *testing.T) {
const startTime = pcommon.Timestamp(123 * 1e9)
const currentTime = pcommon.Timestamp(126 * 1e9)
mockStartTime := time.Now().Add(-10 * time.Hour)
mockStartTimeSeconds := float64(mockStartTime.Unix())
processStartTime := mockStartTime.Add(-10 * time.Hour)
processStartTimeSeconds := float64(processStartTime.Unix())

tests := []struct {
name string
inputs pmetric.Metrics
startTimeMetricRegex *regexp.Regexp
expectedStartTime pcommon.Timestamp
expectedErr error
}{
{
name: "regexp_match_sum_metric_no_fallback",
inputs: metrics(
sumMetric("test_sum_metric", doublePoint(nil, startTime, currentTime, 16)),
histogramMetric("test_histogram_metric", histogramPoint(nil, startTime, currentTime, []float64{1, 2}, []uint64{2, 3, 4})),
summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})),
sumMetric("example_process_start_time_seconds", doublePoint(nil, startTime, currentTime, processStartTimeSeconds)),
sumMetric("process_start_time_seconds", doublePoint(nil, startTime, currentTime, processStartTimeSeconds)),
exponentialHistogramMetric("test_exponential_histogram_metric", exponentialHistogramPointSimplified(nil, startTime, currentTime, 3, 1, -5, 3)),
),
startTimeMetricRegex: regexp.MustCompile("^.*_process_start_time_seconds$"),
expectedStartTime: timestampFromFloat64(processStartTimeSeconds),
},
{
name: "regexp_match_sum_metric_fallback",
inputs: metrics(
sumMetric("test_sum_metric", doublePoint(nil, startTime, currentTime, 16)),
histogramMetric("test_histogram_metric", histogramPoint(nil, startTime, currentTime, []float64{1, 2}, []uint64{2, 3, 4})),
summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})),
),
startTimeMetricRegex: regexp.MustCompile("^.*_process_start_time_seconds$"),
expectedStartTime: timestampFromFloat64(mockStartTimeSeconds),
},
{
name: "match_default_sum_start_time_metric_fallback",
inputs: metrics(
sumMetric("test_sum_metric", doublePoint(nil, startTime, currentTime, 16)),
histogramMetric("test_histogram_metric", histogramPoint(nil, startTime, currentTime, []float64{1, 2}, []uint64{2, 3, 4})),
summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})),
),
expectedStartTime: timestampFromFloat64(mockStartTimeSeconds),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, true)
if tt.expectedErr != nil {
assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr)
return
}

// Make sure the right adjuster is used and one that has the fallback time set.
metricAdjuster, ok := stma.(*startTimeMetricAdjuster)
assert.True(t, ok)
assert.NotNil(t, metricAdjuster.fallbackStartTime)

// To test that the adjuster is using the fallback correctly, override the fallback time to use
// directly.
metricAdjuster.fallbackStartTime = &mockStartTime

assert.NoError(t, stma.AdjustMetrics(tt.inputs))
for i := 0; i < tt.inputs.ResourceMetrics().Len(); i++ {
rm := tt.inputs.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
ilm := rm.ScopeMetrics().At(j)
for k := 0; k < ilm.Metrics().Len(); k++ {
metric := ilm.Metrics().At(k)
switch metric.Type() {
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp())
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp())
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp())
}
}
}
}
}
})
}
}
7 changes: 4 additions & 3 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Log
}()

var startTimeMetricRegex *regexp.Regexp
if r.cfg.StartTimeMetricRegex != "" {
startTimeMetricRegex, err = regexp.Compile(r.cfg.StartTimeMetricRegex)
if r.cfg.AdjustOpts.StartTimeMetricRegex != "" {
startTimeMetricRegex, err = regexp.Compile(r.cfg.AdjustOpts.StartTimeMetricRegex)
if err != nil {
return err
}
Expand All @@ -134,9 +134,10 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Log
r.consumer,
r.settings,
gcInterval(r.cfg.PrometheusConfig),
r.cfg.UseStartTimeMetric,
r.cfg.AdjustOpts.UseStartTimeMetric,
startTimeMetricRegex,
useCreatedMetricGate.IsEnabled(),
r.cfg.AdjustOpts.UseCollectorStartTimeFallback,
enableNativeHistogramsGate.IsEnabled(),
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
r.cfg.TrimMetricSuffixes,
Expand Down
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/metrics_receiver_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,8 @@ func testComponent(t *testing.T, targets []*testData, alterConfig func(*Config),
defer mp.Close()

config := &Config{
PrometheusConfig: cfg,
StartTimeMetricRegex: "",
PrometheusConfig: cfg,
AdjustOpts: MetricAdjusterOpts{StartTimeMetricRegex: ""},
}
if alterConfig != nil {
alterConfig(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ func testScraperMetrics(t *testing.T, targets []*testData, reportExtraScrapeMetr

cms := new(consumertest.MetricsSink)
receiver := newPrometheusReceiver(receivertest.NewNopSettings(), &Config{
PrometheusConfig: cfg,
UseStartTimeMetric: false,
StartTimeMetricRegex: "",
PrometheusConfig: cfg,
AdjustOpts: MetricAdjusterOpts{
UseStartTimeMetric: false,
StartTimeMetricRegex: "",
},
ReportExtraScrapeMetrics: reportExtraScrapeMetrics,
}, cms)

Expand Down
6 changes: 3 additions & 3 deletions receiver/prometheusreceiver/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ func TestStartTimeMetric(t *testing.T) {
},
}
testComponent(t, targets, func(c *Config) {
c.UseStartTimeMetric = true
c.AdjustOpts.UseStartTimeMetric = true
})
}

Expand Down Expand Up @@ -1475,8 +1475,8 @@ func TestStartTimeMetricRegex(t *testing.T) {
},
}
testComponent(t, targets, func(c *Config) {
c.StartTimeMetricRegex = "^(.+_)*process_start_time_seconds$"
c.UseStartTimeMetric = true
c.AdjustOpts.StartTimeMetricRegex = "^(.+_)*process_start_time_seconds$"
c.AdjustOpts.UseStartTimeMetric = true
})
}

Expand Down

0 comments on commit 05b85e8

Please sign in to comment.