Skip to content

Commit

Permalink
Merge pull request prometheus#14200 from aknuds1/feat/promote-attributes
Browse files Browse the repository at this point in the history
OTLP Translator prometheusremotewrite: Support resource attribute promotion
  • Loading branch information
gouthamve authored Jul 21, 2024
2 parents a60e5ce + e2ef0dc commit 1fa9ba8
Show file tree
Hide file tree
Showing 16 changed files with 290 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## unreleased

* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200
* [FEATURE] Remote-Write: Add sender and receiver support for [Remote Write 2.0-rc.2](https://prometheus.io/docs/specs/remote_write_spec_2_0/) specification #14395 #14427 #14444
* [ENHANCEMENT] Remote-Write: 1.x messages against Remote Write 2.x Receivers will have now correct values for `prometheus_storage_<samples|histograms|exemplar>_failed_total` in case of partial errors #14444

Expand Down
36 changes: 36 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ var (
DefaultExemplarsConfig = ExemplarsConfig{
MaxExemplars: 100000,
}

// DefaultOTLPConfig is the default OTLP configuration.
DefaultOTLPConfig = OTLPConfig{}
)

// Config is the top-level configuration for Prometheus's config files.
Expand All @@ -242,6 +245,7 @@ type Config struct {

RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
OTLPConfig OTLPConfig `yaml:"otlp,omitempty"`
}

// SetDirectory joins any relative file paths with dir.
Expand Down Expand Up @@ -1304,3 +1308,35 @@ func getGoGCEnv() int {
}
return DefaultRuntimeConfig.GoGC
}

// OTLPConfig is the configuration for writing to the OTLP endpoint.
type OTLPConfig struct {
PromoteResourceAttributes []string `yaml:"promote_resource_attributes,omitempty"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultOTLPConfig
type plain OTLPConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}

seen := map[string]struct{}{}
var err error
for i, attr := range c.PromoteResourceAttributes {
attr = strings.TrimSpace(attr)
if attr == "" {
err = errors.Join(err, fmt.Errorf("empty promoted OTel resource attribute"))
continue
}
if _, exists := seen[attr]; exists {
err = errors.Join(err, fmt.Errorf("duplicated promoted OTel resource attribute %q", attr))
continue
}

seen[attr] = struct{}{}
c.PromoteResourceAttributes[i] = attr
}
return err
}
26 changes: 26 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ var expectedConf = &Config{
},
},

OTLPConfig: OTLPConfig{
PromoteResourceAttributes: []string{
"k8s.cluster.name", "k8s.job.name", "k8s.namespace.name",
},
},

RemoteReadConfigs: []*RemoteReadConfig{
{
URL: mustParseURL("http://remote1/read"),
Expand Down Expand Up @@ -1471,6 +1477,26 @@ func TestRemoteWriteRetryOnRateLimit(t *testing.T) {
require.False(t, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit)
}

func TestOTLPSanitizeResourceAttributes(t *testing.T) {
t.Run("good config", func(t *testing.T) {
want, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.good.yml"), false, false, log.NewNopLogger())
require.NoError(t, err)

out, err := yaml.Marshal(want)
require.NoError(t, err)
var got Config
require.NoError(t, yaml.UnmarshalStrict(out, &got))

require.Equal(t, []string{"k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"}, got.OTLPConfig.PromoteResourceAttributes)
})

t.Run("bad config", func(t *testing.T) {
_, err := LoadFile(filepath.Join("testdata", "otlp_sanitize_resource_attributes.bad.yml"), false, false, log.NewNopLogger())
require.ErrorContains(t, err, `duplicated promoted OTel resource attribute "k8s.job.name"`)
require.ErrorContains(t, err, `empty promoted OTel resource attribute`)
})
}

func TestLoadConfig(t *testing.T) {
// Parse a valid file that sets a global scrape timeout. This tests whether parsing
// an overwritten default field in the global config permanently changes the default.
Expand Down
3 changes: 3 additions & 0 deletions config/testdata/conf.good.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ remote_write:
headers:
name: value

otlp:
promote_resource_attributes: ["k8s.cluster.name", "k8s.job.name", "k8s.namespace.name"]

remote_read:
- url: http://remote1/read
read_recent: true
Expand Down
2 changes: 2 additions & 0 deletions config/testdata/otlp_sanitize_resource_attributes.bad.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
otlp:
promote_resource_attributes: ["k8s.cluster.name", " k8s.job.name ", "k8s.namespace.name", "k8s.job.name", ""]
2 changes: 2 additions & 0 deletions config/testdata/otlp_sanitize_resource_attributes.good.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
otlp:
promote_resource_attributes: ["k8s.cluster.name", " k8s.job.name ", "k8s.namespace.name"]
4 changes: 4 additions & 0 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ alerting:
remote_write:
[ - <remote_write> ... ]

# Settings related to the OTLP receiver feature.
otlp:
[ promote_resource_attributes: [<string>, ...] | default = [ ] ]

# Settings related to the remote read feature.
remote_read:
[ - <remote_read> ... ]
Expand Down
38 changes: 27 additions & 11 deletions storage/remote/otlptranslator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ type bucketBoundsData struct {
bound float64
}

// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds
// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds.
type byBucketBoundsData []bucketBoundsData

func (m byBucketBoundsData) Len() int { return len(m) }
func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound }
func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] }

// ByLabelName enables the usage of sort.Sort() with a slice of labels
// ByLabelName enables the usage of sort.Sort() with a slice of labels.
type ByLabelName []prompb.Label

func (a ByLabelName) Len() int { return len(a) }
Expand Down Expand Up @@ -115,14 +115,23 @@ var seps = []byte{'\xff'}
// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values.
// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string,
// If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, settings Settings,
ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label {
resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)

promotedAttrs := make([]prompb.Label, 0, len(settings.PromoteResourceAttributes))
for _, name := range settings.PromoteResourceAttributes {
if value, exists := resourceAttrs.Get(name); exists {
promotedAttrs = append(promotedAttrs, prompb.Label{Name: name, Value: value.AsString()})
}
}
sort.Stable(ByLabelName(promotedAttrs))

// Calculate the maximum possible number of labels we could return so we can preallocate l
maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2
maxLabelCount := attributes.Len() + len(settings.ExternalLabels) + len(promotedAttrs) + len(extras)/2

if haveServiceName {
maxLabelCount++
Expand All @@ -132,9 +141,6 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
maxLabelCount++
}

// map ensures no duplicate label name
l := make(map[string]string, maxLabelCount)

// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
labels := make([]prompb.Label, 0, maxLabelCount)
Expand All @@ -148,6 +154,8 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
})
sort.Stable(ByLabelName(labels))

// map ensures no duplicate label names.
l := make(map[string]string, maxLabelCount)
for _, label := range labels {
var finalKey = prometheustranslator.NormalizeLabel(label.Name)
if existingValue, alreadyExists := l[finalKey]; alreadyExists {
Expand All @@ -157,6 +165,13 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
}
}

for _, lbl := range promotedAttrs {
normalized := prometheustranslator.NormalizeLabel(lbl.Name)
if _, exists := l[normalized]; !exists {
l[normalized] = lbl.Value
}
}

// Map service.name + service.namespace to job
if haveServiceName {
val := serviceName.AsString()
Expand All @@ -169,7 +184,7 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
if haveInstanceID {
l[model.InstanceLabel] = instance.AsString()
}
for key, value := range externalLabels {
for key, value := range settings.ExternalLabels {
// External labels have already been sanitized
if _, alreadyExists := l[key]; alreadyExists {
// Skip external labels if they are overridden by metric attributes
Expand Down Expand Up @@ -232,7 +247,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)

// If the sum is unset, it indicates the _sum metric point should be
// omitted
Expand Down Expand Up @@ -408,7 +423,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDat
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false)

// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
Expand Down Expand Up @@ -554,7 +569,8 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta
name = settings.Namespace + "_" + name
}

labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name)
settings.PromoteResourceAttributes = nil
labels := createAttributes(resource, attributes, settings, identifyingAttrs, false, model.MetricNameLabel, name)
haveIdentifier := false
for _, l := range labels {
if l.Name == model.JobLabel || l.Name == model.InstanceLabel {
Expand Down
Loading

0 comments on commit 1fa9ba8

Please sign in to comment.