From d23b4d33622137f7245b628d77d15f8ecfef3a0d Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 4 Mar 2024 12:51:37 -0500 Subject: [PATCH] Memory queue: cancel in-progress writes on queue closed, not producer closed (#38094) Fixes a race condition that could lead to incorrect event totals and occasional panics #37702. Once a producer sends a get request to the memory queue, it must wait on the response unless the queue itself is closed, otherwise it can return a false failure. The previous code mistakenly waited on the done signal for the current producer rather than the queue. This PR adds the queue's done signal to the producer struct, and waits on that once the insert request is sent. --- CHANGELOG.next.asciidoc | 1 + libbeat/publisher/queue/memqueue/produce.go | 39 ++++-- .../publisher/queue/memqueue/queue_test.go | 121 +++++++++++++++--- 3 files changed, 131 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7731d291ba4..77e78868b6a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -96,6 +96,7 @@ fields added to events containing the Beats version. {pull}37553[37553] - Update github.com/lestrrat-go/jwx dependency. {pull}37799[37799] - [threatintel] MISP pagination fixes {pull}37898[37898] - Fix file handle leak when handling errors in filestream {pull}37973[37973] +- Fix a race condition that could crash Filebeat with a "negative WaitGroup counter" error {pull}38094[38094] - Prevent HTTPJSON holding response bodies between executions. {issue}35219[35219] {pull}38116[38116] - Fix "failed processing S3 event for object key" error on aws-s3 input when key contains the "+" character {issue}38012[38012] {pull}38125[38125] diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 954ea055f4a..5ddea468e4c 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -36,9 +36,10 @@ type ackProducer struct { } type openState struct { - log *logp.Logger - done chan struct{} - events chan pushRequest + log *logp.Logger + done chan struct{} + queueDone <-chan struct{} + events chan pushRequest } // producerID stores the order of events within a single producer, so multiple @@ -58,9 +59,10 @@ type ackHandler func(count int) func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCancel bool) queue.Producer { openState := openState{ - log: b.logger, - done: make(chan struct{}), - events: b.pushChan, + log: b.logger, + done: make(chan struct{}), + queueDone: b.ctx.Done(), + events: b.pushChan, } if cb != nil { @@ -143,27 +145,40 @@ func (st *openState) Close() { func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { select { case st.events <- req: - // If the output is blocked and the queue is full, `req` is written - // to `st.events`, however the queue never writes back to `req.resp`, - // which effectively blocks for ever. So we also need to select on the - // done channel to ensure we don't miss the shutdown signal. + // The events channel is buffered, which means we may successfully + // write to it even if the queue is shutting down. To avoid blocking + // forever during shutdown, we also have to wait on the queue's + // shutdown channel. select { case resp := <-req.resp: return resp, true - case <-st.done: + case <-st.queueDone: st.events = nil return 0, false } case <-st.done: st.events = nil return 0, false + case <-st.queueDone: + st.events = nil + return 0, false } } func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) { select { case st.events <- req: - return <-req.resp, true + // The events channel is buffered, which means we may successfully + // write to it even if the queue is shutting down. To avoid blocking + // forever during shutdown, we also have to wait on the queue's + // shutdown channel. + select { + case resp := <-req.resp: + return resp, true + case <-st.queueDone: + st.events = nil + return 0, false + } case <-st.done: st.events = nil return 0, false diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 141514483f3..53f8da4b77c 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -27,8 +27,7 @@ import ( "testing" "time" - "gotest.tools/assert" - + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/publisher/queue" @@ -77,17 +76,17 @@ func TestProduceConsumer(t *testing.T) { t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond))) } -// TestProducerDoesNotBlockWhenCancelled ensures the producer Publish -// does not block indefinitely. +// TestProducerDoesNotBlockWhenQueueClosed ensures the producer Publish +// does not block indefinitely during queue shutdown. // -// Once we get a producer `p` from the queue we want to ensure +// Once we get a producer `p` from the queue `q` we want to ensure // that if p.Publish is called and blocks it will unblock once -// p.Cancel is called. +// `q.Close` is called. // // For this test we start a queue with size 2 and try to add more -// than 2 events to it, p.Publish will block, once we call p.Cancel, +// than 2 events to it, p.Publish will block, once we call q.Close, // we ensure the 3rd event was not successfully published. -func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { +func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { q := NewQueue(nil, nil, Settings{ Events: 2, // Queue size @@ -138,8 +137,12 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { time.Millisecond, "the first two events were not successfully published") - // Cancel the producer, this should unblock its Publish method - p.Cancel() + // Close the queue, this should unblock the pending Publish call. + // It's not enough to just cancel the producer: once the producer + // has successfully sent a request to the queue, it must wait for + // the response unless the queue shuts down, otherwise the pipeline + // event totals will be wrong. + q.Close() require.Eventually( t, @@ -149,6 +152,88 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { "test not flagged as successful, p.Publish likely blocked indefinitely") } +func TestProducerClosePreservesEventCount(t *testing.T) { + // Check for https://github.com/elastic/beats/issues/37702, a problem + // where canceling a producer while it was waiting on a response + // to an insert request could lead to inaccurate event totals. + + var activeEvents atomic.Int64 + + q := NewQueue(nil, nil, + Settings{ + Events: 3, // Queue size + MaxGetRequest: 2, + FlushTimeout: 10 * time.Millisecond, + }, 1) + + p := q.Producer(queue.ProducerConfig{ + ACK: func(count int) { + activeEvents.Add(-int64(count)) + }, + OnDrop: func(e interface{}) { + //activeEvents.Add(-1) + }, + DropOnCancel: false, + }) + + // Asynchronously, send 4 events to the queue. + // Three will be enqueued, and one will be buffered, + // until we start reading from the queue. + // This needs to run in a goroutine because the buffered + // event will block until the queue handles it. + var wgProducer sync.WaitGroup + wgProducer.Add(1) + go func() { + for i := 0; i < 4; i++ { + event := i + // For proper navigation of the race conditions inherent to this + // test: increment active events before the publish attempt, then + // decrement afterwards if it failed (otherwise the event count + // could become negative even under correct queue operation). + activeEvents.Add(1) + _, ok := p.Publish(event) + if !ok { + activeEvents.Add(-1) + } + } + wgProducer.Done() + }() + + // This sleep is regrettable, but there's no deterministic way to know when + // the producer code has buffered an event in the queue's channel. + // However, the test is written to produce false negatives only: + // - If this test fails, it _always_ indicates a bug. + // - If there is a bug, this test will _often_ fail. + time.Sleep(20 * time.Millisecond) + + // Cancel the producer, then read and acknowledge two batches. If the + // Publish calls and the queue code are working, activeEvents should + // _usually_ end up as 0, but _always_ end up non-negative. + p.Cancel() + + // The queue reads also need to be done in a goroutine, in case the + // producer cancellation signal went through before the Publish + // requests -- if only 2 events entered the queue, then the second + // Get call will block until the queue itself is cancelled. + go func() { + for i := 0; i < 2; i++ { + batch, err := q.Get(2) + // Only error to worry about is queue closing, which isn't + // a test failure. + if err == nil { + batch.Done() + } + } + }() + + // One last sleep to let things percolate, then we close the queue + // to unblock any helpers and verify that the final active event + // count isn't negative. + time.Sleep(10 * time.Millisecond) + q.Close() + assert.False(t, activeEvents.Load() < 0, "active event count should never be negative") +} + func TestQueueMetricsDirect(t *testing.T) { eventsToTest := 5 maxEvents := 10 @@ -190,7 +275,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te // Read events, don't yet ack them batch, err := testQueue.Get(eventsToTest) - assert.NilError(t, err, "error in Get") + assert.NoError(t, err, "error in Get") t.Logf("Got batch of %d events", batch.Count()) queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) @@ -206,7 +291,7 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup // wait briefly to avoid races across all the queue channels time.Sleep(time.Millisecond * 100) testMetrics, err := q.Metrics() - assert.NilError(t, err, "error calling metrics for test %s", test) + assert.NoError(t, err, "error calling metrics for test %s", test) assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test) assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test) assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) @@ -266,18 +351,18 @@ func TestEntryIDs(t *testing.T) { for i := 0; i < entryCount; i++ { batch, err := q.Get(1) - assert.NilError(t, err, "Queue read should succeed") + assert.NoError(t, err, "Queue read should succeed") assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i), fmt.Sprintf("Oldest entry ID before ACKing event %v should be %v", i, i)) batch.Done() waiter.waitForEvents(1) metrics, err = q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i+1), fmt.Sprintf("Oldest entry ID after ACKing event %v should be %v", i, i+1)) @@ -297,7 +382,7 @@ func TestEntryIDs(t *testing.T) { for i := 0; i < entryCount; i++ { batch, err := q.Get(1) - assert.NilError(t, err, "Queue read should succeed") + assert.NoError(t, err, "Queue read should succeed") assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry") batches = append(batches, batch) } @@ -318,7 +403,7 @@ func TestEntryIDs(t *testing.T) { // the slight nondeterminism. time.Sleep(1 * time.Millisecond) metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(0), fmt.Sprintf("Oldest entry ID after ACKing event %v should be 0", i)) } @@ -326,7 +411,7 @@ func TestEntryIDs(t *testing.T) { batches[0].Done() waiter.waitForEvents(100) metrics, err := q.Metrics() - assert.NilError(t, err, "Queue metrics call should succeed") + assert.NoError(t, err, "Queue metrics call should succeed") assert.Equal(t, metrics.OldestEntryID, queue.EntryID(100), fmt.Sprintf("Oldest entry ID after ACKing event 0 should be %v", queue.EntryID(entryCount)))