From 23b2804d0a7bd71211a9ee43ac9eaa9bacdaf6fa Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 22 Oct 2024 10:28:08 -0400 Subject: [PATCH] Delete unused/buggy EventACKTracker helper (#41357) --- x-pack/libbeat/common/aws/acker.go | 85 ------------------------- x-pack/libbeat/common/aws/acker_test.go | 69 -------------------- 2 files changed, 154 deletions(-) delete mode 100644 x-pack/libbeat/common/aws/acker.go delete mode 100644 x-pack/libbeat/common/aws/acker_test.go diff --git a/x-pack/libbeat/common/aws/acker.go b/x-pack/libbeat/common/aws/acker.go deleted file mode 100644 index 95fbe14b7744..000000000000 --- a/x-pack/libbeat/common/aws/acker.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package aws - -import ( - "context" - "sync" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/acker" -) - -// EventACKTracker tracks the publishing state of S3 objects. Specifically -// it tracks the number of message acknowledgements that are pending from the -// output. It can be used to wait until all ACKs have been received for one or -// more S3 objects. -type EventACKTracker struct { - sync.Mutex - PendingACKs int64 - ctx context.Context - cancel context.CancelFunc -} - -func NewEventACKTracker(ctx context.Context) *EventACKTracker { - ctx, cancel := context.WithCancel(ctx) - return &EventACKTracker{ctx: ctx, cancel: cancel} -} - -// Add increments the number of pending ACKs. -func (a *EventACKTracker) Add() { - a.Lock() - a.PendingACKs++ - a.Unlock() -} - -// ACK decrements the number of pending ACKs. -func (a *EventACKTracker) ACK() { - a.Lock() - defer a.Unlock() - - if a.PendingACKs <= 0 { - panic("misuse detected: negative ACK counter") - } - - a.PendingACKs-- - if a.PendingACKs == 0 { - a.cancel() - } -} - -// Wait waits for the number of pending ACKs to be zero. -// Wait must be called sequentially only after every expected -// `Add` calls are made. Failing to do so could reset the pendingACKs -// property to 0 and would results in Wait returning after additional -// calls to `Add` are made without a corresponding `ACK` call. -func (a *EventACKTracker) Wait() { - // If there were never any pending ACKs then cancel the context. (This can - // happen when a document contains no events or cannot be read due to an error). - a.Lock() - if a.PendingACKs == 0 { - a.cancel() - } - a.Unlock() - - // Wait. - <-a.ctx.Done() -} - -// NewEventACKHandler returns a beat ACKer that can receive callbacks when -// an event has been ACKed an output. If the event contains a private metadata -// pointing to an eventACKTracker then it will invoke the trackers ACK() method -// to decrement the number of pending ACKs. -func NewEventACKHandler() beat.EventListener { - return acker.ConnectionOnly( - acker.EventPrivateReporter(func(_ int, privates []interface{}) { - for _, private := range privates { - if ack, ok := private.(*EventACKTracker); ok { - ack.ACK() - } - } - }), - ) -} diff --git a/x-pack/libbeat/common/aws/acker_test.go b/x-pack/libbeat/common/aws/acker_test.go deleted file mode 100644 index 3c470f0b922b..000000000000 --- a/x-pack/libbeat/common/aws/acker_test.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package aws - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/libbeat/beat" -) - -func TestEventACKTracker(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - acker := NewEventACKTracker(ctx) - acker.Add() - acker.ACK() - - assert.EqualValues(t, 0, acker.PendingACKs) - assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) -} - -func TestEventACKTrackerNoACKs(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - acker := NewEventACKTracker(ctx) - acker.Wait() - - assert.EqualValues(t, 0, acker.PendingACKs) - assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) -} - -func TestEventACKHandler(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - // Create acker. Add one pending ACK. - acker := NewEventACKTracker(ctx) - acker.Add() - - // Create an ACK handler and simulate one ACKed event. - ackHandler := NewEventACKHandler() - ackHandler.AddEvent(beat.Event{Private: acker}, true) - ackHandler.ACKEvents(1) - - assert.EqualValues(t, 0, acker.PendingACKs) - assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) -} - -func TestEventACKHandlerWait(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - // Create acker. Add one pending ACK. - acker := NewEventACKTracker(ctx) - acker.Add() - acker.ACK() - acker.Wait() - acker.Add() - - assert.EqualValues(t, 1, acker.PendingACKs) - assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) -}