Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make otelarrow admission control metrics consistent #36359

Merged
merged 18 commits into from
Nov 15, 2024
Merged
31 changes: 31 additions & 0 deletions .chloggen/otelarrow-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: New admission control metrics are consistent across Arrow and OTLP data paths.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36334]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
`otelcol_otelarrow_admission_in_flight_bytes` new, replaces `otelcol_otel_arrow_receiver_in_flight_bytes`
`otelcol_otelarrow_admission_waiting_bytes`: new, describes waiting requests
`otelcol_otel_arrow_receiver_in_flight_items`: removed
`otelcol_otel_arrow_receiver_in_flight_requests`: removed
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
39 changes: 34 additions & 5 deletions internal/otelarrow/admission2/boundedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,24 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
)

var ErrTooMuchWaiting = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data")
var ErrRequestTooLarge = status.Errorf(grpccodes.InvalidArgument, "rejecting request, request is too large")

// BoundedQueue is a LIFO-oriented admission-controlled Queue.
type BoundedQueue struct {
maxLimitAdmit uint64
maxLimitWait uint64
tracer trace.Tracer
maxLimitAdmit uint64
maxLimitWait uint64
tracer trace.Tracer
telemetryBuilder *internalmetadata.TelemetryBuilder

// lock protects currentAdmitted, currentWaiting, and waiters

Expand All @@ -43,13 +48,37 @@ type waiter struct {
// NewBoundedQueue returns a LIFO-oriented Queue implementation which
// admits `maxLimitAdmit` bytes concurrently and allows up to
// `maxLimitWait` bytes to wait for admission.
func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) Queue {
return &BoundedQueue{
func NewBoundedQueue(id component.ID, ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) (Queue, error) {
bq := &BoundedQueue{
maxLimitAdmit: maxLimitAdmit,
maxLimitWait: maxLimitWait,
waiters: list.New(),
tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"),
}
attr := metric.WithAttributes(attribute.String(netstats.ReceiverKey, id.String()))
telemetryBuilder, err := internalmetadata.NewTelemetryBuilder(ts,
internalmetadata.WithOtelarrowAdmissionInFlightBytesCallback(bq.inFlightCB, attr),
internalmetadata.WithOtelarrowAdmissionWaitingBytesCallback(bq.waitingCB, attr),
)
if err != nil {
return nil, err
}
bq.telemetryBuilder = telemetryBuilder
return bq, nil
}

func (bq *BoundedQueue) inFlightCB() int64 {
// Note, see https://github.com/open-telemetry/otel-arrow/issues/270
bq.lock.Lock()
defer bq.lock.Unlock()
return int64(bq.currentAdmitted)
}

func (bq *BoundedQueue) waitingCB() int64 {
// Note, see https://github.com/open-telemetry/otel-arrow/issues/270
bq.lock.Lock()
defer bq.lock.Unlock()
return int64(bq.currentWaiting)
}

// acquireOrGetWaiter returns with three distinct conditions depending
Expand Down
198 changes: 149 additions & 49 deletions internal/otelarrow/admission2/boundedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,53 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"google.golang.org/grpc/codes"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
)

const (
expectScope = "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"
expectInFlightName = "otelcol_otelarrow_admission_in_flight_bytes"
expectWaitingName = "otelcol_otelarrow_admission_waiting_bytes"
)

type bqTest struct {
t *testing.T
t *testing.T
reader *sdkmetric.ManualReader
provider *sdkmetric.MeterProvider
*BoundedQueue
}

var noopTelemetry = componenttest.NewNopTelemetrySettings()

func newBQTest(t *testing.T, maxAdmit, maxWait uint64) bqTest {
settings := componenttest.NewNopTelemetrySettings()

reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(
sdkmetric.WithResource(resource.Empty()),
sdkmetric.WithReader(reader),
)
settings.MeterProvider = provider
settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
return settings.MeterProvider
}

bq, err := NewBoundedQueue(component.MustNewID("admission_testing"), settings, maxAdmit, maxWait)
require.NoError(t, err)
return bqTest{
t: t,
BoundedQueue: NewBoundedQueue(noopTelemetry, maxAdmit, maxWait).(*BoundedQueue),
reader: reader,
provider: provider,
BoundedQueue: bq.(*BoundedQueue),
}
}

Expand Down Expand Up @@ -67,87 +97,97 @@ func mkRange(from, to uint64) []uint64 {

func TestBoundedQueueLimits(t *testing.T) {
for _, test := range []struct {
name string
maxLimitAdmit uint64
maxLimitWait uint64
requestSizes []uint64
timeout time.Duration
expectErrs map[string]int
name string
maxLimitAdmit uint64
maxLimitWait uint64
expectAcquired [2]int64
requestSizes []uint64
timeout time.Duration
expectErrs map[string]int
}{
{
name: "simple_no_waiters_25",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(25, 40),
timeout: 0,
expectErrs: map[string]int{},
name: "simple_no_waiters_25",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(25, 40),
expectAcquired: [2]int64{1000, 1000},
timeout: 0,
expectErrs: map[string]int{},
},
{
name: "simple_no_waiters_1",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(1, 1000),
timeout: 0,
expectErrs: map[string]int{},
name: "simple_no_waiters_1",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(1, 1000),
expectAcquired: [2]int64{1000, 1000},
timeout: 0,
expectErrs: map[string]int{},
},
{
name: "without_waiting_remainder",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(30, 40),
timeout: 0,
name: "without_waiting_remainder",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(30, 40),
expectAcquired: [2]int64{990, 990},
timeout: 0,
expectErrs: map[string]int{
// 7 failures with a remainder of 10
// 30 * (40 - 7) = 990
ErrTooMuchWaiting.Error(): 7,
},
},
{
name: "without_waiting_complete",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: append(mkRepeat(30, 40), 10),
timeout: 0,
name: "without_waiting_complete",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: append(mkRepeat(30, 40), 10),
expectAcquired: [2]int64{1000, 1000},
timeout: 0,
expectErrs: map[string]int{
// 30*33+10 succeed, 7 failures (as above)
ErrTooMuchWaiting.Error(): 7,
},
},
{
name: "with_waiters_timeout",
maxLimitAdmit: 1000,
maxLimitWait: 1000,
requestSizes: mkRepeat(20, 100),
timeout: time.Second,
name: "with_waiters_timeout",
maxLimitAdmit: 1000,
maxLimitWait: 1000,
requestSizes: mkRepeat(20, 100),
expectAcquired: [2]int64{1000, 1000},
timeout: time.Second,
expectErrs: map[string]int{
// 20*50=1000 is half of the requests timing out
status.Error(grpccodes.Canceled, context.DeadlineExceeded.Error()).Error(): 50,
},
},
{
name: "with_size_exceeded",
maxLimitAdmit: 1000,
maxLimitWait: 2000,
requestSizes: []uint64{1001},
timeout: 0,
name: "with_size_exceeded",
maxLimitAdmit: 1000,
maxLimitWait: 2000,
requestSizes: []uint64{1001},
expectAcquired: [2]int64{0, 0},
timeout: 0,
expectErrs: map[string]int{
ErrRequestTooLarge.Error(): 1,
},
},
{
name: "mixed_sizes",
maxLimitAdmit: 45, // 45 is the exact sum of request sizes
maxLimitWait: 0,
requestSizes: mkRange(1, 9),
timeout: 0,
expectErrs: map[string]int{},
name: "mixed_sizes",
maxLimitAdmit: 45, // 45 is the exact sum of request sizes
maxLimitWait: 0,
requestSizes: mkRange(1, 9),
expectAcquired: [2]int64{45, 45},
timeout: 0,
expectErrs: map[string]int{},
},
{
name: "too_many_mixed_sizes",
maxLimitAdmit: 44, // all but one request will succeed
maxLimitWait: 0,
requestSizes: mkRange(1, 9),
timeout: 0,
// worst case is the size=9 fails, so minimum is 44-9+1
expectAcquired: [2]int64{36, 44},
timeout: 0,
expectErrs: map[string]int{
ErrTooMuchWaiting.Error(): 1,
},
Expand Down Expand Up @@ -190,6 +230,12 @@ func TestBoundedQueueLimits(t *testing.T) {

wait1.Wait()

// The in-flight bytes are in-range, none waiting.
inflight, waiting := bq.verifyMetrics(t)
require.LessOrEqual(t, test.expectAcquired[0], inflight)
require.GreaterOrEqual(t, test.expectAcquired[1], inflight)
require.Equal(t, int64(0), waiting)

close(releaseChan)

wait2.Wait()
Expand All @@ -214,10 +260,55 @@ func TestBoundedQueueLimits(t *testing.T) {

// and the final state is all 0.
bq.waitForPending(0, 0)

// metrics are zero
inflight, waiting = bq.verifyMetrics(t)
require.Equal(t, int64(0), inflight)
require.Equal(t, int64(0), waiting)
})
}
}

func (bq bqTest) verifyPoint(t *testing.T, m metricdata.Metrics) int64 {
switch a := m.Data.(type) {
case metricdata.Sum[int64]:
require.Len(t, a.DataPoints, 1)
dp := a.DataPoints[0]
for _, attr := range dp.Attributes.ToSlice() {
if attr.Key == netstats.ReceiverKey && attr.Value.AsString() == "admission_testing" {
return dp.Value
}
}
t.Errorf("point value not found: %v", m.Data)
default:
t.Errorf("incorrect metric data type: %T", m.Data)
}
return -1
}

func (bq bqTest) verifyMetrics(t *testing.T) (inflight int64, waiting int64) {
inflight = -1
waiting = -1

var rm metricdata.ResourceMetrics
require.NoError(t, bq.reader.Collect(context.Background(), &rm))

for _, sm := range rm.ScopeMetrics {
if sm.Scope.Name != expectScope {
continue
}
for _, m := range sm.Metrics {
switch m.Name {
case expectInFlightName:
inflight = bq.verifyPoint(t, m)
case expectWaitingName:
waiting = bq.verifyPoint(t, m)
}
}
}
return
}

func TestBoundedQueueLIFO(t *testing.T) {
const maxAdmit = 10

Expand Down Expand Up @@ -252,6 +343,11 @@ func TestBoundedQueueLIFO(t *testing.T) {
notW1 := bq.startWaiter(ctx, secondWait, &relW1)
bq.waitForPending(maxAdmit, maxAdmit)

// The in-flight and waiting bytes are counted.
inflight, waiting := bq.verifyMetrics(t)
require.Equal(t, int64(maxAdmit), inflight)
require.Equal(t, int64(maxAdmit), waiting)

relFirst()

// early is true when releasing the first acquired
Expand All @@ -278,6 +374,10 @@ func TestBoundedQueueLIFO(t *testing.T) {
relW1()

bq.waitForPending(0, 0)

inflight, waiting = bq.verifyMetrics(t)
require.Equal(t, int64(0), inflight)
require.Equal(t, int64(0), waiting)
})
}
}
Expand Down
6 changes: 6 additions & 0 deletions internal/otelarrow/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

package otelarrow // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"
Loading