Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Record limit methods with DroppedAttributes #5190

Merged
merged 18 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `Recorder` in `go.opentelemetry.io/otel/log/logtest` to facilitate testing the log bridge implementations. (#5134)
- The `DroppedAttributes` is added to the `"go.opentelemetry.io/otel/sdk/log".Record` type.
This method can be used to determine how many log attributes were dropped from the `Record` due to limits being exceeded. (#5190)
- Add span flags to OTLP spans and links exported by `go.opentelemetry.io/otel/exporters/otlp/otlptrace`. (#5194)

### Changed

- Update `go.opentelemetry.io/proto/otlp` from v1.1.0 to v1.2.0. (#5177)

### Removed

- The `AttributeCountLimit` on the `"go.opentelemetry.io/otel/sdk/log".Record` type is removed. (#5190)
- The `AttributeValueLengthLimit` on the `"go.opentelemetry.io/otel/sdk/log".Record` type is removed. (#5190)

## [1.25.0/0.47.0/0.0.8/0.1.0-alpha] 2024-04-05

### Added
Expand Down
6 changes: 2 additions & 4 deletions exporters/stdout/stdoutlog/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ func (e *Exporter) newRecordJSON(r sdklog.Record) recordJSON {

Attributes: make([]log.KeyValue, 0, r.AttributesLen()),

Resource: r.Resource(),
Scope: r.InstrumentationScope(),
AttributeValueLengthLimit: r.AttributeValueLengthLimit(),
AttributeCountLimit: r.AttributeCountLimit(),
Resource: r.Resource(),
Scope: r.InstrumentationScope(),
}

r.WalkAttributes(func(kv log.KeyValue) bool {
Expand Down
46 changes: 46 additions & 0 deletions sdk/log/logger_norace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !race

package log

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
)

func TestAllocationLimits(t *testing.T) {
// This test is not run with a race detector. The sync.Pool used by parts
// of the SDK has memory optimizations removed for the race detector. Do
// not test performance of the SDK in that state.

const runs = 10

logger := newLogger(NewLoggerProvider(), instrumentation.Scope{})

r := log.Record{}
r.SetTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetObservedTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetBody(log.StringValue("testing body value"))
r.SetSeverity(log.SeverityInfo)
r.SetSeverityText("testing text")

r.AddAttributes(
log.String("k1", "str"),
log.Float64("k2", 1.0),
log.Int("k3", 2),
log.Bool("k4", true),
log.Bytes("k5", []byte{1}),
)

assert.Equal(t, 0.0, testing.AllocsPerRun(runs, func() {
logger.newRecord(context.Background(), r)
}), "newRecord")
}
25 changes: 0 additions & 25 deletions sdk/log/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,28 +273,3 @@ func TestLoggerEnabled(t *testing.T) {
})
}
}

func TestAllocationLimits(t *testing.T) {
const runs = 10

logger := newLogger(NewLoggerProvider(), instrumentation.Scope{})

r := log.Record{}
r.SetTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetObservedTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetBody(log.StringValue("testing body value"))
r.SetSeverity(log.SeverityInfo)
r.SetSeverityText("testing text")

r.AddAttributes(
log.String("k1", "str"),
log.Float64("k2", 1.0),
log.Int("k3", 2),
log.Bool("k4", true),
log.Bytes("k5", []byte{1}),
)

assert.Equal(t, 0.0, testing.AllocsPerRun(runs, func() {
logger.newRecord(context.Background(), r)
}), "newRecord")
}
174 changes: 154 additions & 20 deletions sdk/log/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"slices"
"sync"
"time"

"go.opentelemetry.io/otel/log"
Expand All @@ -19,6 +20,20 @@ import (
// cover 95% of all use-cases (https://go.dev/blog/slog#performance).
const attributesInlineCount = 5

// indexPool is a pool of index maps used for de-duplication.
var indexPool = sync.Pool{
New: func() any { return make(map[string]int) },
}

func getIndex() map[string]int {
return indexPool.Get().(map[string]int)
}

func putIndex(index map[string]int) {
clear(index)
indexPool.Put(index)
}

// Record is a log record emitted by the Logger.
type Record struct {
// Do not embed the log.Record. Attributes need to be overwrite-able and
Expand Down Expand Up @@ -48,6 +63,10 @@ type Record struct {
// - Unused array elements are zero-ed. Used to detect mistakes.
back []log.KeyValue

// dropped is the count of attributes that have been dropped when limits
// were reached.
dropped int
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

traceID trace.TraceID
spanID trace.SpanID
traceFlags trace.TraceFlags
Expand Down Expand Up @@ -131,6 +150,99 @@ func (r *Record) WalkAttributes(f func(log.KeyValue) bool) {

// AddAttributes adds attributes to the log record.
func (r *Record) AddAttributes(attrs ...log.KeyValue) {
n := r.AttributesLen()
if n == 0 {
// Avoid the more complex duplicate map lookups bellow.
attrs, r.dropped = dedup(attrs)

var drop int
attrs, drop = head(attrs, r.attributeCountLimit)
r.dropped += drop

r.addAttrs(attrs)
return
}

// Used to find duplicates between attrs and existing attributes in r.
rIndex := r.attrIndex()
defer putIndex(rIndex)

// Unique attrs that need to be added to r. This uses the same underlying
// array as attrs.
//
// Note, do not iterate attrs twice by just calling dedup(attrs) here.
unique := attrs[:0]
// Used to find duplicates within attrs itself. The index value is the
// index of the element in unique.
uIndex := getIndex()
dashpole marked this conversation as resolved.
Show resolved Hide resolved
defer putIndex(uIndex)

// Deduplicate attrs within the scope of all existing attributes.
for _, a := range attrs {
// Last-value-wins for any duplicates in attrs.
idx, found := uIndex[a.Key]
if found {
r.dropped++
unique[idx] = a
continue
}

idx, found = rIndex[a.Key]
if found {
// New attrs overwrite any existing with the same key.
r.dropped++
if idx < 0 {
r.front[-(idx + 1)] = a
} else {
r.back[idx] = a
}
} else {
// Unique attribute.
// TODO: apply truncation to string and []string values.
// TODO: deduplicate map values.
unique = append(unique, a)
uIndex[a.Key] = len(unique) - 1
}
}
attrs = unique

if r.attributeCountLimit > 0 && n+len(attrs) > r.attributeCountLimit {
// Truncate the now unique attributes to comply with limit.
//
// Do not use head(attrs, r.attributeCountLimit - n) here. If
// (r.attributeCountLimit - n) <= 0 attrs needs to be emptied.
last := max(0, (r.attributeCountLimit - n))
r.dropped += len(attrs) - last
attrs = attrs[:last]
}

r.addAttrs(attrs)
}

// attrIndex returns an index map for all attributes in the Record r. The index
// maps the attribute key to location the attribute is stored. If the value is
// < 0 then -(value + 1) (e.g. -1 -> 0, -2 -> 1, -3 -> 2) represents the index
// in r.nFront. Otherwise, the index is the exact index of r.back.
//
// The returned index is taken from the indexPool. It is the callers
// responsibility to return the index to that pool (putIndex) when done.
func (r *Record) attrIndex() map[string]int {
index := getIndex()
dashpole marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < r.nFront; i++ {
key := r.front[i].Key
index[key] = -i - 1 // stored in front: negative index.
}
for i := 0; i < len(r.back); i++ {
key := r.back[i].Key
index[key] = i // stored in back: positive index.
}
return index
}

// addAttrs adds attrs to the Record r. This does not validate any limits or
// duplication of attributes, these tasks are left to the caller to handle
// prior to calling.
func (r *Record) addAttrs(attrs []log.KeyValue) {
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
a := attrs[i]
Expand All @@ -144,6 +256,14 @@ func (r *Record) AddAttributes(attrs ...log.KeyValue) {

// SetAttributes sets (and overrides) attributes to the log record.
func (r *Record) SetAttributes(attrs ...log.KeyValue) {
// TODO: apply truncation to string and []string values.
// TODO: deduplicate map values.
attrs, r.dropped = dedup(attrs)

var drop int
attrs, drop = head(attrs, r.attributeCountLimit)
r.dropped += drop

r.nFront = 0
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
Expand All @@ -155,11 +275,45 @@ func (r *Record) SetAttributes(attrs ...log.KeyValue) {
r.back = slices.Clone(attrs[i:])
}

// head returns the first n values of kvs along with the number of elements
// dropped. If n is less than or equal to zero, kvs is returned with 0.
func head(kvs []log.KeyValue, n int) (out []log.KeyValue, dropped int) {
if n > 0 && len(kvs) > n {
return kvs[:n], len(kvs) - n
}
return kvs, 0
}

// dedup deduplicates kvs front-to-back with the last value saved.
func dedup(kvs []log.KeyValue) (unique []log.KeyValue, dropped int) {
index := getIndex()
defer putIndex(index)

unique = kvs[:0] // Use the same underlying array as kvs.
for _, a := range kvs {
idx, found := index[a.Key]
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
if found {
dropped++
unique[idx] = a
} else {
unique = append(unique, a)
index[a.Key] = len(unique) - 1
}
}
return unique, dropped
}

// AttributesLen returns the number of attributes in the log record.
func (r *Record) AttributesLen() int {
return r.nFront + len(r.back)
}

// DroppedAttributes returns the number of attributes dropped due to limits
// being reached.
func (r *Record) DroppedAttributes() int {
return r.dropped
}

// TraceID returns the trace ID or empty array.
func (r *Record) TraceID() trace.TraceID {
return r.traceID
Expand Down Expand Up @@ -206,26 +360,6 @@ func (r *Record) InstrumentationScope() instrumentation.Scope {
return *r.scope
}

// AttributeValueLengthLimit is the maximum allowed attribute value length.
//
// This limit only applies to string and string slice attribute values.
// Any string longer than this value should be truncated to this length.
//
// Negative value means no limit should be applied.
func (r *Record) AttributeValueLengthLimit() int {
return r.attributeValueLengthLimit
}

// AttributeCountLimit is the maximum allowed log record attribute count. Any
// attribute added to a log record once this limit is reached should be dropped.
//
// Zero means no attributes should be recorded.
//
// Negative value means no limit should be applied.
func (r *Record) AttributeCountLimit() int {
return r.attributeCountLimit
}

// Clone returns a copy of the record with no shared state. The original record
// and the clone can both be modified without interfering with each other.
func (r *Record) Clone() Record {
Expand Down
Loading